无法通过kafka api连接到eventhub

o7jaxewo  于 2021-06-30  发布在  Java
关注(0)|答案(2)|浏览(427)

我在通过Kafka库连接事件中心时遇到以下异常。

Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative

ERROR [pool-6-thread-1] STREAM - code="SAE-SP-A-1000: Stream processing failed, exiting ...",exception="Invalid SASL mechanism response, server may be expecting a different protocol"
 org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol
 Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative
      at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:298) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:678) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:501) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveToken(SaslClientAuthenticator.java:435) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:259) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) ~[kafka-clients-2.2.1.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar!/:?]

消费者属性如下所示:

bootstrap.servers=XXX-topics.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://XXXX-topics.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************";
ih99xse1

ih99xse11#

您还需要使用ssl.ca.cert=因为您使用的是sasl\u ssl而不是sasl\u明文。我们需要使用sasl\u ssl连接到azure事件中心。

xa9qqrwz

xa9qqrwz2#

发布到基本计划事件中心时会发生此错误,因为基本计划不支持通过kafka协议进行交互。
https://azure.microsoft.com/de-de/pricing/details/event-hubs/
升级到标准计划可以解决此问题。

相关问题