ApacheCamel -知道流处理何时完成吗?

svmlkihl  于 2023-03-08  发布在  Apache
关注(0)|答案(1)|浏览(120)

我在dependencie camel-spring-boot-dependencies 3.10.0和3.11.0版本之间遇到问题。
对于此代码:
一个一个
版本3.10.0的输出为:

c.e.demo.IndexeurAggregateBehavior       : Started IndexeurAggregateBehavior in 7.346 seconds (JVM running for 8.148)
quartz                                   : Quartz lance
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 1,5]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 6,7]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 0,2]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 8,9]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 4,3]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 0,2]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 1,5]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 6,7]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 4,3]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 8,9]
route_stage                              : This is the end

对于版本3.11.0为:

c.e.demo.IndexeurAggregateBehavior       : Started IndexeurAggregateBehavior in 5.455 seconds (JVM running for 6.141)
quartz                                   : Quartz lance
route_stage                              : This is the end
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 2,1]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 2,1]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 5,3]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 5,3]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 0,7]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 0,7]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 4,6]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 4,6]
route_enrich_A                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 8,9]
route_enrich_B                           : Exchange[ExchangePattern: InOut, BodyType: java.util.ArrayList, Body: 8,9]

对于版本3.11.0,处理顺序不同。
我的问题是我想知道什么时候最后一个项目是完成。当日志消息This is the end在版本3.10.0是附加。我搜索文档,但我没有找到任何帮助我。
谢谢你的帮助

b09cbbtk

b09cbbtk1#

您可以尝试以下几个选项:

  • 您可以尝试定义This is the end日志语句或任何代码,只要路由被视为通过.onCompletion() DSL的交换完成
  • 编写您自己的route policy,按照文档中的说明将其添加到路由中。该策略作用于Camel的onExchangeDone(Route route, Exchange exchange)方法调用。如果您希望在每个路由上运行该策略,也可以使用自定义RoutePolicyFactory
  • 您还可以定义一个自定义EventNotifier实现(即通过EventNotifierSupport),并实现notify(CamelEvent event)以作用于ExchangeCompletedEvent。由于此事件通知程序添加到Camel的ManagementStrategy(即camelContext.getManagementStrategy().addEventNotifier(customEventNotifier))中,因此它基本上针对所有已完成的交换触发,然后在每个路由上触发,因此您可能需要筛选由您感兴趣的特定路由触发的事件

我将从上到下进行尝试,因为提到的每种方法都在增加复杂性

相关问题