diff --git a/mdb/mpubsub/pubsub.go b/mdb/mpubsub/pubsub.go index fa8efb8..b768911 100644 --- a/mdb/mpubsub/pubsub.go +++ b/mdb/mpubsub/pubsub.go @@ -8,12 +8,12 @@ import ( "time" "cloud.google.com/go/pubsub" + "github.com/mediocregopher/mediocre-go-lib/mcmp" "github.com/mediocregopher/mediocre-go-lib/mctx" "github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/merr" "github.com/mediocregopher/mediocre-go-lib/mlog" "github.com/mediocregopher/mediocre-go-lib/mrun" - oldctx "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -37,54 +37,71 @@ type PubSub struct { *pubsub.Client gce *mdb.GCE - ctx context.Context + cmp *mcmp.Component } -// WithPubSub returns a PubSub instance which will be initialized and configured -// when the start event is triggered on the returned Context (see mrun.Start). -// The PubSub instance will have Close called on it when the stop event is -// triggered on the returned Context(see mrun.Stop). -// -// gce is optional and can be passed in if there's an existing gce object which -// should be used, otherwise a new one will be created with mdb.MGCE. -func WithPubSub(parent context.Context, gce *mdb.GCE) (context.Context, *PubSub) { - ctx := mctx.NewChild(parent, "pubsub") - if gce == nil { - ctx, gce = mdb.WithGCE(ctx, "") +type pubsubOpts struct { + gce *mdb.GCE +} + +// PubSubOpt is a value which adjusts the behavior of InstPubSub. +type PubSubOpt func(*pubsubOpts) + +// PubSubGCE indicates that InstPubSub should use the given GCE instance rather +// than instantiate its own. +func PubSubGCE(gce *mdb.GCE) PubSubOpt { + return func(opts *pubsubOpts) { + opts.gce = gce + } +} + +// InstPubSub instantiates a PubSub which will be initialized when the Init +// event is triggered on the given Component. The PubSub instance will have +// Close called on it when the Shutdown event is triggered on the given +// Component. +func InstPubSub(cmp *mcmp.Component, options ...PubSubOpt) *PubSub { + var opts pubsubOpts + for _, opt := range options { + opt(&opts) } - ps := &PubSub{ - gce: gce, + ps := PubSub{ + gce: opts.gce, + cmp: cmp.Child("pubsub"), + } + if ps.gce == nil { + ps.gce = mdb.InstGCE(ps.cmp) } - ctx = mrun.WithStartHook(ctx, func(innerCtx context.Context) error { - ps.ctx = mctx.MergeAnnotations(ps.ctx, ps.gce.Context()) - mlog.Info("connecting to pubsub", ps.ctx) + mrun.InitHook(ps.cmp, func(ctx context.Context) error { + mlog.From(ps.cmp).Info("connecting to pubsub", ctx) var err error - ps.Client, err = pubsub.NewClient(innerCtx, ps.gce.Project, ps.gce.ClientOptions()...) - return merr.Wrap(err, ps.ctx) + ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...) + return merr.Wrap(err, ps.cmp.Context(), ctx) }) - ctx = mrun.WithStopHook(ctx, func(context.Context) error { + + mrun.ShutdownHook(ps.cmp, func(ctx context.Context) error { + mlog.From(ps.cmp).Info("closing pubsub", ctx) return ps.Client.Close() }) - ps.ctx = ctx - return mctx.WithChild(parent, ctx), ps + return &ps } // Topic provides methods around a particular topic in PubSub type Topic struct { + *PubSub + Name string + ctx context.Context - ps *PubSub topic *pubsub.Topic - name string } // Topic returns, after potentially creating, a topic of the given name func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) { t := &Topic{ - ctx: mctx.Annotate(ps.ctx, "topicName", name), - ps: ps, - name: name, + PubSub: ps, + ctx: mctx.Annotate(ps.cmp.Context(), "topicName", name), + Name: name, } var err error @@ -117,10 +134,11 @@ func (t *Topic) Publish(ctx context.Context, data []byte) error { // Subscription provides methods around a subscription to a topic in PubSub type Subscription struct { - ctx context.Context - topic *Topic - sub *pubsub.Subscription - name string + *Topic + Name string + + ctx context.Context + sub *pubsub.Subscription // only used in tests to trigger batch processing batchTestTrigger chan bool @@ -129,25 +147,25 @@ type Subscription struct { // Subscription returns a Subscription instance, after potentially creating it, // for the Topic func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) { - name = t.name + "_" + name + name = t.Name + "_" + name s := &Subscription{ + Topic: t, + Name: name, ctx: mctx.Annotate(t.ctx, "subName", name), - topic: t, - name: name, } var err error if create { - s.sub, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ + s.sub, err = s.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ Topic: t.topic, }) if isErrAlreadyExists(err) { - s.sub = t.ps.Subscription(name) + s.sub = s.PubSub.Subscription(s.Name) } else if err != nil { return nil, merr.Wrap(err, s.ctx, ctx) } } else { - s.sub = t.ps.Subscription(name) + s.sub = s.PubSub.Subscription(s.Name) if exists, err := s.sub.Exists(ctx); err != nil { return nil, merr.Wrap(err, s.ctx, ctx) } else if !exists { @@ -199,16 +217,14 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum s.sub.ReceiveSettings.MaxExtension = opts.Timeout s.sub.ReceiveSettings.MaxOutstandingMessages = opts.Concurrent - octx := oldctx.Context(ctx) for { - err := s.sub.Receive(octx, func(octx oldctx.Context, msg *Message) { - innerOCtx, cancel := oldctx.WithTimeout(octx, opts.Timeout) + err := s.sub.Receive(ctx, func(ctx context.Context, msg *Message) { + innerCtx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() - innerCtx := context.Context(innerOCtx) ok, err := fn(innerCtx, msg) if err != nil { - mlog.Warn("error consuming pubsub message", + mlog.From(s.cmp).Warn("error consuming pubsub message", s.ctx, ctx, innerCtx, merr.Context(err)) } @@ -218,10 +234,10 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum msg.Nack() } }) - if octx.Err() == context.Canceled || err == nil { + if ctx.Err() == context.Canceled || err == nil { return } else if err != nil { - mlog.Warn("error consuming from pubsub", + mlog.From(s.cmp).Warn("error consuming from pubsub", s.ctx, ctx, merr.Context(err)) time.Sleep(1 * time.Second) } @@ -315,8 +331,8 @@ func (s *Subscription) BatchConsume( } ret, err := fn(thisCtx, msgs) if err != nil { - mlog.Warn("error consuming pubsub batch messages", - s.ctx, merr.Context(err)) + mlog.From(s.cmp).Warn("error consuming pubsub batch messages", + s.ctx, thisCtx, merr.Context(err)) } for i := range thisGroup { thisGroup[i].retCh <- ret // retCh is buffered diff --git a/mdb/mpubsub/pubsub_test.go b/mdb/mpubsub/pubsub_test.go index d2327ea..ab817af 100644 --- a/mdb/mpubsub/pubsub_test.go +++ b/mdb/mpubsub/pubsub_test.go @@ -13,10 +13,10 @@ import ( // this requires the pubsub emulator to be running func TestPubSub(t *T) { - ctx := mtest.Context() - ctx = mtest.WithEnv(ctx, "PUBSUB_GCE_PROJECT", "test") - ctx, ps := WithPubSub(ctx, nil) - mtest.Run(ctx, t, func() { + cmp := mtest.Component() + mtest.Env(cmp, "PUBSUB_GCE_PROJECT", "test") + ps := InstPubSub(cmp) + mtest.Run(cmp, t, func() { topicName := "testTopic_" + mrand.Hex(8) ctx := context.Background() @@ -47,12 +47,13 @@ func TestPubSub(t *T) { } func TestBatchPubSub(t *T) { - ctx := mtest.Context() - ctx = mtest.WithEnv(ctx, "PUBSUB_GCE_PROJECT", "test") - ctx, ps := WithPubSub(ctx, nil) - mtest.Run(ctx, t, func() { - + cmp := mtest.Component() + mtest.Env(cmp, "PUBSUB_GCE_PROJECT", "test") + ps := InstPubSub(cmp) + mtest.Run(cmp, t, func() { topicName := "testBatchTopic_" + mrand.Hex(8) + ctx := context.Background() + topic, err := ps.Topic(ctx, topicName, true) require.NoError(t, err)