我正试图发送一个csv文件从Kafka到Spark流应用程序,我不知道怎么做。我在这里读了很多帖子,但没人帮我。
我想我的Kafka生产者发送csv和分裂它在应用程序(消费者),但这并不重要。我试图创建一个rdd并发送给spark。这对普通的字符串消息有效,但对csv无效。
这是我的制作人:
message =sc.textFile("/home/guest/host/Seeds.csv")
producer.send('test', message)
我的spark消费者:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1]) counts = data.flatMap(lambda line: line.split(";")) \
```
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
问题是,通过发送csv,spark streaming不会收到任何事件。有人能帮我了解一下格式或概念吗?
我用python在docker容器下运行producer和consumer。
谢谢您。
2条答案
按热度按时间2cmtqfgy1#
在您的producer中,消息是一个rdd(分布在集群中的csv文件行的集合),它被延迟地评估,也就是说,在您对它执行操作之前,它不会做任何事情。所以你需要在送到Kafka之前收集rdd。请看下面的链接。如何正确使用pyspark向kafka代理发送数据?
eagi6jfj2#
在我的工作中,我把csv转换成json,
这里有一个例子,你可以让它在你的膝盖上(我的意思是没有任何限制)
import json
)然后你可以用下一个答案https://stackoverflow.com/a/47457985/6796393