implement mdb.PubSub
This commit is contained in:
parent
0feba17791
commit
d3259d45f8
5
env.test
Normal file
5
env.test
Normal file
@ -0,0 +1,5 @@
|
||||
if [ "$(ps aux | grep '[p]ubsub-emulator')" = "" ]; then
|
||||
echo "starting pubsub emulator"
|
||||
yes | gcloud beta emulators pubsub start >/dev/null 2>&1 &
|
||||
fi
|
||||
$(gcloud beta emulators pubsub env-init)
|
3
mdb/mdb.go
Normal file
3
mdb/mdb.go
Normal file
@ -0,0 +1,3 @@
|
||||
// Package mdb contains a number of database wrappers for databases I commonly
|
||||
// use
|
||||
package mdb
|
380
mdb/ps.go
Normal file
380
mdb/ps.go
Normal file
@ -0,0 +1,380 @@
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/pubsub"
|
||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||
oldctx "golang.org/x/net/context"
|
||||
"google.golang.org/api/option"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func isErrAlreadyExists(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
s, ok := status.FromError(err)
|
||||
return ok && s.Code() == codes.AlreadyExists
|
||||
}
|
||||
|
||||
// Message aliases the type in the official driver
|
||||
type Message = pubsub.Message
|
||||
|
||||
// PubSub holds onto the info needed to communicate with PubSub and wraps a
|
||||
// *pubsub.Client
|
||||
type PubSub struct {
|
||||
proj, credFile string
|
||||
*pubsub.Client
|
||||
}
|
||||
|
||||
// NewPubSub initializes and returns PubSub instance for the given projectID and
|
||||
// using the credentials file (if given)
|
||||
func NewPubSub(ctx context.Context, projectID, credFile string) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
proj: projectID,
|
||||
credFile: credFile,
|
||||
}
|
||||
var err error
|
||||
ps.Client, err = pubsub.NewClient(ctx, ps.proj, ps.clientOpts()...)
|
||||
return ps, err
|
||||
}
|
||||
|
||||
// CfgPubSub configures and returns a PubSub instance which will be usable once
|
||||
// Run is called on the passed in Cfg instance
|
||||
//
|
||||
// defaultProject is optional and can be given to indcate the default gce
|
||||
// project id for the configuration parameter which is created.
|
||||
func CfgPubSub(cfg *mcfg.Cfg, defaultProject string) *PubSub {
|
||||
cfg = cfg.Child("pubsub")
|
||||
proj := cfg.ParamString("project", defaultProject, "name of the project in gce")
|
||||
credFile := cfg.ParamString("cred-file", "", "path to pubsub credientials json file, if any")
|
||||
|
||||
var ps PubSub
|
||||
cfg.Start.Then(func(ctx context.Context) error {
|
||||
ps.proj = *proj
|
||||
ps.credFile = *credFile
|
||||
mlog.Info("connecting to pubsub", &ps)
|
||||
psInner, err := NewPubSub(ctx, *proj, *credFile)
|
||||
if err != nil {
|
||||
return mlog.ErrWithKV(err, &ps)
|
||||
}
|
||||
ps = *psInner
|
||||
return nil
|
||||
})
|
||||
return &ps
|
||||
}
|
||||
|
||||
func (ps *PubSub) clientOpts() []option.ClientOption {
|
||||
var opts []option.ClientOption
|
||||
//opts := []option.ClientOption{
|
||||
// option.WithGRPCConnectionPool(ps.NumConns),
|
||||
//}
|
||||
if ps.credFile != "" {
|
||||
opts = append(opts, option.WithCredentialsFile(ps.credFile))
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (ps *PubSub) KV() mlog.KV {
|
||||
return mlog.KV{
|
||||
"psProj": ps.proj,
|
||||
}
|
||||
}
|
||||
|
||||
// Topic provides methods around a particular topic in PubSub
|
||||
type Topic struct {
|
||||
ps *PubSub
|
||||
topic *pubsub.Topic
|
||||
name string
|
||||
}
|
||||
|
||||
// Topic returns, after potentially creating, a topic of the given name
|
||||
func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) {
|
||||
var t *pubsub.Topic
|
||||
var err error
|
||||
if create {
|
||||
t, err = ps.Client.CreateTopic(ctx, name)
|
||||
if isErrAlreadyExists(err) {
|
||||
t = ps.Client.Topic(name)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
t = ps.Client.Topic(name)
|
||||
if exists, err := t.Exists(ctx); err != nil {
|
||||
panic(err)
|
||||
return nil, err
|
||||
} else if !exists {
|
||||
return nil, mlog.ErrWithKV(errors.New("topic dne"), ps.KV().Set("topicName", name))
|
||||
}
|
||||
}
|
||||
return &Topic{
|
||||
ps: ps,
|
||||
topic: t,
|
||||
name: name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (t *Topic) KV() mlog.KV {
|
||||
kv := t.ps.KV()
|
||||
kv["topicName"] = t.name
|
||||
return kv
|
||||
}
|
||||
|
||||
// Publish publishes a message with the given data as its body to the Topic
|
||||
func (t *Topic) Publish(ctx context.Context, data []byte) error {
|
||||
_, err := t.topic.Publish(ctx, &Message{Data: data}).Get(ctx)
|
||||
if err != nil {
|
||||
return mlog.ErrWithKV(err, t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscription provides methods around a subscription to a topic in PubSub
|
||||
type Subscription struct {
|
||||
topic *Topic
|
||||
sub *pubsub.Subscription
|
||||
name string
|
||||
|
||||
// only used in tests to trigger batch processing
|
||||
batchTestTrigger chan bool
|
||||
}
|
||||
|
||||
// Subscription returns a Subscription instance, after potentially creating it,
|
||||
// for the Topic
|
||||
func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) {
|
||||
name = t.name + "_" + name
|
||||
var s *pubsub.Subscription
|
||||
var err error
|
||||
if create {
|
||||
s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
|
||||
Topic: t.topic,
|
||||
})
|
||||
if isErrAlreadyExists(err) {
|
||||
s = t.ps.Subscription(name)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
s = t.ps.Subscription(name)
|
||||
if exists, err := s.Exists(ctx); err != nil {
|
||||
return nil, err
|
||||
} else if !exists {
|
||||
return nil, mlog.ErrWithKV(errors.New("sub dne"), t.KV().Set("subName", name))
|
||||
}
|
||||
}
|
||||
return &Subscription{
|
||||
topic: t,
|
||||
sub: s,
|
||||
name: name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (s *Subscription) KV() mlog.KV {
|
||||
kv := s.topic.KV()
|
||||
kv["subName"] = s.name
|
||||
return kv
|
||||
}
|
||||
|
||||
// ConsumerFunc is a function which messages being consumed will be passed. The
|
||||
// returned boolean and returned error are independent. If the bool is false the
|
||||
// message will be returned to the queue for retrying later. If an error is
|
||||
// returned it will be logged.
|
||||
//
|
||||
// The Context will be canceled once the deadline has been reached (as set when
|
||||
// Consume is called).
|
||||
type ConsumerFunc func(context.Context, *Message) (bool, error)
|
||||
|
||||
// ConsumerOpts are options which effect the behavior of a Consume method call
|
||||
type ConsumerOpts struct {
|
||||
// Default 30s. The timeout each message has to complete before its context
|
||||
// is cancelled and the server re-publishes it
|
||||
Timeout time.Duration
|
||||
|
||||
// Default 1. Number of concurrent messages to consume at a time
|
||||
Concurrent int
|
||||
|
||||
// TODO DisableBatchAutoTrigger
|
||||
// Currently there is no auto-trigger behavior, batches only get processed
|
||||
// on a dumb ticker. This is necessary for the way I plan to have the
|
||||
// datastore writing, but it's not the expected behavior of a batch getting
|
||||
// triggered everytime <Concurrent> messages come in.
|
||||
}
|
||||
|
||||
func (co ConsumerOpts) withDefaults() ConsumerOpts {
|
||||
if co.Timeout == 0 {
|
||||
co.Timeout = 30 * time.Second
|
||||
}
|
||||
if co.Concurrent == 0 {
|
||||
co.Concurrent = 1
|
||||
}
|
||||
return co
|
||||
}
|
||||
|
||||
// Consume uses the given ConsumerFunc and ConsumerOpts to process messages off
|
||||
// the Subscription
|
||||
func (s *Subscription) Consume(ctx context.Context, fn ConsumerFunc, opts ConsumerOpts) {
|
||||
opts = opts.withDefaults()
|
||||
s.sub.ReceiveSettings.MaxExtension = opts.Timeout
|
||||
s.sub.ReceiveSettings.MaxOutstandingMessages = opts.Concurrent
|
||||
|
||||
octx := oldctx.Context(ctx)
|
||||
for {
|
||||
err := s.sub.Receive(octx, func(octx oldctx.Context, msg *Message) {
|
||||
innerCtx, cancel := oldctx.WithTimeout(octx, opts.Timeout)
|
||||
defer cancel()
|
||||
|
||||
ok, err := fn(context.Context(innerCtx), msg)
|
||||
if err != nil {
|
||||
mlog.Warn("error consuming pubsub message", s, mlog.ErrKV(err))
|
||||
}
|
||||
|
||||
if ok {
|
||||
msg.Ack()
|
||||
} else {
|
||||
msg.Nack()
|
||||
}
|
||||
})
|
||||
if octx.Err() == context.Canceled || err == nil {
|
||||
return
|
||||
} else if err != nil {
|
||||
mlog.Warn("error consuming from pubsub", s, mlog.ErrKV(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BatchConsumerFunc is similar to ConsumerFunc, except it takes in a batch of
|
||||
// multiple messages at once. If the boolean returned will apply to every
|
||||
// message in the batch.
|
||||
type BatchConsumerFunc func(context.Context, []*Message) (bool, error)
|
||||
|
||||
// BatchGroupFunc is an optional param to BatchConsume which allows for grouping
|
||||
// messages into separate groups. Each message received is attempted to be
|
||||
// placed in a group. Grouping is done by calling this function with the
|
||||
// received message and a random message from a group, and if this function
|
||||
// returns true then the received message is placed into that group. If this
|
||||
// returns false for all groups then a new group is created.
|
||||
//
|
||||
// This function should be a pure function.
|
||||
type BatchGroupFunc func(a, b *Message) bool
|
||||
|
||||
// BatchConsume is like Consume, except it groups incoming messages together,
|
||||
// allowing them to be processed in batches instead of individually.
|
||||
//
|
||||
// BatchConsume first collects messages internally for half the
|
||||
// ConsumerOpts.Timeout value. Once that time has passed it will group all
|
||||
// messages based on the BatchGroupFunc (if nil then all collected messages form
|
||||
// one big group). The BatchConsumerFunc is called for each group, with the
|
||||
// context passed in having a timeout of ConsumerOpts.Timeout/2.
|
||||
//
|
||||
// The ConsumerOpts.Concurrent value determines the maximum number of messages
|
||||
// collected during the first section of the process (before BatchConsumerFn is
|
||||
// called).
|
||||
func (s *Subscription) BatchConsume(
|
||||
ctx context.Context,
|
||||
fn BatchConsumerFunc, gfn BatchGroupFunc,
|
||||
opts ConsumerOpts,
|
||||
) {
|
||||
opts = opts.withDefaults()
|
||||
|
||||
type promise struct {
|
||||
msg *Message
|
||||
retCh chan bool // must be buffered by one
|
||||
}
|
||||
|
||||
var groups [][]promise
|
||||
var groupsL sync.Mutex
|
||||
|
||||
groupProm := func(prom promise) {
|
||||
groupsL.Lock()
|
||||
defer groupsL.Unlock()
|
||||
for i := range groups {
|
||||
if gfn == nil || gfn(groups[i][0].msg, prom.msg) {
|
||||
groups[i] = append(groups[i], prom)
|
||||
return
|
||||
}
|
||||
}
|
||||
groups = append(groups, []promise{prom})
|
||||
}
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
defer wg.Wait()
|
||||
|
||||
processGroups := func() {
|
||||
groupsL.Lock()
|
||||
thisGroups := groups
|
||||
groups = nil
|
||||
groupsL.Unlock()
|
||||
|
||||
// we do a waitgroup chain so as to properly handle the cancel
|
||||
// function. We hold wg (by adding one) until all routines spawned
|
||||
// here have finished, and once they have release wg and cancel
|
||||
thisCtx, cancel := context.WithTimeout(ctx, opts.Timeout/2)
|
||||
thisWG := new(sync.WaitGroup)
|
||||
thisWG.Add(1)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
thisWG.Wait()
|
||||
cancel()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for i := range thisGroups {
|
||||
thisGroup := thisGroups[i]
|
||||
thisWG.Add(1)
|
||||
go func() {
|
||||
defer thisWG.Done()
|
||||
msgs := make([]*Message, len(thisGroup))
|
||||
for i := range thisGroup {
|
||||
msgs[i] = thisGroup[i].msg
|
||||
}
|
||||
ret, err := fn(thisCtx, msgs)
|
||||
if err != nil {
|
||||
mlog.Warn("error consuming pubsub batch messages", s, mlog.ErrKV(err))
|
||||
}
|
||||
for i := range thisGroup {
|
||||
thisGroup[i].retCh <- ret // retCh is buffered
|
||||
}
|
||||
}()
|
||||
}
|
||||
thisWG.Done()
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
tick := time.NewTicker(opts.Timeout / 2)
|
||||
defer tick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
processGroups()
|
||||
case <-s.batchTestTrigger:
|
||||
processGroups()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.Consume(ctx, func(ctx context.Context, msg *Message) (bool, error) {
|
||||
retCh := make(chan bool, 1)
|
||||
groupProm(promise{msg: msg, retCh: retCh})
|
||||
select {
|
||||
case ret := <-retCh:
|
||||
return ret, nil
|
||||
case <-ctx.Done():
|
||||
return false, errors.New("reading from batch grouping process timed out")
|
||||
}
|
||||
}, opts)
|
||||
|
||||
}
|
171
mdb/ps_test.go
Normal file
171
mdb/ps_test.go
Normal file
@ -0,0 +1,171 @@
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
. "testing"
|
||||
"time"
|
||||
|
||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||
"github.com/mediocregopher/mediocre-go-lib/mtest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var testPS *PubSub
|
||||
|
||||
func init() {
|
||||
cfg := mcfg.New()
|
||||
testPS = CfgPubSub(cfg, "test")
|
||||
cfg.TestRun()
|
||||
}
|
||||
|
||||
// this requires the pubsub emulator to be running
|
||||
func TestPubSub(t *T) {
|
||||
topicName := "testTopic_" + mtest.RandHex(8)
|
||||
ctx := context.Background()
|
||||
|
||||
// Topic shouldn't exist yet
|
||||
_, err := testPS.Topic(ctx, topicName, false)
|
||||
require.Error(t, err)
|
||||
|
||||
// ...so create it
|
||||
topic, err := testPS.Topic(ctx, topicName, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a subscription and consumer
|
||||
sub, err := topic.Subscription(ctx, "testSub", true)
|
||||
require.NoError(t, err)
|
||||
|
||||
msgCh := make(chan *Message)
|
||||
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
|
||||
msgCh <- m
|
||||
return true, nil
|
||||
}, ConsumerOpts{})
|
||||
time.Sleep(1 * time.Second) // give consumer time to actually start
|
||||
|
||||
// publish a message and make sure it gets consumed
|
||||
assert.NoError(t, topic.Publish(ctx, []byte("foo")))
|
||||
msg := <-msgCh
|
||||
assert.Equal(t, []byte("foo"), msg.Data)
|
||||
}
|
||||
|
||||
func TestBatchPubSub(t *T) {
|
||||
ctx := context.Background()
|
||||
topicName := "testBatchTopic_" + mtest.RandHex(8)
|
||||
topic, err := testPS.Topic(ctx, topicName, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
readBatch := func(ch chan []*Message) map[byte]int {
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
assert.Fail(t, "waited too long to read batch")
|
||||
return nil
|
||||
case mm := <-ch:
|
||||
ret := map[byte]int{}
|
||||
for _, m := range mm {
|
||||
ret[m.Data[0]]++
|
||||
}
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
// we use the same sub across the next two sections to ensure that cleanup
|
||||
// also works
|
||||
sub, err := topic.Subscription(ctx, "testSub", true)
|
||||
require.NoError(t, err)
|
||||
sub.batchTestTrigger = make(chan bool)
|
||||
|
||||
{ // no grouping
|
||||
// Create a subscription and consumer
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
ch := make(chan []*Message)
|
||||
go func() {
|
||||
sub.BatchConsume(ctx,
|
||||
func(ctx context.Context, mm []*Message) (bool, error) {
|
||||
ch <- mm
|
||||
return true, nil
|
||||
},
|
||||
nil,
|
||||
ConsumerOpts{Concurrent: 5},
|
||||
)
|
||||
close(ch)
|
||||
}()
|
||||
time.Sleep(1 * time.Second) // give consumer time to actually start
|
||||
|
||||
exp := map[byte]int{}
|
||||
for i := byte(0); i <= 9; i++ {
|
||||
require.NoError(t, topic.Publish(ctx, []byte{i}))
|
||||
exp[i] = 1
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
sub.batchTestTrigger <- true
|
||||
gotA := readBatch(ch)
|
||||
assert.Len(t, gotA, 5)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
sub.batchTestTrigger <- true
|
||||
gotB := readBatch(ch)
|
||||
assert.Len(t, gotB, 5)
|
||||
|
||||
for i, c := range gotB {
|
||||
gotA[i] += c
|
||||
}
|
||||
assert.Equal(t, exp, gotA)
|
||||
|
||||
time.Sleep(1 * time.Second) // give time to ack before cancelling
|
||||
cancel()
|
||||
<-ch
|
||||
}
|
||||
|
||||
{ // with grouping
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
ch := make(chan []*Message)
|
||||
go func() {
|
||||
sub.BatchConsume(ctx,
|
||||
func(ctx context.Context, mm []*Message) (bool, error) {
|
||||
ch <- mm
|
||||
return true, nil
|
||||
},
|
||||
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
|
||||
ConsumerOpts{Concurrent: 10},
|
||||
)
|
||||
close(ch)
|
||||
}()
|
||||
time.Sleep(1 * time.Second) // give consumer time to actually start
|
||||
|
||||
exp := map[byte]int{}
|
||||
for i := byte(0); i <= 9; i++ {
|
||||
require.NoError(t, topic.Publish(ctx, []byte{i}))
|
||||
exp[i] = 1
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
sub.batchTestTrigger <- true
|
||||
gotA := readBatch(ch)
|
||||
assert.Len(t, gotA, 5)
|
||||
gotB := readBatch(ch)
|
||||
assert.Len(t, gotB, 5)
|
||||
|
||||
assertGotGrouped := func(got map[byte]int) {
|
||||
prev := byte(255)
|
||||
for i := range got {
|
||||
if prev != 255 {
|
||||
assert.Equal(t, prev%2, i%2)
|
||||
}
|
||||
prev = i
|
||||
}
|
||||
}
|
||||
|
||||
assertGotGrouped(gotA)
|
||||
assertGotGrouped(gotB)
|
||||
for i, c := range gotB {
|
||||
gotA[i] += c
|
||||
}
|
||||
assert.Equal(t, exp, gotA)
|
||||
|
||||
time.Sleep(1 * time.Second) // give time to ack before cancelling
|
||||
cancel()
|
||||
<-ch
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user