mdb/mpubsub: refactor to use new mctx/mlog/mcfg stuff

This commit is contained in:
Brian Picciano 2019-02-03 16:05:46 -05:00
parent 8ddf1348db
commit 5dc8d92987
2 changed files with 216 additions and 197 deletions

View File

@ -9,15 +9,20 @@ import (
"time" "time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"github.com/mediocregopher/mediocre-go-lib/m" "github.com/mediocregopher/mediocre-go-lib/mctx"
"github.com/mediocregopher/mediocre-go-lib/mcfg"
"github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/mdb"
"github.com/mediocregopher/mediocre-go-lib/merr"
"github.com/mediocregopher/mediocre-go-lib/mlog" "github.com/mediocregopher/mediocre-go-lib/mlog"
"github.com/mediocregopher/mediocre-go-lib/mrun"
oldctx "golang.org/x/net/context" oldctx "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// TODO this package still uses context.Context in the callback functions
// TODO Consume (and probably BatchConsume) don't properly handle the Client
// being closed.
func isErrAlreadyExists(err error) bool { func isErrAlreadyExists(err error) bool {
if err == nil { if err == nil {
return false return false
@ -37,24 +42,39 @@ type PubSub struct {
log *mlog.Logger log *mlog.Logger
} }
// Cfg configures and returns a PubSub instance which will be usable once // MNew returns a PubSub instance which will be initialized and configured when
// StartRun is called on the passed in Cfg instance. // the start event is triggered on ctx (see mrun.Start). The PubSub instance
func Cfg(cfg *mcfg.Cfg) *PubSub { // will have Close called on it when the stop event is triggered on ctx (see
cfg = cfg.Child("pubsub") // mrun.Stop).
var ps PubSub //
ps.gce = mdb.CfgGCE(cfg) // gce is optional and can be passed in if there's an existing gce object which
ps.log = m.Log(cfg, &ps) // should be used, otherwise a new one will be created with mdb.MGCE.
cfg.Start.Then(func(ctx context.Context) error { func MNew(ctx mctx.Context, gce *mdb.GCE) *PubSub {
if gce == nil {
gce = mdb.MGCE(ctx, "")
}
ctx = mctx.ChildOf(ctx, "pubsub")
ps := &PubSub{
gce: gce,
log: mlog.From(ctx),
}
ps.log.SetKV(ps)
mrun.OnStart(ctx, func(innerCtx mctx.Context) error {
ps.log.Info("connecting to pubsub") ps.log.Info("connecting to pubsub")
var err error var err error
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...) ps.Client, err = pubsub.NewClient(innerCtx, ps.gce.Project, ps.gce.ClientOptions()...)
return mlog.ErrWithKV(err, &ps) return merr.WithKV(err, ps.KV())
}) })
return &ps mrun.OnStop(ctx, func(mctx.Context) error {
return ps.Client.Close()
})
return ps
} }
// KV implements the mlog.KVer interface // KV implements the mlog.KVer interface
func (ps *PubSub) KV() mlog.KV { func (ps *PubSub) KV() map[string]interface{} {
return ps.gce.KV() return ps.gce.KV()
} }
@ -67,44 +87,43 @@ type Topic struct {
// Topic returns, after potentially creating, a topic of the given name // Topic returns, after potentially creating, a topic of the given name
func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) { func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) {
kv := mlog.KVerFunc(func() mlog.KV { t := &Topic{
return ps.KV().Set("topicName", name) ps: ps,
}) name: name,
}
var t *pubsub.Topic
var err error var err error
if create { if create {
t, err = ps.Client.CreateTopic(ctx, name) t.topic, err = ps.Client.CreateTopic(ctx, name)
if isErrAlreadyExists(err) { if isErrAlreadyExists(err) {
t = ps.Client.Topic(name) t.topic = ps.Client.Topic(name)
} else if err != nil { } else if err != nil {
return nil, mlog.ErrWithKV(err, kv) return nil, merr.WithKV(err, t.KV())
} }
} else { } else {
t = ps.Client.Topic(name) t.topic = ps.Client.Topic(name)
if exists, err := t.Exists(ctx); err != nil { if exists, err := t.topic.Exists(ctx); err != nil {
return nil, mlog.ErrWithKV(err, kv) return nil, merr.WithKV(err, t.KV())
} else if !exists { } else if !exists {
return nil, mlog.ErrWithKV(errors.New("topic dne"), kv) err := merr.New("topic dne")
return nil, merr.WithKV(err, t.KV())
} }
} }
return &Topic{ return t, nil
ps: ps,
topic: t,
name: name,
}, nil
} }
// KV implements the mlog.KVer interface // KV implements the mlog.KVer interface
func (t *Topic) KV() mlog.KV { func (t *Topic) KV() map[string]interface{} {
return t.ps.KV().Set("topicName", t.name) kv := t.ps.KV()
kv["topicName"] = t.name
return kv
} }
// Publish publishes a message with the given data as its body to the Topic // Publish publishes a message with the given data as its body to the Topic
func (t *Topic) Publish(ctx context.Context, data []byte) error { func (t *Topic) Publish(ctx context.Context, data []byte) error {
_, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx) _, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx)
if err != nil { if err != nil {
return mlog.ErrWithKV(err, t) return merr.WithKV(err, t.KV())
} }
return nil return nil
} }
@ -123,39 +142,38 @@ type Subscription struct {
// for the Topic // for the Topic
func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) { func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) {
name = t.name + "_" + name name = t.name + "_" + name
kv := mlog.KVerFunc(func() mlog.KV { s := &Subscription{
return t.KV().Set("subName", name) topic: t,
}) name: name,
}
var s *pubsub.Subscription
var err error var err error
if create { if create {
s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ s.sub, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
Topic: t.topic, Topic: t.topic,
}) })
if isErrAlreadyExists(err) { if isErrAlreadyExists(err) {
s = t.ps.Subscription(name) s.sub = t.ps.Subscription(name)
} else if err != nil { } else if err != nil {
return nil, mlog.ErrWithKV(err, kv) return nil, merr.WithKV(err, s.KV())
} }
} else { } else {
s = t.ps.Subscription(name) s.sub = t.ps.Subscription(name)
if exists, err := s.Exists(ctx); err != nil { if exists, err := s.sub.Exists(ctx); err != nil {
return nil, mlog.ErrWithKV(err, kv) return nil, merr.WithKV(err, s.KV())
} else if !exists { } else if !exists {
return nil, mlog.ErrWithKV(errors.New("sub dne"), kv) err := merr.New("sub dne")
return nil, merr.WithKV(err, s.KV())
} }
} }
return &Subscription{ return s, nil
topic: t,
sub: s,
name: name,
}, nil
} }
// KV implements the mlog.KVer interface // KV implements the mlog.KVer interface
func (s *Subscription) KV() mlog.KV { func (s *Subscription) KV() map[string]interface{} {
return s.topic.KV().Set("subName", s.name) kv := s.topic.KV()
kv["subName"] = s.name
return kv
} }
// ConsumerFunc is a function which messages being consumed will be passed. The // ConsumerFunc is a function which messages being consumed will be passed. The
@ -208,7 +226,7 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum
ok, err := fn(context.Context(innerCtx), msg) ok, err := fn(context.Context(innerCtx), msg)
if err != nil { if err != nil {
s.topic.ps.log.Warn("error consuming pubsub message", s, mlog.ErrKV(err)) s.topic.ps.log.Warn("error consuming pubsub message", s, merr.KV(err))
} }
if ok { if ok {
@ -220,7 +238,8 @@ func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts Consum
if octx.Err() == context.Canceled || err == nil { if octx.Err() == context.Canceled || err == nil {
return return
} else if err != nil { } else if err != nil {
s.topic.ps.log.Warn("error consuming from pubsub", s, mlog.ErrKV(err)) s.topic.ps.log.Warn("error consuming from pubsub", s, merr.KV(err))
time.Sleep(1 * time.Second)
} }
} }
} }
@ -312,7 +331,7 @@ func (s *Subscription) BatchConsume(
} }
ret, err := fn(thisCtx, msgs) ret, err := fn(thisCtx, msgs)
if err != nil { if err != nil {
s.topic.ps.log.Warn("error consuming pubsub batch messages", s, mlog.ErrKV(err)) s.topic.ps.log.Warn("error consuming pubsub batch messages", s, merr.KV(err))
} }
for i := range thisGroup { for i := range thisGroup {
thisGroup[i].retCh <- ret // retCh is buffered thisGroup[i].retCh <- ret // retCh is buffered

View File

@ -5,169 +5,169 @@ import (
. "testing" . "testing"
"time" "time"
"github.com/mediocregopher/mediocre-go-lib/mcfg"
"github.com/mediocregopher/mediocre-go-lib/mdb"
"github.com/mediocregopher/mediocre-go-lib/mrand" "github.com/mediocregopher/mediocre-go-lib/mrand"
"github.com/mediocregopher/mediocre-go-lib/mtest"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var testPS *PubSub
func init() {
mdb.DefaultGCEProject = "test"
cfg := mcfg.New()
testPS = Cfg(cfg)
cfg.StartTestRun()
}
// this requires the pubsub emulator to be running // this requires the pubsub emulator to be running
func TestPubSub(t *T) { func TestPubSub(t *T) {
topicName := "testTopic_" + mrand.Hex(8) ctx := mtest.NewCtx()
ctx := context.Background() mtest.SetEnv(ctx, "GCE_PROJECT", "test")
ps := MNew(ctx, nil)
mtest.Run(ctx, t, func() {
topicName := "testTopic_" + mrand.Hex(8)
ctx := context.Background()
// Topic shouldn't exist yet // Topic shouldn't exist yet
_, err := testPS.Topic(ctx, topicName, false) _, err := ps.Topic(ctx, topicName, false)
require.Error(t, err) require.Error(t, err)
// ...so create it // ...so create it
topic, err := testPS.Topic(ctx, topicName, true) topic, err := ps.Topic(ctx, topicName, true)
require.NoError(t, err) require.NoError(t, err)
// Create a subscription and consumer // Create a subscription and consumer
sub, err := topic.Subscription(ctx, "testSub", true) sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err) require.NoError(t, err)
msgCh := make(chan *Message) msgCh := make(chan *Message)
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) { go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
msgCh <- m msgCh <- m
return true, nil return true, nil
}, ConsumerOpts{}) }, ConsumerOpts{})
time.Sleep(1 * time.Second) // give consumer time to actually start time.Sleep(1 * time.Second) // give consumer time to actually start
// publish a message and make sure it gets consumed // publish a message and make sure it gets consumed
assert.NoError(t, topic.Publish(ctx, []byte("foo"))) assert.NoError(t, topic.Publish(ctx, []byte("foo")))
msg := <-msgCh msg := <-msgCh
assert.Equal(t, []byte("foo"), msg.Data) assert.Equal(t, []byte("foo"), msg.Data)
})
} }
func TestBatchPubSub(t *T) { func TestBatchPubSub(t *T) {
ctx := context.Background() ctx := mtest.NewCtx()
topicName := "testBatchTopic_" + mrand.Hex(8) mtest.SetEnv(ctx, "GCE_PROJECT", "test")
topic, err := testPS.Topic(ctx, topicName, true) ps := MNew(ctx, nil)
require.NoError(t, err) mtest.Run(ctx, t, func() {
readBatch := func(ch chan []*Message) map[byte]int { topicName := "testBatchTopic_" + mrand.Hex(8)
select { topic, err := ps.Topic(ctx, topicName, true)
case <-time.After(1 * time.Second): require.NoError(t, err)
assert.Fail(t, "waited too long to read batch")
return nil
case mm := <-ch:
ret := map[byte]int{}
for _, m := range mm {
ret[m.Data[0]]++
}
return ret
}
}
// we use the same sub across the next two sections to ensure that cleanup readBatch := func(ch chan []*Message) map[byte]int {
// also works select {
sub, err := topic.Subscription(ctx, "testSub", true) case <-time.After(1 * time.Second):
require.NoError(t, err) assert.Fail(t, "waited too long to read batch")
sub.batchTestTrigger = make(chan bool) return nil
case mm := <-ch:
{ // no grouping ret := map[byte]int{}
// Create a subscription and consumer for _, m := range mm {
ctx, cancel := context.WithCancel(ctx) ret[m.Data[0]]++
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
nil,
ConsumerOpts{Concurrent: 5},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
} }
prev = i return ret
} }
} }
assertGotGrouped(gotA) // we use the same sub across the next two sections to ensure that cleanup
assertGotGrouped(gotB) // also works
for i, c := range gotB { sub, err := topic.Subscription(ctx, "testSub", true)
gotA[i] += c require.NoError(t, err)
} sub.batchTestTrigger = make(chan bool)
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling { // no grouping
cancel() // Create a subscription and consumer
<-ch ctx, cancel := context.WithCancel(ctx)
} ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
nil,
ConsumerOpts{Concurrent: 5},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
}
prev = i
}
}
assertGotGrouped(gotA)
assertGotGrouped(gotB)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
})
} }