从kafka发送csv到spark流媒体

ozxc1zmp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(337)

我正试图发送一个csv文件从Kafka到Spark流应用程序,我不知道怎么做。我在这里读了很多帖子,但没人帮我。
我想我的Kafka生产者发送csv和分裂它在应用程序(消费者),但这并不重要。我试图创建一个rdd并发送给spark。这对普通的字符串消息有效,但对csv无效。
这是我的制作人:

message =sc.textFile("/home/guest/host/Seeds.csv")      
producer.send('test', message)

我的spark消费者:

ssc = StreamingContext(sc, 5)

kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1]) counts = data.flatMap(lambda line: line.split(";")) \ ```
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)

问题是,通过发送csv,spark streaming不会收到任何事件。有人能帮我了解一下格式或概念吗?
我用python在docker容器下运行producer和consumer。
谢谢您。
2cmtqfgy

2cmtqfgy1#

在您的producer中,消息是一个rdd(分布在集群中的csv文件行的集合),它被延迟地评估,也就是说,在您对它执行操作之前,它不会做任何事情。所以你需要在送到Kafka之前收集rdd。请看下面的链接。如何正确使用pyspark向kafka代理发送数据?

eagi6jfj

eagi6jfj2#

在我的工作中,我把csv转换成json,
这里有一个例子,你可以让它在你的膝盖上(我的意思是没有任何限制) import json )

from kafka import KafkaProducer
import time,csv

'''
input csv example

AT,BE,BG,CH,CY,CZ,DE,DK,EE,ES,FI,FR,EL,HR,HU,IE,IT,LT,LU,LV,NL,NO,PL,PT,RO,SI,SK,SE,UK
0.15104895104895097,0.155978142670726,0.0,0.132959173102667,0,0.0261248185776488,0.0314454263905056,0.0,0.0,0.22130378970001602,0.0,0.0881265984931488,0.09026049932169501,0.056874262941565,0.0841602727424313,0.0494006197388216,0.0912473405767843,0.0,0.0656217442366246,0.0,0.0432966804004962,0.0,0.0,0.19138755980861197,0.0,0.0521335743946527,0.0,0.0,0.0434660616908725

'''

# create producer to kafka connection

producer = KafkaProducer(bootstrap_servers='89.218.20.173:9092')

# define *.csv file and a char that divide value

fname = "input.csv"
divider_char = ','

# open file

with open(fname) as fp:  
    # read header (first line of the input file)
    line = fp.readline()
    header = line.split(divider_char)

    #loop other data rows 
    line = fp.readline()    
    while line:
        # start to prepare data row to send
        data_to_send = ""
        values = line.split(divider_char)
        len_header = len(header)
        for i in range(len_header):
            data_to_send += "\""+header[i].strip()+"\""+":"+"\""+values[i].strip()+"\""
            if i<len_header-1 :
                data_to_send += ","
        data_to_send = "{"+data_to_send+"}"

        '''
        example of outputs is valid JSON row 
        {
            "AT":"0.148251748251748",
            "BE":"0.052603706790461",
                ...
            "SE":"0.0826699344612236",
            "UK":"0.10951678628072099"
        }
        '''

        # send data via producer
        producer.send('test', bytes(data_to_send, encoding='utf-8'))
        line = fp.readline()
        # А это так))) на всякий случай
        #time.sleep(1)
producer.close()

然后你可以用下一个答案https://stackoverflow.com/a/47457985/6796393

相关问题