View Javadoc

1   /*
2    * Copyright 2012-2013 Steven Swor.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.github.sworisbreathing.sfmf4j.nio2;
17  
18  import com.github.sworisbreathing.sfmf4j.api.DirectoryListener;
19  import com.github.sworisbreathing.sfmf4j.api.FileMonitorService;
20  import java.io.File;
21  import java.io.IOException;
22  import java.nio.file.ClosedWatchServiceException;
23  import java.nio.file.FileSystems;
24  import java.nio.file.Path;
25  import java.nio.file.Paths;
26  import java.nio.file.StandardWatchEventKinds;
27  import java.nio.file.WatchEvent;
28  import java.nio.file.WatchEvent.Kind;
29  import java.nio.file.WatchKey;
30  import java.nio.file.WatchService;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.LinkedList;
34  import java.util.List;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ThreadFactory;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  /**
45   * jpathwatch implementation of a file monitor service.
46   * @author Steven Swor
47   */
48  public class WatchServiceFileMonitorServiceImpl implements FileMonitorService {
49  
50      /**
51       * The logger.
52       */
53      private static final Logger logger = LoggerFactory.getLogger(WatchServiceFileMonitorServiceImpl.class);
54  
55      /**
56       * Future for the asynchronous task which polls the watch service.
57       */
58      private Future watchFuture = null;
59  
60      /**
61       * The watch service.
62       */
63      private WatchService watchService;
64  
65      /**
66       * The watch kinds we are interested in.
67       * @see StandardWatchEventKind#ENTRY_CREATE
68       * @see StandardWatchEventKind#ENTRY_DELETE
69       * @see StandardWatchEventKind#ENTRY_MODIFY
70       */
71      private static final Kind[] interested_types = new Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY};
72  
73      /**
74       * The executor service which runs the task of polling the watch service.
75       */
76      private ExecutorService executorService;
77  
78      /**
79       * Flag to indicate that the watch service should be stopped during
80       * shutdown.
81       */
82      private volatile boolean closeWatchServiceOnShutdown = false;
83  
84      /**
85       * Flag to indiate that the executor service should be stopped during
86       * shutdown.
87       */
88      private volatile boolean shutdownExecutorServiceOnShutdown = false;
89  
90  
91      /**
92       * Package-protected getter (for automated testing).
93       *
94       * @return
95       */
96      ExecutorService getExecutorService() {
97          return executorService;
98      }
99  
100     /**
101      * Package-protected getter (for automated testing).
102      *
103      * @return
104      */
105     WatchService getWatchService() {
106         return watchService;
107     }
108 
109     /**
110      * The registered watch keys for each path.
111      */
112     private final ConcurrentMap<String, WatchKey> watchKeysByPath;
113 
114     /**
115      * The registered paths for each watch key.
116      */
117     private final ConcurrentMap<WatchKey, String> pathsByWatchKey;
118 
119     /**
120      * The listeners for each watch key.
121      */
122     private final ConcurrentMap<WatchKey, Collection<SFMF4JWatchListener>> listenersByWatchKey;
123 
124     /**
125      * Creates a new WatchServiceFileMonitorServiceImpl.
126      * @param watchService the watch service to use.  When this argument is
127      * {@code null}, the watch service will be created during {@link
128      * #initialize()} and stopped during {@link #shutdown()}.  If this argument
129      * is not {@code null}, then it is assumed that the watch service is running
130      * and will be closed elsewhere (for example, an IoC container)
131      * @param executorService the executor service to use.  When this argument
132      * is {@code null}, the executor service will be created during {@link
133      * #initialize()} and stopped during {@link #shutdown()}.  If this argument
134      * is not {@code null}, then it is assumed that the executor service is
135      * running and will be closed elsewhere (for example, in an IoC container)
136      */
137     public WatchServiceFileMonitorServiceImpl(final WatchService watchService, final ExecutorService executorService) {
138         this.watchService = watchService;
139         this.executorService = executorService;
140         this.watchKeysByPath = new ConcurrentHashMap<String, WatchKey>();
141         this.pathsByWatchKey = new ConcurrentHashMap<WatchKey, String>();
142         this.listenersByWatchKey = new ConcurrentHashMap<WatchKey, Collection<SFMF4JWatchListener>>();
143     }
144 
145     /**
146      * Gets the watch key for a path with lazy initialization.
147      * @param path the path
148      * @return a registered watch key for the path
149      * @throws IOException if lazy initialization fails
150      */
151     private synchronized WatchKey getWatchKeyForPath(final String path) throws IOException {
152         WatchKey key = watchKeysByPath.get(path);
153         if (key == null) {
154             logger.debug("Lazy-instantiating watch key for path: {}", path);
155             key = Paths.get(path).register(watchService, interested_types);
156             watchKeysByPath.put(path, key);
157             pathsByWatchKey.put(key, path);
158             listenersByWatchKey.put(key, Collections.newSetFromMap(new ConcurrentHashMap<SFMF4JWatchListener, Boolean>()));
159         }
160         return key;
161     }
162 
163     @Override
164     public void registerDirectoryListener(File directory, DirectoryListener directoryListener) {
165         String path = directory.getAbsolutePath();
166         try {
167             synchronized (this) {
168                 WatchKey key = getWatchKeyForPath(path);
169                 SFMF4JWatchListener listener = new SFMF4JWatchListener(directoryListener);
170                 listenersByWatchKey.get(key).add(listener);
171             }
172         } catch (IOException ex) {
173             logger.error(ex.getMessage(), ex);
174         }
175     }
176 
177     @Override
178     public void unregisterDirectoryListener(File directory, DirectoryListener directoryListener) {
179         String path = directory.getAbsolutePath();
180         synchronized (this) {
181             final WatchKey key = watchKeysByPath.get(path);
182             if (key != null) {
183                 Collection<SFMF4JWatchListener> listeners = listenersByWatchKey.get(key);
184                 listeners.remove(new SFMF4JWatchListener(directoryListener)); // note the equals implementation
185                 if (listeners.isEmpty()) {
186                     //no longer listening on that path.
187                     cleanup(key);
188                 } else {
189                     logger.debug("somebody is still listening: {}", path);
190                 }
191             }
192         }
193     }
194 
195     /**
196      * Resolves a watch event with its absolute path.
197      * @param key the watch key (used to look up the parent path)
198      * @param event the event to resolve
199      * @return a copy of the event, with a resolved path
200      */
201     private synchronized WatchEvent<Path> resolveEventWithCorrectPath(final WatchKey key, final WatchEvent<Path> event) {
202         Path correctPath = Paths.get(pathsByWatchKey.get(key));
203         return new ResolvedPathWatchEvent(event, correctPath);
204     }
205 
206     /**
207      * Properly unregisters and removes a watch key.
208      * @param key the watch key
209      */
210      @SuppressWarnings("PMD.EmptyCatchBlock")
211     private synchronized void cleanup(final WatchKey key) {
212         logger.trace("cleanUp {}", key);
213         try {
214             key.cancel();
215         }catch(Exception ex) {
216 
217         }
218         Collection<SFMF4JWatchListener> listeners = listenersByWatchKey.remove(key);
219         if (listeners != null && !listeners.isEmpty()) {
220             logger.warn("Cleaning up key but listeners are still registered.");
221         }
222         String path = pathsByWatchKey.remove(key);
223         if (path != null) {
224             watchKeysByPath.remove(path);
225         }
226     }
227 
228     @Override
229     public synchronized void initialize() {
230         if (watchService==null) {
231             logger.warn("No watch service was explicitly set.  Setting one now.");
232             closeWatchServiceOnShutdown = true;
233             try {
234                 this.watchService = FileSystems.getDefault().newWatchService();
235             }catch(IOException ex) {
236                 throw new RuntimeException(ex.getLocalizedMessage(), ex);
237             }
238         }
239         if (executorService==null) {
240             logger.warn("No executor service was explicitly set.  Setting one now.");
241             shutdownExecutorServiceOnShutdown = true;
242             this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory(){
243 
244                 @Override
245                 public Thread newThread(Runnable r) {
246                     return new Thread(r, "NIO2FileMonitorService");
247                 }
248             });
249         }
250         if (watchFuture==null || watchFuture.isCancelled() || watchFuture.isDone()) {
251             watchFuture = executorService.submit(new Runnable() {
252                 @Override
253                 public void run() {
254                     while (true) {
255                         try {
256                             Collection<SFMF4JWatchListener> listeners;
257                             List<WatchEvent<?>> events;
258                             final WatchKey key = watchService.take();
259                             synchronized (WatchServiceFileMonitorServiceImpl.this) {
260                                 listeners = listenersByWatchKey.get(key);
261                                 if (listeners != null && !listeners.isEmpty()) {
262                                     listeners = new LinkedList<SFMF4JWatchListener>(listeners);
263                                     events = key.pollEvents();
264                                     boolean stillValid = key.reset();
265                                     if (!stillValid) {
266                                         logger.warn("Key no longer valid.");
267                                         cleanup(key);
268                                     } else {
269                                         logger.debug("Key is still valid.");
270                                         if (events != null && !events.isEmpty()) {
271                                             for (WatchEvent event : events) {
272                                                 WatchEvent<Path> resolvedEvent = resolveEventWithCorrectPath(key, event);
273                                                 logger.debug("Event kind={} count={} path={}", new Object[]{resolvedEvent.kind().name(), resolvedEvent.count(), resolvedEvent.context()});
274                                                 for (SFMF4JWatchListener listener : listeners) {
275                                                     listener.onEvent(resolvedEvent);
276                                                 }
277                                             }
278                                         }
279                                     }
280                                 } else {
281                                     logger.debug("No listeners found for valid key... cleaning up.");
282                                     cleanup(key);
283                                 }
284                             }
285                         } catch (InterruptedException ex) {
286                             return;
287                         } catch (ClosedWatchServiceException ex) {
288                             return;
289                         }
290                     }
291                 }
292             });
293         }
294     }
295 
296     @Override
297     @SuppressWarnings("PMD.EmptyCatchBlock")
298     public synchronized void shutdown() {
299         watchFuture.cancel(true);
300         watchFuture = null;
301         for (WatchKey key : pathsByWatchKey.keySet()) {
302             cleanup(key);
303         }
304         if (shutdownExecutorServiceOnShutdown) {
305             executorService.shutdownNow();
306             executorService = null;
307         }
308         if (closeWatchServiceOnShutdown) {
309             try {
310                 watchService.close();
311             }catch(IOException ex) {
312                 //trap
313             }catch(ClosedWatchServiceException ex) {
314                 //trap
315             }finally {
316                 watchService = null;
317             }
318         }
319     }
320 
321     @Override
322     public synchronized boolean isMonitoringDirectory(File directory) {
323         ExecutorService es = getExecutorService();
324         return es != null && !es.isShutdown() && watchKeysByPath.containsKey(directory.getAbsolutePath());
325     }
326 }