我正在使用databricks spark 3.x,我正在读取大量的流(100+),每个流都有自己的契约,需要写出自己的delta/parquet/sql/whatever表。虽然这是许多流,但每个流的活动量很低—有些流一天可能只能看到数百条记录。我想流,因为我的目标是一个相当低的延迟方法。
下面是我要说的(代码为简单而缩写;我正确地使用了检查点、输出模式等)。假设 schemas
变量包含每个主题的架构。我尝试过这种方法,在这里我创建了大量的单个流,但是它需要大量的计算,而且大部分都被浪费了:
def batchprocessor(topic, schema):
def F(df, batchId):
sql = f'''
MERGE INTO SOME TABLE
USING SOME MERGE TABLE ON SOME CONDITION
WHEN MATCHED
UPDATE SET *
WHEN NOT MATCHED
INSERT *
'''
df.createOrReplaceTempView(f"SOME MERGE TABLE")
df._jdf.sparkSession().sql(sql)
return F
for topic in topics:
query = (spark
.readStream
.format("delta")
.load(f"/my-stream-one-table-per-topic/{topic}")
.withColumn('json', from_json(col('value'),schemas[topic]))
.select(col('json.*'))
.writeStream
.format("delta")
.foreachBatch(batchProcessor(topic, schema))
.start())
我还尝试创建一个流来进行大量过滤,但即使在我将单个消息推送到单个主题的测试环境中,性能也相当糟糕:
def batchprocessor(df, batchId):
df.cache()
for topic in topics:
filteredDf = (df.filter(f"topic == '{topic}'")
.withColumn('json', from_json(col('value'),schemas[topic]))
.select(col('json.*')))
sql = f'''
MERGE INTO SOME TABLE
USING SOME MERGE TABLE ON SOME CONDITION
WHEN MATCHED
UPDATE SET *
WHEN NOT MATCHED
INSERT *
'''
filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
filteredDf._jdf.sparkSession().sql(sql)
df.unpersist()
query = (spark
.readStream
.format("delta")
.load(f"/my-stream-all-topics-in-one-but-partitioned")
.writeStream
.format("delta")
.foreachBatch(batchProcessor)
.start())
有没有什么好的方法可以对这样的流进行实质性的解复用?它已经被分区了,所以我假设查询规划器没有做太多的冗余工作,但是看起来仍然有大量的开销。
1条答案
按热度按时间lh80um4z1#
我运行了一系列基准测试,选项2更有效。我还不知道为什么。