diff --git a/jstream/jstream.go b/jstream/jstream.go index f6af740..63df39e 100644 --- a/jstream/jstream.go +++ b/jstream/jstream.go @@ -189,7 +189,7 @@ type Element struct { Err error // needed for byte blobs and streams - sr *StreamReader + sr StreamReader } // Type returns the Element's Type, or an error @@ -216,11 +216,11 @@ func (el Element) assertType(is Type) error { return nil } -// Value attempts to unmarshal a JSON Value Element's value into the given +// DecodeValue attempts to unmarshal a JSON Value Element's value into the given // receiver. // // This method should not be called more than once. -func (el Element) Value(i interface{}) error { +func (el Element) DecodeValue(i interface{}) error { if err := el.assertType(TypeJSONValue); err != nil { return err } @@ -234,29 +234,29 @@ func (el Element) SizeHint() uint { return el.element.SizeHint } -// Bytes returns an io.Reader which will contain the contents of a ByteBlob -// element. The io.Reader _must_ be read till io.EOF or ErrCanceled before the -// StreamReader may be used again. +// DecodeBytes returns an io.Reader which will contain the contents of a +// ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled +// before the StreamReader may be used again. // // This method should not be called more than once. -func (el Element) Bytes() (io.Reader, error) { +func (el Element) DecodeBytes() (io.Reader, error) { if err := el.assertType(TypeByteBlob); err != nil { return nil, err } return el.sr.readBytes(), nil } -// Stream returns the embedded stream represented by this Element as a +// DecodeStream returns the embedded stream represented by this Element as a // StreamReader. The returned StreamReader _must_ be iterated (via the Next // method) till ErrStreamEnded or ErrCanceled is returned before the original // StreamReader may be used again. // // This method should not be called more than once. -func (el Element) Stream() (*StreamReader, error) { +func (el Element) DecodeStream() (*StreamReader, error) { if err := el.assertType(TypeStream); err != nil { return nil, err } - return el.sr, nil + return &el.sr, nil } // Discard reads whatever of this Element's data may be left on the StreamReader @@ -273,14 +273,14 @@ func (el Element) Discard() error { } switch typ { case TypeByteBlob: - r, _ := el.Bytes() + r, _ := el.DecodeBytes() _, err := io.Copy(ioutil.Discard, r) if err == ErrCanceled { return nil } return err case TypeStream: - stream, _ := el.Stream() + stream, _ := el.DecodeStream() for { nextEl := stream.Next() if nextEl.Err == ErrStreamEnded || nextEl.Err == ErrCanceled { @@ -304,6 +304,9 @@ type StreamReader struct { // only one of these can be set at a time dec *json.Decoder bbr *byteBlobReader + + // once set this StreamReader will always return this error on Next + err error } // NewStreamReader takes an io.Reader and interprets it as a jstream Stream. @@ -337,6 +340,10 @@ func (sr *StreamReader) multiReader() io.Reader { // // If the underlying io.Reader is closed the returned Err field will be io.EOF. func (sr *StreamReader) Next() Element { + if sr.err != nil { + return Element{Err: sr.err} + } + if sr.dec == nil { sr.dec = json.NewDecoder(sr.multiReader()) } @@ -346,14 +353,17 @@ func (sr *StreamReader) Next() Element { if err = sr.dec.Decode(&el); err != nil { // welp } else if el.StreamEnd { + sr.err = ErrStreamEnded err = ErrStreamEnded } else if el.StreamCancel { + sr.err = ErrCanceled err = ErrCanceled } if err != nil { return Element{Err: err} } - return Element{sr: sr, element: el} + + return Element{sr: *sr, element: el} } func (sr *StreamReader) readBytes() *byteBlobReader { diff --git a/jstream/jstream_test.go b/jstream/jstream_test.go index a562720..5a758bc 100644 --- a/jstream/jstream_test.go +++ b/jstream/jstream_test.go @@ -57,10 +57,11 @@ func TestEncoderDecoder(t *T) { } } + cancel := cancelable && mtest.Rand.Intn(10) == 0 tc := testCase{ typ: typ, - cancel: cancelable && mtest.Rand.Intn(10) == 0, - discard: mtest.Rand.Intn(20) == 0, + cancel: cancel, + discard: !cancel && mtest.Rand.Intn(20) == 0, } switch typ { @@ -96,33 +97,59 @@ func TestEncoderDecoder(t *T) { success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, tc.typ, typ, l...) - if tc.discard { - success = success && assert.NoError(t, el.Discard()) - return success - } - switch typ { case TypeJSONValue: + if tc.discard { + success = success && assert.NoError(t, el.Discard()) + break + } + var val interface{} - success = success && assert.NoError(t, el.Value(&val), l...) + success = success && assert.NoError(t, el.DecodeValue(&val), l...) success = success && assert.Equal(t, tc.val, val, l...) case TypeByteBlob: - br, err := el.Bytes() + br, err := el.DecodeBytes() success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...) + + // if we're discarding we read some of the bytes and then will + // discard the rest + var discardKeep int + if tc.discard { + discardKeep = mtest.Rand.Intn(len(tc.bytes) + 1) + br = io.LimitReader(br, int64(discardKeep)) + } + all, err := ioutil.ReadAll(br) if tc.cancel { success = success && assert.Equal(t, ErrCanceled, err, l...) + } else if tc.discard { + success = success && assert.NoError(t, err, l...) + success = success && assert.Equal(t, tc.bytes[:discardKeep], all, l...) + success = success && assert.NoError(t, el.Discard()) + } else { success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, tc.bytes, all, l...) } case TypeStream: - innerR, err := el.Stream() + innerR, err := el.DecodeStream() success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...) + + // if we're discarding we read some of the elements and then will + // discard the rest + var discardKeep int + if tc.discard { + discardKeep = mtest.Rand.Intn(len(tc.stream) + 1) + } + n := 0 for { + if tc.discard && n == discardKeep { + break + } + el := innerR.Next() if tc.cancel && el.Err == ErrCanceled { break @@ -133,6 +160,9 @@ func TestEncoderDecoder(t *T) { success = success && assertRead(innerR, el, tc.stream[n]) n++ } + if tc.discard { + success = success && assert.NoError(t, el.Discard()) + } } return success }