如何从kafka主题为pyspark structured streaming中的web服务器日志创建Dataframe?

zi8p0yeb  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(166)

我正在尝试解析来自kafka主题的web服务器日志。下面是脚本

from pyspark import SparkContext
from pyspark.sql import SparkSession

print("Kafka App launched")
spark = SparkSession.builder.master("spark://mymaster:7078").appName("django_logstash").getOrCreate()

# df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "myip:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).option("partition.assignment.strategy", "range").load()

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "myip:9092").option("subscribe", "django-logs").option("startingOffsets", "latest").load()

result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

reuslt.printSchema()

print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()
query.awaitTermination()

在这里,日志在批处理中被解析为字符串

Batch 1
Key     value
----    ------
Null    "GET /favicon.ico HTTP/1.1" 404 2737"
Null    "GET /hydjobs HTTP/1.1" 301 0"

预期:我需要通过创建一个新的模式从上面的日志创建一个Dataframe。但由于上述日志包含不同的非静态事件,在这种情况下,为上述日志定义新模式的方法是什么?

感谢你的帮助?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题