databricks激发了将结构化流写入大量接收器的最佳实践?

ht4b089n  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(393)

我正在使用databricks spark 3.x,我正在读取大量的流(100+),每个流都有自己的契约,需要写出自己的delta/parquet/sql/whatever表。虽然这是许多流,但每个流的活动量很低—有些流一天可能只能看到数百条记录。我想流,因为我的目标是一个相当低的延迟方法。
下面是我要说的(代码为简单而缩写;我正确地使用了检查点、输出模式等)。假设 schemas 变量包含每个主题的架构。我尝试过这种方法,在这里我创建了大量的单个流,但是它需要大量的计算,而且大部分都被浪费了:

def batchprocessor(topic, schema):
  def F(df, batchId):
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    df.createOrReplaceTempView(f"SOME MERGE TABLE")
    df._jdf.sparkSession().sql(sql)
  return F
for topic in topics:
  query = (spark
    .readStream
    .format("delta")
    .load(f"/my-stream-one-table-per-topic/{topic}")
    .withColumn('json', from_json(col('value'),schemas[topic]))
    .select(col('json.*'))
    .writeStream
    .format("delta")
    .foreachBatch(batchProcessor(topic, schema))
    .start())

我还尝试创建一个流来进行大量过滤,但即使在我将单个消息推送到单个主题的测试环境中,性能也相当糟糕:

def batchprocessor(df, batchId):
  df.cache()
  for topic in topics:
    filteredDf = (df.filter(f"topic == '{topic}'")
      .withColumn('json', from_json(col('value'),schemas[topic]))
      .select(col('json.*')))
    sql = f'''
  MERGE INTO SOME TABLE
  USING SOME MERGE TABLE ON SOME CONDITION
  WHEN MATCHED
  UPDATE SET *
  WHEN NOT MATCHED
  INSERT *
  '''
    filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
    filteredDf._jdf.sparkSession().sql(sql)
  df.unpersist()

query = (spark
.readStream
.format("delta")
.load(f"/my-stream-all-topics-in-one-but-partitioned")
.writeStream
.format("delta")
.foreachBatch(batchProcessor)
.start())

有没有什么好的方法可以对这样的流进行实质性的解复用?它已经被分区了,所以我假设查询规划器没有做太多的冗余工作,但是看起来仍然有大量的开销。

lh80um4z

lh80um4z1#

我运行了一系列基准测试,选项2更有效。我还不知道为什么。

相关问题