diff --git a/go.mod b/go.mod index fe1fc49..eb7a00b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/boombuler/barcode v1.0.0 // indirect github.com/go-sql-driver/mysql v1.4.0 github.com/jmoiron/sqlx v1.2.0 - github.com/mediocregopher/radix/v3 v3.3.1-0.20190716044027-c72935c74ab3 + github.com/mediocregopher/radix/v3 v3.3.2 github.com/pquerna/otp v1.1.0 github.com/stretchr/testify v1.3.0 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd // indirect diff --git a/go.sum b/go.sum index 43f1364..516241f 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,8 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mediocregopher/radix/v3 v3.3.1-0.20190716044027-c72935c74ab3 h1:UjRk86epRQI7V3yFJaR7nYiahIoCMLI/1YjtH5ANfkw= -github.com/mediocregopher/radix/v3 v3.3.1-0.20190716044027-c72935c74ab3/go.mod h1:RsC7cELtyL4TGkg0nwRPTa+J2TXZ0dh/ruohD3rnjMk= +github.com/mediocregopher/radix/v3 v3.3.2 h1:2gAC5aDBWQr1LBgaNQiVLb2LGX4lvkARDkfjsuonKJE= +github.com/mediocregopher/radix/v3 v3.3.2/go.mod h1:RsC7cELtyL4TGkg0nwRPTa+J2TXZ0dh/ruohD3rnjMk= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= diff --git a/mdb/mredis/redis.go b/mdb/mredis/redis.go index 77211d9..688c3bb 100644 --- a/mdb/mredis/redis.go +++ b/mdb/mredis/redis.go @@ -17,11 +17,31 @@ type Redis struct { cmp *mcmp.Component } +type redisOpts struct { + dialOpts []radix.DialOpt +} + +// RedisOption is a value which adjusts the behavior of InstRedis. +type RedisOption func(*redisOpts) + +// RedisDialOpts specifies that the given set of DialOpts should be used when +// creating any new connections. +func RedisDialOpts(dialOpts ...radix.DialOpt) RedisOption { + return func(opts *redisOpts) { + opts.dialOpts = dialOpts + } +} + // InstRedis instantiates a Redis instance which will be initialized when the // Init event is triggered on the given Component. The redis client will have // Close called on it when the Shutdown event is triggered on the given // Component. -func InstRedis(parent *mcmp.Component) *Redis { +func InstRedis(parent *mcmp.Component, options ...RedisOption) *Redis { + var opts redisOpts + for _, opt := range options { + opt(&opts) + } + cmp := parent.Child("redis") client := new(struct{ radix.Client }) @@ -35,7 +55,12 @@ func InstRedis(parent *mcmp.Component) *Redis { cmp.Annotate("addr", *addr, "poolSize", *poolSize) mlog.From(cmp).Info("connecting to redis", ctx) var err error - client.Client, err = radix.NewPool("tcp", *addr, *poolSize) + client.Client, err = radix.NewPool( + "tcp", *addr, *poolSize, + radix.PoolConnFunc(func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, opts.dialOpts...) + }), + ) return err }) mrun.ShutdownHook(cmp, func(ctx context.Context) error { diff --git a/mdb/mredis/stream.go b/mdb/mredis/stream.go new file mode 100644 index 0000000..0a7f868 --- /dev/null +++ b/mdb/mredis/stream.go @@ -0,0 +1,231 @@ +package mredis + +import ( + "bufio" + "errors" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/mediocregopher/mediocre-go-lib/mctx" + "github.com/mediocregopher/mediocre-go-lib/merr" + + "github.com/mediocregopher/radix/v3" + "github.com/mediocregopher/radix/v3/resp/resp2" +) + +// borrowed from radix +type streamReaderEntry struct { + stream []byte + entries []radix.StreamEntry +} + +func (s *streamReaderEntry) UnmarshalRESP(br *bufio.Reader) error { + var ah resp2.ArrayHeader + if err := ah.UnmarshalRESP(br); err != nil { + return err + } + if ah.N != 2 { + return errors.New("invalid xread[group] response") + } + + var stream resp2.BulkStringBytes + stream.B = s.stream[:0] + if err := stream.UnmarshalRESP(br); err != nil { + return err + } + s.stream = stream.B + + return (resp2.Any{I: &s.entries}).UnmarshalRESP(br) +} + +// StreamEntry wraps radix's StreamEntry type in order to provde some extra +// functionality. +type StreamEntry struct { + radix.StreamEntry + + // Ack is used in order to acknowledge that a stream message has been + // successfully consumed and should not be consumed again. + Ack func() error + + // Nack is used to declare that a stream message was not successfully + // consumed and it needs to be consumed again. + Nack func() +} + +// StreamOpts are options used to initialize a Stream instance. Fields are +// required unless otherwise noted. +type StreamOpts struct { + // Key is the redis key at which the redis stream resides. + Key string + + // Group is the name of the consumer group which will consume from Key. + Group string + + // Consumer is the name of this particular consumer. This value should + // remain the same across restarts of the process. + Consumer string + + // (Optional) InitialCursor is only used when the consumer group is first + // being created, and indicates where in the stream the consumer group + // should start consuming from. + // + // "0" indicates the group should consume from the start of the stream. "$" + // indicates the group should not consume any old messages, only those added + // after the group is initialized. A specific message id can be given to + // consume only those messages with greater ids. + // + // Defaults to "$". + InitialCursor string + + // (Optional) ReadCount indicates the max number of messages which should be + // read on every XREADGROUP call. 0 indicates no limit. + ReadCount int + + // (Optional) Block indicates what BLOCK value is sent to XREADGROUP calls. + // This value _must_ be less than the ReadtTimeout the redis client is + // using. + // + // Defaults to 5 * time.Second + Block time.Duration +} + +func (opts *StreamOpts) fillDefaults() { + if opts.InitialCursor == "" { + opts.InitialCursor = "$" + } + if opts.Block == 0 { + opts.Block = 5 * time.Second + } +} + +// Stream wraps a Redis instance in order to provide an abstraction over +// consuming messages from a single redis stream. Stream is intended to be used +// in a single-threaded manner, and doesn't spawn any go-routines. +// +// See https://redis.io/topics/streams-intro +type Stream struct { + client *Redis + opts StreamOpts + + // entries are stored to buf in id decreasing order, and then read from it + // from back-to-front. This allows us to not have to re-allocate the buffer + // during runtime. + buf []StreamEntry + + hasInit bool + numPending int64 +} + +// NewStream initializes and returns a Stream instance using the given options. +func NewStream(r *Redis, opts StreamOpts) *Stream { + opts.fillDefaults() + return &Stream{ + client: r, + opts: opts, + buf: make([]StreamEntry, 0, opts.ReadCount), + } +} + +func (s *Stream) init() error { + // MKSTREAM is not documented, but will make the stream if it doesn't + // already exist. Only the most elite redis gurus know of it's + // existence, don't tell anyone. + err := s.client.Do(radix.Cmd(nil, "XGROUP", "CREATE", s.opts.Key, s.opts.Group, s.opts.InitialCursor, "MKSTREAM")) + if err == nil { + // cool + } else if errStr := err.Error(); !strings.HasPrefix(errStr, `BUSYGROUP Consumer Group name already exists`) { + return merr.Wrap(err, s.client.cmp.Context()) + } + + // if we're here it means init succeeded, mark as such and gtfo + s.hasInit = true + return nil +} + +func (s *Stream) wrapEntry(entry radix.StreamEntry) StreamEntry { + return StreamEntry{ + StreamEntry: entry, + Ack: func() error { + return s.client.Do(radix.Cmd(nil, "XACK", s.opts.Key, s.opts.Group, entry.ID.String())) + }, + Nack: func() { atomic.AddInt64(&s.numPending, 1) }, + } +} + +func (s *Stream) fillBufFrom(id string) error { + args := []string{"GROUP", s.opts.Group, s.opts.Consumer} + if s.opts.ReadCount > 0 { + args = append(args, "COUNT", strconv.Itoa(s.opts.ReadCount)) + } + args = append(args, "BLOCK", strconv.Itoa(int(s.opts.Block.Seconds()))) + args = append(args, "STREAMS", s.opts.Key, id) + + var srEntries []streamReaderEntry + err := s.client.Do(radix.Cmd(&srEntries, "XREADGROUP", args...)) + if err != nil { + return merr.Wrap(err, s.client.cmp.Context()) + } else if len(srEntries) == 0 { + return nil // no messages + } else if len(srEntries) != 1 || string(srEntries[0].stream) != s.opts.Key { + return merr.New("malformed return from XREADGROUP", + mctx.Annotate(s.client.cmp.Context(), "srEntries", srEntries)) + } + entries := srEntries[0].entries + + for i := len(entries) - 1; i >= 0; i-- { + s.buf = append(s.buf, s.wrapEntry(entries[i])) + } + return nil +} + +func (s *Stream) fillBuf() error { + if len(s.buf) > 0 { + return nil + } else if !s.hasInit { + if err := s.init(); err != nil { + return err + } else if !s.hasInit { + return nil + } + } + + numPending := atomic.LoadInt64(&s.numPending) + if numPending > 0 { + if err := s.fillBufFrom("0"); err != nil { + return err + } else if len(s.buf) > 0 { + return nil + } + + // no pending entries, we can mark Stream as such and continue. This + // _might_ fail if another routine called Nack in between originally + // loading numPending and now, in which case we should leave the buffer + // alone and let it get filled again later. + if !atomic.CompareAndSwapInt64(&s.numPending, numPending, 0) { + return nil + } + } + + return s.fillBufFrom(">") +} + +// Next returns the next StreamEntry which needs processing, or false. This +// method is expected to block for up to the value of the Block field in +// StreamOpts. +// +// If an error is returned it's up to the caller whether or not they want to +// keep retrying. +func (s *Stream) Next() (StreamEntry, bool, error) { + if err := s.fillBuf(); err != nil { + return StreamEntry{}, false, err + } else if len(s.buf) == 0 { + return StreamEntry{}, false, nil + } + + l := len(s.buf) + entry := s.buf[l-1] + s.buf = s.buf[:l-1] + return entry, true, nil +} diff --git a/mdb/mredis/stream_test.go b/mdb/mredis/stream_test.go new file mode 100644 index 0000000..9135ae2 --- /dev/null +++ b/mdb/mredis/stream_test.go @@ -0,0 +1,159 @@ +package mredis + +import ( + "reflect" + "sync" + . "testing" + "time" + + "github.com/mediocregopher/mediocre-go-lib/mrand" + "github.com/mediocregopher/mediocre-go-lib/mtest" + + "github.com/mediocregopher/radix/v3" +) + +func TestStream(t *T) { + cmp := mtest.Component() + redis := InstRedis(cmp) + + streamKey := "stream-" + mrand.Hex(8) + group := "group-" + mrand.Hex(8) + stream := NewStream(redis, StreamOpts{ + Key: streamKey, + Group: group, + Consumer: "consumer-" + mrand.Hex(8), + InitialCursor: "0", + }) + + mtest.Run(cmp, t, func() { + // once the test is ready to be finished up this will be closed + finishUpCh := make(chan struct{}) + + // continually publish messages, adding them to the expEntries + t.Log("creating publisher") + pubDone := make(chan struct{}) + expEntries := map[radix.StreamEntryID]radix.StreamEntry{} + go func() { + defer close(pubDone) + tick := time.NewTicker(50 * time.Millisecond) + defer tick.Stop() + + for { + var id radix.StreamEntryID + key, val := mrand.Hex(8), mrand.Hex(8) + if err := redis.Do(radix.Cmd(&id, "XADD", streamKey, "*", key, val)); err != nil { + t.Fatalf("error XADDing: %v", err) + } + expEntries[id] = radix.StreamEntry{ + ID: id, + Fields: map[string]string{key: val}, + } + + select { + case <-tick.C: + continue + case <-finishUpCh: + return + } + } + }() + + gotEntriesL := new(sync.Mutex) + gotEntries := map[radix.StreamEntryID]radix.StreamEntry{} + + // spawn some workers which will process the StreamEntry's. We do this + // to try and suss out any race conditions with Nack'ing. Each worker + // will have a random chance of Nack'ing, until finishUpCh is closed and + // then they will Ack everything. + t.Log("creating workers") + const numWorkers = 5 + wg := new(sync.WaitGroup) + entryCh := make(chan StreamEntry, numWorkers*10) + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for entry := range entryCh { + select { + case <-finishUpCh: + default: + if mrand.Intn(10) == 0 { + entry.Nack() + continue + } + } + + if err := entry.Ack(); err != nil { + t.Fatalf("error calling Ack: %v", err) + } + gotEntriesL.Lock() + gotEntries[entry.ID] = entry.StreamEntry + gotEntriesL.Unlock() + } + }() + } + + t.Log("consuming...") + waitTimer := time.After(5 * time.Second) + loop: + for { + select { + case <-waitTimer: + break loop + default: + } + + entry, ok, err := stream.Next() + if err != nil { + t.Fatalf("error calling Next (1): %v", err) + } else if ok { + entryCh <- entry + } + } + + // after 5 seconds we declare that it's time to finish up + t.Log("finishing up...") + close(finishUpCh) + <-pubDone + + // Keep consuming until all messages have come in, then tell the workers + // to clean themselves up. + t.Log("consuming last of the entries") + for { + entry, ok, err := stream.Next() + if err != nil { + t.Fatalf("error calling Next (2): %v", err) + } else if ok { + entryCh <- entry + } else { + break // must be empty + } + } + close(entryCh) + wg.Wait() + t.Log("all workers cleaned up") + + // call XPENDING to see if anything comes back, nothing should. + t.Log("checking for leftover pending entries") + var xpendingRes []interface{} + err := redis.Do(radix.Cmd(&xpendingRes, "XPENDING", streamKey, group)) + if err != nil { + t.Fatalf("error calling XPENDING: %v", err) + } else if numPending := xpendingRes[0].(int64); numPending != 0 { + t.Fatalf("XPENDING says there's %v pending msgs, there should be 0", numPending) + } + + if len(expEntries) != len(gotEntries) { + t.Errorf("len(expEntries):%d != len(gotEntries):%d", len(expEntries), len(gotEntries)) + } + + for id, expEntry := range expEntries { + gotEntry, ok := gotEntries[id] + if !ok { + t.Errorf("did not consume entry %s", id) + } else if !reflect.DeepEqual(gotEntry, expEntry) { + t.Errorf("expEntry:%#v != gotEntry:%#v", expEntry, gotEntry) + } + } + }) +}