diff --git a/jstream/jstream.go b/jstream/jstream.go index fb4c7cc..f6af740 100644 --- a/jstream/jstream.go +++ b/jstream/jstream.go @@ -116,6 +116,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" ) // byte blob constants @@ -258,6 +259,43 @@ func (el Element) Stream() (*StreamReader, error) { return el.sr, nil } +// Discard reads whatever of this Element's data may be left on the StreamReader +// it came from and discards it, making the StreamReader ready to have Next call +// on it again. +// +// If the Element is a Byte Blob and is ended with io.EOF, or if the Element is +// a Stream and is ended with ErrStreamEnded then this returns nil. If either is +// canceled this also returns nil. All other errors are returned. +func (el Element) Discard() error { + typ, err := el.Type() + if err != nil { + return err + } + switch typ { + case TypeByteBlob: + r, _ := el.Bytes() + _, err := io.Copy(ioutil.Discard, r) + if err == ErrCanceled { + return nil + } + return err + case TypeStream: + stream, _ := el.Stream() + for { + nextEl := stream.Next() + if nextEl.Err == ErrStreamEnded || nextEl.Err == ErrCanceled { + return nil + } else if nextEl.Err != nil { + return nextEl.Err + } else if err := nextEl.Discard(); err != nil { + return err + } + } + default: // TypeJSONValue + return nil + } +} + // StreamReader represents a Stream from which Elements may be read using the // Next method. type StreamReader struct { diff --git a/jstream/jstream_test.go b/jstream/jstream_test.go index 3bd9480..a562720 100644 --- a/jstream/jstream_test.go +++ b/jstream/jstream_test.go @@ -34,11 +34,12 @@ func (cb *cancelBuffer) Read(p []byte) (int, error) { func TestEncoderDecoder(t *T) { type testCase struct { - typ Type - val interface{} - bytes []byte - stream []testCase - cancel bool + typ Type + val interface{} + bytes []byte + stream []testCase + cancel bool + discard bool } var randTestCase func(Type, bool) testCase @@ -57,8 +58,9 @@ func TestEncoderDecoder(t *T) { } tc := testCase{ - typ: typ, - cancel: cancelable && mtest.Rand.Intn(10) == 0, + typ: typ, + cancel: cancelable && mtest.Rand.Intn(10) == 0, + discard: mtest.Rand.Intn(20) == 0, } switch typ { @@ -94,6 +96,11 @@ func TestEncoderDecoder(t *T) { success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, tc.typ, typ, l...) + if tc.discard { + success = success && assert.NoError(t, el.Discard()) + return success + } + switch typ { case TypeJSONValue: var val interface{}