在dataflow上运行的apache beam管道无法从kafkaio读取:ssl握手失败

olmpazwi  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我正在构建一个apachebeam管道,将kafka作为一个无限源进行读取。
我可以用directrunner在本地运行它。
但是,当在云上使用googleclouddataflowrunner运行时,管道将失败,并附带异常堆栈跟踪。
似乎最终是conscrypt java库 javax.net.ssl.SSLException: Unable to parse TLS packet header . 我真的不知道如何解决这个问题。

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        java.util.concurrent.FutureTask.report(FutureTask.java:122)
        java.util.concurrent.FutureTask.get(FutureTask.java:206)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLException: Unable to parse TLS packet header
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:782)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:723)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:688)
        org.conscrypt.Java8EngineWrapper.unwrap(Java8EngineWrapper.java:236)
        org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:464)
        org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328)
        org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255)
        org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79)
        org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:460)
        org.apache.kafka.common.network.Selector.poll(Selector.java:398)
        org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:468)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:450)
        org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1772)
        org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1411)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(KafkaUnboundedReader.java:641)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.lambda$start$0(KafkaUnboundedReader.java:106)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
wqlqzqxt

wqlqzqxt1#

看起来conscrypt在许多这样的cotext中导致了ssl错误。beam 2.9.0中的dataflow worker有一个选项来禁用此功能。请试一试。 --experiment=disable_conscrypt_security_provider . 或者,您可以尝试beam 2.4.x,它不启用conscrypt。

相关问题