我是第一次使用pyspark。spark版本:2.3.0Kafka版本:2.2.0
我有一个Kafka生产者发送的avro格式的嵌套数据,我正试图写在pyspark的Spark流/结构化流的代码,将反序列化的avro从Kafka到Dataframe做转换写在Parquet格式到s3。我能够在spark/scala中找到avro转换器,但是pyspark中的支持还没有被添加。如何在pyspark中转换相同的值。谢谢。
我是第一次使用pyspark。spark版本:2.3.0Kafka版本:2.2.0
我有一个Kafka生产者发送的avro格式的嵌套数据,我正试图写在pyspark的Spark流/结构化流的代码,将反序列化的avro从Kafka到Dataframe做转换写在Parquet格式到s3。我能够在spark/scala中找到avro转换器,但是pyspark中的支持还没有被添加。如何在pyspark中转换相同的值。谢谢。
1条答案
按热度按时间f87krz0w1#
正如您所提到的,从kafka读取avro消息并通过pyspark解析,没有相同的直接库。但是我们可以通过编写小 Package 器来读取/解析avro消息,并在pyspark流式代码中将该函数作为udf调用,如下所示。
参考资料:pyspark2.4.0,使用readstream-python从kafka读取avro
注:avro是内置的,但外部数据源模块,因为spark 2.4。请按照“ApacheAvro数据源指南”的部署部分部署应用程序。
重新fererence:httpshttp://spark-test.github.io/pyspark-coverage-site/pyspark\u sql\u avro\u functions\u py.html
spark提交:
[调整软件包版本以匹配基于spark/avro版本的安装]
Pypark流代码: