如何在camel-ftp组件中定义良好的错误处理?

k2arahey  于 2023-03-23  发布在  Apache
关注(0)|答案(1)|浏览(85)

我们的一个应用程序是Camel Spring Boot应用程序。
路线相当简单:它使用camel-ftp组件从FTP服务器获取文件,并将其推送到JMS队列。
Camel版本:2.20.2 Spring-Boot是:2.7.8

@Component
@Slf4j
public class FTPListenerRoute extends RouteBuilder {

    @Value("${ems.activemq.queue.name}")
    private String activeMqTargetQueueName;

    @Value("${ems.error-uri:error}")
    private String errorUri;

    @Value("${ems.max-redeliveries}")
    private int maximumRetries;

    private final FTPConfiguration ftpConfiguration;

    @Autowired
    public FTPListenerRoute(FTPConfiguration ftpConfiguration) {
        this.ftpConfiguration = ftpConfiguration;
    }

    @Override
    public void configure() throws Exception {

        errorHandler(
                springTransactionErrorHandler()
                        .loggingLevel(LoggingLevel.DEBUG)
                        .maximumRedeliveries(0)
                        .log(log).logHandled(true));

        String uri = ftpConfiguration.buildUri();
        from(uri)
                .routeId("listener-main-route")
                //.transacted()
                .doTry()
                .to("direct:msgInTransaction")
                .doCatch(RuntimeCamelException.class)
                .log(LoggingLevel.ERROR, log, "caugt exception, rerouting to : {}", errorUri)
                .to(errorUri)
                .endDoTry();

        from("direct:msgInTransaction")
                .routeId("ftplistener-transacted-route")
                .log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
                .transacted()
                .to("jms:queue:" + activeMqTargetQueueName);
    }

    @Bean
    public PlatformTransactionManager jmsTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}
package be.fgov.minfin.esbsoa.ems.ftp.listener.configuration;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.URISyntaxException;
import java.util.Optional;

@Component
public class FTPConfiguration {

    // FTP Properties
    @Value("${ems.ftp.username}")
    private String ftpUsername;
    @Value("${ems.ftp.password}")
    private String ftpPassword;
    @Value("${ems.ftp.host}")
    private String ftpHost;
    @Value("${ems.ftp.port:}")
    private Optional<Integer> ftpPort;
    @Value("${ems.ftp.path}")
    private String ftpPath;
    @Value("${ems.ftp.path.error:error}")
    private String ftpErrorPath;
    @Value("${ems.ftp.path.completed:completed}")
    private String ftpCompletedPath;
    @Value("${ems.ftp.delay:30000}")
    private String ftpDelay;
    @Value("${ems.ftp.filter.file.name:}")
    private String fileNameFilter;
    @Value("${ems.ftp.deleteFile:false}")
    private boolean isFilesDeletionAfterCompletion;
    @Value("${ems.ftp.filter.file.size:50000000}")
    private int maxFileSize;
    @Value("${ems.ftp.protocol:ftp}")
    private String protocol;
    @Value("${ems.ftp.passiveMode:true}")
    private String passiveMode;

    public String buildUri() throws URISyntaxException {
        URIBuilder ftpUriBuilder = getUri(ftpPath);
        ftpUriBuilder.addParameter("moveFailed", ftpErrorPath)
                .addParameter("delay", ftpDelay)
                .addParameter("binary", "true")
                .addParameter("initialDelay", "5")
                .addParameter("filterFile", "${file:size} <= " + maxFileSize)
                .addParameter("readLock", "changed");

        if (this.isFilesDeletionAfterCompletion) {
            ftpUriBuilder.addParameter("delete", "true");
        } else {
            ftpUriBuilder.addParameter("move", ftpCompletedPath);
        }

        if (StringUtils.isNotBlank(fileNameFilter)) {
            ftpUriBuilder.addParameter("include", fileNameFilter);
        }
        return ftpUriBuilder.build().toString();
    }

    private URIBuilder getUri(String path) {
        URIBuilder uriBuilder = new URIBuilder()
                .setScheme(protocol)
                .setHost(ftpHost)
                .setUserInfo(ftpUsername, ftpPassword)
                .setPath(path)
                .addParameter("passiveMode", passiveMode);
        ftpPort.ifPresent(uriBuilder::setPort);
        return uriBuilder;
    }
}

在维护周末期间,服务器通常会重新启动,此路由失败并显示以下错误:

