从一系列脱机事件在pyspark中模拟rdd数据流

avwztpqn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我需要注入事件保存到hdfs在线Kafka流回到dstream pyspark进行相同的算法处理。我找到了holdenkarau的代码示例,它“相当于kafka这样的可检查点、可重放、可靠的消息队列”。我想知道是否有可能在pyspark中实现它:

package com.holdenkarau.spark.testing
import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.FriendlyInputDStream

/**

* This is a input stream just for the testsuites. This is equivalent to a
* checkpointable, replayable, reliable message queue like Kafka.
* It requires a sequence as input, and returns the i_th element at the i_th batch
* under manual clock.
* 
* Based on TestInputStream class from TestSuiteBase in the Apache Spark project.
* /

class TestInputStream[T: ClassTag](@transient var sc: SparkContext,
  ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
  extends FriendlyInputDStream[T](ssc_) {

  def start() {}

  def stop() {}

  def compute(validTime: Time): Option[RDD[T]] = {
    logInfo("Computing RDD for time " + validTime)
    val index = ((validTime - ourZeroTime) / slideDuration - 1).toInt
    val selectedInput = if (index < input.size) input(index) else Seq[T]()

    // lets us test cases where RDDs are not created
    Option(selectedInput).map{si =>
      val rdd = sc.makeRDD(si, numPartitions)
      logInfo("Created RDD " + rdd.id + " with " + selectedInput)
      rdd
    }
  }
}
kq0g1dla

kq0g1dla1#

spark提供两个内置 DStream 可用于测试的实现,在大多数情况下,您不需要任何外部实现。
ConstantInputDStream StreamingContext.queueStream 第二种是简化版的Pypark- pyspark.streaming.StreamingContext.queueStream :

ssc = StreamingContext(sc)
ssc.queueStream([
    sc.range(0, 1000),
    sc.range(1000, 2000),
    sc.range(2000, 3000)
])

如果还不够,您可以始终使用新线程将序列化数据原子地写入文件系统,然后使用标准的基于文件的线程从文件系统读取数据 DStream .

相关问题