有没有办法从stream2中的列“b”中减去stream1中的列“a”?

j13ufse2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我在spark structured streaming(pyspark)中阅读了Kafka的2条流(stream1和stream2)。我必须计算流1和流2的偏移量之差。
我试着这样做:

<class 'pyspark.sql.dataframe.DataFrame'>
root
|--timestamp: timestamp (nullable = true)
|-- value: string (nullable = true)
|-- offset: double (nullable = true)
|-- string_val: string (nullable = true)
|-- ping: double (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- offset_v1: double (nullable = true)
|-- date_time: string (nullable = true)
|-- date_format: timestamp (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
|-- Mean: double (nullable = true)
|-- pingTime: timestamp (nullable = true)
|-- Std_Deviation: double (nullable = true)
|-- devTime: timestamp (nullable = true)
|-- offset_v2: double (nullable = true)
|-- upperBound: double (nullable = true)
|-- lowerBound: double (nullable = true)

stream2 = stream2.withColumn('difference',stream2.offset_v2-stream1.offset_v1)

它抛出一个错误:
pyspark.sql.utils.analysisexception:运算符中的u'resolved属性偏移量ţv1ţ95缺少上限ţ182,标准ţ偏差ţ149,下限ţ189,平均值ţ133,pingtimeţ129-t30000ms,devtimeţ144-t30000ms,偏移量ţv2ţ155!项目[平均值133,pingtime 129-t30000ms,标准偏差149,devtime 144-t30000ms,偏移量v2 155,上界182,下界189,(偏移量v2 155-偏移量v1 95)作为差值233]

guicsvcw

guicsvcw1#

就像venki所说的,您需要首先连接起来比较相关的行。你有什么专栏要写吗?有个约会和身份证就行了。假设两个Dataframe中都有一个名为join\u col的Dataframe:

from pyspark.sql.functions import *

stream_final = stream1.join(stream2, 'join_col', 'inner')

# Now compute difference by adding a new column 'offset_diff':

stream_final = stream_final.withColumn('offset_diff', stream_final.offset_v1 - stream_final.offset_v2)

如果找不到合适的联接,那么在比较不同长度的列的情况下,这就是一个问题,我相信这就是您要处理的。

相关问题