From fa41ab5a01e9bec04dc6b68d0f01b0bbe214f751 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Mon, 26 Mar 2018 12:32:43 +0000 Subject: [PATCH] Implementation of jstream --- jstream/byte_blob_reader.go | 55 +++++ jstream/byte_blob_reader_test.go | 186 ++++++++++++++ jstream/jstream.go | 410 +++++++++++++++++++++++++++++++ jstream/jstream_test.go | 246 +++++++++++++++++++ 4 files changed, 897 insertions(+) create mode 100644 jstream/byte_blob_reader.go create mode 100644 jstream/byte_blob_reader_test.go create mode 100644 jstream/jstream.go create mode 100644 jstream/jstream_test.go diff --git a/jstream/byte_blob_reader.go b/jstream/byte_blob_reader.go new file mode 100644 index 0000000..236c068 --- /dev/null +++ b/jstream/byte_blob_reader.go @@ -0,0 +1,55 @@ +package jstream + +import ( + "bytes" + "encoding/base64" + "io" +) + +type delimReader struct { + r io.Reader + delim byte + rest []byte +} + +func (dr *delimReader) Read(b []byte) (int, error) { + if dr.delim != 0 { + return 0, io.EOF + } + n, err := dr.r.Read(b) + if i := bytes.IndexAny(b[:n], bbDelims); i >= 0 { + dr.delim = b[i] + dr.rest = append([]byte(nil), b[i+1:n]...) + return i, err + } + return n, err +} + +type byteBlobReader struct { + dr *delimReader + dec io.Reader +} + +func newByteBlobReader(r io.Reader) *byteBlobReader { + dr := &delimReader{r: r} + return &byteBlobReader{ + dr: dr, + dec: base64.NewDecoder(base64.StdEncoding, dr), + } +} + +func (bbr *byteBlobReader) Read(into []byte) (int, error) { + n, err := bbr.dec.Read(into) + if bbr.dr.delim == bbEnd { + return n, io.EOF + } else if bbr.dr.delim == bbCancel { + return n, ErrCanceled + } + return n, err +} + +// returns the bytes which were read off the underlying io.Reader but which +// haven't been consumed yet. +func (bbr *byteBlobReader) buffered() io.Reader { + return bytes.NewBuffer(bbr.dr.rest) +} diff --git a/jstream/byte_blob_reader_test.go b/jstream/byte_blob_reader_test.go new file mode 100644 index 0000000..9480dae --- /dev/null +++ b/jstream/byte_blob_reader_test.go @@ -0,0 +1,186 @@ +package jstream + +import ( + "bytes" + "encoding/base64" + "io" + "io/ioutil" + . "testing" + + "github.com/mediocregopher/mediocre-go-lib/mtest" + "github.com/stretchr/testify/assert" +) + +type bbrTest struct { + wsSuffix []byte // whitespace + body []byte + shouldCancel bool + intoSize int +} + +func randBBRTest(minBodySize, maxBodySize int) bbrTest { + var whitespace = []byte{' ', '\n', '\t', '\r'} + genWhitespace := func(n int) []byte { + ws := make([]byte, n) + for i := range ws { + ws[i] = whitespace[mtest.Rand.Intn(len(whitespace))] + } + return ws + } + + body := mtest.RandBytes(minBodySize + mtest.Rand.Intn(maxBodySize-minBodySize)) + return bbrTest{ + wsSuffix: genWhitespace(mtest.Rand.Intn(10)), + body: body, + intoSize: 1 + mtest.Rand.Intn(len(body)+1), + } +} + +func (bt bbrTest) msgAndArgs() []interface{} { + return []interface{}{"bt:%#v len(body):%d", bt, len(bt.body)} +} + +func (bt bbrTest) mkBytes() []byte { + buf := new(bytes.Buffer) + enc := base64.NewEncoder(base64.StdEncoding, buf) + + if bt.shouldCancel { + enc.Write(bt.body[:len(bt.body)/2]) + enc.Close() + buf.WriteByte(bbCancel) + } else { + enc.Write(bt.body) + enc.Close() + buf.WriteByte(bbEnd) + } + + buf.Write(bt.wsSuffix) + return buf.Bytes() +} + +func (bt bbrTest) do(t *T) bool { + buf := bytes.NewBuffer(bt.mkBytes()) + bbr := newByteBlobReader(buf) + + into := make([]byte, bt.intoSize) + outBuf := new(bytes.Buffer) + _, err := io.CopyBuffer(outBuf, bbr, into) + if bt.shouldCancel { + return assert.Equal(t, ErrCanceled, err, bt.msgAndArgs()...) + } + if !assert.NoError(t, err, bt.msgAndArgs()...) { + return false + } + if !assert.Equal(t, bt.body, outBuf.Bytes(), bt.msgAndArgs()...) { + return false + } + fullRest := append(bbr.dr.rest, buf.Bytes()...) + if len(bt.wsSuffix) == 0 { + return assert.Empty(t, fullRest, bt.msgAndArgs()...) + } + return assert.Equal(t, bt.wsSuffix, fullRest, bt.msgAndArgs()...) +} + +func TestByteBlobReader(t *T) { + // some sanity tests + bbrTest{ + body: []byte{2, 3, 4, 5}, + intoSize: 4, + }.do(t) + bbrTest{ + body: []byte{2, 3, 4, 5}, + intoSize: 3, + }.do(t) + bbrTest{ + body: []byte{2, 3, 4, 5}, + shouldCancel: true, + intoSize: 3, + }.do(t) + + // fuzz this bitch + for i := 0; i < 50000; i++ { + bt := randBBRTest(0, 1000) + if !bt.do(t) { + return + } + bt.shouldCancel = true + if !bt.do(t) { + return + } + } +} + +func BenchmarkByteBlobReader(b *B) { + type bench struct { + bt bbrTest + body []byte + buf *bytes.Reader + cpBuf []byte + } + + mkTestSet := func(minBodySize, maxBodySize int) []bench { + n := 100 + benches := make([]bench, n) + for i := range benches { + bt := randBBRTest(minBodySize, maxBodySize) + body := bt.mkBytes() + benches[i] = bench{ + bt: bt, + body: body, + buf: bytes.NewReader(nil), + cpBuf: make([]byte, bt.intoSize), + } + } + return benches + } + + testRaw := func(b *B, benches []bench) { + j := 0 + for i := 0; i < b.N; i++ { + if j >= len(benches) { + j = 0 + } + benches[j].buf.Reset(benches[j].body) + io.CopyBuffer(ioutil.Discard, benches[j].buf, benches[j].cpBuf) + j++ + } + } + + testBBR := func(b *B, benches []bench) { + j := 0 + for i := 0; i < b.N; i++ { + if j >= len(benches) { + j = 0 + } + benches[j].buf.Reset(benches[j].body) + bbr := newByteBlobReader(benches[j].buf) + io.CopyBuffer(ioutil.Discard, bbr, benches[j].cpBuf) + j++ + } + } + + benches := []struct { + name string + minBodySize, maxBodySize int + }{ + {"small", 0, 1000}, + {"medium", 1000, 10000}, + {"large", 10000, 100000}, + {"xlarge", 100000, 1000000}, + } + + b.StopTimer() + for i := range benches { + b.Run(benches[i].name, func(b *B) { + set := mkTestSet(benches[i].minBodySize, benches[i].maxBodySize) + b.StartTimer() + b.Run("raw", func(b *B) { + testRaw(b, set) + }) + b.Run("bbr", func(b *B) { + testBBR(b, set) + }) + b.StopTimer() + }) + } +} diff --git a/jstream/jstream.go b/jstream/jstream.go new file mode 100644 index 0000000..fb4c7cc --- /dev/null +++ b/jstream/jstream.go @@ -0,0 +1,410 @@ +// Package jstream defines and implements the JSON Stream protocol +// +// Purpose +// +// The purpose of the jstream protocol is to provide a very simple layer on top +// of an existing JSON implementation to allow for streaming arbitrary numbers +// of JSON objects and byte blobs of arbitrary size in a standard way, and to +// allow for embedding streams within each other. +// +// The order of priorities when designing jstream is as follows: +// 1) Protocol simplicity +// 2) Implementation simplicity +// 3) Efficiency, both in parsing speed and bandwidth +// +// The justification for this is that protocol simplicity generally spills into +// implementation simplicity anyway, and accounts for future languages which +// have different properties than current ones. Parsing speed isn't much of a +// concern when reading data off a network (the primary use-case here), as RTT +// is always going to be the main blocker. Bandwidth can be a concern, but it's +// one better solved by wrapping the byte stream with a compressor. +// +// jstream protocol +// +// The jstream protocol is carried over a byte stream (in go: an io.Reader). To +// read the protocol a JSON object is read off the byte stream and inspected to +// determine what kind of jstream element it is. +// +// Multiple jstream elements are sequentially read off the same byte stream. +// Each element may be separated from the other by any amount of whitespace, +// with whitespace being defined as spaces, tabs, carriage returns, and/or +// newlines. +// +// jstream elements +// +// There are three jstream element types: +// +// * JSON Value: Any JSON value +// * Byte Blob: A stream of bytes of unknown, and possibly infinite, size +// * Stream: A heterogenous sequence of jstream elements of unknown, and +// possibly infinite, size +// +// JSON Value elements are defined as being JSON objects with a `val` field. The +// value of that field is the JSON Value. +// +// { "val":{"foo":"bar"} } +// +// Byte Blob elements are defined as being a JSON object with a `bytesStart` +// field with a value of `true`. Immediately following the JSON object are the +// bytes which are the Byte Blob, encoded using standard base64. Immediately +// following the encoded bytes is the character `$`, to indicate the bytes have +// been completely written. Alternatively the character `!` may be written +// immediately after the bytes to indicate writing was canceled prematurely by +// the writer. +// +// { "bytesStart":true }wXnxQHgUO8g=$ +// { "bytesStart":true }WGYcTI8=! +// +// The JSON object may also contain a `sizeHint` field, which gives the +// estimated number of bytes in the Byte Blob (excluding the trailing +// delimiter). The hint is neither required to exist or be accurate if it does. +// The trailing delimeter (`$` or `!`) is required to be sent even if the hint +// is sent. +// +// Stream elements are defined as being a JSON object with a `streamStart` field +// with a value of `true`. Immediately following the JSON object will be zero +// more jstream elements of any type, possibly separated by whitespace. Finally +// the Stream is ended with another JSON object with a `streamEnd` field with a +// value of `true`. +// +// { "streamStart":true } +// { "val":{"foo":"bar"} } +// { "bytesStart":true }7TdlDQOnA6isxD9C$ +// { "streamEnd":true } +// +// A Stream may also be prematurely canceled by the sending of a JSON object +// with the `streamCancel` field set to `true` (in place of one with `streamEnd` +// set to `true`). +// +// The Stream's original JSON object (the "head") may also have a `sizeHint` +// field, which gives the estimated number of jstream elements in the Stream. +// The hint is neither required to exist or be accurate if it does. The tail +// JSON object (with the `streamEnd` field) is required even if `sizeHint` is +// given. +// +// One of the elements in a Stream may itself be a Stream. In this way Streams +// may be embedded within each other. +// +// Here's an example of a complex Stream, which carries within it two different +// streams and some other elements: +// +// { "streamStart":true } +// { "val":{"foo":"bar" } +// { "streamStart":true, "sizeHint":2 } +// { "val":{"foo":"baz"} } +// { "val":{"foo":"biz"} } +// { "streamEnd":true } +// { "bytesStart":true }X7KCpLIjqIBJt9vA$ +// { "streamStart":true } +// { "bytesStart":true }0jT+kNCuxHywUYy0$ +// { "bytesStart":true }LUqjR6OACB2p1BG4$ +// { "streamEnd":true } +// { "streamEnd":true } +// +// Finally, the byte stream off of which the jstream is based (i.e. the +// io.Reader) is implicitly treated as a Stream, with the Stream ending when the +// byte stream is closed. +// +package jstream + +// TODO figure out how to expose the json.Encoder/Decoders so that users can set +// custom options on them (like UseNumber and whatnot) + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" +) + +// byte blob constants +const ( + bbEnd = '$' + bbCancel = '!' + bbDelims = string(bbEnd) + string(bbCancel) +) + +// Type is used to enumerate the types of jstream elements +type Type string + +// The jstream element types +const ( + TypeJSONValue Type = "jsonValue" + TypeByteBlob Type = "byteBlob" + TypeStream Type = "stream" +) + +// ErrWrongType is an error returned by the Decode* methods on Decoder when the +// wrong decoding method has been called for the element which was read. The +// error contains the actual type of the element. +type ErrWrongType struct { + Actual Type +} + +func (err ErrWrongType) Error() string { + return fmt.Sprintf("wrong type, actual type is %q", err.Actual) +} + +var ( + // ErrCanceled is returned when reading either a Byte Blob or a Stream, + // indicating that the writer has prematurely canceled the element. + ErrCanceled = errors.New("canceled by writer") + + // ErrStreamEnded is returned from Next when the Stream being read has been + // ended by the writer. + ErrStreamEnded = errors.New("stream ended") +) + +type element struct { + Value json.RawMessage `json:"val,omitempty"` + + BytesStart bool `json:"bytesStart,omitempty"` + + StreamStart bool `json:"streamStart,omitempty"` + StreamEnd bool `json:"streamEnd,omitempty"` + StreamCancel bool `json:"streamCancel,omitempty"` + + SizeHint uint `json:"sizeHint,omitempty"` +} + +// Element is a single jstream element which is read off a StreamReader. +// +// If a method is called which expects a particular Element type (e.g. +// DecodeValue, which expects a JSONValue Element) but the Element is not of +// that type then an ErrWrongType will be returned. +// +// If there was an error reading the Element off the StreamReader that error is +// kept in the Element and returned from any method call. +type Element struct { + element + + // Err will be set if the StreamReader encountered an error while reading + // the next Element. If set then the Element is otherwise unusable. + // + // Err may be ErrCanceled or ErrStreamEnded, which would indicate the end of + // the stream but would not indicate the StreamReader is no longer usable, + // depending on the behavior of the writer on the other end. + Err error + + // needed for byte blobs and streams + sr *StreamReader +} + +// Type returns the Element's Type, or an error +func (el Element) Type() (Type, error) { + if el.Err != nil { + return "", el.Err + } else if el.element.StreamStart { + return TypeStream, nil + } else if el.element.BytesStart { + return TypeByteBlob, nil + } else if len(el.element.Value) > 0 { + return TypeJSONValue, nil + } + return "", errors.New("malformed Element, can't determine type") +} + +func (el Element) assertType(is Type) error { + typ, err := el.Type() + if err != nil { + return err + } else if typ != is { + return ErrWrongType{Actual: typ} + } + return nil +} + +// Value attempts to unmarshal a JSON Value Element's value into the given +// receiver. +// +// This method should not be called more than once. +func (el Element) Value(i interface{}) error { + if err := el.assertType(TypeJSONValue); err != nil { + return err + } + return json.Unmarshal(el.element.Value, i) +} + +// SizeHint returns the size hint which may have been optionally sent for +// ByteBlob and Stream elements, or zero. The hint is never required to be +// sent or to be accurate. +func (el Element) SizeHint() uint { + return el.element.SizeHint +} + +// Bytes returns an io.Reader which will contain the contents of a ByteBlob +// element. The io.Reader _must_ be read till io.EOF or ErrCanceled before the +// StreamReader may be used again. +// +// This method should not be called more than once. +func (el Element) Bytes() (io.Reader, error) { + if err := el.assertType(TypeByteBlob); err != nil { + return nil, err + } + return el.sr.readBytes(), nil +} + +// Stream returns the embedded stream represented by this Element as a +// StreamReader. The returned StreamReader _must_ be iterated (via the Next +// method) till ErrStreamEnded or ErrCanceled is returned before the original +// StreamReader may be used again. +// +// This method should not be called more than once. +func (el Element) Stream() (*StreamReader, error) { + if err := el.assertType(TypeStream); err != nil { + return nil, err + } + return el.sr, nil +} + +// StreamReader represents a Stream from which Elements may be read using the +// Next method. +type StreamReader struct { + orig io.Reader + + // only one of these can be set at a time + dec *json.Decoder + bbr *byteBlobReader +} + +// NewStreamReader takes an io.Reader and interprets it as a jstream Stream. +func NewStreamReader(r io.Reader) *StreamReader { + return &StreamReader{orig: r} +} + +// pulls buffered bytes out of either the json.Decoder or byteBlobReader, if +// possible, and returns an io.MultiReader of those and orig. Will also set the +// json.Decoder/byteBlobReader to nil if that's where the bytes came from. +func (sr *StreamReader) multiReader() io.Reader { + if sr.dec != nil { + buf := sr.dec.Buffered() + sr.dec = nil + return io.MultiReader(buf, sr.orig) + } else if sr.bbr != nil { + buf := sr.bbr.buffered() + sr.bbr = nil + return io.MultiReader(buf, sr.orig) + } + return sr.orig +} + +// Next reads, decodes, and returns the next Element off the StreamReader. If +// the Element is a ByteBlob or embedded Stream then it _must_ be fully consumed +// before Next is called on this StreamReader again. +// +// The returned Element's Err field will be ErrStreamEnd if the Stream was +// ended, or ErrCanceled if it was canceled, and this StreamReader should not be +// used again in those cases. +// +// If the underlying io.Reader is closed the returned Err field will be io.EOF. +func (sr *StreamReader) Next() Element { + if sr.dec == nil { + sr.dec = json.NewDecoder(sr.multiReader()) + } + + var el element + var err error + if err = sr.dec.Decode(&el); err != nil { + // welp + } else if el.StreamEnd { + err = ErrStreamEnded + } else if el.StreamCancel { + err = ErrCanceled + } + if err != nil { + return Element{Err: err} + } + return Element{sr: sr, element: el} +} + +func (sr *StreamReader) readBytes() *byteBlobReader { + sr.bbr = newByteBlobReader(sr.multiReader()) + return sr.bbr +} + +//////////////////////////////////////////////////////////////////////////////// + +// StreamWriter represents a Stream to which Elements may be written using any +// of the Encode methods. +type StreamWriter struct { + w io.Writer + enc *json.Encoder +} + +// NewStreamWriter takes an io.Writer and returns a StreamWriter which will +// write to it. +func NewStreamWriter(w io.Writer) *StreamWriter { + return &StreamWriter{w: w, enc: json.NewEncoder(w)} +} + +// EncodeValue marshals the given value and writes it to the Stream as a +// JSONValue element. +func (sw *StreamWriter) EncodeValue(i interface{}) error { + b, err := json.Marshal(i) + if err != nil { + return err + } + return sw.enc.Encode(element{Value: b}) +} + +// EncodeBytes copies the given io.Reader, until io.EOF, onto the Stream as a +// ByteBlob element. This method will block until copying is completed or an +// error is encountered. +// +// If the io.Reader returns any error which isn't io.EOF then the ByteBlob is +// canceled and that error is returned from this method. Otherwise nil is +// returned. +// +// sizeHint may be given if it's known or can be guessed how many bytes the +// io.Reader will read out. +func (sw *StreamWriter) EncodeBytes(sizeHint uint, r io.Reader) error { + if err := sw.enc.Encode(element{ + BytesStart: true, + SizeHint: sizeHint, + }); err != nil { + return err + + } + + enc := base64.NewEncoder(base64.StdEncoding, sw.w) + if _, err := io.Copy(enc, r); err != nil { + // if canceling doesn't work then the whole connection is broken and + // it's not worth doing anything about. if nothing else the brokeness of + // it will be discovered the next time it is used. + sw.w.Write([]byte{bbCancel}) + return err + } else if err := enc.Close(); err != nil { + return err + } else if _, err := sw.w.Write([]byte{bbEnd}); err != nil { + return err + } + + return nil +} + +// EncodeStream encodes an embedded Stream element onto the Stream. The callback +// is given a new StreamWriter which represents the embedded Stream and to which +// any elemens may be written. This methods blocks until the callback has +// returned. +// +// If the callback returns nil the Stream is ended normally. If it returns +// anything else the embedded Stream is canceled and that error is returned from +// this method. +// +// sizeHint may be given if it's known or can be guessed how many elements will +// be in the embedded Stream. +func (sw *StreamWriter) EncodeStream(sizeHint uint, fn func(*StreamWriter) error) error { + if err := sw.enc.Encode(element{ + StreamStart: true, + SizeHint: sizeHint, + }); err != nil { + return err + + } else if err := fn(sw); err != nil { + // as when canceling a byte blob, we don't really care if this errors + sw.enc.Encode(element{StreamCancel: true}) + return err + } + return sw.enc.Encode(element{StreamEnd: true}) +} diff --git a/jstream/jstream_test.go b/jstream/jstream_test.go new file mode 100644 index 0000000..3bd9480 --- /dev/null +++ b/jstream/jstream_test.go @@ -0,0 +1,246 @@ +package jstream + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "sync" + . "testing" + + "github.com/mediocregopher/mediocre-go-lib/mtest" + "github.com/stretchr/testify/assert" +) + +type cancelBuffer struct { + lr *io.LimitedReader +} + +func newCancelBuffer(b []byte) io.Reader { + return &cancelBuffer{ + lr: &io.LimitedReader{ + R: bytes.NewBuffer(b), + N: int64(len(b) / 2), + }, + } +} + +func (cb *cancelBuffer) Read(p []byte) (int, error) { + if cb.lr.N == 0 { + return 0, ErrCanceled + } + return cb.lr.Read(p) +} + +func TestEncoderDecoder(t *T) { + type testCase struct { + typ Type + val interface{} + bytes []byte + stream []testCase + cancel bool + } + + var randTestCase func(Type, bool) testCase + randTestCase = func(typ Type, cancelable bool) testCase { + // if typ isn't given then use a random one + if typ == "" { + pick := mtest.Rand.Intn(5) + switch { + case pick == 0: + typ = TypeStream + case pick < 4: + typ = TypeJSONValue + default: + typ = TypeByteBlob + } + } + + tc := testCase{ + typ: typ, + cancel: cancelable && mtest.Rand.Intn(10) == 0, + } + + switch typ { + case TypeJSONValue: + tc.val = map[string]interface{}{ + mtest.RandHex(8): mtest.RandHex(8), + mtest.RandHex(8): mtest.RandHex(8), + mtest.RandHex(8): mtest.RandHex(8), + mtest.RandHex(8): mtest.RandHex(8), + mtest.RandHex(8): mtest.RandHex(8), + } + return tc + case TypeByteBlob: + tc.bytes = mtest.RandBytes(mtest.Rand.Intn(256)) + return tc + case TypeStream: + for i := mtest.Rand.Intn(10); i > 0; i-- { + tc.stream = append(tc.stream, randTestCase("", true)) + } + return tc + } + panic("shouldn't get here") + } + + tcLog := func(tcs ...testCase) []interface{} { + return []interface{}{"%#v", tcs} + } + + var assertRead func(*StreamReader, Element, testCase) bool + assertRead = func(r *StreamReader, el Element, tc testCase) bool { + l, success := tcLog(tc), true + typ, err := el.Type() + success = success && assert.NoError(t, err, l...) + success = success && assert.Equal(t, tc.typ, typ, l...) + + switch typ { + case TypeJSONValue: + var val interface{} + success = success && assert.NoError(t, el.Value(&val), l...) + success = success && assert.Equal(t, tc.val, val, l...) + case TypeByteBlob: + br, err := el.Bytes() + success = success && assert.NoError(t, err, l...) + success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...) + all, err := ioutil.ReadAll(br) + if tc.cancel { + success = success && assert.Equal(t, ErrCanceled, err, l...) + } else { + success = success && assert.NoError(t, err, l...) + success = success && assert.Equal(t, tc.bytes, all, l...) + } + case TypeStream: + innerR, err := el.Stream() + success = success && assert.NoError(t, err, l...) + success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...) + n := 0 + for { + el := innerR.Next() + if tc.cancel && el.Err == ErrCanceled { + break + } else if n == len(tc.stream) { + success = success && assert.Equal(t, ErrStreamEnded, el.Err, l...) + break + } + success = success && assertRead(innerR, el, tc.stream[n]) + n++ + } + } + return success + } + + var assertWrite func(*StreamWriter, testCase) bool + assertWrite = func(w *StreamWriter, tc testCase) bool { + l, success := tcLog(tc), true + switch tc.typ { + case TypeJSONValue: + success = success && assert.NoError(t, w.EncodeValue(tc.val), l...) + case TypeByteBlob: + if tc.cancel { + r := newCancelBuffer(tc.bytes) + err := w.EncodeBytes(uint(len(tc.bytes)), r) + success = success && assert.Equal(t, ErrCanceled, err, l...) + } else { + r := bytes.NewBuffer(tc.bytes) + err := w.EncodeBytes(uint(len(tc.bytes)), r) + success = success && assert.NoError(t, err, l...) + } + case TypeStream: + err := w.EncodeStream(uint(len(tc.stream)), func(innerW *StreamWriter) error { + if len(tc.stream) == 0 && tc.cancel { + return ErrCanceled + } + for i := range tc.stream { + if tc.cancel && i == len(tc.stream)/2 { + return ErrCanceled + } else if !assertWrite(w, tc.stream[i]) { + return errors.New("we got problems") + } + } + return nil + }) + if tc.cancel { + success = success && assert.Equal(t, ErrCanceled, err, l...) + } else { + success = success && assert.NoError(t, err, l...) + } + } + return success + } + + do := func(tcs ...testCase) bool { + // we keep a copy of all read/written bytes for debugging, but generally + // don't actually log them + ioR, ioW := io.Pipe() + cpR, cpW := new(bytes.Buffer), new(bytes.Buffer) + r, w := NewStreamReader(io.TeeReader(ioR, cpR)), NewStreamWriter(io.MultiWriter(ioW, cpW)) + + readCh, writeCh := make(chan bool, 1), make(chan bool, 1) + wg := new(sync.WaitGroup) + wg.Add(2) + go func() { + success := true + for _, tc := range tcs { + success = success && assertRead(r, r.Next(), tc) + } + success = success && assert.Equal(t, io.EOF, r.Next().Err) + readCh <- success + ioR.Close() + wg.Done() + }() + go func() { + success := true + for _, tc := range tcs { + success = success && assertWrite(w, tc) + } + writeCh <- success + ioW.Close() + wg.Done() + }() + wg.Wait() + + //log.Printf("data written:%q", cpW.Bytes()) + //log.Printf("data read: %q", cpR.Bytes()) + + if !(<-readCh && <-writeCh) { + assert.FailNow(t, "test case failed", tcLog(tcs...)...) + return false + } + return true + } + + // some basic test cases + do() // empty stream + do(randTestCase(TypeJSONValue, false)) + do(randTestCase(TypeByteBlob, false)) + do( + randTestCase(TypeJSONValue, false), + randTestCase(TypeJSONValue, false), + randTestCase(TypeJSONValue, false), + ) + do( + randTestCase(TypeJSONValue, false), + randTestCase(TypeByteBlob, false), + randTestCase(TypeJSONValue, false), + ) + do( + randTestCase(TypeByteBlob, false), + randTestCase(TypeByteBlob, false), + randTestCase(TypeByteBlob, false), + ) + do( + randTestCase(TypeJSONValue, false), + randTestCase(TypeStream, false), + randTestCase(TypeJSONValue, false), + ) + + // some special cases, empty elements which are canceled + do(testCase{typ: TypeStream, cancel: true}) + do(testCase{typ: TypeByteBlob, cancel: true}) + + for i := 0; i < 1000; i++ { + tc := randTestCase(TypeStream, false) + do(tc.stream...) + } +}