spark流kafka超时

p8h8hvxi  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(409)

我尝试在amazonemr上运行spark+kafka集成,使用sparkshell的一个简单示例,但我不断遇到超时错误。但是,当我用 org.apache.kafka 和下面相同的设置,它的工作没有失败。
超时错误:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

我搬家了 client.truststore.jks 以及 client.keystore.p12 到hdfs并运行下面的

$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col

val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
        "kafka.security.protocol" -> "SSL",
        "kafka.ssl.endpoint.identification.algorithm" -> "",
        "kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
        "kafka.ssl.truststore.password" -> "password",
        "kafka.ssl.keystore.type" -> "PKCS12",
        "kafka.ssl.key.password" -> "password",
        "kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
        "kafka.ssl.keystore.password" -> "password")
    )

 val df = spark
        .read
        .option("header", true)
        .option("escape", "\"")
        .csv("s3://bucket/file.csv")

 val publishToKafkaDf = df.withColumn("value", col("body"))

 publishToKafkaDf
      .selectExpr( "CAST(value AS STRING)")
      .write
      .format("kafka")
      .option("topic", "test-topic")
      .options(kafkaOptions)
      .save()
iqih9akk

iqih9akk1#

解决了,这是一个aws安全组与从属节点的出站问题

相关问题