jstream: implement Element.Discard
This commit is contained in:
parent
5626f9171b
commit
7d74bafdb4
@ -116,6 +116,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// byte blob constants
|
// byte blob constants
|
||||||
@ -258,6 +259,43 @@ func (el Element) Stream() (*StreamReader, error) {
|
|||||||
return el.sr, nil
|
return el.sr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Discard reads whatever of this Element's data may be left on the StreamReader
|
||||||
|
// it came from and discards it, making the StreamReader ready to have Next call
|
||||||
|
// on it again.
|
||||||
|
//
|
||||||
|
// If the Element is a Byte Blob and is ended with io.EOF, or if the Element is
|
||||||
|
// a Stream and is ended with ErrStreamEnded then this returns nil. If either is
|
||||||
|
// canceled this also returns nil. All other errors are returned.
|
||||||
|
func (el Element) Discard() error {
|
||||||
|
typ, err := el.Type()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch typ {
|
||||||
|
case TypeByteBlob:
|
||||||
|
r, _ := el.Bytes()
|
||||||
|
_, err := io.Copy(ioutil.Discard, r)
|
||||||
|
if err == ErrCanceled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
case TypeStream:
|
||||||
|
stream, _ := el.Stream()
|
||||||
|
for {
|
||||||
|
nextEl := stream.Next()
|
||||||
|
if nextEl.Err == ErrStreamEnded || nextEl.Err == ErrCanceled {
|
||||||
|
return nil
|
||||||
|
} else if nextEl.Err != nil {
|
||||||
|
return nextEl.Err
|
||||||
|
} else if err := nextEl.Discard(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default: // TypeJSONValue
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// StreamReader represents a Stream from which Elements may be read using the
|
// StreamReader represents a Stream from which Elements may be read using the
|
||||||
// Next method.
|
// Next method.
|
||||||
type StreamReader struct {
|
type StreamReader struct {
|
||||||
|
@ -34,11 +34,12 @@ func (cb *cancelBuffer) Read(p []byte) (int, error) {
|
|||||||
|
|
||||||
func TestEncoderDecoder(t *T) {
|
func TestEncoderDecoder(t *T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
typ Type
|
typ Type
|
||||||
val interface{}
|
val interface{}
|
||||||
bytes []byte
|
bytes []byte
|
||||||
stream []testCase
|
stream []testCase
|
||||||
cancel bool
|
cancel bool
|
||||||
|
discard bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var randTestCase func(Type, bool) testCase
|
var randTestCase func(Type, bool) testCase
|
||||||
@ -57,8 +58,9 @@ func TestEncoderDecoder(t *T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tc := testCase{
|
tc := testCase{
|
||||||
typ: typ,
|
typ: typ,
|
||||||
cancel: cancelable && mtest.Rand.Intn(10) == 0,
|
cancel: cancelable && mtest.Rand.Intn(10) == 0,
|
||||||
|
discard: mtest.Rand.Intn(20) == 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
switch typ {
|
switch typ {
|
||||||
@ -94,6 +96,11 @@ func TestEncoderDecoder(t *T) {
|
|||||||
success = success && assert.NoError(t, err, l...)
|
success = success && assert.NoError(t, err, l...)
|
||||||
success = success && assert.Equal(t, tc.typ, typ, l...)
|
success = success && assert.Equal(t, tc.typ, typ, l...)
|
||||||
|
|
||||||
|
if tc.discard {
|
||||||
|
success = success && assert.NoError(t, el.Discard())
|
||||||
|
return success
|
||||||
|
}
|
||||||
|
|
||||||
switch typ {
|
switch typ {
|
||||||
case TypeJSONValue:
|
case TypeJSONValue:
|
||||||
var val interface{}
|
var val interface{}
|
||||||
|
Loading…
Reference in New Issue
Block a user