我正在使用kafka流式api(kakfa版本:0.10.2.0)尝试让一个简单的wordcount示例工作:wordcount app gist。我同时负责制作者和控制台使用者: ./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
```
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
启动应用程序,一切似乎都正常工作,但当我在控制台生产者中键入一些字符串时,消费者什么也没有收到。如果我将应用程序更改为对输入执行简单的touppercase,那么消费者将收到流(修改为大写): `//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")` 有人知道为什么我在字数统计的例子里什么都没收到吗?我应该在使用者上指定任何序列化程序吗?在上一次测试中,控制台使用者处理了我通过控制台发送的消息,但没有显示它们,请参见下面的输出:
➜ bin ./kafka-console-consumer.sh
--topic output-topic
--bootstrap-server localhost:9092
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
^C总共处理了7条消息
1条答案
按热度按时间fcy6dtqo1#
KStream
工作,因为它不使用缓存。为了KTable
你得等一等,不然就要定了cache.max.bytes.buffering
至0
(但不在生产代码中!)