我正在使用PySpark2.1。下面是我的输入Dataframe。我很喜欢从不同的Dataframe中获取动态偏移值请帮助
df1型=
类别值
1 3
2 2
4 5
df2型
类别年月周数lag\u属性运行
1 0 0 0 0 2
1 2019 1 1 1 0
1 2019 1 2 2 0
1 2019 1 3 3 0
1 2019 1 4 4 1
1 2019 1 5 5 2
1 2019 1 6 6 3
1 2019 1 7 7 4
1 2019 1 8 8 5
1 2019 1 9 9 6
2 0 0 0 9 0
2 2018 1 1 2 0
2 2018 1 2 3 2
2 2018 1 3 4 3
2 2018 1 3 5 4
如上例所示,df1是我的查找表,它有偏移值,对于1,偏移值是3,对于2类,偏移值是2。
在df2中,runs是我的输出列,因此对于df1中的每个类别值,如果lag值是3,那么从dataframe2[df2]应该考虑lag\u attrbute和lag down的3个值,因此可以看到每3个lag\u属性值都重复运行
我试过了,但没用。请帮忙
df1=df1.registerTempTable("df1")
df2=df2.registerTempTable("df2")
sqlCtx.sql("select st.category,st.Year,st.Month,st.weekyear,st.lag_attribute,LAG(st.lag_attribute,df1.value, 0) OVER (PARTITION BY st.cagtegory ORDER BY st.Year,st.Month,st.weekyear) as return_test from df1 st,df2 lkp where df1.category=df2.category")
请帮我跨过这个障碍
1条答案
按热度按时间7ajki6be1#
lag
接受一个列对象和一个整数(python integer),如函数的签名所示:的值
count
不可能是火种IntegerType
(列对象)。不过,有一些解决方法,让我们从示例数据开始:你能做什么,如果
df1
不是太大(意味着少量的categories
每一个都有很多潜在的价值category
),是转换df1
添加到列表并创建if-elif-elif。。。基于其值的条件:注:这是如果
c
以及l
是整数,如果是字符串,则:现在我们可以应用条件:
如果
df1
大了就可以自己加入了df2
建立在一个lag
列:首先我们要把
values
从df1
至df2
使用联接:如果
df1
不是太大,你应该broadcast
信息技术:现在我们将枚举每个
partition
建立一个lag
列:注意:您的电脑有问题
runs
滞后值,例如catagory=2
它只是滞后1
而不是2
例如。还有一些行具有相同的顺序(例如,示例Dataframe中的最后两行)df2
有相同的category, year, month and weeknumber
)在您的Dataframe中,由于涉及洗牌,所以每次运行代码时可能会得到不同的结果。