spark流if(!rdd.partitions.isempty)不工作

kfgdxczn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(487)

我试图从kafka服务器创建一个数据流,然后对该流进行一些转换。我已经包括了一个捕获如果流是空的( if(!rdd.partitions.isEmpty) ); 然而,即使没有任何事件被发表到Kafka的主题中 else 永远达不到语句。

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

stream.foreachRDD { rdd =>
    if(!rdd.partitions.isEmpty) {

        val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)

        val val = message(0)

    } else println("empty stream...")

    ssc.start() 
    ssc.awaitTermination()

}

在使用时,是否应该使用另一种语句来检查流是否为空 KafkaUtils.createDirectStream 而不是 createStream ?

ih99xse1

ih99xse11#

使用 RDD.isEmpty 而不是 RDD.partitions.isEmpty 它添加了一个检查,以查看底层分区是否实际包含元素:

stream.foreachRDD { rdd =>
  if(!rdd.isEmpty) {
    // Stuff
  }
}

原因是什么 RDD.partitions.isEmpty 不起作用的原因是内存中存在一个分区 RDD ,但分区本身是空的。但是从 partitions 这是一个 Array[Partition] ,它不是空的。

相关问题