mediocre-go-lib/mdb/ps_test.go

172 lines
3.8 KiB
Go
Raw Permalink Normal View History

2018-02-15 22:47:18 +00:00
package mdb
import (
"context"
. "testing"
"time"
"github.com/mediocregopher/mediocre-go-lib/mcfg"
"github.com/mediocregopher/mediocre-go-lib/mtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var testPS *PubSub
func init() {
cfg := mcfg.New()
testPS = CfgPubSub(cfg, "test")
cfg.TestRun()
}
// this requires the pubsub emulator to be running
func TestPubSub(t *T) {
topicName := "testTopic_" + mtest.RandHex(8)
ctx := context.Background()
// Topic shouldn't exist yet
_, err := testPS.Topic(ctx, topicName, false)
require.Error(t, err)
// ...so create it
topic, err := testPS.Topic(ctx, topicName, true)
require.NoError(t, err)
// Create a subscription and consumer
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
msgCh := make(chan *Message)
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
msgCh <- m
return true, nil
}, ConsumerOpts{})
time.Sleep(1 * time.Second) // give consumer time to actually start
// publish a message and make sure it gets consumed
assert.NoError(t, topic.Publish(ctx, []byte("foo")))
msg := <-msgCh
assert.Equal(t, []byte("foo"), msg.Data)
}
func TestBatchPubSub(t *T) {
ctx := context.Background()
topicName := "testBatchTopic_" + mtest.RandHex(8)
topic, err := testPS.Topic(ctx, topicName, true)
require.NoError(t, err)
readBatch := func(ch chan []*Message) map[byte]int {
select {
case <-time.After(1 * time.Second):
assert.Fail(t, "waited too long to read batch")
return nil
case mm := <-ch:
ret := map[byte]int{}
for _, m := range mm {
ret[m.Data[0]]++
}
return ret
}
}
// we use the same sub across the next two sections to ensure that cleanup
// also works
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
sub.batchTestTrigger = make(chan bool)
{ // no grouping
// Create a subscription and consumer
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
nil,
ConsumerOpts{Concurrent: 5},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
sub.BatchConsume(ctx,
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
},
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
)
close(ch)
}()
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
}
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
}
prev = i
}
}
assertGotGrouped(gotA)
assertGotGrouped(gotB)
for i, c := range gotB {
gotA[i] += c
}
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
cancel()
<-ch
}
}