我正在尝试用spark结构化流媒体处理一些事件。
传入事件如下所示:
事件1:
urlhttp://first/path/to/read/from...
事件2:
urlhttp://second/path/to/read/from...
等等。
我的目标是读取每个url并生成一个新的df。到目前为止,我已经用这样的代码做了,我做了一个 collect()
.
def createDF(url):
file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url
""" Read data """
binary = spark.read.format("binaryFile").load(file_url)
""" Do other operations """
...
""" save the data """
# write it into blob again
return something
def loadData(batchDf, batchId):
"""
batchDf:
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
| body|partition| offset|sequenceNumber| enqueuedTime|publisher|partitionKey| properties|systemProperties| url|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
|[{"topic":"/subsc...| 0|30084343744| 55489|2021-03-03 14:21:...| null| null|[aeg-event-type -...| []|http://path...|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
"""
""" Before ....
df = batchDf.select("url")
url = df.collect()
[createDF(item) for item in url]
"""
# Now without collect()
# Select the url field of the df
url_select_df = batchDf.select("url")
# Read url value
result = url_select_df.rdd.map(lambda x: createDF(x.url))
query = df \
.writeStream \
.foreachBatch(loadData) \
.outputMode("update") \
.queryName("test") \
.start() \
.awaitTermination()
但是,当我想提取不带collect的url时,会收到以下错误消息:
似乎您正试图引用广播中的sparkcontext。
会发生什么?
非常感谢你的帮助
1条答案
按热度按时间r7s23pms1#
没有上帝的召唤
collect
Dataframeurl_select_df
分配给执行人。当你打电话的时候map
,lambda表达式在执行器上执行。因为lambda表达式正在调用createDF
如果使用sparkcontext,则会出现异常,因为无法在执行器上使用sparkcontext。看来你已经想出了解决办法
collect
将Dataframe添加到驱动程序,并在那里应用lambda表达式。只要确保你没有超载(基于内存)你的驱动程序。