Apache Camel 3.14.9:CLIENT_ACKNOWLEDGE模式下处理器内部的异常处理

x7rlezfr  于 9个月前  发布在  Apache
关注(0)|答案(1)|浏览(68)

我有一个路由器,使用聚合器->拆分器->重排序器EIP。当重新排序器批量将交换一个接一个地发送到处理器时,在我有一个交换异常之后,我必须将其余的消息发送到不同的队列。有可能吗?
任何示例代码块将是一个很大的帮助在这里。
我正在使用Sping Boot 与Apache Camel和Java DSL。
//在Julian的评论之后编辑
1.需要CLIENT_ACKNOWLEGEMENT,因为在处理聚合批处理期间,路由可能会崩溃。
1.你能解释一下“整批”和“只是一批”的区别吗?
1.这里是路由器实现from("inputQueue?concurrentConsumers=5&acknowledgementModeName=CLIENT_ACKNOWLEDGE") //CLIENT_ACKNOWLEDGE set .routeId("Consumer") .aggregate(header("AGG_ID"), new AggregationStrategy(){ // 1.Aggregating @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { oldExchange.getIn().setBody(oldBody + SEPARATOR + newBody); return oldExchange; } }) .completionTimeout(5000L) .completionSize(5) //2. Aggregate for 5 messages or wait till 5 seconds .split(simple("${body}")).delimiter("|~~|").parallelProcessing(false) // 3. Splitting .resequence(xpath("//*[local-name()='TimeStamp']")) // 4. resequencing based on TimeStamp in Message .batch(new BatchResequencerConfig(5, 5000L)).allowDuplicates() //5. Aggregate and resequence batch size are same .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { //processing logic :: external service call // 6. Exception conneting to external system, I must reject others messages inside the current batch and //send send them to another queue. //There will be onExeption().handle(true) defined inside the router, so that client acknowledgement send and broker removed the message from queue. } }) .to("outputQueue");所以主要意图是停止处理一批5条消息的下一条消息,当任何消息都有异常时。如果路由崩溃,则所有消息必须在输入队列中可用。

cclgggtu

cclgggtu1#

如果我很好地理解了您的需求,您希望您的resequencer将交换定向到某个Processor,但如果发生异常,您希望将消息传递到不同的端点。
如果不确切地知道您的需求是什么,就很难给予代码示例。rest of the messages的定义是什么?你指的是整批还是整批?
无论如何,解决这个问题的方法是相同的,但根据您的确切需求会有不同的口味。你可以实现一个singleton来存储路径,通常是direct:default,但当发生错误时,将路径切换到direct:error。在resequencer的出口处有一个处理器,它将根据存储在单例中的当前路径路由消息。枚举是一个很好的单例实现,所以这里有一种方法:

public Enum Destination {
    PATH;

    @Getter
    private String value = "direct:default";

    public void switchToError() {
       value = "direct:error"; 
    }
    
    public void switchToDefault() {
        value = "direct:default";
    }
}

所以,在你的主要路线,你会有这样的东西:

resequence(body()).batch()
.process(exchange -> exchange.getIn().setHeader("next", Destination.PATH.getValue()))
.routingSlip(header("next"));

然后在错误处理部分将路径设置为direct:error

onException(Exception.class)
.process(exchange -> Destination.PATH.switchToError())
.handled(true)
.to("direct:error");

当然,当一个新的聚合完成时,您可以swithToDefault
希望它能帮助即使我不清楚在所有什么CLIENT_ACKNOWLEDGE意味着从你的问题描述。

相关问题