需要替换scala spark中的列值

nuypyhwy  于 6个月前  发布在  Scala
关注(0)|答案(3)|浏览(49)

我有一个JSON数据存储为字符串列在spark框架,它有一些坏数据例如:

{"name":"neo",
 "age":"22",
 "city""nowhere",
 "country":""}

字符串
我需要转换列值以添加“city”:“nowhere”。我尝试了以下方法,但它无法按预期工作。我应该更改什么才能使其工作

val regex = """(".*?")(".*?")"""
df.withColumn("updated_col", regexp_replace(col("value"), regex, "$1:$2"))

gcxthw6b

gcxthw6b1#

检查以下解决方案

scala> df
.withColumn(
    "updated", 
    regexp_replace(
        $"data",
         """[^:]("")""",
         """\":\""""
    )
)
.show(false)
+------------------------------------------------------+------------------------------------------------------+
|data                                                  |updated                                               |
+------------------------------------------------------+------------------------------------------------------+
|{"name":"neo","age":"22","city""nowhere","country":""}|{"name":"neo","age":"22","cit":"nowhere","country":""}|
+------------------------------------------------------+------------------------------------------------------+

个字符

2lpgd968

2lpgd9682#

由于Spark的regexp_replace的限制(或者可能是我理解它的局限性),我成功做到这一点的唯一方法是使用如下所示的临时唯一的索引来执行两步过程:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._ 
val spark = SparkSession.builder().appName("JsonFix").master("local[1]").getOrCreate()
    

val df = Seq(
      (1, """{"name":"neo", "age":"22", "city""nowhere", "country":""}""")
    ).toDF("id", "value")

df.show(false)

// Introduce a temporary unique delimiter to isolate "city" segments
val tmpDelimiter = "___TMP___"
val df1 = df.withColumn("value", regexp_replace(col("value"), "\"city\"\"", "\"city\"" + tmpDelimiter + "\""))

// adding missing ':' for "city" using the temporary delimiter
val df2 = df1.withColumn("value", regexp_replace(col("value"), "\"city\"" + tmpDelimiter, "\"city\":"))
df2.show(false)

字符串
我也想知道另一种管理方式。

gcxthw6b

gcxthw6b3#

下面是使用json和map spark函数的代码。我已经测试过了。

import spark.implicits._
import org.apache.spark.sql.functions.{from_json,col, map_concat, expr, to_json}
import org.apache.spark.sql.types.{MapType, StringType}

val jsonString="""{"name":"neo","age":"22","country":""}"""
val data = Seq((1, jsonString))
val df=data.toDF("id","value")
df.show(truncate = false)
val df2 = df.withColumn("value", to_json(map_concat(expr("""map("city", "nowhere")"""), from_json(col("value"),MapType(StringType,StringType)))))
df2.show(truncate = false)

字符串
输出量:

+---+--------------------------------------------+
|id |value                                       |
+---+--------------------------------------------+
|1  |{"name":"neo",\n "age":"22",\n "country":""}|
+---+--------------------------------------------+

+---+-------------------------------------------------------+
|id |value                                                  |
+---+-------------------------------------------------------+
|1  |{"city":"nowhere","name":"neo","age":"22","country":""}|
+---+-------------------------------------------------------+

相关问题