kafka使用者解压缩gz文件流并读取

k2fxgqgv  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(862)

kafka producer正在发送.gz文件,但无法在消费端解压和读取文件。获取错误为“ioerror:不是gzip文件”
producer-bin/kafka-console-producer.sh—代理列表localhost:9092 --topic 机场<~/downloads/stocks.json.gz
消费者-

import sys 
import gzip
import StringIO
from kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)

try:
    for message in consumer:
        f = StringIO.StringIO(message.value)
        gzip_f = gzip.GzipFile(fileobj=f)
        unzipped_content = gzip_f.read()
        content = unzipped_content.decode('utf8')
        print (content)
except KeyboardInterrupt:
    sys.exit()

消费者出错-

Traceback (most recent call last):
  File "consumer.py", line 18, in <module>
    unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
    self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
    self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
    raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file
d4so4syb

d4so4syb1#

Kafka不是用来发送巨大的有效载荷/信息的。您应该将其视为一个分布式消息总线,它为您提供了分布式系统的所有特权。
由于以下原因,Kafka限制了可以发送的消息的大小
巨大的消息增加了代理中的内存压力。
大消息会减慢代理的速度,处理它们的成本非常高。
解决方案:
您完全可以使用基于引用的消息传递,将大量消息的位置发送给消费者,而不是按原样发送大量数据。这将允许您使用外部数据存储的功能,还可以减轻Kafka代理的压力。
您还可以将数据分块并内联发送,然后在接收器处重新组装。
使用批量大小: batch.size 以总字节(而不是消息数)度量批大小。它控制在向kafka代理发送消息之前要收集的数据字节数。在不超过可用内存的情况下,将其设置为尽可能高的值。默认值为16384。
如果您增加缓冲区的大小,它可能永远不会满。生产者最终会根据其他触发器发送信息,例如以毫秒为单位的延迟时间。虽然可以通过将缓冲区批处理大小设置得太高来降低内存使用率,但这不会影响延迟。
如果你的制作人一直在发送,你可能会得到最好的吞吐量。如果生产者经常处于空闲状态,您可能没有编写足够的数据来保证当前的资源分配。
因为,你的数据是 gzip 你可以用 Reference Based Messaging .
不要使用fetch size和message max byte size(不能覆盖所有文件大小),而是将文件存储在nfs/hdfs/s3这样的分布式文件系统上,并将引用发送给使用者。消费者可以选择位置并解压缩数据。

相关问题