now := time.Now()
oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
js, _ := nc.JetStream()
sub, err := js.SubscribeSync(
"foo.*",
nats.OrderedConsumer(),
nats.StartTime(oneAndHalfSecondAgo),
)
for {
msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
if err != nil {
log.Fatal(err)
}
// 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
// 2. if the message is before now we can push it in an array here
}
2条答案
按热度按时间xiozqbni1#
根据官方文件
按开始时间搜索
第一次使用消息时,从该时间或之后的消息开始。使用者需要指定OptStartTime,即流中的开始时间。它将在该时间或之后接收最接近的可用消息。
mfpqipee2#
字符串
请注意,这种技术虽然有用,但效率很低,因为我们要逐个抓取消息。
我们可以使用.Subscribe()(异步的)来修改它,但是这样我们就会有一个不同的问题:
我们会从JetStream中过度拉取超过当前时刻的消息,然后我们必须确保我们抓取的缓冲消息确实会返回到JetStream。据我所知,没有配置选项告诉JetStream关于“MaxTime”。
当然,这种技术并不理想,但似乎没有其他方法可以做到这一点-至少在撰写本文时没有。