From d3259d45f8e42e04684dc1f1fafa5c3accf7496c Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Thu, 15 Feb 2018 22:47:18 +0000 Subject: [PATCH] implement mdb.PubSub --- env.test | 5 + mdb/mdb.go | 3 + mdb/ps.go | 380 +++++++++++++++++++++++++++++++++++++++++++++++++ mdb/ps_test.go | 171 ++++++++++++++++++++++ 4 files changed, 559 insertions(+) create mode 100644 env.test create mode 100644 mdb/mdb.go create mode 100644 mdb/ps.go create mode 100644 mdb/ps_test.go diff --git a/env.test b/env.test new file mode 100644 index 0000000..e1691a1 --- /dev/null +++ b/env.test @@ -0,0 +1,5 @@ +if [ "$(ps aux | grep '[p]ubsub-emulator')" = "" ]; then + echo "starting pubsub emulator" + yes | gcloud beta emulators pubsub start >/dev/null 2>&1 & +fi +$(gcloud beta emulators pubsub env-init) diff --git a/mdb/mdb.go b/mdb/mdb.go new file mode 100644 index 0000000..b85b1fe --- /dev/null +++ b/mdb/mdb.go @@ -0,0 +1,3 @@ +// Package mdb contains a number of database wrappers for databases I commonly +// use +package mdb diff --git a/mdb/ps.go b/mdb/ps.go new file mode 100644 index 0000000..8235521 --- /dev/null +++ b/mdb/ps.go @@ -0,0 +1,380 @@ +package mdb + +import ( + "context" + "errors" + "sync" + "time" + + "cloud.google.com/go/pubsub" + "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mlog" + oldctx "golang.org/x/net/context" + "google.golang.org/api/option" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func isErrAlreadyExists(err error) bool { + if err == nil { + return false + } + s, ok := status.FromError(err) + return ok && s.Code() == codes.AlreadyExists +} + +// Message aliases the type in the official driver +type Message = pubsub.Message + +// PubSub holds onto the info needed to communicate with PubSub and wraps a +// *pubsub.Client +type PubSub struct { + proj, credFile string + *pubsub.Client +} + +// NewPubSub initializes and returns PubSub instance for the given projectID and +// using the credentials file (if given) +func NewPubSub(ctx context.Context, projectID, credFile string) (*PubSub, error) { + ps := &PubSub{ + proj: projectID, + credFile: credFile, + } + var err error + ps.Client, err = pubsub.NewClient(ctx, ps.proj, ps.clientOpts()...) + return ps, err +} + +// CfgPubSub configures and returns a PubSub instance which will be usable once +// Run is called on the passed in Cfg instance +// +// defaultProject is optional and can be given to indcate the default gce +// project id for the configuration parameter which is created. +func CfgPubSub(cfg *mcfg.Cfg, defaultProject string) *PubSub { + cfg = cfg.Child("pubsub") + proj := cfg.ParamString("project", defaultProject, "name of the project in gce") + credFile := cfg.ParamString("cred-file", "", "path to pubsub credientials json file, if any") + + var ps PubSub + cfg.Start.Then(func(ctx context.Context) error { + ps.proj = *proj + ps.credFile = *credFile + mlog.Info("connecting to pubsub", &ps) + psInner, err := NewPubSub(ctx, *proj, *credFile) + if err != nil { + return mlog.ErrWithKV(err, &ps) + } + ps = *psInner + return nil + }) + return &ps +} + +func (ps *PubSub) clientOpts() []option.ClientOption { + var opts []option.ClientOption + //opts := []option.ClientOption{ + // option.WithGRPCConnectionPool(ps.NumConns), + //} + if ps.credFile != "" { + opts = append(opts, option.WithCredentialsFile(ps.credFile)) + } + return opts +} + +// KV implements the mlog.KVer interface +func (ps *PubSub) KV() mlog.KV { + return mlog.KV{ + "psProj": ps.proj, + } +} + +// Topic provides methods around a particular topic in PubSub +type Topic struct { + ps *PubSub + topic *pubsub.Topic + name string +} + +// Topic returns, after potentially creating, a topic of the given name +func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) { + var t *pubsub.Topic + var err error + if create { + t, err = ps.Client.CreateTopic(ctx, name) + if isErrAlreadyExists(err) { + t = ps.Client.Topic(name) + } else if err != nil { + return nil, err + } + } else { + t = ps.Client.Topic(name) + if exists, err := t.Exists(ctx); err != nil { + panic(err) + return nil, err + } else if !exists { + return nil, mlog.ErrWithKV(errors.New("topic dne"), ps.KV().Set("topicName", name)) + } + } + return &Topic{ + ps: ps, + topic: t, + name: name, + }, nil +} + +// KV implements the mlog.KVer interface +func (t *Topic) KV() mlog.KV { + kv := t.ps.KV() + kv["topicName"] = t.name + return kv +} + +// Publish publishes a message with the given data as its body to the Topic +func (t *Topic) Publish(ctx context.Context, data []byte) error { + _, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx) + if err != nil { + return mlog.ErrWithKV(err, t) + } + return nil +} + +// Subscription provides methods around a subscription to a topic in PubSub +type Subscription struct { + topic *Topic + sub *pubsub.Subscription + name string + + // only used in tests to trigger batch processing + batchTestTrigger chan bool +} + +// Subscription returns a Subscription instance, after potentially creating it, +// for the Topic +func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) { + name = t.name + "_" + name + var s *pubsub.Subscription + var err error + if create { + s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ + Topic: t.topic, + }) + if isErrAlreadyExists(err) { + s = t.ps.Subscription(name) + } else if err != nil { + return nil, err + } + } else { + s = t.ps.Subscription(name) + if exists, err := s.Exists(ctx); err != nil { + return nil, err + } else if !exists { + return nil, mlog.ErrWithKV(errors.New("sub dne"), t.KV().Set("subName", name)) + } + } + return &Subscription{ + topic: t, + sub: s, + name: name, + }, nil +} + +// KV implements the mlog.KVer interface +func (s *Subscription) KV() mlog.KV { + kv := s.topic.KV() + kv["subName"] = s.name + return kv +} + +// ConsumerFunc is a function which messages being consumed will be passed. The +// returned boolean and returned error are independent. If the bool is false the +// message will be returned to the queue for retrying later. If an error is +// returned it will be logged. +// +// The Context will be canceled once the deadline has been reached (as set when +// Consume is called). +type ConsumerFunc func(context.Context, *Message) (bool, error) + +// ConsumerOpts are options which effect the behavior of a Consume method call +type ConsumerOpts struct { + // Default 30s. The timeout each message has to complete before its context + // is cancelled and the server re-publishes it + Timeout time.Duration + + // Default 1. Number of concurrent messages to consume at a time + Concurrent int + + // TODO DisableBatchAutoTrigger + // Currently there is no auto-trigger behavior, batches only get processed + // on a dumb ticker. This is necessary for the way I plan to have the + // datastore writing, but it's not the expected behavior of a batch getting + // triggered everytime messages come in. +} + +func (co ConsumerOpts) withDefaults() ConsumerOpts { + if co.Timeout == 0 { + co.Timeout = 30 * time.Second + } + if co.Concurrent == 0 { + co.Concurrent = 1 + } + return co +} + +// Consume uses the given ConsumerFunc and ConsumerOpts to process messages off +// the Subscription +func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts ConsumerOpts) { + opts = opts.withDefaults() + s.sub.ReceiveSettings.MaxExtension = opts.Timeout + s.sub.ReceiveSettings.MaxOutstandingMessages = opts.Concurrent + + octx := oldctx.Context(ctx) + for { + err := s.sub.Receive(octx, func(octx oldctx.Context, msg *Message) { + innerCtx, cancel := oldctx.WithTimeout(octx, opts.Timeout) + defer cancel() + + ok, err := fn(context.Context(innerCtx), msg) + if err != nil { + mlog.Warn("error consuming pubsub message", s, mlog.ErrKV(err)) + } + + if ok { + msg.Ack() + } else { + msg.Nack() + } + }) + if octx.Err() == context.Canceled || err == nil { + return + } else if err != nil { + mlog.Warn("error consuming from pubsub", s, mlog.ErrKV(err)) + } + } +} + +// BatchConsumerFunc is similar to ConsumerFunc, except it takes in a batch of +// multiple messages at once. If the boolean returned will apply to every +// message in the batch. +type BatchConsumerFunc func(context.Context, []*Message) (bool, error) + +// BatchGroupFunc is an optional param to BatchConsume which allows for grouping +// messages into separate groups. Each message received is attempted to be +// placed in a group. Grouping is done by calling this function with the +// received message and a random message from a group, and if this function +// returns true then the received message is placed into that group. If this +// returns false for all groups then a new group is created. +// +// This function should be a pure function. +type BatchGroupFunc func(a, b *Message) bool + +// BatchConsume is like Consume, except it groups incoming messages together, +// allowing them to be processed in batches instead of individually. +// +// BatchConsume first collects messages internally for half the +// ConsumerOpts.Timeout value. Once that time has passed it will group all +// messages based on the BatchGroupFunc (if nil then all collected messages form +// one big group). The BatchConsumerFunc is called for each group, with the +// context passed in having a timeout of ConsumerOpts.Timeout/2. +// +// The ConsumerOpts.Concurrent value determines the maximum number of messages +// collected during the first section of the process (before BatchConsumerFn is +// called). +func (s *Subscription) BatchConsume( + ctx context.Context, + fn BatchConsumerFunc, gfn BatchGroupFunc, + opts ConsumerOpts, +) { + opts = opts.withDefaults() + + type promise struct { + msg *Message + retCh chan bool // must be buffered by one + } + + var groups [][]promise + var groupsL sync.Mutex + + groupProm := func(prom promise) { + groupsL.Lock() + defer groupsL.Unlock() + for i := range groups { + if gfn == nil || gfn(groups[i][0].msg, prom.msg) { + groups[i] = append(groups[i], prom) + return + } + } + groups = append(groups, []promise{prom}) + } + + wg := new(sync.WaitGroup) + defer wg.Wait() + + processGroups := func() { + groupsL.Lock() + thisGroups := groups + groups = nil + groupsL.Unlock() + + // we do a waitgroup chain so as to properly handle the cancel + // function. We hold wg (by adding one) until all routines spawned + // here have finished, and once they have release wg and cancel + thisCtx, cancel := context.WithTimeout(ctx, opts.Timeout/2) + thisWG := new(sync.WaitGroup) + thisWG.Add(1) + wg.Add(1) + go func() { + thisWG.Wait() + cancel() + wg.Done() + }() + + for i := range thisGroups { + thisGroup := thisGroups[i] + thisWG.Add(1) + go func() { + defer thisWG.Done() + msgs := make([]*Message, len(thisGroup)) + for i := range thisGroup { + msgs[i] = thisGroup[i].msg + } + ret, err := fn(thisCtx, msgs) + if err != nil { + mlog.Warn("error consuming pubsub batch messages", s, mlog.ErrKV(err)) + } + for i := range thisGroup { + thisGroup[i].retCh <- ret // retCh is buffered + } + }() + } + thisWG.Done() + } + + wg.Add(1) + go func() { + defer wg.Done() + tick := time.NewTicker(opts.Timeout / 2) + defer tick.Stop() + for { + select { + case <-tick.C: + processGroups() + case <-s.batchTestTrigger: + processGroups() + case <-ctx.Done(): + return + } + } + }() + + s.Consume(ctx, func(ctx context.Context, msg *Message) (bool, error) { + retCh := make(chan bool, 1) + groupProm(promise{msg: msg, retCh: retCh}) + select { + case ret := <-retCh: + return ret, nil + case <-ctx.Done(): + return false, errors.New("reading from batch grouping process timed out") + } + }, opts) + +} diff --git a/mdb/ps_test.go b/mdb/ps_test.go new file mode 100644 index 0000000..32ac8c8 --- /dev/null +++ b/mdb/ps_test.go @@ -0,0 +1,171 @@ +package mdb + +import ( + "context" + . "testing" + "time" + + "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mtest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testPS *PubSub + +func init() { + cfg := mcfg.New() + testPS = CfgPubSub(cfg, "test") + cfg.TestRun() +} + +// this requires the pubsub emulator to be running +func TestPubSub(t *T) { + topicName := "testTopic_" + mtest.RandHex(8) + ctx := context.Background() + + // Topic shouldn't exist yet + _, err := testPS.Topic(ctx, topicName, false) + require.Error(t, err) + + // ...so create it + topic, err := testPS.Topic(ctx, topicName, true) + require.NoError(t, err) + + // Create a subscription and consumer + sub, err := topic.Subscription(ctx, "testSub", true) + require.NoError(t, err) + + msgCh := make(chan *Message) + go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) { + msgCh <- m + return true, nil + }, ConsumerOpts{}) + time.Sleep(1 * time.Second) // give consumer time to actually start + + // publish a message and make sure it gets consumed + assert.NoError(t, topic.Publish(ctx, []byte("foo"))) + msg := <-msgCh + assert.Equal(t, []byte("foo"), msg.Data) +} + +func TestBatchPubSub(t *T) { + ctx := context.Background() + topicName := "testBatchTopic_" + mtest.RandHex(8) + topic, err := testPS.Topic(ctx, topicName, true) + require.NoError(t, err) + + readBatch := func(ch chan []*Message) map[byte]int { + select { + case <-time.After(1 * time.Second): + assert.Fail(t, "waited too long to read batch") + return nil + case mm := <-ch: + ret := map[byte]int{} + for _, m := range mm { + ret[m.Data[0]]++ + } + return ret + } + } + + // we use the same sub across the next two sections to ensure that cleanup + // also works + sub, err := topic.Subscription(ctx, "testSub", true) + require.NoError(t, err) + sub.batchTestTrigger = make(chan bool) + + { // no grouping + // Create a subscription and consumer + ctx, cancel := context.WithCancel(ctx) + ch := make(chan []*Message) + go func() { + sub.BatchConsume(ctx, + func(ctx context.Context, mm []*Message) (bool, error) { + ch <- mm + return true, nil + }, + nil, + ConsumerOpts{Concurrent: 5}, + ) + close(ch) + }() + time.Sleep(1 * time.Second) // give consumer time to actually start + + exp := map[byte]int{} + for i := byte(0); i <= 9; i++ { + require.NoError(t, topic.Publish(ctx, []byte{i})) + exp[i] = 1 + } + + time.Sleep(1 * time.Second) + sub.batchTestTrigger <- true + gotA := readBatch(ch) + assert.Len(t, gotA, 5) + + time.Sleep(1 * time.Second) + sub.batchTestTrigger <- true + gotB := readBatch(ch) + assert.Len(t, gotB, 5) + + for i, c := range gotB { + gotA[i] += c + } + assert.Equal(t, exp, gotA) + + time.Sleep(1 * time.Second) // give time to ack before cancelling + cancel() + <-ch + } + + { // with grouping + ctx, cancel := context.WithCancel(ctx) + ch := make(chan []*Message) + go func() { + sub.BatchConsume(ctx, + func(ctx context.Context, mm []*Message) (bool, error) { + ch <- mm + return true, nil + }, + func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 }, + ConsumerOpts{Concurrent: 10}, + ) + close(ch) + }() + time.Sleep(1 * time.Second) // give consumer time to actually start + + exp := map[byte]int{} + for i := byte(0); i <= 9; i++ { + require.NoError(t, topic.Publish(ctx, []byte{i})) + exp[i] = 1 + } + + time.Sleep(1 * time.Second) + sub.batchTestTrigger <- true + gotA := readBatch(ch) + assert.Len(t, gotA, 5) + gotB := readBatch(ch) + assert.Len(t, gotB, 5) + + assertGotGrouped := func(got map[byte]int) { + prev := byte(255) + for i := range got { + if prev != 255 { + assert.Equal(t, prev%2, i%2) + } + prev = i + } + } + + assertGotGrouped(gotA) + assertGotGrouped(gotB) + for i, c := range gotB { + gotA[i] += c + } + assert.Equal(t, exp, gotA) + + time.Sleep(1 * time.Second) // give time to ack before cancelling + cancel() + <-ch + } +}