2018-02-15 22:47:18 +00:00

172 lines
3.8 KiB

package mdb
import (
. "testing"
var testPS *PubSub
func init() {
cfg := mcfg.New()
testPS = CfgPubSub(cfg, "test")
// this requires the pubsub emulator to be running
func TestPubSub(t *T) {
topicName := "testTopic_" + mtest.RandHex(8)
ctx := context.Background()
// Topic shouldn't exist yet
_, err := testPS.Topic(ctx, topicName, false)
require.Error(t, err)
// create it
topic, err := testPS.Topic(ctx, topicName, true)
require.NoError(t, err)
// Create a subscription and consumer
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
msgCh := make(chan *Message)
go sub.Consume(ctx, func(ctx context.Context, m *Message) (bool, error) {
msgCh <- m
return true, nil
}, ConsumerOpts{})
time.Sleep(1 * time.Second) // give consumer time to actually start
// publish a message and make sure it gets consumed
assert.NoError(t, topic.Publish(ctx, []byte("foo")))
msg := <-msgCh
assert.Equal(t, []byte("foo"), msg.Data)
func TestBatchPubSub(t *T) {
ctx := context.Background()
topicName := "testBatchTopic_" + mtest.RandHex(8)
topic, err := testPS.Topic(ctx, topicName, true)
require.NoError(t, err)
readBatch := func(ch chan []*Message) map[byte]int {
select {
case <-time.After(1 * time.Second):
assert.Fail(t, "waited too long to read batch")
return nil
case mm := <-ch:
ret := map[byte]int{}
for _, m := range mm {
return ret
// we use the same sub across the next two sections to ensure that cleanup
// also works
sub, err := topic.Subscription(ctx, "testSub", true)
require.NoError(t, err)
sub.batchTestTrigger = make(chan bool)
{ // no grouping
// Create a subscription and consumer
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
ConsumerOpts{Concurrent: 5},
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
for i, c := range gotB {
gotA[i] += c
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling
{ // with grouping
ctx, cancel := context.WithCancel(ctx)
ch := make(chan []*Message)
go func() {
func(ctx context.Context, mm []*Message) (bool, error) {
ch <- mm
return true, nil
func(a, b *Message) bool { return a.Data[0]%2 == b.Data[0]%2 },
ConsumerOpts{Concurrent: 10},
time.Sleep(1 * time.Second) // give consumer time to actually start
exp := map[byte]int{}
for i := byte(0); i <= 9; i++ {
require.NoError(t, topic.Publish(ctx, []byte{i}))
exp[i] = 1
time.Sleep(1 * time.Second)
sub.batchTestTrigger <- true
gotA := readBatch(ch)
assert.Len(t, gotA, 5)
gotB := readBatch(ch)
assert.Len(t, gotB, 5)
assertGotGrouped := func(got map[byte]int) {
prev := byte(255)
for i := range got {
if prev != 255 {
assert.Equal(t, prev%2, i%2)
prev = i
for i, c := range gotB {
gotA[i] += c
assert.Equal(t, exp, gotA)
time.Sleep(1 * time.Second) // give time to ack before cancelling