如何将部分包含json字符串的文本日志转换为pyspark中的结构化日志?

50pmv0ei  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(367)

我试图从非结构化日志创建一个Dataframe,其中部分包含json

2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}

我试过了

rdd = session.sparkContext.textFile("F:\\mypath\\rdd_test_log.txt")

dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
                                     ip= data.split(" ")[1],
                                     EventTime=data.split(":")[2])).toDF()

结果是

---------+------------------------+
|EventTime                     |ip       |time                    |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+

期望值:

time                     |ip        |eventtime          |sourcename|Keys        |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local     |-9serverkey |status

那么如何将这个json字符串解析为rdd呢?或者应该采取什么方法?
谢谢你的帮助。。
谢谢

xfyts7mz

xfyts7mz1#

你可以用 find('{') 在字符串上选取一个索引,从中可以获得json文本的子字符串,然后解析该json。

dataFrame = (
    rdd.map(lambda l: (l.split(" "), l))
    .map(
        lambda data: Row(
            time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
        )
    )
    .toDF()
    .select(
        "time",
        "ip",
        f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"$1",').alias(
            "EventTime"
        ),
    )
)

dataFrame.show(1, False)

显示

+------------------------+---------+---------------------------------------------------------------------------------------------+
|time                    |ip       |EventTime                                                                                    |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+

然后你就可以解析了 EventTime 可以进一步展开为许多列的Map:

parsed = dataFrame.select(
    "time",
    "ip",
    f.from_json(
        "EventTime",
        StructType(
            [
                StructField("EventTime", StringType()),
                StructField("sourcename", StringType()),
                StructField("Keys", StringType()),
                StructField("Type", StringType()),
            ]
        ),
    ).alias("eventdetails"),
)

现在从Map中创建单独的列

parsed = (
    parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
    .withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
    .withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
    .withColumn("Type", parsed["eventdetails"].getItem("Type"))
    .drop("eventdetails")
)

parsed.show()

它给出:

+--------------------+---------+-------------------+----------+-----------+------+
|                time|       ip|          eventtime|sourcename|       Keys|  Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+

注意,我假设您的json是有效的。 "Keys":-9serverkey 是无效的键/值对,因此我将您的数据编辑为 "Keys":"-9serverkey"

l5tcr1uw

l5tcr1uw2#

用Dataframe替换rdd并使用 text 要获取您的文件:

df = spark.read.text("F:\\mypath\\rdd_test_log.txt")

df.show()
+--------------------+
|               value|
+--------------------+
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
+--------------------+

然后分割json并将其保留为文本。


# Version Spark >= 3

from pyspark.sql import functions as F

df = df.withColumn("values", F.split(F.col("value"), " ", limit=3)).select(
    F.col("values").getItem(0).alias("time"),
    F.col("values").getItem(1).alias("IP"),
    F.col("values").getItem(2).alias("JSON"),
)

# OR

# Version spark <= 2.4

from pyspark.sql import functions as F, types as T

udf_split = F.udf(lambda x : x.split(" ", 2), T.ArrayType(T.StringType()))

df = df.withColumn("values", udf_split(F.col("value"))).select(
    F.col("values").getItem(0).alias("time"),
    F.col("values").getItem(1).alias("IP"),
    F.col("values").getItem(2).alias("JSON"),
)

df.show()
+--------------------+---------+--------------------+
|                time|       IP|                JSON|
+--------------------+---------+--------------------+
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
+--------------------+---------+--------------------+

修复json的步骤

df = df.withColumn(
    "JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([^,]+)', '"Keys":"$1"')
)
df.show(truncate=False)
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time                    |IP       |JSON                                                                                         |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+

然后您只需将json字符串转换为结构类型。

json_struct = T.StructType(
    [
        T.StructField("EventTime", T.StringType()),
        T.StructField("sourcename", T.StringType()),
        T.StructField("Keys", T.StringType()),
        T.StructField("Type", T.StringType()),
    ]
)

df = df.withColumn("JSON", F.from_json("JSON", json_struct))
df.select("time", "IP", "JSON.*").show()

+--------------------+---------+-------------------+----------+-----------+------+
|                time|       IP|          EventTime|sourcename|       Keys|  Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+

相关问题