mediocre-go-lib/mdb/mpubsub/pubsub_test.go
Brian Picciano 4b446a0efc mctx: refactor so that contexts no longer carry mutable data
This change required refactoring nearly every package in this project,
but it does a lot to simplify mctx and make other code using it easier
to think about.

Other code, such as mlog and mcfg, had to be slightly modified for this
change to work as well.
2019-02-07 19:42:12 -05:00

174 lines
4.1 KiB
Go

package mpubsub
import (
"context"
. "testing"
"time"
"github.com/mediocregopher/mediocre-go-lib/mrand"
"github.com/mediocregopher/mediocre-go-lib/mtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// this requires the pubsub emulator to be running
func TestPubSub(t *T) {
ctx := mtest.NewCtx()
ctx = mtest.SetEnv(ctx, "GCE_PROJECT", "test")
ctx, ps := MNew(ctx, nil)
mtest.Run(ctx, t, func() {
topicName := "testTopic_" + mrand.Hex(8)
ctx := context.Background()
// Topic shouldn't exist yet
_, err := ps.Topic(ctx, topicName, false)
require.Error(t, err)
// ...so create it
topic, err := ps.Topic(ctx, topicName, true)
require.NoError(t, err)
// Create a subscription and consumer
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
msgCh := make(chan *Message)
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
msgCh <- m
return true, nil
}, ConsumerOpts{})
time.Sleep(1 * time.Second) // give consumer time to actually start
// publish a message and make sure it gets consumed
assert.NoError(t, topic.Publish(ctx, []byte("foo")))
msg := <-msgCh
assert.Equal(t, []byte("foo"), msg.Data)
})
}
func TestBatchPubSub(t *T) {
ctx := mtest.NewCtx()
ctx = mtest.SetEnv(ctx, "GCE_PROJECT", "test")
ctx, ps := MNew(ctx, nil)
mtest.Run(ctx, t, func() {
topicName := "testBatchTopic_" + mrand.Hex(8)
topic, err := ps.Topic(ctx, topicName, true)
require.NoError(t, err)
readBatch := func(ch chan []*Message) map[byte]int {
select {
case <-time.After(1 * time.Second):
assert.Fail(t, "waited too long to read batch")
return nil
case mm := <-ch:
ret := map[byte]int{}
for _, m := range mm {
ret[m.Data[0]]++
}
return ret
}
}
// we use the same sub across the next two sections to ensure that cleanup
// also works
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
sub.batchTestTrigger = make(chan bool)
{ // no grouping
// Create a subscription and consumer
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
nil,
ConsumerOpts{Concurrent: 5},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
}
prev = i
}
}
assertGotGrouped(gotA)
assertGotGrouped(gotB)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
})
}