spark解决以下问题(转换、透视、交叉连接)

mutmk8jj  于 2021-05-16  发布在  Spark
关注(0)|答案(0)|浏览(180)

这是输入Dataframe:

df1_input = spark.createDataFrame([ \
    ("P1","A","B","C"), \
    ("P1","D","E","F"), \
    ("P1","G","H","I"), \
    ("P1","J","K","L") ], ["Person","L1B","B2E","J3A"])
df1_input.show()

+------+---+---+---+
|Person|L1B|B2E|J3A|
+------+---+---+---+
|    P1|  A|  B|  C|
|    P1|  D|  E|  F|
|    P1|  G|  H|  I|
|    P1|  J|  K|  L|
+------+---+---+---+

下面给出了相应的说明:

df1_item_details = spark.createDataFrame([ \
    ("L1B","item Desc1","A","Detail Desc1"), \
    ("L1B","item Desc1","D","Detail Desc2"), \
    ("L1B","item Desc1","G","Detail Desc3"), \
    ("L1B","item Desc1","J","Detail Desc4"), \
    ("B2E","item Desc2","B","Detail Desc5"), \
    ("B2E","item Desc2","E","Detail Desc6"), \
    ("B2E","item Desc2","H","Detail Desc7"), \
    ("B2E","item Desc2","K","Detail Desc8"), \
    ("J3A","item Desc3","C","Detail Desc9"), \
    ("J3A","item Desc3","F","Detail Desc10"), \
    ("J3A","item Desc3","I","Detail Desc11"), \
    ("J3A","item Desc3","L","Detail Desc12")], ["Item","Item Desc","Detail","Detail Desc"])
df1_item_details.show()

+----+----------+------+-------------+
|Item| Item Desc|Detail|  Detail Desc|
+----+----------+------+-------------+
| L1B|item Desc1|     A| Detail Desc1|
| L1B|item Desc1|     D| Detail Desc2|
| L1B|item Desc1|     G| Detail Desc3|
| L1B|item Desc1|     J| Detail Desc4|
| B2E|item Desc2|     B| Detail Desc5|
| B2E|item Desc2|     E| Detail Desc6|
| B2E|item Desc2|     H| Detail Desc7|
| B2E|item Desc2|     K| Detail Desc8|
| J3A|item Desc3|     C| Detail Desc9|
| J3A|item Desc3|     F|Detail Desc10|
| J3A|item Desc3|     I|Detail Desc11|
| J3A|item Desc3|     L|Detail Desc12|
+----+----------+------+-------------+

以下是一些需要在最终输出上粘贴的标准信息:

df1_stdColumns = spark.createDataFrame([ \
    ("School","BMM"), \
    ("College","MSRIT"), \
    ("Workplace1","Blr"), \
    ("Workplace2","Chn")], ["StdKey","StdVal"])
df1_stdColumns.show()

+----------+------+
|    StdKey|StdVal|
+----------+------+
|    School|   BMM|
|   College| MSRIT|
|Workplace1|   Blr|
|Workplace2|   Chn|
+----------+------+

预期输出如下所示:

+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+
| Person | L1B |  Item Desc1   | B2E |  Item Desc2   | J3A |   Item Desc3   | School | College | WorkPlace1 | WorkPlace2 |
+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+
| P1     | A   | Detail Desc 1 | B   | Detail Desc 5 | C   | Detail Desc 9  | Bmm    | MSRIT   | Blr        | Chn        |
| P1     | D   | Detail Desc 2 | E   | Detail Desc 6 | F   | Detail Desc 10 | Bmm    | MSRIT   | Blr        | Chn        |
| P1     | G   | Detail Desc 3 | H   | Detail Desc 7 | I   | Detail Desc 11 | Bmm    | MSRIT   | Blr        | Chn        |
| P1     | J   | Detail Desc 4 | K   | Detail Desc 8 | L   | Detail Desc 12 | Bmm    | MSRIT   | Blr        | Chn        |
+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+

有人能提出一个最佳的方法吗?输入数据集的大小以百万为单位。目前的代码,我已经运行了约10小时,它不是最佳的。。如果可能的话,寻找一些性能良好的spark(python\scala\sql)代码
编辑:下面是代码,我有工作,但需要永远完成时,输入量以百万计

from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql import DataFrame
from typing import Iterable 

# Databricks runtime 7.3 on spark 3.0.1 which supports AQE

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "10000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum","1")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "10KB")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1B")

df1_input=df1_input.withColumn("RecordId", monotonically_increasing_id())
df1_input_2=df1_input

# Custom function to do transpose

def melt(df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="Value") -> DataFrame:
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

df9_ColsList=melt(df1_stdColumns, id_vars=['StdKey'], value_vars=df1_stdColumns.columns).filter("variable <>'StdKey'")
df9_ColsList=df9_ColsList.groupBy("variable").pivot("StdKey").agg(F.first("value")).drop("variable")

df1_input_2=melt(df1_input_2, id_vars=['RecordId','Person'], value_vars=df1_input_2.columns).filter("variable != 'Person'").filter("variable != 'RecordId'").withColumnRenamed('variable','Name')

df_prevStepInputItemDets=(df1_input_2.join(df1_item_details,(df1_input_2.Name == df1_item_details.Item) & (df1_input_2.Value == df1_item_details.Detail)))

# Since pivot performs better if the columns are know in advance, sacrificing a collect to do it. (Since Pivot without providing this was performing worse)

CurrStagePivotCols_tmp = df1_item_details.select("Item Desc").rdd.flatMap(lambda x: x).collect()
CurrStagePivotCols = [] 
[CurrStagePivotCols.append(x) for x in CurrStagePivotCols_tmp if x not in CurrStagePivotCols] 

df_prevStepInputItemDets=(df_prevStepInputItemDets \
     .groupBy('RecordId',"Person") \
     .pivot("Item Desc",CurrStagePivotCols) \
     #.pivot("Item Desc") \                          
     .agg(F.first("Detail Desc"))).drop("RecordId")

# combine codes and descriptions

# Add rowNumber to both dataframes so that they can be merged side-by-side

def add_rowNum(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("RowNum", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

ta = df1_input.alias('ta')
tb = df_prevStepInputItemDets.alias('tb')

ta = add_rowNum(ta)
tb = add_rowNum(tb)
df9_code_desc = tb.join(ta.drop("Katashiki"), on="RowNum",how='inner').drop("RowNum")

# CrossJoin to plaster standard columns

df9_final=df9_code_desc.crossJoin(df9_ColsList).drop("RecordId")

display(df9_final)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题