camel消息在错误的SEDA端点结束

q8l4jmvw  于 9个月前  发布在  Apache
关注(0)|答案(1)|浏览(84)

我对 Camel 的SEDA端点有点困惑。显然有些事情我不明白,但到目前为止我还不知道是什么。
下面是我的设置(抽象):

from("direct:some-entry-point")
    .setBody {
        // Route sets a byte array
        myByteArray
    }
    .to("seda:file-upload")     // The message goes on to upload the byte array
    .choice()
        .`when` { doTheOtherThing }   // Under certain circumstances, something needs to be done additionally. The body can already be uploaded, however.
        .to("direct:do-the-other-thing")

from("direct:do-the-other-thing")
    .setBody {
        // This route adds a file handle, because this one is big and won't fit into memory
       myFileHandle
    }
    .to("seda:streaming-upload")  //Since this file is big, it is forwarded to another endpoint to be uploaded as a stream.

from("seda:streaming-upload?queue=#FiloBlockingQueue")
    .process {
        // Opens a stream to the filehandle in the body and uploads from that.
    }

from("seda:file-upload?queue=#FiloBlockingQueue")
    .process {
         // Uploads the byte array in the body
    }

总而言之,没有什么太复杂的我想。但我有个问题。出于某种疯狂的原因,当some-entry-point将消息发送到文件上传端点时,它最终会在流上传端点处结束,在那里它会崩溃,因为它需要一个文件句柄并得到一个字节数组。
我已经做了所有我能做的日志记录和断点,我所能告诉你的是,这确实是发生的事情。消息不会在文件上传时到达,也不会在做其他事情时到达,即使已经给出了发生这种情况的条件。我还验证了do-the-other-thing是转发到流媒体上传的唯一途径。我能解释这种行为的唯一方法是,转发到file-upload的消息最终到达了错误的端点。有人能告诉我为什么吗?
编辑:一些新的见解:将消息发送到错误队列的行为不一致。事实上,这似乎完全是随机的。我向任何SEDA队列端点发送消息,这是一个将实际接收消息的cointoss。从行为判断,我怀疑我完全误解了这里的底层架构。在我的应用程序中似乎只有一个SEDA队列示例,并且两个端点都轮询同一个示例。第一个投票的人是得到消息的人。这并不是我从“端点”中直观地期望的行为,但它可以解释很多事情。有谁认识 Camel 的能给我解释一下吗?

lqfhib0f

lqfhib0f1#

事实证明,这两个端点都由同一个队列支持。我对 Camel 的工作原理了解得太少了。当向SEDA端点发送内容时,它会被放在指定的队列中,SEDA端点将以一定的间隔轮询它们的队列。如果两个端点共享同一个队列,您无法控制哪一个端点将首先接收消息。
因此,如果你想要两个不同的端点,* 是 * 实际上独立的端点,你需要给予它们自己的队列,方法是为它们创建一个bean,然后将其注入camel,就像这样:

@Configuration
class QueueConfig(
    @Value("\${capture.queueSize}") private val queueSize: Int,
    private val eventPublisher: ApplicationEventPublisher
) {
    @Bean("FirstBlockingQueue")
    fun createFirstQueue(): BlockingQueue<Exchange> =
        TODO("instantiate queue here")

    @Bean("SecondBlockingQueue")
    fun createSecondQueue(): BlockingQueue<Exchange> =
        TODO("instantiate other queue here")

}

在路由生成器中:

from("seda:first-enpoint?queue=#FirstBlockingQueue")
    // do things

from("seda:second-endpoint?queue=#SecondBlockingQueue")
    // do things

现在,两个端点都有自己的队列支持,并且不会窃取彼此的消息。

相关问题