confluent schema registry在运行spark submit时引发未经授权的异常401

5jdjgkvh  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(336)

我试图运行一个spark-dstream作业,它试图从kafka读取数据,并用合流avro模式注册表反序列化它。我们在创建kafkautils.createdirectstream时配置了schema registry基本身份验证和传递配置

val kafkaConfig = Map("basic.auth.credentials.source" -> "URL","key.deserializer" -> classOf[KafkaAvroDeserializer], "value.deserializer" -> classOf[KafkaAvroDeserializer], "bootstrap.servers" -> "localhost:9092", "schema.registry.url" -> "http://username:password@localhost:8081")
val stream = KafkaUtils.createDirectStream[String, GenericData.Record](
      ssc,
      LocationStrategies.PreferConsistent,
      Subscribe[String, GenericData.Record](topics, kafkaConfig)
    )

当我尝试在intellij中运行此命令时,它的工作正常,但是当我尝试使用spark submit命令运行此命令时,我得到了schema registry unauthorized error。。?有人面临同样的问题吗?
错误跟踪:

21/02/21 16:03:42 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic_name-2 at offset 110. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 161
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:486)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:479)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1261)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1488)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1328)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:641)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:602)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:206)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:135)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:39)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:38)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:224)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我正在使用spark 3.0和schema registry 5.3.1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题