Go语言 NATS JetStream:是否可以显式地从JetStream请求(重新)发送它在subject foo.* 中收到的最后几条消息?

7qhs6swi  于 5个月前  发布在  Go
关注(0)|答案(2)|浏览(59)

从本质上讲,主题是什么。
我想知道JetStream是否可以以一种允许我们重新获取主题为“foo.”的最后15条消息或JetStream在过去1.5秒内收到的主题为“foo.”的消息的方式进行查询。
如果可能的话,任何代码样本或链接到代码样本是赞赏。

xiozqbni

xiozqbni1#

根据官方文件

  • 可以从特定时间开始抓取消息:在过去的1.5秒内。
    按开始时间搜索

第一次使用消息时,从该时间或之后的消息开始。使用者需要指定OptStartTime,即流中的开始时间。它将在该时间或之后接收最接近的可用消息。

  • 另一个要求,最后15条信息,我认为这是不可能的
mfpqipee

mfpqipee2#

  • 在JetStream中有一种方法可以实现与时间相关的检索。
now := time.Now()
oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)

js, _ := nc.JetStream()
sub, err := js.SubscribeSync(
     "foo.*",
     nats.OrderedConsumer(),
     nats.StartTime(oneAndHalfSecondAgo),
)

for {
    msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
    if err != nil {
        log.Fatal(err)
    }

    // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here

    // 2. if the message is before now we can push it in an array here
}

字符串
请注意,这种技术虽然有用,但效率很低,因为我们要逐个抓取消息。
我们可以使用.Subscribe()(异步的)来修改它,但是这样我们就会有一个不同的问题:
我们会从JetStream中过度拉取超过当前时刻的消息,然后我们必须确保我们抓取的缓冲消息确实会返回到JetStream。据我所知,没有配置选项告诉JetStream关于“MaxTime”。

  • 至于如何“获取最新的N条消息”,可以修改上面的代码示例,以便他将获得相当高的消息块(例如,最后5秒或10秒或30秒内的所有消息),并且在获得到目前为止的所有消息之后,他可以获取最新的“N”条消息。

当然,这种技术并不理想,但似乎没有其他方法可以做到这一点-至少在撰写本文时没有。

相关问题