在spark中使用seprate查询结构化流数据的建议方法是什么

fkvaft9z  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(158)

我使用spark redis从模拟中的不同进程向spark cluster发送数据(使用redis c-client“hiredis”),我的每个数据发送器(进程)生成如下格式的数据流:

step | region_id | value

其中,step是当前模拟步骤,region\u id是每个进程的标识符。
我想使用spark structural streaming将数据从每个区域传输到外部python脚本。目前,我有一个这样的工作版本:(它将有一个数据过滤器,以便查询只处理区域id=1的行)
斯卡拉

def main(args: Array[String]): Unit = {
         val spark = SparkSession
                     .builder()
                     .getOrCreate()

         val fluids = spark
                     .readStream
                     .format("redis")
                     .option("stream.keys","fluids")
                     .schema(StructType(Array(
                           StructField("step", LongType),
                           StructField("region_id", LongType),
                           StructField("valuelist", StringType)
                      )))
                      .load()
          // filter data so that the python script will process data generated from region 1
          val region1 = fluids.select("step", "valuelist").where("region_id = 1")

          val py_command="env python3 ./run_fluiddmd.py"
          val query_py = region1
            .writeStream
            .outputMode("update")
            .format("console")
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .foreachBatch { (batchDF: Dataset[Row], batchId: Long) =>
               // Transform and write batchDF 
               val rows: RDD[Row] = batchDF.rdd

               // pipe rdd
               val pipeRDD = rows.pipe(py_command)
               pipeRDD.collect().foreach(println)
            }.start()

          query_py.awaitTermination()
     } // End main

问题是,我想查询所有可用的区域(假设我从程序参数中传递\u个区域的数目)。
我有这样的初步计划(语法可能是错误的……):

filtered_regions = (fluids.select("step", "valuelist").where("region_id=regionID" ) for regionID in range(number_of_regions)))

然后我可以使用类似foreach的东西,让rueryèy迭代所有区域?
或者我可以只查询所有的数据,而将pipedrdd的数据分区(在foreachbatch循环中)?
我是scala/spark的新手,任何建议和代码示例都非常感谢!谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题