pyspark-rdd:列数不匹配

qkf9rpyu  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(590)

我想使用pyspark构建一个Dataframe,其中一列是数据集其他两列的siphash结果。为此,我创建了一个在 rdd.map() 功能如下:

import siphash
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext( spark )

# Hashing function

def hash_two_columns( row ):
    # Transform row to a dict
    row_dict = row.asDict()
    # Concat col1 and col2
    concat_str = 'E'.join( [str(row_dict['col1']), str(row_dict['col2'])] )
    # Fill string with 0 to get 16 bytes (otherwise error is raised)
    sixteenBytes_str = concat_str.zfill(16)
    # Preserve concatenated value for testing (this can be removed later)
    row_dict["hashcols_str"] = sixteenBytes_str
    # Calculate siphash
    row_dict["hashcols_id"] = siphash.SipHash_2_4( sixteenBytes_str.encode('utf-8') ).hash()
    return Row(**row_dict )

# Create test dataframe

test_df = spark.createDataFrame([
         (1,"text1",58965,11111),
         (3,"text2",78652,888888),
         (4,"text3",78652,888888),              
    ], ("id","item","col1","col2"))

# Build the schema

# Using this to avoid "ValueError: Some of types cannot be determined by the first 100 rows" when pyspark tries to deduct schema by itself

test_df_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("item", StringType(), True),
    StructField("col1", IntegerType(), True),
    StructField("col2", IntegerType(), True),
    StructField("hashcols_str", StringType(), True),
    StructField("hashcols_id", LongType(), True)
])

# Create the final Dataframe

final_test_df = sqlContext \
     .createDataFrame(
          test_df.rdd.map(hash_two_columns).collect(), 
          test_df_schema) \
     .toDF()

final_test_df.show(truncate=False)

尽管架构定义与最终的Dataframe结构匹配,但运行此代码失败,并出现以下错误:
illegalargumentexception:要求失败:列数不匹配。旧列名(6):id,item,col1,col2,hashcols\u str,hashcols\u id新列名(0):(java.lang.runtimeexception)
有没有人知道如何正确地实施这一点?非常感谢您的支持。

8nuwlpux

8nuwlpux1#

我根据这篇文章找到了一个解决方案:
以这种方式更新函数:

def hash_two_columns( col1, col2 ):
    # Concat col1 and col2
    concat_str = 'E'.join( [col1, col2] )
    # Fill string with 0 to get 16 bytes (otherwise error is raised)
    sixteenBytes_str = concat_str.zfill(16)
    # Calculate siphash
    hashcols_id = siphash.SipHash_2_4( sixteenBytes_str.encode('utf-8') ).hash()
    return hashcols_id

然后,使用udf(用户定义函数)将新列添加到dataframe中 withColumn 功能。

from pyspark.sql.functions import udf

example_udf = udf( hash_two_columns, LongType() )

test_df = test_df \
    .withColumn( "hashcols_id", example_udf( test_df.col1, test_df.col2 ) )

test_df.show()

相关问题