Camel Azure事件中心问题

eh57zj3b  于 2023-06-05  发布在  Apache
关注(0)|答案(1)|浏览(88)

我的camel路由,它公开了一个rest端点并将消息路由到azure event hub。当它将消息路由到事件中心时,我收到编码错误。感谢是否有任何输入(路线上线且无连接问题)
代码如下:

@Component
public class RestEventHubRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        restConfiguration().component("servlet").bindingMode(RestBindingMode.auto);

        from("rest:post:/rest").to(
                "azure-eventhubs://?connectionString=Endpoint=xxx");

    }

}

错误:-

java.lang.IllegalArgumentException: No encoding is known for map entry value of type: org.apache.catalina.connector.RequestFacade
    at org.apache.qpid.proton.codec.MapType.calculateSize(MapType.java:110) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.MapType.getEncoding(MapType.java:66) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.MapType.getEncoding(MapType.java:29) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.AbstractPrimitiveType.write(AbstractPrimitiveType.java:27) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.messaging.FastPathApplicationPropertiesType.write(FastPathApplicationPropertiesType.java:173) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.messaging.FastPathApplicationPropertiesType.write(FastPathApplicationPropertiesType.java:43) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.codec.EncoderImpl.writeObject(EncoderImpl.java:734) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:736) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:696) ~[proton-j-0.33.8.jar:na]
    at com.azure.messaging.eventhubs.EventDataBatch.getSize(EventDataBatch.java:187) ~[azure-messaging-eventhubs-5.11.2.jar:5.11.2]
    at com.azure.messaging.eventhubs.EventDataBatch.tryAdd(EventDataBatch.java:121) ~[azure-messaging-eventhubs-5.11.2.jar:5.11.2]
    at com.azure.messaging.eventhubs.EventHubProducerAsyncClient$EventDataCollector.lambda$accumulator$0(EventHubProducerAsyncClient.java:698) ~[azure-messaging-eventhubs-5.11.2.jar:5.11.2]
    at reactor.core.publisher.MonoStreamCollector$StreamCollectorSubscriber.onNext(MonoStreamCollector.java:132) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:402) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:291) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoStreamCollector$StreamCollectorSubscriber.onSubscribe(MonoStreamCollector.java:121) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:81) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onComplete(FluxRetryWhen.java:200) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onComplete(FluxTimeout.java:234) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onComplete(FluxTakeUntil.java:121) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onNext(FluxTakeUntil.java:99) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.tryOnNext(FluxDistinctUntilChanged.java:149) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.onNext(FluxDistinctUntilChanged.java:102) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.ReplayProcessor.tryEmitNext(ReplayProcessor.java:508) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.4.26.jar:3.4.26]
    at com.azure.core.amqp.implementation.handler.Handler.onNext(Handler.java:89) ~[azure-core-amqp-2.4.2.jar:2.4.2]
    at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteOpen(SendLinkHandler.java:114) ~[azure-core-amqp-2.4.2.jar:2.4.2]
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:164) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:na]
    at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.4.2.jar:2.4.2]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.26.jar:3.4.26]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

我期待将消息转发到事件中心的路由

i1icjdpr

i1icjdpr1#

或许现在帮不上忙,但也许能帮到别人。在我的例子中,这是一个序列化问题,因为Eventhubs会尝试序列化你发送给它的任何东西。我可以通过使用以下命令删除Camel头来修复它:

.removeHeaders("*") // Remove all headers

事实证明camel也会将所有的头发送到eventhub,这可能会导致问题,因为Eventhub也会尝试序列化这些头。删除(其中一些)可以解决这个问题。

相关问题