pyspark -如何向ArrayType列添加新元素

svmlkihl  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(90)

ItemStruct = StructType([StructField("BomId", StringType()), StructField("price", StringType())])
BomStruct = StructType([StructField("OrderId",StringType()), StructField("items", ArrayType(ItemStruct))])
sampledata_sof = [Row("123-A", [Row("Bom-11", 120), Row("Bom-12", 140)]), Row("100-A", [Row("Bom-23", 170), Row("Bom-24", 190)])]

dfSampleBom = spark.createDataFrame(spark.sparkContext.parallelize(sampledata_sof), BomStruct)
dfSampleBom.printSchema()
dfSampleBom.show()```

字符串
[Output from jupyter notebook](https://i.stack.imgur.com/XzBhG.png)
问:给定上述结构,如何实现以下内容?如果Bom-11在items中,则添加item Bom-99(price $99)。预期输出:OrderId = 123-A的行应在items列表中包含{Bom-99,99}。换句话说,希望生成并有条件地添加一个或几个元素到items ArrayType列中。
尝试使用

df.rdd.map(lambda x: generateItems(x))

型
但得到了错误 *pyspark.errors.exceptions.base。PySparkRuntimeError:[CONTEXT_ONLY_VALID_ON_DRIVER]似乎您正在尝试从广播变量、操作或转换引用SparkContext。SparkContext只能在驱动程序上使用,而不能在它在worker上运行的代码中使用。有关详细信息,请参阅SPARK-5063。*
DF中的项目数量为1000个,因此希望有一个可以激发的解决方案,可以本地分发和有效处理。(阅读UDF可能无法跨工作节点分发,因此不确定这是否是一个选项)
ztmd8pv5

ztmd8pv51#

您可以先使用filter来确定items是否有Bom-11,然后使用array_insertconcat将结构体插入到现有数组中。

Pyspark 3.4+

item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))

df = (dfSampleBom.select(
          'OrderId',
          F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0, 
                 F.array_insert('items', -1, item_to_ingest))
          .otherwise(F.col('items')).alias('items')))

字符串

Pyspark 3.1+

item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))

df = (dfSampleBom.select(
          'OrderId',
          F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0, 
                 F.concat('items', F.array(item_to_ingest)))
          .otherwise(F.col('items')).alias('items')))

相关问题