mirror of
https://github.com/Stirling-Tools/Stirling-PDF.git
synced 2024-11-11 02:10:11 +01:00
#1214 Only take files that are good for processing
This commit is contained in:
parent
e30665e7c8
commit
801dcdb463
@ -2,8 +2,10 @@ package stirling.software.SPDF.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
@ -108,4 +110,28 @@ public class AppConfig {
|
||||
public boolean missingActivSecurity() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Bean(name = "watchedFoldersDir")
|
||||
public String watchedFoldersDir() {
|
||||
return "./pipeline/watchedFolders/";
|
||||
}
|
||||
|
||||
@Bean(name = "finishedFoldersDir")
|
||||
public String finishedFoldersDir() {
|
||||
return "./pipeline/finishedFolders/";
|
||||
}
|
||||
|
||||
@Bean(name = "directoryFilter")
|
||||
public Predicate<Path> processPDFOnlyFilter() {
|
||||
return path -> {
|
||||
if (Files.isDirectory(path)) {
|
||||
return !path.toString()
|
||||
.contains(
|
||||
"processing");
|
||||
} else {
|
||||
String fileName = path.getFileName().toString();
|
||||
return fileName.endsWith(".pdf");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import java.util.stream.Stream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.io.ByteArrayResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
@ -28,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import stirling.software.SPDF.model.PipelineConfig;
|
||||
import stirling.software.SPDF.model.PipelineOperation;
|
||||
import stirling.software.SPDF.utils.FileMonitor;
|
||||
|
||||
@Service
|
||||
public class PipelineDirectoryProcessor {
|
||||
@ -35,11 +37,18 @@ public class PipelineDirectoryProcessor {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class);
|
||||
@Autowired private ObjectMapper objectMapper;
|
||||
@Autowired private ApiDocService apiDocService;
|
||||
|
||||
final String watchedFoldersDir = "./pipeline/watchedFolders/";
|
||||
final String finishedFoldersDir = "./pipeline/finishedFolders/";
|
||||
|
||||
@Autowired PipelineProcessor processor;
|
||||
@Autowired FileMonitor fileMonitor;
|
||||
|
||||
final String watchedFoldersDir;
|
||||
final String finishedFoldersDir;
|
||||
|
||||
public PipelineDirectoryProcessor(
|
||||
@Qualifier("watchedFoldersDir") String watchedFoldersDir,
|
||||
@Qualifier("finishedFoldersDir") String finishedFoldersDir) {
|
||||
this.watchedFoldersDir = watchedFoldersDir;
|
||||
this.finishedFoldersDir = finishedFoldersDir;
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 60000)
|
||||
public void scanFolders() {
|
||||
@ -130,7 +139,7 @@ public class PipelineDirectoryProcessor {
|
||||
throws IOException {
|
||||
try (Stream<Path> paths = Files.list(dir)) {
|
||||
if ("automated".equals(operation.getParameters().get("fileInput"))) {
|
||||
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile))
|
||||
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path))
|
||||
.map(Path::toFile)
|
||||
.toArray(File[]::new);
|
||||
} else {
|
||||
|
162
src/main/java/stirling/software/SPDF/utils/FileMonitor.java
Normal file
162
src/main/java/stirling/software/SPDF/utils/FileMonitor.java
Normal file
@ -0,0 +1,162 @@
|
||||
package stirling.software.SPDF.utils;
|
||||
|
||||
import static java.nio.file.StandardWatchEventKinds.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class FileMonitor {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class);
|
||||
private final Map<Path, WatchKey> path2KeyMapping;
|
||||
private final Set<Path> newlyDiscoveredFiles;
|
||||
private final ConcurrentHashMap.KeySetView<Path, Boolean> readyForProcessingFiles;
|
||||
private final WatchService watchService;
|
||||
private final Predicate<Path> pathFilter;
|
||||
private Set<Path> stagingFiles;
|
||||
|
||||
/**
|
||||
* @param rootDirectory the root directory to monitor
|
||||
* @param pathFilter the filter to apply to the paths, return true if the path should be
|
||||
* monitored, false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
@Autowired
|
||||
public FileMonitor(
|
||||
@Qualifier("watchedFoldersDir") String rootDirectory,
|
||||
@Qualifier("directoryFilter") Predicate<Path> pathFilter)
|
||||
throws IOException {
|
||||
this.newlyDiscoveredFiles = new HashSet<>();
|
||||
this.path2KeyMapping = new HashMap<>();
|
||||
this.stagingFiles = new HashSet<>();
|
||||
this.pathFilter = pathFilter;
|
||||
this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
|
||||
this.watchService = FileSystems.getDefault().newWatchService();
|
||||
|
||||
Path path = Path.of(rootDirectory);
|
||||
recursivelyRegisterEntry(path);
|
||||
|
||||
logger.info("Created a new file tracker for directory: {}", rootDirectory);
|
||||
}
|
||||
|
||||
private boolean shouldNotProcess(Path path) {
|
||||
return !pathFilter.test(path);
|
||||
}
|
||||
|
||||
private void recursivelyRegisterEntry(Path dir) throws IOException {
|
||||
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
||||
path2KeyMapping.put(dir, key);
|
||||
logger.info("Registered directory: {}", dir);
|
||||
|
||||
try (Stream<Path> directoryVisitor = Files.walk(dir, 1)) {
|
||||
final Iterator<Path> iterator = directoryVisitor.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Path path = iterator.next();
|
||||
if (path.equals(dir) || shouldNotProcess(path)) continue;
|
||||
|
||||
if (Files.isDirectory(path)) {
|
||||
recursivelyRegisterEntry(path);
|
||||
} else if (Files.isRegularFile(path)) {
|
||||
handleFileCreation(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 5000)
|
||||
public void trackFiles() {
|
||||
/*
|
||||
All files observed changes in the last iteration will be considered as staging files.
|
||||
If those files are not modified in current iteration, they will be considered as ready for processing.
|
||||
*/
|
||||
stagingFiles = new HashSet<>(newlyDiscoveredFiles);
|
||||
readyForProcessingFiles.clear();
|
||||
WatchKey key;
|
||||
while ((key = watchService.poll()) != null) {
|
||||
final Path watchingDir = (Path) key.watchable();
|
||||
key.pollEvents()
|
||||
.forEach(
|
||||
(evt) -> {
|
||||
final Path path = (Path) evt.context();
|
||||
final WatchEvent.Kind<?> kind = evt.kind();
|
||||
if (shouldNotProcess(path)) return;
|
||||
|
||||
try {
|
||||
if (Files.isDirectory(path)) {
|
||||
if (kind == ENTRY_CREATE) {
|
||||
handleDirectoryCreation(path);
|
||||
}
|
||||
/*
|
||||
we don't need to handle directory deletion or modification
|
||||
- directory deletion will be handled by key.reset()
|
||||
- directory modification indicates a new file creation or deletion, which is handled by below
|
||||
*/
|
||||
}
|
||||
Path relativePathFromRoot = watchingDir.resolve(path);
|
||||
if (kind == ENTRY_CREATE) {
|
||||
handleFileCreation(relativePathFromRoot);
|
||||
} else if (kind == ENTRY_DELETE) {
|
||||
handleFileRemoval(relativePathFromRoot);
|
||||
} else if (kind == ENTRY_MODIFY) {
|
||||
handleFileModification(relativePathFromRoot);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error while processing file: {}", path, e);
|
||||
}
|
||||
});
|
||||
|
||||
boolean isKeyValid = key.reset();
|
||||
if (!isKeyValid) { // key is invalid when the directory itself is no longer exists
|
||||
path2KeyMapping.remove((Path) key.watchable());
|
||||
if (path2KeyMapping.isEmpty()) {
|
||||
logger.warn(
|
||||
"FileMonitor is not monitoring any directory, no even the root directory.");
|
||||
}
|
||||
}
|
||||
}
|
||||
readyForProcessingFiles.addAll(stagingFiles);
|
||||
}
|
||||
|
||||
private void handleDirectoryCreation(Path dir) throws IOException {
|
||||
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
||||
path2KeyMapping.put(dir, key);
|
||||
}
|
||||
|
||||
private void handleFileRemoval(Path path) {
|
||||
newlyDiscoveredFiles.remove(path);
|
||||
stagingFiles.remove(path);
|
||||
}
|
||||
|
||||
private void handleFileCreation(Path path) {
|
||||
newlyDiscoveredFiles.add(path);
|
||||
stagingFiles.remove(path);
|
||||
}
|
||||
|
||||
private void handleFileModification(Path path) {
|
||||
// the logic is the same
|
||||
handleFileCreation(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the file is ready for processing.
|
||||
*
|
||||
* <p>A file is ready for processing if it is not being modified for 5000ms.
|
||||
*
|
||||
* @param path the path of the file
|
||||
* @return true if the file is ready for processing, false otherwise
|
||||
*/
|
||||
public boolean isFileReadyForProcessing(Path path) {
|
||||
return readyForProcessingFiles.contains(path);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user