From 6b6355d77f79bd0a14d760ca0406bf39339e1be7 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Thu, 17 May 2018 10:01:02 +0000 Subject: [PATCH] jstream: refactor how jstream.Element works to allow for double discarding --- jstream/jstream.go | 89 ++++++++++++++++++++--------------------- jstream/jstream_test.go | 43 +++++++++++++++----- 2 files changed, 77 insertions(+), 55 deletions(-) diff --git a/jstream/jstream.go b/jstream/jstream.go index 63df39e..8887d6c 100644 --- a/jstream/jstream.go +++ b/jstream/jstream.go @@ -178,7 +178,12 @@ type element struct { // If there was an error reading the Element off the StreamReader that error is // kept in the Element and returned from any method call. type Element struct { - element + Type Type + + // SizeHint is the size hint which may have been optionally sent for + // ByteBlob and Stream elements, or zero. The hint is never required to be + // sent or to be accurate. + SizeHint uint // Err will be set if the StreamReader encountered an error while reading // the next Element. If set then the Element is otherwise unusable. @@ -188,90 +193,59 @@ type Element struct { // depending on the behavior of the writer on the other end. Err error - // needed for byte blobs and streams - sr StreamReader -} - -// Type returns the Element's Type, or an error -func (el Element) Type() (Type, error) { - if el.Err != nil { - return "", el.Err - } else if el.element.StreamStart { - return TypeStream, nil - } else if el.element.BytesStart { - return TypeByteBlob, nil - } else if len(el.element.Value) > 0 { - return TypeJSONValue, nil - } - return "", errors.New("malformed Element, can't determine type") + value json.RawMessage + br io.Reader + sr *StreamReader } func (el Element) assertType(is Type) error { - typ, err := el.Type() - if err != nil { - return err - } else if typ != is { - return ErrWrongType{Actual: typ} + if el.Err != nil { + return el.Err + } else if el.Type != is { + return ErrWrongType{Actual: el.Type} } return nil } // DecodeValue attempts to unmarshal a JSON Value Element's value into the given // receiver. -// -// This method should not be called more than once. func (el Element) DecodeValue(i interface{}) error { if err := el.assertType(TypeJSONValue); err != nil { return err } - return json.Unmarshal(el.element.Value, i) -} - -// SizeHint returns the size hint which may have been optionally sent for -// ByteBlob and Stream elements, or zero. The hint is never required to be -// sent or to be accurate. -func (el Element) SizeHint() uint { - return el.element.SizeHint + return json.Unmarshal(el.value, i) } // DecodeBytes returns an io.Reader which will contain the contents of a // ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled // before the StreamReader may be used again. -// -// This method should not be called more than once. func (el Element) DecodeBytes() (io.Reader, error) { if err := el.assertType(TypeByteBlob); err != nil { return nil, err } - return el.sr.readBytes(), nil + return el.br, nil } // DecodeStream returns the embedded stream represented by this Element as a // StreamReader. The returned StreamReader _must_ be iterated (via the Next // method) till ErrStreamEnded or ErrCanceled is returned before the original // StreamReader may be used again. -// -// This method should not be called more than once. func (el Element) DecodeStream() (*StreamReader, error) { if err := el.assertType(TypeStream); err != nil { return nil, err } - return &el.sr, nil + return el.sr, nil } // Discard reads whatever of this Element's data may be left on the StreamReader -// it came from and discards it, making the StreamReader ready to have Next call -// on it again. +// it came from and discards it, making the StreamReader ready to have Next +// called on it again. // // If the Element is a Byte Blob and is ended with io.EOF, or if the Element is // a Stream and is ended with ErrStreamEnded then this returns nil. If either is // canceled this also returns nil. All other errors are returned. func (el Element) Discard() error { - typ, err := el.Type() - if err != nil { - return err - } - switch typ { + switch el.Type { case TypeByteBlob: r, _ := el.DecodeBytes() _, err := io.Copy(ioutil.Discard, r) @@ -314,6 +288,11 @@ func NewStreamReader(r io.Reader) *StreamReader { return &StreamReader{orig: r} } +func (sr *StreamReader) clone() *StreamReader { + sr2 := *sr + return &sr2 +} + // pulls buffered bytes out of either the json.Decoder or byteBlobReader, if // possible, and returns an io.MultiReader of those and orig. Will also set the // json.Decoder/byteBlobReader to nil if that's where the bytes came from. @@ -363,7 +342,25 @@ func (sr *StreamReader) Next() Element { return Element{Err: err} } - return Element{sr: *sr, element: el} + if el.StreamStart { + return Element{ + Type: TypeStream, + SizeHint: el.SizeHint, + sr: sr.clone(), + } + } else if el.BytesStart { + return Element{ + Type: TypeByteBlob, + SizeHint: el.SizeHint, + br: sr.readBytes(), + } + } else if len(el.Value) > 0 { + return Element{ + Type: TypeJSONValue, + value: el.Value, + } + } + return Element{Err: errors.New("malformed Element, can't determine type")} } func (sr *StreamReader) readBytes() *byteBlobReader { diff --git a/jstream/jstream_test.go b/jstream/jstream_test.go index 5a758bc..5df63d0 100644 --- a/jstream/jstream_test.go +++ b/jstream/jstream_test.go @@ -59,9 +59,10 @@ func TestEncoderDecoder(t *T) { cancel := cancelable && mtest.Rand.Intn(10) == 0 tc := testCase{ - typ: typ, - cancel: cancel, - discard: !cancel && mtest.Rand.Intn(20) == 0, + typ: typ, + cancel: cancel, + // using cancelable here is gross, but whatever + discard: cancelable && !cancel && mtest.Rand.Intn(20) == 0, } switch typ { @@ -93,11 +94,9 @@ func TestEncoderDecoder(t *T) { var assertRead func(*StreamReader, Element, testCase) bool assertRead = func(r *StreamReader, el Element, tc testCase) bool { l, success := tcLog(tc), true - typ, err := el.Type() - success = success && assert.NoError(t, err, l...) - success = success && assert.Equal(t, tc.typ, typ, l...) + success = success && assert.Equal(t, tc.typ, el.Type, l...) - switch typ { + switch el.Type { case TypeJSONValue: if tc.discard { success = success && assert.NoError(t, el.Discard()) @@ -110,7 +109,7 @@ func TestEncoderDecoder(t *T) { case TypeByteBlob: br, err := el.DecodeBytes() success = success && assert.NoError(t, err, l...) - success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...) + success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint, l...) // if we're discarding we read some of the bytes and then will // discard the rest @@ -135,7 +134,7 @@ func TestEncoderDecoder(t *T) { case TypeStream: innerR, err := el.DecodeStream() success = success && assert.NoError(t, err, l...) - success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...) + success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint, l...) // if we're discarding we read some of the elements and then will // discard the rest @@ -281,3 +280,29 @@ func TestEncoderDecoder(t *T) { do(tc.stream...) } } + +func TestDoubleDiscardBytes(t *T) { + buf := new(bytes.Buffer) + w := NewStreamWriter(buf) + b1, b2 := mtest.RandBytes(10), mtest.RandBytes(10) + assert.NoError(t, w.EncodeBytes(uint(len(b1)), bytes.NewBuffer(b1))) + assert.NoError(t, w.EncodeBytes(uint(len(b2)), bytes.NewBuffer(b2))) + + r := NewStreamReader(buf) + el1 := r.Next() + r1, err := el1.DecodeBytes() + assert.NoError(t, err) + + b1got, err := ioutil.ReadAll(r1) + assert.NoError(t, err) + assert.Equal(t, b1, b1got) + + // discarding an already consumed byte blob shouldn't do anything + assert.NoError(t, el1.Discard()) + + r2, err := r.Next().DecodeBytes() + assert.NoError(t, err) + b2got, err := ioutil.ReadAll(r2) + assert.NoError(t, err) + assert.Equal(t, b2, b2got) +}