此主题应仅为每个x保存最新的“document x updated”事件。但我无法正确配置主题,它保留了多个副本。
我的想法是保持段小,以及所有相关的超时,刷新和保留时间。
主题设置(我对每个选项应用的位置和前缀没有足够清楚的了解,因此可能有几个未使用和不相关的选项以及夸大的数字-欢迎更正):
"cleanup.policy" -> "compact",
"file.delete.delay.ms" -> "10",
"segment.bytes" -> "10000",
"delete.retention.ms" -> "10",
"retention.bytes" -> "10000",
"segment.ms" -> "10",
"retention.ms" -> "10",
"min.cleanable.dirty.ratio" -> "0.001",
"flush.messages" -> "1",
"flush.ms" -> "10",
"min.compaction.lag.ms" -> "1",
"log.cleaner.min.compaction.lag.ms" -> "1"
我用Kafka的《 akka 流》来介绍这个主题:
val ids = List("12345", ...)
val publish: Future[Done] = Source(ids ++ ids ++ ids ++ ids ++ ids)
.map { id =>
ProducerMessage.Message(new ProducerRecord[String, String](topic, id, id), id)
}
.via(producerFlow)
.map(logResult)
.runWith(Sink.ignore)
Await.result(publish, 3.seconds)
等了几秒钟后,我数了数留言:
var count = 0
val runCount = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { t =>
count += 1
t
}
.runWith(Sink.ignore)
Try { Await.result(runCount, timeout) }
我希望消费者能收到 ids.length
消息,但它总是在第一次运行时接收所有生成的消息,在随后的运行中接收更多的消息。
一些压缩确实发生了-如果我运行测试几次,消耗的消息计数停止增长,我在kafka日志中看到了段删除-但是每个键仍然有多个消息。
如何使用kafka主题作为快照存储?
使用Kafka0.10.2.1
谢谢您。
2条答案
按热度按时间eivnm1vs1#
根据kafka规范:“日志压缩确保kafka在单个主题分区的数据日志中始终至少保留每个消息键的最后一个已知值”。i、 Kafka不保证每个密钥只保留一条消息,但是它保证每个密钥都有最新的消息版本。
w8rqjzmb2#
您可以尝试使用配置来查看是否可以实现所需的功能(请参阅本文),但我建议您在应用程序级别进行处理,只使用带有该键的最新消息作为有效消息,因为日志压缩是在一个单独的线程上运行的,所以无法在每次更新之后触发它(即使有办法,也不会非常有效)。