我试图使用一个安全的Kafka主题(使用sasl\u明文,scramlogin方法)。
spark版本2.3.1 scala 2.11 kafka最新版本
我正在使用spark结构化流来构造流。为此,我导入了这个库:spark-sql-kafka-0-10u2.11-2.3.1
这将导入kafka-clients.jar的旧版本(0.10.0.1)。
下面是我的客户.jaas:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
mechanism=SCRAM-SHA-512
security.protocol=SASL_PLAINTEXT
client.id="*****"
username="****"
password="*****";
};
我使用的是scramloginmodule和上面指定的kafka客户机jar,甚至没有这个jar。所以,我添加了kafka客户机jar文件的更高版本kafka-clients-1.1.1-cp1.jar
在我的笔记本中,我有以下代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.option("kafka.sasl.mechanism","SCRAM-SHA-512")
.option("kafka.security.protocol","SASL_PLAINTEXT")
.load()
val kafkaStream = kafkaSource.
select(
$"key" cast "string", // deserialize keys
$"value" cast "string", // deserialize values
$"topic",
$"partition",
$"offset",
$"timestamp",
$"timestampType")
import org.apache.spark.sql.ForeachWriter
kafkaStream.writeStream.foreach(new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println("Processing : " + row.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start().awaitTermination()
从驱动程序日志,我可以看到,我能够连接良好。
18/09/05 13:09:46 INFO AbstractLogin: Successfully logged in.
18/09/05 13:09:46 INFO ConsumerConfig: ConsumerConfig values:
但在这之后,什么也没发生!!打印完以下几行后就会卡住:
18/09/05 13:09:47 INFO AppInfoParser: Kafka version : 0.10.0.1
18/09/05 13:09:47 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
上面的kafka版本也非常混乱,因为我没有使用这个版本(尽管这是kafka客户端的版本,当我添加spark-sql-kafka-0-10_2.11-2.3.1库时,它会自动导入)。但是我选择了排除kafka客户机(当我导入时)。如我所说,我使用的kafka客户机是更高版本的,我连接的kafka集群也是。
为了复制这一点,我在自己的笔记本电脑上运行的spark集群上运行了相同的代码,一切正常,在那里我可以消费和打印kafka消息。
任何经历过类似问题的人,请告知!!
暂无答案!
目前还没有任何答案,快来回答吧!