2023-03-14 07:50:53.596  INFO 1 --- [tra/Coda_Edepo/] ftplistener-main-route                   : Processing 3DAABB587130ED0-000000000000026E with headers: {CamelFileAbsolute=false, CamelFileAbsolutePath=Coda_Edepo/FIL.EMPCFF.679200407454.20230313.DEP, CamelFileHost=ftphost, CamelFileLastModified=1678754100000, CamelFileLength=516516, CamelFileName=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameConsumed=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameOnly=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileParent=Coda_Edepo, CamelFilePath=Coda_Edepo//FIL.EMPCFF.679200407454.20230313.DEP, CamelFileRelativePath=FIL.EMPCFF.679200407454.20230313.DEP, CamelFtpReplyCode=226, CamelFtpReplyString=226 Transfer complete.
, CamelMessageTimestamp=1678754100000}
2023-03-14 07:50:53.617  WARN 1 --- [tra/Coda_Edepo/] o.a.c.c.file.GenericFileOnCompletion     : Error during commit. Exchange[3DAABB587130ED0-000000000000026E]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]]

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]
    at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:147) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:121) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:134) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:86) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:60) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:99) ~[camel-support-3.12.0.jar:3.12.0]
    at org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:88) ~[camel-support-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:238) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:59) ~[camel-support-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:775) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:710) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:263) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) ~[camel-api-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:275) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:138) ~[camel-spring-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109) ~[camel-core-processor-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.12.0.jar:3.12.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:156) ~[camel-ftp-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.12.0.jar:3.12.0]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:191) ~[camel-support-3.12.0.jar:3.12.0]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:108) ~[camel-support-3.12.0.jar:3.12.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[na:na]
    at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]

之后,文件会被camel一次又一次地重新处理,直到重命名操作成功。这会导致数据库中出现重复的大问题。
我的理解是:
1.一个文件被放到ftp服务器上,然后被Camel拾取
1.事务开始
1.文件通过JMS发送到代理并成功

  1. camel尝试“重命名”文件以将其移动到completed文件夹
    1.重命名失败并抛出GenericFileOperationFailedException
    1.文件被放回处理文件夹,并从步骤1再次重新处理。
    我使用Spock Framework和MockFtpServer库创建了一个集成测试。
    我有以下成功的快乐情景:
def 'happy scenario'() {
        given: 'one original file destination, one completed file destination'
        fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
        fileSystem.add(new FileEntry("/upload/completed/mockFile.txt", "test"))

        and: 'start the server'
        ftpServer.start()
        camelContext.start()

        when: 'the file is actually sent to the ftp server'
        remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")

        then: 'assert that it was'
        // assert that the file is put on the ftp
        "test" == remoteFile.readFile("/upload/mockFile.txt")

        and: 'receive the file on the other end of the route'
        def receivedMessage = jmsMessagingTemplate.receive(jmsDestinationQueue)

        then: 'assert that that it is the correct file'
        "test" == new String(receivedMessage.getPayload())
    }

在错误场景中,我正在存根RENTOftp命令以抛出GenericFileOperationFailedException

def 'failed scenario - GenericFileOperationFailedException'() {
        given: 'an original file location on the FTP'
        fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
        fileSystem.add(new FileEntry("/upload/completed/mockFile.txt"))

        and: 'a custom ERROR code for the RNTO command'
        ftpServer.setCommandHandler(CommandNames.RNTO, new ExceptionCommandHandler())

        and: 'start the server'
        ftpServer.start()
        camelContext.start()

        when: 'the file is actually sent to the ftp server'
        remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")

        then:'assert that it was'
        "test" == remoteFile.readFile("/upload/mockFile.txt")
        and: 'assert that message was received in the destination queue'
        def receivedFromDestinationQueue = jmsMessagingTemplate.receive(jmsQueueName)
        assertNotNull(receivedFromDestinationQueue.payload)
        and: 'try to poll DLQ, original message should be sent there'
        def dlq = StringUtils.substringAfterLast(errorUri, ":")
        def receivedFromDLQ = jmsMessagingTemplate.receive(dlq)
        assertNotNull(receivedFromDLQ)
    }

此测试未通过:由于GenericFileOperationFailedException,从应该发送消息的DLQ接收不到消息。
我轮询了destinationQueue和errorQueue,因为我注意到文件被发送到了目的队列并被处理。
我试图实现的是确保文件在所有情况下都只传递给JMS代理一次,如果在过程中有重试或错误,则应该将消息发送到DLQ。

ntjbwcob

ntjbwcob1#

您是否已经尝试在路由中定义自定义onCompletionExceptionHandler,其目的是
“处理在使用者执行提交或回滚的文件完成过程中发生的任何抛出的异常”

例如:

from("ftp://...?onCompletionExceptionHandler=#MyHandler")

如果您将文件内容放入JMS队列中,请确保所使用的文件不会太大(一些JMS代理对此感到不舒服)。

相关问题