mediocre-go-lib/mdb/mpubsub/pubsub_test.go

175 lines
4.1 KiB
Go
Raw Normal View History

package mpubsub
2018-02-15 22:47:18 +00:00
import (
"context"
. "testing"
"time"
"github.com/mediocregopher/mediocre-go-lib/mrand"
"github.com/mediocregopher/mediocre-go-lib/mtest"
2018-02-15 22:47:18 +00:00
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// this requires the pubsub emulator to be running
func TestPubSub(t *T) {
2019-06-23 18:56:03 +00:00
cmp := mtest.Component()
mtest.Env(cmp, "PUBSUB_GCE_PROJECT", "test")
ps := InstPubSub(cmp)
mtest.Run(cmp, t, func() {
topicName := "testTopic_" + mrand.Hex(8)
ctx := context.Background()
2018-02-15 22:47:18 +00:00
// Topic shouldn't exist yet
_, err := ps.Topic(ctx, topicName, false)
require.Error(t, err)
2018-02-15 22:47:18 +00:00
// ...so create it
topic, err := ps.Topic(ctx, topicName, true)
require.NoError(t, err)
2018-02-15 22:47:18 +00:00
// Create a subscription and consumer
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
msgCh := make(chan *Message)
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
msgCh <- m
return true, nil
}, ConsumerOpts{})
2018-02-15 22:47:18 +00:00
time.Sleep(1 * time.Second) // give consumer time to actually start
// publish a message and make sure it gets consumed
assert.NoError(t, topic.Publish(ctx, []byte("foo")))
msg := <-msgCh
assert.Equal(t, []byte("foo"), msg.Data)
})
}
func TestBatchPubSub(t *T) {
2019-06-23 18:56:03 +00:00
cmp := mtest.Component()
mtest.Env(cmp, "PUBSUB_GCE_PROJECT", "test")
ps := InstPubSub(cmp)
mtest.Run(cmp, t, func() {
topicName := "testBatchTopic_" + mrand.Hex(8)
2019-06-23 18:56:03 +00:00
ctx := context.Background()
topic, err := ps.Topic(ctx, topicName, true)
require.NoError(t, err)
readBatch := func(ch chan []*Message) map[byte]int {
select {
case <-time.After(1 * time.Second):
assert.Fail(t, "waited too long to read batch")
return nil
case mm := <-ch:
ret := map[byte]int{}
for _, m := range mm {
ret[m.Data[0]]++
}
return ret
}
2018-02-15 22:47:18 +00:00
}
// we use the same sub across the next two sections to ensure that cleanup
// also works
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
sub.batchTestTrigger = make(chan bool)
{ // no grouping
// Create a subscription and consumer
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
nil,
ConsumerOpts{Concurrent: 5},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
2018-02-15 22:47:18 +00:00
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
2018-02-15 22:47:18 +00:00
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
2018-02-15 22:47:18 +00:00
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
2018-02-15 22:47:18 +00:00
}
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
}
prev = i
2018-02-15 22:47:18 +00:00
}
}
assertGotGrouped(gotA)
assertGotGrouped(gotB)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
2018-02-15 22:47:18 +00:00
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
})
2018-02-15 22:47:18 +00:00
}