在spark流媒体/结构化流媒体中读取来自kafka的avro消息

yfwxisqw  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我是第一次使用pyspark。spark版本:2.3.0Kafka版本:2.2.0
我有一个Kafka生产者发送的avro格式的嵌套数据,我正试图写在pyspark的Spark流/结构化流的代码,将反序列化的avro从Kafka到Dataframe做转换写在Parquet格式到s3。我能够在spark/scala中找到avro转换器,但是pyspark中的支持还没有被添加。如何在pyspark中转换相同的值。谢谢。

f87krz0w

f87krz0w1#

正如您所提到的,从kafka读取avro消息并通过pyspark解析,没有相同的直接库。但是我们可以通过编写小 Package 器来读取/解析avro消息,并在pyspark流式代码中将该函数作为udf调用,如下所示。
参考资料:pyspark2.4.0,使用readstream-python从kafka读取avro
注:avro是内置的,但外部数据源模块,因为spark 2.4。请按照“ApacheAvro数据源指南”的部署部分部署应用程序。
重新fererence:httpshttp://spark-test.github.io/pyspark-coverage-site/pyspark\u sql\u avro\u functions\u py.html
spark提交:
[调整软件包版本以匹配基于spark/avro版本的安装]

/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064

Pypark流代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os

# Spark Streaming context :

spark = SparkSession.builder.appName('streamingdata').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

# Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topicname"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost.com:9093'

# Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets", "latest") \
     .option("failOnDataLoss" ,"false")\
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id" ,"MCI-CIL")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location", "/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password", "changeit") \
     .option("kafka.sasl.kerberos.keytab","/path/bdpda.headless.keytab") \
     .option("kafka.sasl.kerberos.principal","bdpda") \
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

df1.registerTempTable("test")

# Deserilzing the Avro code function

from pyspark.sql.column import Column, _to_java_column 
def from_avro(col): 
     jsonFormatSchema = """
                    {
                     "type": "record",
                     "name": "struct",
                     "fields": [
                       {"name": "col1", "type": "long"},
                       {"name": "col2", "type": "string"}
                                ]
                     }"""
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema))

spark.udf.register("JsonformatterWithPython", from_avro)

squared_udf = udf(from_avro)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))

# Declaring the Readstream Schema DataFrame :

df2.coalesce(1).writeStream \
   .format("parquet") \
   .option("checkpointLocation","/path/chk31") \
   .outputMode("append") \
   .start("/path/stream/tgt31")

ssc.awaitTermination()

相关问题