
374 lines
10 KiB
Raw Normal View History

2018-07-19 19:38:56 +00:00
// Package mpubsub implements connecting to Google's PubSub service and
// simplifying a number of interactions with it.
package mpubsub
2018-02-15 22:47:18 +00:00
import (
2019-06-23 18:56:03 +00:00
2018-02-15 22:47:18 +00:00
2018-02-15 22:47:18 +00:00
// TODO Consume (and probably BatchConsume) don't properly handle the Client
// being closed.
func isErrAlreadyExists(err error) bool {
2018-02-15 22:47:18 +00:00
if err == nil {
return false
s, ok := status.FromError(err)
return ok && s.Code() == codes.AlreadyExists
// Message aliases the type in the official driver
type Message = pubsub.Message
2018-07-18 23:50:40 +00:00
// PubSub is a wrapper around a pubsub client providing more functionality.
2018-02-15 22:47:18 +00:00
type PubSub struct {
gce *mdb.GCE
2019-06-23 18:56:03 +00:00
cmp *mcmp.Component
2018-02-15 22:47:18 +00:00
2019-06-23 18:56:03 +00:00
type pubsubOpts struct {
gce *mdb.GCE
// PubSubOpt is a value which adjusts the behavior of InstPubSub.
type PubSubOpt func(*pubsubOpts)
// PubSubGCE indicates that InstPubSub should use the given GCE instance rather
// than instantiate its own.
func PubSubGCE(gce *mdb.GCE) PubSubOpt {
return func(opts *pubsubOpts) {
opts.gce = gce
2019-06-23 18:56:03 +00:00
2019-06-23 18:56:03 +00:00
// InstPubSub instantiates a PubSub which will be initialized when the Init
// event is triggered on the given Component. The PubSub instance will have
// Close called on it when the Shutdown event is triggered on the given
// Component.
func InstPubSub(cmp *mcmp.Component, options ...PubSubOpt) *PubSub {
var opts pubsubOpts
for _, opt := range options {
2019-06-23 18:56:03 +00:00
ps := PubSub{
gce: opts.gce,
cmp: cmp.Child("pubsub"),
if ps.gce == nil {
ps.gce = mdb.InstGCE(ps.cmp)
mrun.InitHook(ps.cmp, func(ctx context.Context) error {
mlog.From(ps.cmp).Info("connecting to pubsub", ctx)
var err error
2019-06-23 18:56:03 +00:00
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...)
return merr.Wrap(err, ps.cmp.Context(), ctx)
2019-06-23 18:56:03 +00:00
mrun.ShutdownHook(ps.cmp, func(ctx context.Context) error {
mlog.From(ps.cmp).Info("closing pubsub", ctx)
return ps.Client.Close()
2018-02-15 22:47:18 +00:00
2019-06-23 18:56:03 +00:00
return &ps
2018-02-15 22:47:18 +00:00
// Topic provides methods around a particular topic in PubSub
type Topic struct {
2019-06-23 18:56:03 +00:00
Name string
ctx context.Context
2018-02-15 22:47:18 +00:00
topic *pubsub.Topic
// Topic returns, after potentially creating, a topic of the given name
func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) {
t := &Topic{
2019-06-23 18:56:03 +00:00
PubSub: ps,
ctx: mctx.Annotate(ps.cmp.Context(), "topicName", name),
Name: name,
2018-07-18 23:50:40 +00:00
2018-02-15 22:47:18 +00:00
var err error
if create {
t.topic, err = ps.Client.CreateTopic(ctx, name)
if isErrAlreadyExists(err) {
t.topic = ps.Client.Topic(name)
2018-02-15 22:47:18 +00:00
} else if err != nil {
return nil, merr.Wrap(err, t.ctx, ctx)
2018-02-15 22:47:18 +00:00
} else {
t.topic = ps.Client.Topic(name)
if exists, err := t.topic.Exists(t.ctx); err != nil {
return nil, merr.Wrap(err, t.ctx, ctx)
2018-02-15 22:47:18 +00:00
} else if !exists {
return nil, merr.New("topic dne", t.ctx, ctx)
2018-02-15 22:47:18 +00:00
return t, nil
2018-02-15 22:47:18 +00:00
// Publish publishes a message with the given data as its body to the Topic
func (t *Topic) Publish(ctx context.Context, data []byte) error {
_, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx)
if err != nil {
return merr.Wrap(err, t.ctx, ctx)
2018-02-15 22:47:18 +00:00
return nil
// Subscription provides methods around a subscription to a topic in PubSub
type Subscription struct {
2019-06-23 18:56:03 +00:00
Name string
ctx context.Context
sub *pubsub.Subscription
2018-02-15 22:47:18 +00:00
// only used in tests to trigger batch processing
batchTestTrigger chan bool
// Subscription returns a Subscription instance, after potentially creating it,
// for the Topic
func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) {
2019-06-23 18:56:03 +00:00
name = t.Name + "_" + name
s := &Subscription{
2019-06-23 18:56:03 +00:00
Topic: t,
Name: name,
ctx: mctx.Annotate(t.ctx, "subName", name),
2018-07-18 23:50:40 +00:00
2018-02-15 22:47:18 +00:00
var err error
if create {
2019-06-23 18:56:03 +00:00
s.sub, err = s.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
2018-02-15 22:47:18 +00:00
Topic: t.topic,
if isErrAlreadyExists(err) {
2019-06-23 18:56:03 +00:00
s.sub = s.PubSub.Subscription(s.Name)
2018-02-15 22:47:18 +00:00
} else if err != nil {
return nil, merr.Wrap(err, s.ctx, ctx)
2018-02-15 22:47:18 +00:00
} else {
2019-06-23 18:56:03 +00:00
s.sub = s.PubSub.Subscription(s.Name)
if exists, err := s.sub.Exists(ctx); err != nil {
return nil, merr.Wrap(err, s.ctx, ctx)
2018-02-15 22:47:18 +00:00
} else if !exists {
return nil, merr.New("sub dne", s.ctx, ctx)
2018-02-15 22:47:18 +00:00
return s, nil
2018-02-15 22:47:18 +00:00
// ConsumerFunc is a function which messages being consumed will be passed. The
// returned boolean and returned error are independent. If the bool is false the
// message will be returned to the queue for retrying later. If an error is
// returned it will be logged.
// The Context will be canceled once the deadline has been reached (as set when
// Consume is called).
type ConsumerFunc func(context.Context, *Message) (bool, error)
// ConsumerOpts are options which effect the behavior of a Consume method call
type ConsumerOpts struct {
// Default 30s. The timeout each message has to complete before its context
// is cancelled and the server re-publishes it
Timeout time.Duration
// Default 1. Number of concurrent messages to consume at a time
Concurrent int
// TODO DisableBatchAutoTrigger
// Currently there is no auto-trigger behavior, batches only get processed
// on a dumb ticker. This is necessary for the way I plan to have the
// datastore writing, but it's not the expected behavior of a batch getting
// triggered everytime <Concurrent> messages come in.
func (co ConsumerOpts) withDefaults() ConsumerOpts {
if co.Timeout == 0 {
co.Timeout = 30 * time.Second
if co.Concurrent == 0 {
co.Concurrent = 1
return co
// Consume uses the given ConsumerFunc and ConsumerOpts to process messages off
// the Subscription
func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts ConsumerOpts) {
opts = opts.withDefaults()
s.sub.ReceiveSettings.MaxExtension = opts.Timeout
s.sub.ReceiveSettings.MaxOutstandingMessages = opts.Concurrent
for {
2019-06-23 18:56:03 +00:00
err := s.sub.Receive(ctx, func(ctx context.Context, msg *Message) {
innerCtx, cancel := context.WithTimeout(ctx, opts.Timeout)
2018-02-15 22:47:18 +00:00
defer cancel()
ok, err := fn(innerCtx, msg)
2018-02-15 22:47:18 +00:00
if err != nil {
2019-06-23 18:56:03 +00:00
mlog.From(s.cmp).Warn("error consuming pubsub message",
s.ctx, ctx, innerCtx, merr.Context(err))
2018-02-15 22:47:18 +00:00
if ok {
} else {
2019-06-23 18:56:03 +00:00
if ctx.Err() == context.Canceled || err == nil {
2018-02-15 22:47:18 +00:00
} else if err != nil {
2019-06-23 18:56:03 +00:00
mlog.From(s.cmp).Warn("error consuming from pubsub",
s.ctx, ctx, merr.Context(err))
time.Sleep(1 * time.Second)
2018-02-15 22:47:18 +00:00
// BatchConsumerFunc is similar to ConsumerFunc, except it takes in a batch of
// multiple messages at once. If the boolean returned will apply to every
// message in the batch.
type BatchConsumerFunc func(context.Context, []*Message) (bool, error)
// BatchGroupFunc is an optional param to BatchConsume which allows for grouping
// messages into separate groups. Each message received is attempted to be
// placed in a group. Grouping is done by calling this function with the
// received message and a random message from a group, and if this function
// returns true then the received message is placed into that group. If this
// returns false for all groups then a new group is created.
// This function should be a pure function.
type BatchGroupFunc func(a, b *Message) bool
// BatchConsume is like Consume, except it groups incoming messages together,
// allowing them to be processed in batches instead of individually.
// BatchConsume first collects messages internally for half the
// ConsumerOpts.Timeout value. Once that time has passed it will group all
// messages based on the BatchGroupFunc (if nil then all collected messages form
// one big group). The BatchConsumerFunc is called for each group, with the
// context passed in having a timeout of ConsumerOpts.Timeout/2.
// The ConsumerOpts.Concurrent value determines the maximum number of messages
// collected during the first section of the process (before BatchConsumerFn is
// called).
func (s *Subscription) BatchConsume(
ctx context.Context,
fn BatchConsumerFunc, gfn BatchGroupFunc,
opts ConsumerOpts,
) {
opts = opts.withDefaults()
type promise struct {
msg *Message
retCh chan bool // must be buffered by one
var groups [][]promise
var groupsL sync.Mutex
groupProm := func(prom promise) {
defer groupsL.Unlock()
for i := range groups {
if gfn == nil || gfn(groups[i][0].msg, prom.msg) {
groups[i] = append(groups[i], prom)
groups = append(groups, []promise{prom})
wg := new(sync.WaitGroup)
defer wg.Wait()
processGroups := func() {
thisGroups := groups
groups = nil
// we do a waitgroup chain so as to properly handle the cancel
// function. We hold wg (by adding one) until all routines spawned
// here have finished, and once they have release wg and cancel
thisCtx, cancel := context.WithTimeout(ctx, opts.Timeout/2)
thisWG := new(sync.WaitGroup)
go func() {
for i := range thisGroups {
thisGroup := thisGroups[i]
go func() {
defer thisWG.Done()
msgs := make([]*Message, len(thisGroup))
for i := range thisGroup {
msgs[i] = thisGroup[i].msg
ret, err := fn(thisCtx, msgs)
if err != nil {
2019-06-23 18:56:03 +00:00
mlog.From(s.cmp).Warn("error consuming pubsub batch messages",
s.ctx, thisCtx, merr.Context(err))
2018-02-15 22:47:18 +00:00
for i := range thisGroup {
thisGroup[i].retCh <- ret // retCh is buffered
go func() {
defer wg.Done()
tick := time.NewTicker(opts.Timeout / 2)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-s.batchTestTrigger:
case <-ctx.Done():
s.Consume(ctx, func(ctx context.Context, msg *Message) (bool, error) {
retCh := make(chan bool, 1)
groupProm(promise{msg: msg, retCh: retCh})
select {
case ret := <-retCh:
return ret, nil
case <-ctx.Done():
return false, merr.New("reading from batch grouping process timed out", s.ctx, ctx)
2018-02-15 22:47:18 +00:00
}, opts)