基于spark中的移动和向dataframe添加批号

x9ybnkn6  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(447)

我有一个需要批处理的数据集(由于api限制)。
一批的列文本长度之和不能超过1000。并且一批中的最大行数不能大于5。
为此,我想在单个批次中添加批次号,以便以后根据批次号处理数据。
如何在pyspark(databricks)中实现这一点。我对这一切都很陌生,我甚至不知道在网上找什么。
我真的很感谢你的帮助。
下表说明了我正在努力实现的目标:
原始表格
idtext长度15002400320043005100610071008100910010300
结果表
idtext长度批号150012400132002430025100261002710028100391003103003

pvcm50d1

pvcm50d11#

如果您不是在寻找最佳解决方案,而是在spark中寻找一种不太复杂的解决问题的方法,我们可以将问题分为两个步骤:
将数据分成块,每个块有5行,忽略文本长度
如果一个块中的文本长度之和太大,请将此块拆分为其他块
此解决方案不是最佳的,因为它产生过多的批次。
步骤1可以使用zipwithindex实现。在创建批id时,我们为以后划分批留下了足够的“空间”。此步骤结束时,一个块中的所有行都被分组到一个列表中,作为步骤2的输入:

df = ...

r = df.rdd.zipWithIndex().toDF() \
    .select("_1.id", "_1.text_length", "_2") \
    .withColumn("batch", F.expr("cast(_2 / 5 as long)*5")) \
    .withColumn("data", F.struct("id", "text_length", "batch")) \
    .groupBy("batch") \
    .agg(F.collect_list("data").alias("data"))

第2部分主要由一个udf组成,它检查在一个批中是否超过了最大文本长度。如果是这样,则以下元素的批处理id将增加一个。由于我们在第1部分中跳过了足够多的批处理ID,因此不会出现任何冲突。

def splitBatchIfNecessary(data):
    text_length = 0
    batch = -1
    for d in data:
        text_length = text_length + d.text_length
        if text_length > 1000:
          if batch == -1:
            text_length = 0
            batch = d.batch + 1
            yield (d.id, d.text_length, d.batch)
          else:
            text_length = d.text_length
            batch = batch + 1
            yield (d.id, d.text_length, batch)          
        else:
          if batch == -1:
            batch = d.batch
          yield (d.id, d.text_length, batch)

schema=r.schema["data"].dataType
split_udf = F.udf(splitBatchIfNecessary, schema)

r = r.withColumn("data",split_udf(F.col("data")) ) \
      .selectExpr("explode(data)") \
      .select("col.*")

输出:

+---+-----------+-----+                                                         
| id|text_length|batch|
+---+-----------+-----+
|  1|        500|    0|
|  2|        400|    0|
|  3|        200|    1|
|  4|        300|    1|
|  5|        100|    1|
|  6|        100|    5|
|  7|        100|    5|
|  8|        100|    5|
|  9|        100|    5|
| 10|        300|    5|
+---+-----------+-----+

可能的优化是 zipWithIndex 通过zipWithUniqueID(但得到的是稍微“不完整”的批处理)或使用矢量化的udf。

hjqgdpho

hjqgdpho2#

不像mck所说的,这不是“划分问题”。
问题是,1)spark与分区一起工作-不只是一个这样的分区才能有效;2)没有一个分组属性来确保“批”可以自然形成或仅在分区内自然提取。此外,我们能有负数或分数吗这是没有说明的。但最多有5个条目。
这意味着处理只需要基于一个分区,但它可能不够大。
尝试处理每个分区是没有意义的,因为所有的工作都需要在每个分区n、n+1等中完成,以抵消分区n-1中的影响。我已经在这里制定了一个解决方案,以便将分区边界考虑在内,但这违反了spark的原则,而且用例更加简单。
实际上不是spark用例。它是一种相对于并行算法的顺序算法,使用pl/sql、scala、java、c++。
唯一的办法是:
在已全局应用zipwithindex的固定大小分区上循环(为了安全)
scala成批处理-临时结果
从上一个创建的批处理中获取所有项并与下一个分区合并
从临时结果中删除最后一批
重复循环
注意:绕过数据的分区边界方面的近似方法似乎不起作用-->另一个答案事实上证明了这一点。你得到的是一个折衷的结果,而不是真正的答案。而且要纠正这一点并不容易,因为批处理有间隙,并且可能由于分组而在其他分区中。

相关问题