jstream: rename decode methods, ensure Discard properly handles partially consumed elements

This commit is contained in:
Brian Picciano 2018-05-17 09:13:10 +00:00
parent 92d026f53d
commit f47e8215eb
2 changed files with 63 additions and 23 deletions

View File

@ -189,7 +189,7 @@ type Element struct {
Err error Err error
// needed for byte blobs and streams // needed for byte blobs and streams
sr *StreamReader sr StreamReader
} }
// Type returns the Element's Type, or an error // Type returns the Element's Type, or an error
@ -216,11 +216,11 @@ func (el Element) assertType(is Type) error {
return nil return nil
} }
// Value attempts to unmarshal a JSON Value Element's value into the given // DecodeValue attempts to unmarshal a JSON Value Element's value into the given
// receiver. // receiver.
// //
// This method should not be called more than once. // This method should not be called more than once.
func (el Element) Value(i interface{}) error { func (el Element) DecodeValue(i interface{}) error {
if err := el.assertType(TypeJSONValue); err != nil { if err := el.assertType(TypeJSONValue); err != nil {
return err return err
} }
@ -234,29 +234,29 @@ func (el Element) SizeHint() uint {
return el.element.SizeHint return el.element.SizeHint
} }
// Bytes returns an io.Reader which will contain the contents of a ByteBlob // DecodeBytes returns an io.Reader which will contain the contents of a
// element. The io.Reader _must_ be read till io.EOF or ErrCanceled before the // ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled
// StreamReader may be used again. // before the StreamReader may be used again.
// //
// This method should not be called more than once. // This method should not be called more than once.
func (el Element) Bytes() (io.Reader, error) { func (el Element) DecodeBytes() (io.Reader, error) {
if err := el.assertType(TypeByteBlob); err != nil { if err := el.assertType(TypeByteBlob); err != nil {
return nil, err return nil, err
} }
return el.sr.readBytes(), nil return el.sr.readBytes(), nil
} }
// Stream returns the embedded stream represented by this Element as a // DecodeStream returns the embedded stream represented by this Element as a
// StreamReader. The returned StreamReader _must_ be iterated (via the Next // StreamReader. The returned StreamReader _must_ be iterated (via the Next
// method) till ErrStreamEnded or ErrCanceled is returned before the original // method) till ErrStreamEnded or ErrCanceled is returned before the original
// StreamReader may be used again. // StreamReader may be used again.
// //
// This method should not be called more than once. // This method should not be called more than once.
func (el Element) Stream() (*StreamReader, error) { func (el Element) DecodeStream() (*StreamReader, error) {
if err := el.assertType(TypeStream); err != nil { if err := el.assertType(TypeStream); err != nil {
return nil, err return nil, err
} }
return el.sr, nil return &el.sr, nil
} }
// Discard reads whatever of this Element's data may be left on the StreamReader // Discard reads whatever of this Element's data may be left on the StreamReader
@ -273,14 +273,14 @@ func (el Element) Discard() error {
} }
switch typ { switch typ {
case TypeByteBlob: case TypeByteBlob:
r, _ := el.Bytes() r, _ := el.DecodeBytes()
_, err := io.Copy(ioutil.Discard, r) _, err := io.Copy(ioutil.Discard, r)
if err == ErrCanceled { if err == ErrCanceled {
return nil return nil
} }
return err return err
case TypeStream: case TypeStream:
stream, _ := el.Stream() stream, _ := el.DecodeStream()
for { for {
nextEl := stream.Next() nextEl := stream.Next()
if nextEl.Err == ErrStreamEnded || nextEl.Err == ErrCanceled { if nextEl.Err == ErrStreamEnded || nextEl.Err == ErrCanceled {
@ -304,6 +304,9 @@ type StreamReader struct {
// only one of these can be set at a time // only one of these can be set at a time
dec *json.Decoder dec *json.Decoder
bbr *byteBlobReader bbr *byteBlobReader
// once set this StreamReader will always return this error on Next
err error
} }
// NewStreamReader takes an io.Reader and interprets it as a jstream Stream. // NewStreamReader takes an io.Reader and interprets it as a jstream Stream.
@ -337,6 +340,10 @@ func (sr *StreamReader) multiReader() io.Reader {
// //
// If the underlying io.Reader is closed the returned Err field will be io.EOF. // If the underlying io.Reader is closed the returned Err field will be io.EOF.
func (sr *StreamReader) Next() Element { func (sr *StreamReader) Next() Element {
if sr.err != nil {
return Element{Err: sr.err}
}
if sr.dec == nil { if sr.dec == nil {
sr.dec = json.NewDecoder(sr.multiReader()) sr.dec = json.NewDecoder(sr.multiReader())
} }
@ -346,14 +353,17 @@ func (sr *StreamReader) Next() Element {
if err = sr.dec.Decode(&el); err != nil { if err = sr.dec.Decode(&el); err != nil {
// welp // welp
} else if el.StreamEnd { } else if el.StreamEnd {
sr.err = ErrStreamEnded
err = ErrStreamEnded err = ErrStreamEnded
} else if el.StreamCancel { } else if el.StreamCancel {
sr.err = ErrCanceled
err = ErrCanceled err = ErrCanceled
} }
if err != nil { if err != nil {
return Element{Err: err} return Element{Err: err}
} }
return Element{sr: sr, element: el}
return Element{sr: *sr, element: el}
} }
func (sr *StreamReader) readBytes() *byteBlobReader { func (sr *StreamReader) readBytes() *byteBlobReader {

View File

@ -57,10 +57,11 @@ func TestEncoderDecoder(t *T) {
} }
} }
cancel := cancelable && mtest.Rand.Intn(10) == 0
tc := testCase{ tc := testCase{
typ: typ, typ: typ,
cancel: cancelable && mtest.Rand.Intn(10) == 0, cancel: cancel,
discard: mtest.Rand.Intn(20) == 0, discard: !cancel && mtest.Rand.Intn(20) == 0,
} }
switch typ { switch typ {
@ -96,33 +97,59 @@ func TestEncoderDecoder(t *T) {
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.typ, typ, l...) success = success && assert.Equal(t, tc.typ, typ, l...)
if tc.discard {
success = success && assert.NoError(t, el.Discard())
return success
}
switch typ { switch typ {
case TypeJSONValue: case TypeJSONValue:
if tc.discard {
success = success && assert.NoError(t, el.Discard())
break
}
var val interface{} var val interface{}
success = success && assert.NoError(t, el.Value(&val), l...) success = success && assert.NoError(t, el.DecodeValue(&val), l...)
success = success && assert.Equal(t, tc.val, val, l...) success = success && assert.Equal(t, tc.val, val, l...)
case TypeByteBlob: case TypeByteBlob:
br, err := el.Bytes() br, err := el.DecodeBytes()
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...) success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...)
// if we're discarding we read some of the bytes and then will
// discard the rest
var discardKeep int
if tc.discard {
discardKeep = mtest.Rand.Intn(len(tc.bytes) + 1)
br = io.LimitReader(br, int64(discardKeep))
}
all, err := ioutil.ReadAll(br) all, err := ioutil.ReadAll(br)
if tc.cancel { if tc.cancel {
success = success && assert.Equal(t, ErrCanceled, err, l...) success = success && assert.Equal(t, ErrCanceled, err, l...)
} else if tc.discard {
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.bytes[:discardKeep], all, l...)
success = success && assert.NoError(t, el.Discard())
} else { } else {
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.bytes, all, l...) success = success && assert.Equal(t, tc.bytes, all, l...)
} }
case TypeStream: case TypeStream:
innerR, err := el.Stream() innerR, err := el.DecodeStream()
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...) success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...)
// if we're discarding we read some of the elements and then will
// discard the rest
var discardKeep int
if tc.discard {
discardKeep = mtest.Rand.Intn(len(tc.stream) + 1)
}
n := 0 n := 0
for { for {
if tc.discard && n == discardKeep {
break
}
el := innerR.Next() el := innerR.Next()
if tc.cancel && el.Err == ErrCanceled { if tc.cancel && el.Err == ErrCanceled {
break break
@ -133,6 +160,9 @@ func TestEncoderDecoder(t *T) {
success = success && assertRead(innerR, el, tc.stream[n]) success = success && assertRead(innerR, el, tc.stream[n])
n++ n++
} }
if tc.discard {
success = success && assert.NoError(t, el.Discard())
}
} }
return success return success
} }