使用databricks spark cluster中的安全kafka

4szc88ey  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(238)

我试图使用一个安全的Kafka主题(使用sasl\u明文,scramlogin方法)。
spark版本2.3.1 scala 2.11 kafka最新版本
我正在使用spark结构化流来构造流。为此,我导入了这个库:spark-sql-kafka-0-10u2.11-2.3.1
这将导入kafka-clients.jar的旧版本(0.10.0.1)。
下面是我的客户.jaas:

KafkaClient {
 org.apache.kafka.common.security.scram.ScramLoginModule required
 mechanism=SCRAM-SHA-512
 security.protocol=SASL_PLAINTEXT
 client.id="*****"
 username="****"
 password="*****";
};

我使用的是scramloginmodule和上面指定的kafka客户机jar,甚至没有这个jar。所以,我添加了kafka客户机jar文件的更高版本kafka-clients-1.1.1-cp1.jar
在我的笔记本中,我有以下代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

val kafkaSource = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",   "host:port")   
    .option("subscribe", "topic")    
  .option("startingOffsets", "earliest") 
  .option("maxOffsetsPerTrigger", 1)

  .option("kafka.sasl.mechanism","SCRAM-SHA-512")
  .option("kafka.security.protocol","SASL_PLAINTEXT")
  .load()

val kafkaStream = kafkaSource.
  select(
    $"key" cast "string",   // deserialize keys
    $"value" cast "string", // deserialize values
    $"topic",
    $"partition",
    $"offset",
    $"timestamp",
    $"timestampType")

import org.apache.spark.sql.ForeachWriter

kafkaStream.writeStream.foreach(new ForeachWriter[Row] {

  override def process(row: Row): Unit = {
    println("Processing : " + row.mkString(","))
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }
}).start().awaitTermination()

从驱动程序日志,我可以看到,我能够连接良好。

18/09/05 13:09:46 INFO AbstractLogin: Successfully logged in.
18/09/05 13:09:46 INFO ConsumerConfig: ConsumerConfig values:

但在这之后,什么也没发生!!打印完以下几行后就会卡住:

18/09/05 13:09:47 INFO AppInfoParser: Kafka version : 0.10.0.1
18/09/05 13:09:47 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5

上面的kafka版本也非常混乱,因为我没有使用这个版本(尽管这是kafka客户端的版本,当我添加spark-sql-kafka-0-10_2.11-2.3.1库时,它会自动导入)。但是我选择了排除kafka客户机(当我导入时)。如我所说,我使用的kafka客户机是更高版本的,我连接的kafka集群也是。
为了复制这一点,我在自己的笔记本电脑上运行的spark集群上运行了相同的代码,一切正常,在那里我可以消费和打印kafka消息。
任何经历过类似问题的人,请告知!!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题