我是spark的新手,我正在使用spark结构化流媒体(spark 3.0.0,scala 2.12)读取kafka主题中的数据
我有3个spark workers,主题分为3个部分
如果我错了,请纠正我,但是当我从不同的答案和spark文档中阅读时,spark使用消费者组来阅读kafka的内容,如果我有相同数量的分区,那么应该将执行者Map为1:1
但是当我检查消费者组时,我发现只有一个ip正在消费,它是驱动程序ip,所以数据不是直接从执行器中消费的,我发现任务只被划分为两个执行器,第三个几乎处于空闲状态
所以我有两个问题:
为什么只有驱动节点从kafka消费?我能让每个工人的执行者直接使用Kafka的数据吗?
我能把工作量分配给所有的执行者吗?
object DataProcessing extends Serializable with Logging {
val topics = ""
val brokers = ""
//window config => I'm using this during processing
val windowDuration = s"2 hours"
val slideDuration = s"15 minutes"
@transient var streamingQuery: StreamingQuery = null
val spark =
SparkSession.builder
.appName("app-name")
.getOrCreate()
import spark.implicits._
def main(args: Array[String]): Unit = {
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "10000")
.load()
logDebug("Kafka Topic Data Schema => ")
df.printSchema()
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
//extracting columns => transformations and groupings => windowedData
val writer = new AuroraSink(url, username, password)
streamingQuery = windowedData.writeStream
.queryName("data-processor")
.foreach(writer)
.outputMode("update")
.option("checkpointLocation", s"/data/$checkpoint")
.trigger(ProcessingTime("10 seconds"))
.start()
暂无答案!
目前还没有任何答案,快来回答吧!