diff --git a/mdb/mredis/stream.go b/mdb/mredis/stream.go index 0a7f868..0ee9fa1 100644 --- a/mdb/mredis/stream.go +++ b/mdb/mredis/stream.go @@ -128,6 +128,15 @@ func NewStream(r *Redis, opts StreamOpts) *Stream { } } +func (s *Stream) getNumPending() (int64, error) { + var res []interface{} + err := s.client.Do(radix.Cmd(&res, "XPENDING", s.opts.Key, s.opts.Group)) + if err != nil { + return 0, merr.Wrap(err, s.client.cmp.Context()) + } + return res[0].(int64), nil +} + func (s *Stream) init() error { // MKSTREAM is not documented, but will make the stream if it doesn't // already exist. Only the most elite redis gurus know of it's @@ -139,6 +148,12 @@ func (s *Stream) init() error { return merr.Wrap(err, s.client.cmp.Context()) } + numPending, err := s.getNumPending() + if err != nil { + return err + } + atomic.StoreInt64(&s.numPending, numPending) + // if we're here it means init succeeded, mark as such and gtfo s.hasInit = true return nil diff --git a/mdb/mredis/stream_test.go b/mdb/mredis/stream_test.go index 9135ae2..873ec5f 100644 --- a/mdb/mredis/stream_test.go +++ b/mdb/mredis/stream_test.go @@ -135,11 +135,10 @@ func TestStream(t *T) { // call XPENDING to see if anything comes back, nothing should. t.Log("checking for leftover pending entries") - var xpendingRes []interface{} - err := redis.Do(radix.Cmd(&xpendingRes, "XPENDING", streamKey, group)) + numPending, err := stream.getNumPending() if err != nil { t.Fatalf("error calling XPENDING: %v", err) - } else if numPending := xpendingRes[0].(int64); numPending != 0 { + } else if numPending > 0 { t.Fatalf("XPENDING says there's %v pending msgs, there should be 0", numPending) }