1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.github.sworisbreathing.sfmf4j.nio2;
17
18 import com.github.sworisbreathing.sfmf4j.api.DirectoryListener;
19 import com.github.sworisbreathing.sfmf4j.api.FileMonitorService;
20 import java.io.File;
21 import java.io.IOException;
22 import java.nio.file.ClosedWatchServiceException;
23 import java.nio.file.FileSystems;
24 import java.nio.file.Path;
25 import java.nio.file.Paths;
26 import java.nio.file.StandardWatchEventKinds;
27 import java.nio.file.WatchEvent;
28 import java.nio.file.WatchEvent.Kind;
29 import java.nio.file.WatchKey;
30 import java.nio.file.WatchService;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44
45
46
47
48 public class WatchServiceFileMonitorServiceImpl implements FileMonitorService {
49
50
51
52
53 private static final Logger logger = LoggerFactory.getLogger(WatchServiceFileMonitorServiceImpl.class);
54
55
56
57
58 private Future watchFuture = null;
59
60
61
62
63 private WatchService watchService;
64
65
66
67
68
69
70
71 private static final Kind[] interested_types = new Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY};
72
73
74
75
76 private ExecutorService executorService;
77
78
79
80
81
82 private volatile boolean closeWatchServiceOnShutdown = false;
83
84
85
86
87
88 private volatile boolean shutdownExecutorServiceOnShutdown = false;
89
90
91
92
93
94
95
96 ExecutorService getExecutorService() {
97 return executorService;
98 }
99
100
101
102
103
104
105 WatchService getWatchService() {
106 return watchService;
107 }
108
109
110
111
112 private final ConcurrentMap<String, WatchKey> watchKeysByPath;
113
114
115
116
117 private final ConcurrentMap<WatchKey, String> pathsByWatchKey;
118
119
120
121
122 private final ConcurrentMap<WatchKey, Collection<SFMF4JWatchListener>> listenersByWatchKey;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public WatchServiceFileMonitorServiceImpl(final WatchService watchService, final ExecutorService executorService) {
138 this.watchService = watchService;
139 this.executorService = executorService;
140 this.watchKeysByPath = new ConcurrentHashMap<String, WatchKey>();
141 this.pathsByWatchKey = new ConcurrentHashMap<WatchKey, String>();
142 this.listenersByWatchKey = new ConcurrentHashMap<WatchKey, Collection<SFMF4JWatchListener>>();
143 }
144
145
146
147
148
149
150
151 private synchronized WatchKey getWatchKeyForPath(final String path) throws IOException {
152 WatchKey key = watchKeysByPath.get(path);
153 if (key == null) {
154 logger.debug("Lazy-instantiating watch key for path: {}", path);
155 key = Paths.get(path).register(watchService, interested_types);
156 watchKeysByPath.put(path, key);
157 pathsByWatchKey.put(key, path);
158 listenersByWatchKey.put(key, Collections.newSetFromMap(new ConcurrentHashMap<SFMF4JWatchListener, Boolean>()));
159 }
160 return key;
161 }
162
163 @Override
164 public void registerDirectoryListener(File directory, DirectoryListener directoryListener) {
165 String path = directory.getAbsolutePath();
166 try {
167 synchronized (this) {
168 WatchKey key = getWatchKeyForPath(path);
169 SFMF4JWatchListener listener = new SFMF4JWatchListener(directoryListener);
170 listenersByWatchKey.get(key).add(listener);
171 }
172 } catch (IOException ex) {
173 logger.error(ex.getMessage(), ex);
174 }
175 }
176
177 @Override
178 public void unregisterDirectoryListener(File directory, DirectoryListener directoryListener) {
179 String path = directory.getAbsolutePath();
180 synchronized (this) {
181 final WatchKey key = watchKeysByPath.get(path);
182 if (key != null) {
183 Collection<SFMF4JWatchListener> listeners = listenersByWatchKey.get(key);
184 listeners.remove(new SFMF4JWatchListener(directoryListener));
185 if (listeners.isEmpty()) {
186
187 cleanup(key);
188 } else {
189 logger.debug("somebody is still listening: {}", path);
190 }
191 }
192 }
193 }
194
195
196
197
198
199
200
201 private synchronized WatchEvent<Path> resolveEventWithCorrectPath(final WatchKey key, final WatchEvent<Path> event) {
202 Path correctPath = Paths.get(pathsByWatchKey.get(key));
203 return new ResolvedPathWatchEvent(event, correctPath);
204 }
205
206
207
208
209
210 @SuppressWarnings("PMD.EmptyCatchBlock")
211 private synchronized void cleanup(final WatchKey key) {
212 logger.trace("cleanUp {}", key);
213 try {
214 key.cancel();
215 }catch(Exception ex) {
216
217 }
218 Collection<SFMF4JWatchListener> listeners = listenersByWatchKey.remove(key);
219 if (listeners != null && !listeners.isEmpty()) {
220 logger.warn("Cleaning up key but listeners are still registered.");
221 }
222 String path = pathsByWatchKey.remove(key);
223 if (path != null) {
224 watchKeysByPath.remove(path);
225 }
226 }
227
228 @Override
229 public synchronized void initialize() {
230 if (watchService==null) {
231 logger.warn("No watch service was explicitly set. Setting one now.");
232 closeWatchServiceOnShutdown = true;
233 try {
234 this.watchService = FileSystems.getDefault().newWatchService();
235 }catch(IOException ex) {
236 throw new RuntimeException(ex.getLocalizedMessage(), ex);
237 }
238 }
239 if (executorService==null) {
240 logger.warn("No executor service was explicitly set. Setting one now.");
241 shutdownExecutorServiceOnShutdown = true;
242 this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory(){
243
244 @Override
245 public Thread newThread(Runnable r) {
246 return new Thread(r, "NIO2FileMonitorService");
247 }
248 });
249 }
250 if (watchFuture==null || watchFuture.isCancelled() || watchFuture.isDone()) {
251 watchFuture = executorService.submit(new Runnable() {
252 @Override
253 public void run() {
254 while (true) {
255 try {
256 Collection<SFMF4JWatchListener> listeners;
257 List<WatchEvent<?>> events;
258 final WatchKey key = watchService.take();
259 synchronized (WatchServiceFileMonitorServiceImpl.this) {
260 listeners = listenersByWatchKey.get(key);
261 if (listeners != null && !listeners.isEmpty()) {
262 listeners = new LinkedList<SFMF4JWatchListener>(listeners);
263 events = key.pollEvents();
264 boolean stillValid = key.reset();
265 if (!stillValid) {
266 logger.warn("Key no longer valid.");
267 cleanup(key);
268 } else {
269 logger.debug("Key is still valid.");
270 if (events != null && !events.isEmpty()) {
271 for (WatchEvent event : events) {
272 WatchEvent<Path> resolvedEvent = resolveEventWithCorrectPath(key, event);
273 logger.debug("Event kind={} count={} path={}", new Object[]{resolvedEvent.kind().name(), resolvedEvent.count(), resolvedEvent.context()});
274 for (SFMF4JWatchListener listener : listeners) {
275 listener.onEvent(resolvedEvent);
276 }
277 }
278 }
279 }
280 } else {
281 logger.debug("No listeners found for valid key... cleaning up.");
282 cleanup(key);
283 }
284 }
285 } catch (InterruptedException ex) {
286 return;
287 } catch (ClosedWatchServiceException ex) {
288 return;
289 }
290 }
291 }
292 });
293 }
294 }
295
296 @Override
297 @SuppressWarnings("PMD.EmptyCatchBlock")
298 public synchronized void shutdown() {
299 watchFuture.cancel(true);
300 watchFuture = null;
301 for (WatchKey key : pathsByWatchKey.keySet()) {
302 cleanup(key);
303 }
304 if (shutdownExecutorServiceOnShutdown) {
305 executorService.shutdownNow();
306 executorService = null;
307 }
308 if (closeWatchServiceOnShutdown) {
309 try {
310 watchService.close();
311 }catch(IOException ex) {
312
313 }catch(ClosedWatchServiceException ex) {
314
315 }finally {
316 watchService = null;
317 }
318 }
319 }
320
321 @Override
322 public synchronized boolean isMonitoringDirectory(File directory) {
323 ExecutorService es = getExecutorService();
324 return es != null && !es.isShutdown() && watchKeysByPath.containsKey(directory.getAbsolutePath());
325 }
326 }