mediocre-go-lib/jstream/jstream_test.go

309 lines
7.6 KiB
Go

package jstream
import (
"bytes"
"errors"
"io"
"io/ioutil"
"sync"
. "testing"
"github.com/mediocregopher/mediocre-go-lib/mtest"
"github.com/stretchr/testify/assert"
)
type cancelBuffer struct {
lr *io.LimitedReader
}
func newCancelBuffer(b []byte) io.Reader {
return &cancelBuffer{
lr: &io.LimitedReader{
R: bytes.NewBuffer(b),
N: int64(len(b) / 2),
},
}
}
func (cb *cancelBuffer) Read(p []byte) (int, error) {
if cb.lr.N == 0 {
return 0, ErrCanceled
}
return cb.lr.Read(p)
}
func TestEncoderDecoder(t *T) {
type testCase struct {
typ Type
val interface{}
bytes []byte
stream []testCase
cancel bool
discard bool
}
var randTestCase func(Type, bool) testCase
randTestCase = func(typ Type, cancelable bool) testCase {
// if typ isn't given then use a random one
if typ == "" {
pick := mtest.Rand.Intn(5)
switch {
case pick == 0:
typ = TypeStream
case pick < 4:
typ = TypeValue
default:
typ = TypeBytes
}
}
cancel := cancelable && mtest.Rand.Intn(10) == 0
tc := testCase{
typ: typ,
cancel: cancel,
// using cancelable here is gross, but whatever
discard: cancelable && !cancel && mtest.Rand.Intn(20) == 0,
}
switch typ {
case TypeValue:
tc.val = map[string]interface{}{
mtest.RandHex(8): mtest.RandHex(8),
mtest.RandHex(8): mtest.RandHex(8),
mtest.RandHex(8): mtest.RandHex(8),
mtest.RandHex(8): mtest.RandHex(8),
mtest.RandHex(8): mtest.RandHex(8),
}
return tc
case TypeBytes:
tc.bytes = mtest.RandBytes(mtest.Rand.Intn(256))
return tc
case TypeStream:
for i := mtest.Rand.Intn(10); i > 0; i-- {
tc.stream = append(tc.stream, randTestCase("", true))
}
return tc
}
panic("shouldn't get here")
}
tcLog := func(tcs ...testCase) []interface{} {
return []interface{}{"%#v", tcs}
}
var assertRead func(*StreamReader, Element, testCase) bool
assertRead = func(r *StreamReader, el Element, tc testCase) bool {
l, success := tcLog(tc), true
success = success && assert.Equal(t, tc.typ, el.Type, l...)
switch el.Type {
case TypeValue:
if tc.discard {
success = success && assert.NoError(t, el.Discard())
break
}
var val interface{}
success = success && assert.NoError(t, el.DecodeValue(&val), l...)
success = success && assert.Equal(t, tc.val, val, l...)
case TypeBytes:
br, err := el.DecodeBytes()
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint, l...)
// if we're discarding we read some of the bytes and then will
// discard the rest
var discardKeep int
if tc.discard {
discardKeep = mtest.Rand.Intn(len(tc.bytes) + 1)
br = io.LimitReader(br, int64(discardKeep))
}
all, err := ioutil.ReadAll(br)
if tc.cancel {
success = success && assert.Equal(t, ErrCanceled, err, l...)
} else if tc.discard {
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.bytes[:discardKeep], all, l...)
success = success && assert.NoError(t, el.Discard())
} else {
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.bytes, all, l...)
}
case TypeStream:
innerR, err := el.DecodeStream()
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint, l...)
// if we're discarding we read some of the elements and then will
// discard the rest
var discardKeep int
if tc.discard {
discardKeep = mtest.Rand.Intn(len(tc.stream) + 1)
}
n := 0
for {
if tc.discard && n == discardKeep {
break
}
el := innerR.Next()
if tc.cancel && el.Err == ErrCanceled {
break
} else if n == len(tc.stream) {
success = success && assert.Equal(t, ErrStreamEnded, el.Err, l...)
break
}
success = success && assertRead(innerR, el, tc.stream[n])
n++
}
if tc.discard {
success = success && assert.NoError(t, el.Discard())
}
}
return success
}
var assertWrite func(*StreamWriter, testCase) bool
assertWrite = func(w *StreamWriter, tc testCase) bool {
l, success := tcLog(tc), true
switch tc.typ {
case TypeValue:
success = success && assert.NoError(t, w.EncodeValue(tc.val), l...)
case TypeBytes:
if tc.cancel {
r := newCancelBuffer(tc.bytes)
err := w.EncodeBytes(uint(len(tc.bytes)), r)
success = success && assert.Equal(t, ErrCanceled, err, l...)
} else {
r := bytes.NewBuffer(tc.bytes)
err := w.EncodeBytes(uint(len(tc.bytes)), r)
success = success && assert.NoError(t, err, l...)
}
case TypeStream:
err := w.EncodeStream(uint(len(tc.stream)), func(innerW *StreamWriter) error {
if len(tc.stream) == 0 && tc.cancel {
return ErrCanceled
}
for i := range tc.stream {
if tc.cancel && i == len(tc.stream)/2 {
return ErrCanceled
} else if !assertWrite(w, tc.stream[i]) {
return errors.New("we got problems")
}
}
return nil
})
if tc.cancel {
success = success && assert.Equal(t, ErrCanceled, err, l...)
} else {
success = success && assert.NoError(t, err, l...)
}
}
return success
}
do := func(tcs ...testCase) bool {
// we keep a copy of all read/written bytes for debugging, but generally
// don't actually log them
ioR, ioW := io.Pipe()
cpR, cpW := new(bytes.Buffer), new(bytes.Buffer)
r, w := NewStreamReader(io.TeeReader(ioR, cpR)), NewStreamWriter(io.MultiWriter(ioW, cpW))
readCh, writeCh := make(chan bool, 1), make(chan bool, 1)
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
success := true
for _, tc := range tcs {
success = success && assertRead(r, r.Next(), tc)
}
success = success && assert.Equal(t, io.EOF, r.Next().Err)
readCh <- success
ioR.Close()
wg.Done()
}()
go func() {
success := true
for _, tc := range tcs {
success = success && assertWrite(w, tc)
}
writeCh <- success
ioW.Close()
wg.Done()
}()
wg.Wait()
//log.Printf("data written:%q", cpW.Bytes())
//log.Printf("data read: %q", cpR.Bytes())
if !(<-readCh && <-writeCh) {
assert.FailNow(t, "test case failed", tcLog(tcs...)...)
return false
}
return true
}
// some basic test cases
do() // empty stream
do(randTestCase(TypeValue, false))
do(randTestCase(TypeBytes, false))
do(
randTestCase(TypeValue, false),
randTestCase(TypeValue, false),
randTestCase(TypeValue, false),
)
do(
randTestCase(TypeValue, false),
randTestCase(TypeBytes, false),
randTestCase(TypeValue, false),
)
do(
randTestCase(TypeBytes, false),
randTestCase(TypeBytes, false),
randTestCase(TypeBytes, false),
)
do(
randTestCase(TypeValue, false),
randTestCase(TypeStream, false),
randTestCase(TypeValue, false),
)
// some special cases, empty elements which are canceled
do(testCase{typ: TypeStream, cancel: true})
do(testCase{typ: TypeBytes, cancel: true})
for i := 0; i < 1000; i++ {
tc := randTestCase(TypeStream, false)
do(tc.stream...)
}
}
func TestDoubleDiscardBytes(t *T) {
buf := new(bytes.Buffer)
w := NewStreamWriter(buf)
b1, b2 := mtest.RandBytes(10), mtest.RandBytes(10)
assert.NoError(t, w.EncodeBytes(uint(len(b1)), bytes.NewBuffer(b1)))
assert.NoError(t, w.EncodeBytes(uint(len(b2)), bytes.NewBuffer(b2)))
r := NewStreamReader(buf)
el1 := r.Next()
r1, err := el1.DecodeBytes()
assert.NoError(t, err)
b1got, err := ioutil.ReadAll(r1)
assert.NoError(t, err)
assert.Equal(t, b1, b1got)
// discarding an already consumed bytes shouldn't do anything
assert.NoError(t, el1.Discard())
r2, err := r.Next().DecodeBytes()
assert.NoError(t, err)
b2got, err := ioutil.ReadAll(r2)
assert.NoError(t, err)
assert.Equal(t, b2, b2got)
}