将rdd的值作为变量传递给另一个rdd-spark#pyspark

ecr0jaav  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(312)

这个问题在这里已经有答案了

如何从sparkDataframe中的row对象获取值(3个答案)
两年前关门了。
我目前正在探索如何通过sqlcontext调用大型hql文件(包含100行insert into select语句)。
另一件事是,hqls文件是参数化的,所以在从sqlcontext调用它时,我也想传递参数。
已经浏览了大量的博客和帖子,但没有找到任何答案。
我正在尝试的另一件事是,将rdd的输出存储到一个变量中。
Pypark公司

max_date=sqlContext.sql("select max(rec_insert_date) from table")

现在要将max\u date作为变量传递给下一个rdd

incremetal_data=sqlConext.sql(s"select count(1) from table2 where rec_insert_date > $max_dat")

这是行不通的,更何况 max_date 表示为=

u[row-('20018-05-19 00:00:00')]

现在还不清楚如何修剪这些额外的字符。

tkqqtvp1

tkqqtvp11#

你不应该用吗 max(rec_insert_date) 而不是 count(rec_insert_date) ?
将从一个查询返回的值传递给另一个查询有两个选项:
使用collect,它将触发计算并将返回值赋给变量 max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > '{}'".format(max_date)) 另一个更好的选择是使用dataframeapi from pyspark.sql.functions import col, lit incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date)) 使用交叉连接-如果第一个查询有多个结果,则应避免交叉连接。优点是不会破坏处理图,因此可以通过spark优化一切。 max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date")) 至于您的第一个问题是如何从spark调用大型hql文件:
如果您使用的是spark 1.6,则需要创建hivecontexthttps://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-表
如果您使用的是spark 2.x,那么在创建sparksession时,您需要启用配置单元支持https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-表
你可以先把即时消息插入 sqlContext.sql(...) 方法,根据我的经验,这通常是可行的,并且是将逻辑重写为dataframes/datasetsapi的一个很好的起点。在集群中运行它时可能会出现一些问题,因为您的查询将由spark的sql引擎(catalyst)执行,而不会传递到配置单元。

k97glaaz

k97glaaz2#

sql上下文返回数据集[行]。你可以从那里得到你的价值

max_date=sqlContext.sql("select count(rec_insert_date) from table").first()[0]

在spark 2.0+中,使用spark会话可以

max_date=spark.sql("select count(rec_insert_date) from table").rdd.first()[0]

从返回的Dataframe获取基础rdd

相关问题