diff --git a/src/main/java/stirling/software/SPDF/config/AppConfig.java b/src/main/java/stirling/software/SPDF/config/AppConfig.java index 16618e1e..3723e4f8 100644 --- a/src/main/java/stirling/software/SPDF/config/AppConfig.java +++ b/src/main/java/stirling/software/SPDF/config/AppConfig.java @@ -2,8 +2,10 @@ package stirling.software.SPDF.config; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Properties; +import java.util.function.Predicate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; @@ -108,4 +110,28 @@ public class AppConfig { public boolean missingActivSecurity() { return false; } + + @Bean(name = "watchedFoldersDir") + public String watchedFoldersDir() { + return "./pipeline/watchedFolders/"; + } + + @Bean(name = "finishedFoldersDir") + public String finishedFoldersDir() { + return "./pipeline/finishedFolders/"; + } + + @Bean(name = "directoryFilter") + public Predicate processPDFOnlyFilter() { + return path -> { + if (Files.isDirectory(path)) { + return !path.toString() + .contains( + "processing"); + } else { + String fileName = path.getFileName().toString(); + return fileName.endsWith(".pdf"); + } + }; + } } diff --git a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java index c61b29e9..ce7e1b94 100644 --- a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java +++ b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java @@ -19,6 +19,7 @@ import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.Resource; import org.springframework.scheduling.annotation.Scheduled; @@ -28,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import stirling.software.SPDF.model.PipelineConfig; import stirling.software.SPDF.model.PipelineOperation; +import stirling.software.SPDF.utils.FileMonitor; @Service public class PipelineDirectoryProcessor { @@ -35,11 +37,18 @@ public class PipelineDirectoryProcessor { private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class); @Autowired private ObjectMapper objectMapper; @Autowired private ApiDocService apiDocService; - - final String watchedFoldersDir = "./pipeline/watchedFolders/"; - final String finishedFoldersDir = "./pipeline/finishedFolders/"; - @Autowired PipelineProcessor processor; + @Autowired FileMonitor fileMonitor; + + final String watchedFoldersDir; + final String finishedFoldersDir; + + public PipelineDirectoryProcessor( + @Qualifier("watchedFoldersDir") String watchedFoldersDir, + @Qualifier("finishedFoldersDir") String finishedFoldersDir) { + this.watchedFoldersDir = watchedFoldersDir; + this.finishedFoldersDir = finishedFoldersDir; + } @Scheduled(fixedRate = 60000) public void scanFolders() { @@ -130,7 +139,7 @@ public class PipelineDirectoryProcessor { throws IOException { try (Stream paths = Files.list(dir)) { if ("automated".equals(operation.getParameters().get("fileInput"))) { - return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile)) + return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path)) .map(Path::toFile) .toArray(File[]::new); } else { diff --git a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java new file mode 100644 index 00000000..c11352ef --- /dev/null +++ b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java @@ -0,0 +1,167 @@ +package stirling.software.SPDF.utils; + +import static java.nio.file.StandardWatchEventKinds.*; + +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class FileMonitor { + private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class); + private final Map path2KeyMapping; + private final Set newlyDiscoveredFiles; + private final ConcurrentHashMap.KeySetView readyForProcessingFiles; + private final WatchService watchService; + private final Predicate pathFilter; + private final Path rootDir; + private Set stagingFiles; + + /** + * @param rootDirectory the root directory to monitor + * @param pathFilter the filter to apply to the paths, return true if the path should be monitored, false otherwise + */ + @Autowired + public FileMonitor( + @Qualifier("watchedFoldersDir") String rootDirectory, + @Qualifier("directoryFilter") Predicate pathFilter) + throws IOException { + this.newlyDiscoveredFiles = new HashSet<>(); + this.path2KeyMapping = new HashMap<>(); + this.stagingFiles = new HashSet<>(); + this.pathFilter = pathFilter; + this.readyForProcessingFiles = ConcurrentHashMap.newKeySet(); + this.watchService = FileSystems.getDefault().newWatchService(); + this.rootDir = Path.of(rootDirectory); + } + + private boolean shouldNotProcess(Path path) { + return !pathFilter.test(path); + } + + private void recursivelyRegisterEntry(Path dir) throws IOException { + WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + path2KeyMapping.put(dir, key); + logger.info("Registered directory: {}", dir); + + try (Stream directoryVisitor = Files.walk(dir, 1)) { + final Iterator iterator = directoryVisitor.iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + if (path.equals(dir) || shouldNotProcess(path)) continue; + + if (Files.isDirectory(path)) { + recursivelyRegisterEntry(path); + } else if (Files.isRegularFile(path)) { + handleFileCreation(path); + } + } + } + } + + @Scheduled(fixedRate = 5000) + public void trackFiles() { + /* + All files observed changes in the last iteration will be considered as staging files. + If those files are not modified in current iteration, they will be considered as ready for processing. + */ + stagingFiles = new HashSet<>(newlyDiscoveredFiles); + readyForProcessingFiles.clear(); + + if (path2KeyMapping.isEmpty()) { + logger.warn( + "not monitoring any directory, even the root directory itself: {}", rootDir); + if (Files.exists( + rootDir)) { // if the root directory exists, re-register the root directory + try { + recursivelyRegisterEntry(rootDir); + } catch (IOException e) { + logger.error("unable to register monitoring", e); + } + } + } + + WatchKey key; + while ((key = watchService.poll()) != null) { + final Path watchingDir = (Path) key.watchable(); + key.pollEvents() + .forEach( + (evt) -> { + final Path path = (Path) evt.context(); + final WatchEvent.Kind kind = evt.kind(); + if (shouldNotProcess(path)) return; + + try { + if (Files.isDirectory(path)) { + if (kind == ENTRY_CREATE) { + handleDirectoryCreation(path); + } + /* + we don't need to handle directory deletion or modification + - directory deletion will be handled by key.reset() + - directory modification indicates a new file creation or deletion, which is handled by below + */ + } + Path relativePathFromRoot = watchingDir.resolve(path); + if (kind == ENTRY_CREATE) { + handleFileCreation(relativePathFromRoot); + } else if (kind == ENTRY_DELETE) { + handleFileRemoval(relativePathFromRoot); + } else if (kind == ENTRY_MODIFY) { + handleFileModification(relativePathFromRoot); + } + } catch (Exception e) { + logger.error("Error while processing file: {}", path, e); + } + }); + + boolean isKeyValid = key.reset(); + if (!isKeyValid) { // key is invalid when the directory itself is no longer exists + path2KeyMapping.remove((Path) key.watchable()); + } + } + readyForProcessingFiles.addAll(stagingFiles); + } + + private void handleDirectoryCreation(Path dir) throws IOException { + WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + path2KeyMapping.put(dir, key); + } + + private void handleFileRemoval(Path path) { + newlyDiscoveredFiles.remove(path); + stagingFiles.remove(path); + } + + private void handleFileCreation(Path path) { + newlyDiscoveredFiles.add(path); + stagingFiles.remove(path); + } + + private void handleFileModification(Path path) { + // the logic is the same + handleFileCreation(path); + } + + /** + * Check if the file is ready for processing. + * + *

A file is ready for processing if it is not being modified for 5000ms. + * + * @param path the path of the file + * @return true if the file is ready for processing, false otherwise + */ + public boolean isFileReadyForProcessing(Path path) { + return readyForProcessingFiles.contains(path); + } +}