NATS JetStream: Is it possible to explicitly ask from JetStream to (re)send the last few messages it received in subject foo.*?
Essentially what the subject says.
I'm wondering if JetStream can be queried in a way that allows us to refetch either the last 15 messages of subject "foo.*" or the messages that JetStream received on subject "foo.*" in the last 1.5 seconds.
If that's possible any code-samples or links to code-samples are appreciated.
According to the official docs
- It is possible to grab message starting from a certain time: in the last 1.5 seconds.
DeliverByStartTime
When first consuming messages, start with messages on or after this time. The consumer is required to specify OptStartTime, the time in the stream to start at. It will receive the closest available message on or after that time.
- The other requirement, the last 15 messages, I think it's not possible
-
There is a way to achieve the time-related retrieval in JetStream.
now := time.Now() oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500) js, _ := nc.JetStream() sub, err := js.SubscribeSync( "foo.*", nats.OrderedConsumer(), nats.StartTime(oneAndHalfSecondAgo), ) for { msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones if err != nil { log.Fatal(err) } // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here // 2. if the message is before now we can push it in an array here }
Note that this technique, though useful, is quite inefficient because we grab messages one by one.
We could modify it using .Subscribe() (which is asynchronous) but then we would have a different problem:
We would overpull from JetStream past the present moment and then we would have to make sure that the buffered messages that we grabbed will indeed be returned back to JetStream. As far as I can tell there is no configuration option to tell JetStream about the "MaxTime".
-
As for how to "get the latest N-Messages" one could modify the above code-sample so that he will get a reasonably high chunk of messages (p.e. all messages in the last 5secs or 10secs or 30secs) and after getting all the messages up to the present moment he can then grab the latest 'N' messages.
This technique is not ideal ofcourse but there doesn't seem to be another way to do this - at least not at the time of this writing.