jstream: refactor how jstream.Element works to allow for double discarding

This commit is contained in:
Brian Picciano 2018-05-17 10:01:02 +00:00
parent 131ad8a718
commit 6b6355d77f
2 changed files with 77 additions and 55 deletions

View File

@ -178,7 +178,12 @@ type element struct {
// If there was an error reading the Element off the StreamReader that error is // If there was an error reading the Element off the StreamReader that error is
// kept in the Element and returned from any method call. // kept in the Element and returned from any method call.
type Element struct { type Element struct {
element Type Type
// SizeHint is the size hint which may have been optionally sent for
// ByteBlob and Stream elements, or zero. The hint is never required to be
// sent or to be accurate.
SizeHint uint
// Err will be set if the StreamReader encountered an error while reading // Err will be set if the StreamReader encountered an error while reading
// the next Element. If set then the Element is otherwise unusable. // the next Element. If set then the Element is otherwise unusable.
@ -188,90 +193,59 @@ type Element struct {
// depending on the behavior of the writer on the other end. // depending on the behavior of the writer on the other end.
Err error Err error
// needed for byte blobs and streams value json.RawMessage
sr StreamReader br io.Reader
} sr *StreamReader
// Type returns the Element's Type, or an error
func (el Element) Type() (Type, error) {
if el.Err != nil {
return "", el.Err
} else if el.element.StreamStart {
return TypeStream, nil
} else if el.element.BytesStart {
return TypeByteBlob, nil
} else if len(el.element.Value) > 0 {
return TypeJSONValue, nil
}
return "", errors.New("malformed Element, can't determine type")
} }
func (el Element) assertType(is Type) error { func (el Element) assertType(is Type) error {
typ, err := el.Type() if el.Err != nil {
if err != nil { return el.Err
return err } else if el.Type != is {
} else if typ != is { return ErrWrongType{Actual: el.Type}
return ErrWrongType{Actual: typ}
} }
return nil return nil
} }
// DecodeValue attempts to unmarshal a JSON Value Element's value into the given // DecodeValue attempts to unmarshal a JSON Value Element's value into the given
// receiver. // receiver.
//
// This method should not be called more than once.
func (el Element) DecodeValue(i interface{}) error { func (el Element) DecodeValue(i interface{}) error {
if err := el.assertType(TypeJSONValue); err != nil { if err := el.assertType(TypeJSONValue); err != nil {
return err return err
} }
return json.Unmarshal(el.element.Value, i) return json.Unmarshal(el.value, i)
}
// SizeHint returns the size hint which may have been optionally sent for
// ByteBlob and Stream elements, or zero. The hint is never required to be
// sent or to be accurate.
func (el Element) SizeHint() uint {
return el.element.SizeHint
} }
// DecodeBytes returns an io.Reader which will contain the contents of a // DecodeBytes returns an io.Reader which will contain the contents of a
// ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled // ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled
// before the StreamReader may be used again. // before the StreamReader may be used again.
//
// This method should not be called more than once.
func (el Element) DecodeBytes() (io.Reader, error) { func (el Element) DecodeBytes() (io.Reader, error) {
if err := el.assertType(TypeByteBlob); err != nil { if err := el.assertType(TypeByteBlob); err != nil {
return nil, err return nil, err
} }
return el.sr.readBytes(), nil return el.br, nil
} }
// DecodeStream returns the embedded stream represented by this Element as a // DecodeStream returns the embedded stream represented by this Element as a
// StreamReader. The returned StreamReader _must_ be iterated (via the Next // StreamReader. The returned StreamReader _must_ be iterated (via the Next
// method) till ErrStreamEnded or ErrCanceled is returned before the original // method) till ErrStreamEnded or ErrCanceled is returned before the original
// StreamReader may be used again. // StreamReader may be used again.
//
// This method should not be called more than once.
func (el Element) DecodeStream() (*StreamReader, error) { func (el Element) DecodeStream() (*StreamReader, error) {
if err := el.assertType(TypeStream); err != nil { if err := el.assertType(TypeStream); err != nil {
return nil, err return nil, err
} }
return &el.sr, nil return el.sr, nil
} }
// Discard reads whatever of this Element's data may be left on the StreamReader // Discard reads whatever of this Element's data may be left on the StreamReader
// it came from and discards it, making the StreamReader ready to have Next call // it came from and discards it, making the StreamReader ready to have Next
// on it again. // called on it again.
// //
// If the Element is a Byte Blob and is ended with io.EOF, or if the Element is // If the Element is a Byte Blob and is ended with io.EOF, or if the Element is
// a Stream and is ended with ErrStreamEnded then this returns nil. If either is // a Stream and is ended with ErrStreamEnded then this returns nil. If either is
// canceled this also returns nil. All other errors are returned. // canceled this also returns nil. All other errors are returned.
func (el Element) Discard() error { func (el Element) Discard() error {
typ, err := el.Type() switch el.Type {
if err != nil {
return err
}
switch typ {
case TypeByteBlob: case TypeByteBlob:
r, _ := el.DecodeBytes() r, _ := el.DecodeBytes()
_, err := io.Copy(ioutil.Discard, r) _, err := io.Copy(ioutil.Discard, r)
@ -314,6 +288,11 @@ func NewStreamReader(r io.Reader) *StreamReader {
return &StreamReader{orig: r} return &StreamReader{orig: r}
} }
func (sr *StreamReader) clone() *StreamReader {
sr2 := *sr
return &sr2
}
// pulls buffered bytes out of either the json.Decoder or byteBlobReader, if // pulls buffered bytes out of either the json.Decoder or byteBlobReader, if
// possible, and returns an io.MultiReader of those and orig. Will also set the // possible, and returns an io.MultiReader of those and orig. Will also set the
// json.Decoder/byteBlobReader to nil if that's where the bytes came from. // json.Decoder/byteBlobReader to nil if that's where the bytes came from.
@ -363,7 +342,25 @@ func (sr *StreamReader) Next() Element {
return Element{Err: err} return Element{Err: err}
} }
return Element{sr: *sr, element: el} if el.StreamStart {
return Element{
Type: TypeStream,
SizeHint: el.SizeHint,
sr: sr.clone(),
}
} else if el.BytesStart {
return Element{
Type: TypeByteBlob,
SizeHint: el.SizeHint,
br: sr.readBytes(),
}
} else if len(el.Value) > 0 {
return Element{
Type: TypeJSONValue,
value: el.Value,
}
}
return Element{Err: errors.New("malformed Element, can't determine type")}
} }
func (sr *StreamReader) readBytes() *byteBlobReader { func (sr *StreamReader) readBytes() *byteBlobReader {

View File

@ -59,9 +59,10 @@ func TestEncoderDecoder(t *T) {
cancel := cancelable && mtest.Rand.Intn(10) == 0 cancel := cancelable && mtest.Rand.Intn(10) == 0
tc := testCase{ tc := testCase{
typ: typ, typ: typ,
cancel: cancel, cancel: cancel,
discard: !cancel && mtest.Rand.Intn(20) == 0, // using cancelable here is gross, but whatever
discard: cancelable && !cancel && mtest.Rand.Intn(20) == 0,
} }
switch typ { switch typ {
@ -93,11 +94,9 @@ func TestEncoderDecoder(t *T) {
var assertRead func(*StreamReader, Element, testCase) bool var assertRead func(*StreamReader, Element, testCase) bool
assertRead = func(r *StreamReader, el Element, tc testCase) bool { assertRead = func(r *StreamReader, el Element, tc testCase) bool {
l, success := tcLog(tc), true l, success := tcLog(tc), true
typ, err := el.Type() success = success && assert.Equal(t, tc.typ, el.Type, l...)
success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, tc.typ, typ, l...)
switch typ { switch el.Type {
case TypeJSONValue: case TypeJSONValue:
if tc.discard { if tc.discard {
success = success && assert.NoError(t, el.Discard()) success = success && assert.NoError(t, el.Discard())
@ -110,7 +109,7 @@ func TestEncoderDecoder(t *T) {
case TypeByteBlob: case TypeByteBlob:
br, err := el.DecodeBytes() br, err := el.DecodeBytes()
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...) success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint, l...)
// if we're discarding we read some of the bytes and then will // if we're discarding we read some of the bytes and then will
// discard the rest // discard the rest
@ -135,7 +134,7 @@ func TestEncoderDecoder(t *T) {
case TypeStream: case TypeStream:
innerR, err := el.DecodeStream() innerR, err := el.DecodeStream()
success = success && assert.NoError(t, err, l...) success = success && assert.NoError(t, err, l...)
success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...) success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint, l...)
// if we're discarding we read some of the elements and then will // if we're discarding we read some of the elements and then will
// discard the rest // discard the rest
@ -281,3 +280,29 @@ func TestEncoderDecoder(t *T) {
do(tc.stream...) do(tc.stream...)
} }
} }
func TestDoubleDiscardBytes(t *T) {
buf := new(bytes.Buffer)
w := NewStreamWriter(buf)
b1, b2 := mtest.RandBytes(10), mtest.RandBytes(10)
assert.NoError(t, w.EncodeBytes(uint(len(b1)), bytes.NewBuffer(b1)))
assert.NoError(t, w.EncodeBytes(uint(len(b2)), bytes.NewBuffer(b2)))
r := NewStreamReader(buf)
el1 := r.Next()
r1, err := el1.DecodeBytes()
assert.NoError(t, err)
b1got, err := ioutil.ReadAll(r1)
assert.NoError(t, err)
assert.Equal(t, b1, b1got)
// discarding an already consumed byte blob shouldn't do anything
assert.NoError(t, el1.Discard())
r2, err := r.Next().DecodeBytes()
assert.NoError(t, err)
b2got, err := ioutil.ReadAll(r2)
assert.NoError(t, err)
assert.Equal(t, b2, b2got)
}