kafka流字计数应用程序

gorkyyrv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(316)

我正在使用kafka流式api(kakfa版本:0.10.2.0)尝试让一个简单的wordcount示例工作:wordcount app gist。我同时负责制作者和控制台使用者: ./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092 ```
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning

启动应用程序,一切似乎都正常工作,但当我在控制台生产者中键入一些字符串时,消费者什么也没有收到。如果我将应用程序更改为对输入执行简单的touppercase,那么消费者将收到流(修改为大写): `//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")` 有人知道为什么我在字数统计的例子里什么都没收到吗?我应该在使用者上指定任何序列化程序吗?在上一次测试中,控制台使用者处理了我通过控制台发送的消息,但没有显示它们,请参见下面的输出:

➜ bin ./kafka-console-consumer.sh
--topic output-topic
--bootstrap-server localhost:9092
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

^C总共处理了7条消息
fcy6dtqo

fcy6dtqo1#

KStream 工作,因为它不使用缓存。为了 KTable 你得等一等,不然就要定了 cache.max.bytes.buffering0 (但不在生产代码中!)

相关问题