scala—当条件statisfied时,如何用以“stage{col's}”开头的所有列的null替换值

gwo2fgha  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(264)

我有一个场景,其中最后一个数据框看起来就像是加入阶段和基础的结果。

+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|ID_key             |ICC_key                 |suff_key       |stage_{timestamp}      |stage_{code}|stage_{dol1}|stage_{dol2}|final_{timestamp}      |final_{code}|final_{dol1}|final_{dol2}|
+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|222                |222                     |1              |2019-02-02 21:50:25.585|9123        |20.00       |1000.00     |2019-03-02 21:50:25.585|7123        |30.00       |200.00      |
|333                |333                     |1              |2020-03-03 21:50:25.585|8123        |30.00       |200.00      |2020-01-03 21:50:25.585|823         |30.00       |200.00      |
|444                |444                     |1              |2020-04-03 21:50:25.585|8123        |30.00       |200.00      |null                   |null        |null        |null        |
|555                |333                     |1              |null                   |null        |null        |null        |2020-05-03 21:50:25.585|813         |30.00       |200.00      |
|111                |111                     |1              |2020-01-01 21:50:25.585|A123        |10.00       |99.00       |null                   |null        |null        |null        |
+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+

我在寻找一个逻辑,在final{timestamp}>stage{timestamp}的每一行上,必须用“null”替换值,所有列都以stage{}开头。
如下所示:

+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|ID_key             |ICC_key                 |suff_key       |stage_{timestamp}      |stage_{code}|stage_{dol1}|stage_{dol2}|final_{timestamp}      |final_{code}|final_{dol1}|final_{dol2}|
+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|222                |222                     |1              |null                   |null        |null        |null        |2019-03-02 21:50:25.585|7123        |30.00       |200.00      |
|333                |333                     |1              |2020-03-03 21:50:25.585|8123        |30.00       |200.00      |2020-01-03 21:50:25.585|823         |30.00       |200.00      |
|444                |444                     |1              |2020-04-03 21:50:25.585|8123        |30.00       |200.00      |null                   |null        |null        |null        |
|555                |333                     |1              |null                   |null        |null        |null        |2020-05-03 21:50:25.585|813         |30.00       |200.00      |
|111                |111                     |1              |2020-01-01 21:50:25.585|A123        |10.00       |99.00       |null                   |null        |null        |null        |
+-------------------+------------------------+---------------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+

如果你能帮我讲逻辑就太好了

ryhaxcpt

ryhaxcpt1#

检查以下代码。
条件

scala> val expr = col("final_{timestamp}") > col("stage_{timestamp}")

条件 Matched

scala> val matched = df
                      .columns
                      .filter(_.startsWith("stage"))
                      .map(c => (when(expr,lit(null)).otherwise(col(c))).as(c))

条件 Not Matched

scala> val notMatched = df
                         .columns
                         .filter(!_.startsWith("stage"))
                         .map(c => col(c).as(c))

结合 Not Matched & Matched

scala> val allColumns = notMatched ++ matched

最终结果

scala> df.select(allColumns:_*).show(false)
+------+-------+--------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|ID_key|ICC_key|suff_key|final_{timestamp}      |final_{code}|final_{dol1}|final_{dol2}|stage_{timestamp}      |stage_{code}|stage_{dol1}|stage_{dol2}|
+------+-------+--------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
|222   |222    |1       |2019-03-02 21:50:25.585|7123        |30.00       |200.00      |null                   |null        |null        |null        |
|333   |333    |1       |2020-01-03 21:50:25.585|823         |30.00       |200.00      |2020-03-03 21:50:25.585|8123        |30.00       |200.00      |
|444   |444    |1       |null                   |null        |null        |null        |null                   |null        |null        |null        |
|555   |333    |1       |2020-05-03 21:50:25.585|813         |30.00       |200.00      |null                   |null        |null        |null        |
|111   |111    |1       |null                   |null        |null        |null        |null                   |null        |null        |null        |
+------+-------+--------+-----------------------+------------+------------+------------+-----------------------+------------+------------+------------+
t9eec4r0

t9eec4r02#

代码如下:

// Select all stage columns from dataframe
val stageColumns = df.columns.filter(_.startsWith("stage"))

// For each stage column nullify unless condition satisfied
stageColumns.foldLeft(df) { (acc, c) =>
  acc.withColumn(c, when('final_timestamp <= 'stage_timestamp, col(c)).otherwise(lit(null)))
}
2hh7jdfx

2hh7jdfx3#

Pypark解决方案:


# test_data

tst = sqlContext.createDataFrame([(867,0.12,'G','2020-07-01 17-49-32','2020-07-02 17-49-32'),(430,0.72,'R','2020-07-01 17-49-32','2020-07-02 17-49-32'),(658,0.32,'A','2020-07-01 17-49-32','2020-06-01 17-49-32'),\
                                              (157,0.83,'R','2020-07-01 17-49-32','2020-06-01 17-49-32'),(521,0.12,'G','2020-07-01 17-49-32','2020-08-01 17-49-32'),(867,0.49,'A','2020-07-01 16-45-32','2020-08-01 17-49-32'),
                                              (430,0.14,'G','2020-07-01 16-45-32','2020-07-01 17-49-32'),(867,0.12,'G','2020-07-01 16-45-32','2020-07-01 17-49-32')],
                                               schema=['stage_1','stage_2','RAG','timestamp','timestamp1'])

# change the string column to date

tst_format= (tst.withColumn('timestamp',F.to_date('timestamp',format='yyyy-MM-dd'))).withColumn('timestamp1',F.to_date('timestamp1',format='yyyy-MM-dd'))

# extract column information with name 'stage' and retain others

col_info =[x for x in  tst_format.columns if 'stage_' in x]
col_orig = list(set(tst_format.columns)-set(col_info))

# Build the query expression

expr = col_orig+[(F.when(F.col('timestamp1')>F.col('timestamp'),F.col(x)).otherwise(F.lit(None))).alias(x) for x in col_info]

# execute query

tst_res = tst_format.select(*expr)

结果是:

+---+----------+----------+-------+-------+
|RAG|timestamp1| timestamp|stage_1|stage_2|
+---+----------+----------+-------+-------+
|  G|2020-07-02|2020-07-01|    867|   0.12|
|  R|2020-07-02|2020-07-01|    430|   0.72|
|  A|2020-06-01|2020-07-01|   null|   null|
|  R|2020-06-01|2020-07-01|   null|   null|
|  G|2020-08-01|2020-07-01|    521|   0.12|
|  A|2020-08-01|2020-07-01|    867|   0.49|
|  G|2020-07-01|2020-07-01|   null|   null|
|  G|2020-07-01|2020-07-01|   null|   null|
+---+----------+----------+-------+-------+

相关问题