通过id将消息限制到Camel中的线程

ncgqoxb0  于 9个月前  发布在  Apache
关注(0)|答案(1)|浏览(72)

在我的应用程序中,我从队列中消费消息,处理它们,将结果写入数据库并发送到另一个队列。由于有很多IO,我希望并行处理消息以提高性能。
但应用程序的架构有一个限制:为了防止数据竞争,不能同时处理属于同一实体(例如“客户”)的消息。
最直观的解决方案是使用Sticky Load Balancer

from("jms:queue:in")
    .loadBalance().sticky(header("customerId"))
        .to("seda:x")
        .to("seda:y")
        .to("seda:z")
    .end();

但这种方法有两个缺点:

  • 我不能仅仅通过更改配置属性来更改处理线程的数量
  • 对于这里的每个seda端点,我必须复制粘贴完全相同的路由(或者至少我不知道如何避免复制粘贴)

你能告诉我一个更好的方法吗?

nbewdwxp

nbewdwxp1#

camel-jms组件具有asyncConsumerconcurrentConsumersmaxConcurrentConsumers选项,您可以使用这些选项来控制是否异步使用消息,以及希望使用多少个PMAC消费者。这可能会消除使用seda的需要。

from("amqp:queue:{{example.queue.name}}" 
    + "?asyncConsumer={{example.queue.consumer.async}}"
    + "&concurrentConsumers={{example.queue.consumer.count}}")
    // ...

至于粘性负载均衡器,它不会真正帮助您防止“数据竞争”或并发问题,因为它的工作只是平衡两个或多个端点之间的负载。
为此,您可能不得不使用某种同步和同步进程来处理不会导致这些竞争条件的批处理。
然后可以使用Java同步来处理这个问题。
还尝试查看camels aggregate是否可以通过使用header customerId作为correlationExpression来可靠地处理这个问题,但如果处理时间很长,似乎aggregate仍然有可能触发两次。

相关问题