似乎您正在尝试从广播引用sparkcontext

fgw7neuy  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(270)

我正在尝试用spark结构化流媒体处理一些事件。
传入事件如下所示:
事件1:
urlhttp://first/path/to/read/from...
事件2:
urlhttp://second/path/to/read/from...
等等。
我的目标是读取每个url并生成一个新的df。到目前为止,我已经用这样的代码做了,我做了一个 collect() .

def createDF(url):

    file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url

    """ Read data """
    binary = spark.read.format("binaryFile").load(file_url)
    """ Do other operations """
    ...

    """ save the data """
    # write it into blob again

    return something

def loadData(batchDf, batchId):

    """
    batchDf:
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |                body|partition|     offset|sequenceNumber|        enqueuedTime|publisher|partitionKey|          properties|systemProperties|                 url|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |[{"topic":"/subsc...|        0|30084343744|         55489|2021-03-03 14:21:...|     null|        null|[aeg-event-type -...|              []|http://path...|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
    """

    """ Before .... 

    df = batchDf.select("url")
    url = df.collect()

    [createDF(item) for item in url]
    """
    # Now without collect()
    # Select the url field of the df
    url_select_df = batchDf.select("url")

    # Read url value
    result = url_select_df.rdd.map(lambda x: createDF(x.url))

query  = df \
    .writeStream \
    .foreachBatch(loadData) \
    .outputMode("update") \
    .queryName("test") \
    .start() \
    .awaitTermination()

但是,当我想提取不带collect的url时,会收到以下错误消息:
似乎您正试图引用广播中的sparkcontext。
会发生什么?
非常感谢你的帮助

r7s23pms

r7s23pms1#

没有上帝的召唤 collect Dataframe url_select_df 分配给执行人。当你打电话的时候 map ,lambda表达式在执行器上执行。因为lambda表达式正在调用 createDF 如果使用sparkcontext,则会出现异常,因为无法在执行器上使用sparkcontext。
看来你已经想出了解决办法 collect 将Dataframe添加到驱动程序,并在那里应用lambda表达式。
只要确保你没有超载(基于内存)你的驱动程序。

相关问题