将大量sparkDataframe合并为一个

t1qtbnec  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(475)

我在for循环中使用满足不同条件的不同查询查询缓存的hive temp表超过1500次。我需要在循环中使用unionall合并它们。但是我得到了stackoverflow错误,因为spark不能跟上rdd的发展。
伪代码:

df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)

由于rdd沿袭变得困难,这会引发stackoverflow错误。所以我尝试了如下检查点:

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)
    df.rdd.checkpoint
    df = sqlContext.createDataFrame(df.rdd, df.schema)

我也犯了同样的错误。所以我尝试了saveastable,这是我一直想要避免的,因为每个hql查询和循环中的hiveio之间的作业提交存在延迟。但这种方法效果很好。

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df=query something from tableA
        df.write.saveAsTable('output', mode='append')
    elif ():
        df=query something from tableA
        df.write.saveAsTable('output', mode='append')

我需要帮助避免将Dataframe保存到循环内的配置单元中。我想以某种在内存中高效的方式合并dfs。我尝试的另一个选项是将查询结果直接插入到临时表中,我得到一个错误:cannotinsert into a rdd based table。

fnvucqvd

fnvucqvd1#

也许,临时表的结果将工作。

df1="query something from tableA".registerTempTable("result")
sqlContext.sql("Insert into result query something from tableA")

相关问题