我使用spark redis从模拟中的不同进程向spark cluster发送数据(使用redis c-client“hiredis”),我的每个数据发送器(进程)生成如下格式的数据流:
step | region_id | value
其中,step是当前模拟步骤,region\u id是每个进程的标识符。
我想使用spark structural streaming将数据从每个区域传输到外部python脚本。目前,我有一个这样的工作版本:(它将有一个数据过滤器,以便查询只处理区域id=1的行)
斯卡拉
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.getOrCreate()
val fluids = spark
.readStream
.format("redis")
.option("stream.keys","fluids")
.schema(StructType(Array(
StructField("step", LongType),
StructField("region_id", LongType),
StructField("valuelist", StringType)
)))
.load()
// filter data so that the python script will process data generated from region 1
val region1 = fluids.select("step", "valuelist").where("region_id = 1")
val py_command="env python3 ./run_fluiddmd.py"
val query_py = region1
.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.foreachBatch { (batchDF: Dataset[Row], batchId: Long) =>
// Transform and write batchDF
val rows: RDD[Row] = batchDF.rdd
// pipe rdd
val pipeRDD = rows.pipe(py_command)
pipeRDD.collect().foreach(println)
}.start()
query_py.awaitTermination()
} // End main
问题是,我想查询所有可用的区域(假设我从程序参数中传递\u个区域的数目)。
我有这样的初步计划(语法可能是错误的……):
filtered_regions = (fluids.select("step", "valuelist").where("region_id=regionID" ) for regionID in range(number_of_regions)))
然后我可以使用类似foreach的东西,让rueryèy迭代所有区域?
或者我可以只查询所有的数据,而将pipedrdd的数据分区(在foreachbatch循环中)?
我是scala/spark的新手,任何建议和代码示例都非常感谢!谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!