我正在尝试解析来自kafka主题的web服务器日志。下面是脚本
from pyspark import SparkContext
from pyspark.sql import SparkSession
print("Kafka App launched")
spark = SparkSession.builder.master("spark://mymaster:7078").appName("django_logstash").getOrCreate()
# df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "myip:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).option("partition.assignment.strategy", "range").load()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "myip:9092").option("subscribe", "django-logs").option("startingOffsets", "latest").load()
result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
reuslt.printSchema()
print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()
query.awaitTermination()
在这里,日志在批处理中被解析为字符串
Batch 1
Key value
---- ------
Null "GET /favicon.ico HTTP/1.1" 404 2737"
Null "GET /hydjobs HTTP/1.1" 301 0"
预期:我需要通过创建一个新的模式从上面的日志创建一个Dataframe。但由于上述日志包含不同的非静态事件,在这种情况下,为上述日志定义新模式的方法是什么?
感谢你的帮助?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!