Brian Picciano 4b446a0efc mctx: refactor so that contexts no longer carry mutable data
This change required refactoring nearly every package in this project,
but it does a lot to simplify mctx and make other code using it easier
to think about.

Other code, such as mlog and mcfg, had to be slightly modified for this
change to work as well.
2019-02-07 19:42:12 -05:00

373 lines
10 KiB

// Package mpubsub implements connecting to Google's PubSub service and
// simplifying a number of interactions with it.
package mpubsub
import (
oldctx ""
// TODO Consume (and probably BatchConsume) don't properly handle the Client
// being closed.
func isErrAlreadyExists(err error) bool {
if err == nil {
return false
s, ok := status.FromError(err)
return ok && s.Code() == codes.AlreadyExists
// Message aliases the type in the official driver
type Message = pubsub.Message
// PubSub is a wrapper around a pubsub client providing more functionality.
type PubSub struct {
gce *mdb.GCE
ctx context.Context
// MNew returns a PubSub instance which will be initialized and configured when
// the start event is triggered on the returned Context (see mrun.Start). The
// PubSub instance will have Close called on it when the stop event is triggered
// on the returned Context(see mrun.Stop).
// gce is optional and can be passed in if there's an existing gce object which
// should be used, otherwise a new one will be created with mdb.MGCE.
func MNew(ctx context.Context, gce *mdb.GCE) (context.Context, *PubSub) {
if gce == nil {
ctx, gce = mdb.MGCE(ctx, "")
ps := &PubSub{
gce: gce,
ctx: mctx.NewChild(ctx, "pubsub"),
// TODO the equivalent functionality as here will be added with annotations
// ps.log.SetKV(ps)
ps.ctx = mrun.OnStart(ps.ctx, func(innerCtx context.Context) error {
mlog.Info(ps.ctx, "connecting to pubsub")
var err error
ps.Client, err = pubsub.NewClient(innerCtx, ps.gce.Project, ps.gce.ClientOptions()...)
return merr.WithKV(err, ps.KV())
ps.ctx = mrun.OnStop(ps.ctx, func(context.Context) error {
return ps.Client.Close()
return mctx.WithChild(ctx, ps.ctx), ps
// KV implements the mlog.KVer interface
func (ps *PubSub) KV() map[string]interface{} {
return ps.gce.KV()
// Topic provides methods around a particular topic in PubSub
type Topic struct {
ps *PubSub
topic *pubsub.Topic
name string
// Topic returns, after potentially creating, a topic of the given name
func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) {
t := &Topic{
ps: ps,
name: name,
var err error
if create {
t.topic, err = ps.Client.CreateTopic(ctx, name)
if isErrAlreadyExists(err) {
t.topic = ps.Client.Topic(name)
} else if err != nil {
return nil, merr.WithKV(err, t.KV())
} else {
t.topic = ps.Client.Topic(name)
if exists, err := t.topic.Exists(ctx); err != nil {
return nil, merr.WithKV(err, t.KV())
} else if !exists {
err := merr.New("topic dne")
return nil, merr.WithKV(err, t.KV())
return t, nil
// KV implements the mlog.KVer interface
func (t *Topic) KV() map[string]interface{} {
kv :=
kv["topicName"] =
return kv
// Publish publishes a message with the given data as its body to the Topic
func (t *Topic) Publish(ctx context.Context, data []byte) error {
_, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx)
if err != nil {
return merr.WithKV(err, t.KV())
return nil
// Subscription provides methods around a subscription to a topic in PubSub
type Subscription struct {
topic *Topic
sub *pubsub.Subscription
name string
// only used in tests to trigger batch processing
batchTestTrigger chan bool
// Subscription returns a Subscription instance, after potentially creating it,
// for the Topic
func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) {
name = + "_" + name
s := &Subscription{
topic: t,
name: name,
var err error
if create {
s.sub, err =, name, pubsub.SubscriptionConfig{
Topic: t.topic,
if isErrAlreadyExists(err) {
s.sub =
} else if err != nil {
return nil, merr.WithKV(err, s.KV())
} else {
s.sub =
if exists, err := s.sub.Exists(ctx); err != nil {
return nil, merr.WithKV(err, s.KV())
} else if !exists {
err := merr.New("sub dne")
return nil, merr.WithKV(err, s.KV())
return s, nil
// KV implements the mlog.KVer interface
func (s *Subscription) KV() map[string]interface{} {
kv := s.topic.KV()
kv["subName"] =
return kv
// ConsumerFunc is a function which messages being consumed will be passed. The
// returned boolean and returned error are independent. If the bool is false the
// message will be returned to the queue for retrying later. If an error is
// returned it will be logged.
// The Context will be canceled once the deadline has been reached (as set when
// Consume is called).
type ConsumerFunc func(context.Context, *Message) (bool, error)
// ConsumerOpts are options which effect the behavior of a Consume method call
type ConsumerOpts struct {
// Default 30s. The timeout each message has to complete before its context
// is cancelled and the server re-publishes it
Timeout time.Duration
// Default 1. Number of concurrent messages to consume at a time
Concurrent int
// TODO DisableBatchAutoTrigger
// Currently there is no auto-trigger behavior, batches only get processed
// on a dumb ticker. This is necessary for the way I plan to have the
// datastore writing, but it's not the expected behavior of a batch getting
// triggered everytime <Concurrent> messages come in.
func (co ConsumerOpts) withDefaults() ConsumerOpts {
if co.Timeout == 0 {
co.Timeout = 30 * time.Second
if co.Concurrent == 0 {
co.Concurrent = 1
return co
// Consume uses the given ConsumerFunc and ConsumerOpts to process messages off
// the Subscription
func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts ConsumerOpts) {
opts = opts.withDefaults()
s.sub.ReceiveSettings.MaxExtension = opts.Timeout
s.sub.ReceiveSettings.MaxOutstandingMessages = opts.Concurrent
octx := oldctx.Context(ctx)
for {
err := s.sub.Receive(octx, func(octx oldctx.Context, msg *Message) {
innerCtx, cancel := oldctx.WithTimeout(octx, opts.Timeout)
defer cancel()
ok, err := fn(context.Context(innerCtx), msg)
if err != nil {
mlog.Warn(, "error consuming pubsub message", s, merr.KV(err))
if ok {
} else {
if octx.Err() == context.Canceled || err == nil {
} else if err != nil {
mlog.Warn(, "error consuming from pubsub", s, merr.KV(err))
time.Sleep(1 * time.Second)
// BatchConsumerFunc is similar to ConsumerFunc, except it takes in a batch of
// multiple messages at once. If the boolean returned will apply to every
// message in the batch.
type BatchConsumerFunc func(context.Context, []*Message) (bool, error)
// BatchGroupFunc is an optional param to BatchConsume which allows for grouping
// messages into separate groups. Each message received is attempted to be
// placed in a group. Grouping is done by calling this function with the
// received message and a random message from a group, and if this function
// returns true then the received message is placed into that group. If this
// returns false for all groups then a new group is created.
// This function should be a pure function.
type BatchGroupFunc func(a, b *Message) bool
// BatchConsume is like Consume, except it groups incoming messages together,
// allowing them to be processed in batches instead of individually.
// BatchConsume first collects messages internally for half the
// ConsumerOpts.Timeout value. Once that time has passed it will group all
// messages based on the BatchGroupFunc (if nil then all collected messages form
// one big group). The BatchConsumerFunc is called for each group, with the
// context passed in having a timeout of ConsumerOpts.Timeout/2.
// The ConsumerOpts.Concurrent value determines the maximum number of messages
// collected during the first section of the process (before BatchConsumerFn is
// called).
func (s *Subscription) BatchConsume(
ctx context.Context,
fn BatchConsumerFunc, gfn BatchGroupFunc,
opts ConsumerOpts,
) {
opts = opts.withDefaults()
type promise struct {
msg *Message
retCh chan bool // must be buffered by one
var groups [][]promise
var groupsL sync.Mutex
groupProm := func(prom promise) {
defer groupsL.Unlock()
for i := range groups {
if gfn == nil || gfn(groups[i][0].msg, prom.msg) {
groups[i] = append(groups[i], prom)
groups = append(groups, []promise{prom})
wg := new(sync.WaitGroup)
defer wg.Wait()
processGroups := func() {
thisGroups := groups
groups = nil
// we do a waitgroup chain so as to properly handle the cancel
// function. We hold wg (by adding one) until all routines spawned
// here have finished, and once they have release wg and cancel
thisCtx, cancel := context.WithTimeout(ctx, opts.Timeout/2)
thisWG := new(sync.WaitGroup)
go func() {
for i := range thisGroups {
thisGroup := thisGroups[i]
go func() {
defer thisWG.Done()
msgs := make([]*Message, len(thisGroup))
for i := range thisGroup {
msgs[i] = thisGroup[i].msg
ret, err := fn(thisCtx, msgs)
if err != nil {
mlog.Warn(, "error consuming pubsub batch messages", s, merr.KV(err))
for i := range thisGroup {
thisGroup[i].retCh <- ret // retCh is buffered
go func() {
defer wg.Done()
tick := time.NewTicker(opts.Timeout / 2)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-s.batchTestTrigger:
case <-ctx.Done():
s.Consume(ctx, func(ctx context.Context, msg *Message) (bool, error) {
retCh := make(chan bool, 1)
groupProm(promise{msg: msg, retCh: retCh})
select {
case ret := <-retCh:
return ret, nil
case <-ctx.Done():
return false, errors.New("reading from batch grouping process timed out")
}, opts)