无法使用控制台使用者从kafka主题读取消息

1tu0hz3e  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(359)

我创造了 stream1KSQL (5.0 beta版)带有支持主题 topic1 以及 avro 架构。我能看懂网上所有的留言 topic1 使用 kafka-avro-console-consumer .
然后我创造了 stream2KSQL 这是基于 stream1 但与 json 名为的消息和备份主题的格式 topic2 . 我能看懂网上所有的留言 topic2 使用 kafka-console-consumer 我创造了 stream3KSQL 基于 stream2json 消息格式和主题名为 topic3 . 但是,我无法阅读上的消息 topic3 使用 kafka-console-consumer .
使用 kafkacat 我得到了不同分区的偏移量 topic3 但没有一条真正的信息被打印出来。
看起来信息好像在主题中,但两者都不是 kafkacat 不是 kafka-console-consumer 能够打印出来。
尝试使用 --from-beginning 以及 --offset earliest --partition 0 没有运气。
以下是ksql语句

CREATE STREAM stream1(p_id STRING, location STRING, u_id STRING, r_id STRING, b_id STRING, recorded_dtm STRING, 
v_type STRING, value STRING) WITH (kafka_topic='topic1', value_format='AVRO');

CREATE STREAM stream2 WITH (KAFKA_topic='topic2', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP') 
AS select P_ID+'-'+LOCATION+'-'+U_ID+'-'+R_ID+'-'+B_ID+'-'+V_TYPE as PARTITION_KEY, 
LOCATION, U_ID, R_ID, V_TYPE, B_ID, STRINGTOTIMESTAMP(recorded_dtm, 'yyyyMMddHHmmss') as RECORDED_TIMESTAMP, 
P_ID, VALUE, RECORDED_DTM,'NM' as DATA_TYPE 
FROM stream1 PARTITION BY PARTITION_KEY;

CREATE STREAM stream3 WITH (KAFKA_topic='topic3', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP') 
AS select PARTITION_KEY, LOCATION, U_ID, R_ID, V_TYPE, B_ID, RECORDED_TIMESTAMP, 
P_ID, VALUE, RECORDED_DTM FROM stream2 PARTITION BY PARTITION_KEY;

附加信息
ksql 如果我跑了 SET 'auto.offset.reset'='earliest'; 然后跑 select * from stream1 limit 5; 或者 select * from stream2 limit 5 我看到打印的记录,但是 select * from stream3 limit 5 不返回任何记录。
如果我跑了 describe extended stream3 我明白了
消息总数:212条
正好是我发给topic1的信息数

3lxsmp7m

3lxsmp7m1#

根本原因是 TimestampSTREAM3 的值 recorded_dtm 发送到主题1上的邮件的列早于 log.retention.hours 中设置的值 kafka server.properties .
我们的 log.retention.hours 值设置为 24 hours 而且记录的dtm值早于24小时。这导致了 STREAM3 以及 topic3 根据保留策略立即删除。

相关问题