我已经使用任务执行器设置了文件轮询器
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
正如所见,我已经设置固定threadpool
每次轮询最多 10 条消息,最多 10 条消息。如果我放入 10 个文件,它仍然会一一处理。这里可能出了什么问题?
* 更新 *
在加里回答之后,它工作得很好,尽管我现在有其他问题。
我已经像这样设置了我的轮询器
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
使用原因AcceptAll
因为同一个文件可能会再次出现,这就是我首先移动文件的原因。但是,当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为AcceptAllFile
如果我更改为AcceptOnceFileListFilter
它可以工作,但是再次出现的相同文件将不会再次被拾取!可以采取什么措施来避免这个问题?
问题/错误
在班上AbstractPersistentAcceptOnceFileListFilter
我们有这个代码
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
现在,例如,如果我设置了 max per poll 5 并且有两个文件,那么两个线程可能会选取相同的文件。
假设我的代码在读取文件后就移动了文件。
但另一个线程到达accept
method
如果文件不存在,那么它将返回lastModified时间为0并且返回true。
这会导致问题,因为该文件不存在。
如果它是 0 那么它应该返回 false,因为该文件不再存在。