Implementation of jstream
This commit is contained in:
parent
58837a8186
commit
fa41ab5a01
55
jstream/byte_blob_reader.go
Normal file
55
jstream/byte_blob_reader.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package jstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/base64"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
type delimReader struct {
|
||||||
|
r io.Reader
|
||||||
|
delim byte
|
||||||
|
rest []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dr *delimReader) Read(b []byte) (int, error) {
|
||||||
|
if dr.delim != 0 {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n, err := dr.r.Read(b)
|
||||||
|
if i := bytes.IndexAny(b[:n], bbDelims); i >= 0 {
|
||||||
|
dr.delim = b[i]
|
||||||
|
dr.rest = append([]byte(nil), b[i+1:n]...)
|
||||||
|
return i, err
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type byteBlobReader struct {
|
||||||
|
dr *delimReader
|
||||||
|
dec io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func newByteBlobReader(r io.Reader) *byteBlobReader {
|
||||||
|
dr := &delimReader{r: r}
|
||||||
|
return &byteBlobReader{
|
||||||
|
dr: dr,
|
||||||
|
dec: base64.NewDecoder(base64.StdEncoding, dr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bbr *byteBlobReader) Read(into []byte) (int, error) {
|
||||||
|
n, err := bbr.dec.Read(into)
|
||||||
|
if bbr.dr.delim == bbEnd {
|
||||||
|
return n, io.EOF
|
||||||
|
} else if bbr.dr.delim == bbCancel {
|
||||||
|
return n, ErrCanceled
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns the bytes which were read off the underlying io.Reader but which
|
||||||
|
// haven't been consumed yet.
|
||||||
|
func (bbr *byteBlobReader) buffered() io.Reader {
|
||||||
|
return bytes.NewBuffer(bbr.dr.rest)
|
||||||
|
}
|
186
jstream/byte_blob_reader_test.go
Normal file
186
jstream/byte_blob_reader_test.go
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
package jstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/base64"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
. "testing"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mtest"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
type bbrTest struct {
|
||||||
|
wsSuffix []byte // whitespace
|
||||||
|
body []byte
|
||||||
|
shouldCancel bool
|
||||||
|
intoSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func randBBRTest(minBodySize, maxBodySize int) bbrTest {
|
||||||
|
var whitespace = []byte{' ', '\n', '\t', '\r'}
|
||||||
|
genWhitespace := func(n int) []byte {
|
||||||
|
ws := make([]byte, n)
|
||||||
|
for i := range ws {
|
||||||
|
ws[i] = whitespace[mtest.Rand.Intn(len(whitespace))]
|
||||||
|
}
|
||||||
|
return ws
|
||||||
|
}
|
||||||
|
|
||||||
|
body := mtest.RandBytes(minBodySize + mtest.Rand.Intn(maxBodySize-minBodySize))
|
||||||
|
return bbrTest{
|
||||||
|
wsSuffix: genWhitespace(mtest.Rand.Intn(10)),
|
||||||
|
body: body,
|
||||||
|
intoSize: 1 + mtest.Rand.Intn(len(body)+1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt bbrTest) msgAndArgs() []interface{} {
|
||||||
|
return []interface{}{"bt:%#v len(body):%d", bt, len(bt.body)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt bbrTest) mkBytes() []byte {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
enc := base64.NewEncoder(base64.StdEncoding, buf)
|
||||||
|
|
||||||
|
if bt.shouldCancel {
|
||||||
|
enc.Write(bt.body[:len(bt.body)/2])
|
||||||
|
enc.Close()
|
||||||
|
buf.WriteByte(bbCancel)
|
||||||
|
} else {
|
||||||
|
enc.Write(bt.body)
|
||||||
|
enc.Close()
|
||||||
|
buf.WriteByte(bbEnd)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.Write(bt.wsSuffix)
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt bbrTest) do(t *T) bool {
|
||||||
|
buf := bytes.NewBuffer(bt.mkBytes())
|
||||||
|
bbr := newByteBlobReader(buf)
|
||||||
|
|
||||||
|
into := make([]byte, bt.intoSize)
|
||||||
|
outBuf := new(bytes.Buffer)
|
||||||
|
_, err := io.CopyBuffer(outBuf, bbr, into)
|
||||||
|
if bt.shouldCancel {
|
||||||
|
return assert.Equal(t, ErrCanceled, err, bt.msgAndArgs()...)
|
||||||
|
}
|
||||||
|
if !assert.NoError(t, err, bt.msgAndArgs()...) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !assert.Equal(t, bt.body, outBuf.Bytes(), bt.msgAndArgs()...) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
fullRest := append(bbr.dr.rest, buf.Bytes()...)
|
||||||
|
if len(bt.wsSuffix) == 0 {
|
||||||
|
return assert.Empty(t, fullRest, bt.msgAndArgs()...)
|
||||||
|
}
|
||||||
|
return assert.Equal(t, bt.wsSuffix, fullRest, bt.msgAndArgs()...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestByteBlobReader(t *T) {
|
||||||
|
// some sanity tests
|
||||||
|
bbrTest{
|
||||||
|
body: []byte{2, 3, 4, 5},
|
||||||
|
intoSize: 4,
|
||||||
|
}.do(t)
|
||||||
|
bbrTest{
|
||||||
|
body: []byte{2, 3, 4, 5},
|
||||||
|
intoSize: 3,
|
||||||
|
}.do(t)
|
||||||
|
bbrTest{
|
||||||
|
body: []byte{2, 3, 4, 5},
|
||||||
|
shouldCancel: true,
|
||||||
|
intoSize: 3,
|
||||||
|
}.do(t)
|
||||||
|
|
||||||
|
// fuzz this bitch
|
||||||
|
for i := 0; i < 50000; i++ {
|
||||||
|
bt := randBBRTest(0, 1000)
|
||||||
|
if !bt.do(t) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bt.shouldCancel = true
|
||||||
|
if !bt.do(t) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkByteBlobReader(b *B) {
|
||||||
|
type bench struct {
|
||||||
|
bt bbrTest
|
||||||
|
body []byte
|
||||||
|
buf *bytes.Reader
|
||||||
|
cpBuf []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
mkTestSet := func(minBodySize, maxBodySize int) []bench {
|
||||||
|
n := 100
|
||||||
|
benches := make([]bench, n)
|
||||||
|
for i := range benches {
|
||||||
|
bt := randBBRTest(minBodySize, maxBodySize)
|
||||||
|
body := bt.mkBytes()
|
||||||
|
benches[i] = bench{
|
||||||
|
bt: bt,
|
||||||
|
body: body,
|
||||||
|
buf: bytes.NewReader(nil),
|
||||||
|
cpBuf: make([]byte, bt.intoSize),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return benches
|
||||||
|
}
|
||||||
|
|
||||||
|
testRaw := func(b *B, benches []bench) {
|
||||||
|
j := 0
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
if j >= len(benches) {
|
||||||
|
j = 0
|
||||||
|
}
|
||||||
|
benches[j].buf.Reset(benches[j].body)
|
||||||
|
io.CopyBuffer(ioutil.Discard, benches[j].buf, benches[j].cpBuf)
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testBBR := func(b *B, benches []bench) {
|
||||||
|
j := 0
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
if j >= len(benches) {
|
||||||
|
j = 0
|
||||||
|
}
|
||||||
|
benches[j].buf.Reset(benches[j].body)
|
||||||
|
bbr := newByteBlobReader(benches[j].buf)
|
||||||
|
io.CopyBuffer(ioutil.Discard, bbr, benches[j].cpBuf)
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
benches := []struct {
|
||||||
|
name string
|
||||||
|
minBodySize, maxBodySize int
|
||||||
|
}{
|
||||||
|
{"small", 0, 1000},
|
||||||
|
{"medium", 1000, 10000},
|
||||||
|
{"large", 10000, 100000},
|
||||||
|
{"xlarge", 100000, 1000000},
|
||||||
|
}
|
||||||
|
|
||||||
|
b.StopTimer()
|
||||||
|
for i := range benches {
|
||||||
|
b.Run(benches[i].name, func(b *B) {
|
||||||
|
set := mkTestSet(benches[i].minBodySize, benches[i].maxBodySize)
|
||||||
|
b.StartTimer()
|
||||||
|
b.Run("raw", func(b *B) {
|
||||||
|
testRaw(b, set)
|
||||||
|
})
|
||||||
|
b.Run("bbr", func(b *B) {
|
||||||
|
testBBR(b, set)
|
||||||
|
})
|
||||||
|
b.StopTimer()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
410
jstream/jstream.go
Normal file
410
jstream/jstream.go
Normal file
@ -0,0 +1,410 @@
|
|||||||
|
// Package jstream defines and implements the JSON Stream protocol
|
||||||
|
//
|
||||||
|
// Purpose
|
||||||
|
//
|
||||||
|
// The purpose of the jstream protocol is to provide a very simple layer on top
|
||||||
|
// of an existing JSON implementation to allow for streaming arbitrary numbers
|
||||||
|
// of JSON objects and byte blobs of arbitrary size in a standard way, and to
|
||||||
|
// allow for embedding streams within each other.
|
||||||
|
//
|
||||||
|
// The order of priorities when designing jstream is as follows:
|
||||||
|
// 1) Protocol simplicity
|
||||||
|
// 2) Implementation simplicity
|
||||||
|
// 3) Efficiency, both in parsing speed and bandwidth
|
||||||
|
//
|
||||||
|
// The justification for this is that protocol simplicity generally spills into
|
||||||
|
// implementation simplicity anyway, and accounts for future languages which
|
||||||
|
// have different properties than current ones. Parsing speed isn't much of a
|
||||||
|
// concern when reading data off a network (the primary use-case here), as RTT
|
||||||
|
// is always going to be the main blocker. Bandwidth can be a concern, but it's
|
||||||
|
// one better solved by wrapping the byte stream with a compressor.
|
||||||
|
//
|
||||||
|
// jstream protocol
|
||||||
|
//
|
||||||
|
// The jstream protocol is carried over a byte stream (in go: an io.Reader). To
|
||||||
|
// read the protocol a JSON object is read off the byte stream and inspected to
|
||||||
|
// determine what kind of jstream element it is.
|
||||||
|
//
|
||||||
|
// Multiple jstream elements are sequentially read off the same byte stream.
|
||||||
|
// Each element may be separated from the other by any amount of whitespace,
|
||||||
|
// with whitespace being defined as spaces, tabs, carriage returns, and/or
|
||||||
|
// newlines.
|
||||||
|
//
|
||||||
|
// jstream elements
|
||||||
|
//
|
||||||
|
// There are three jstream element types:
|
||||||
|
//
|
||||||
|
// * JSON Value: Any JSON value
|
||||||
|
// * Byte Blob: A stream of bytes of unknown, and possibly infinite, size
|
||||||
|
// * Stream: A heterogenous sequence of jstream elements of unknown, and
|
||||||
|
// possibly infinite, size
|
||||||
|
//
|
||||||
|
// JSON Value elements are defined as being JSON objects with a `val` field. The
|
||||||
|
// value of that field is the JSON Value.
|
||||||
|
//
|
||||||
|
// { "val":{"foo":"bar"} }
|
||||||
|
//
|
||||||
|
// Byte Blob elements are defined as being a JSON object with a `bytesStart`
|
||||||
|
// field with a value of `true`. Immediately following the JSON object are the
|
||||||
|
// bytes which are the Byte Blob, encoded using standard base64. Immediately
|
||||||
|
// following the encoded bytes is the character `$`, to indicate the bytes have
|
||||||
|
// been completely written. Alternatively the character `!` may be written
|
||||||
|
// immediately after the bytes to indicate writing was canceled prematurely by
|
||||||
|
// the writer.
|
||||||
|
//
|
||||||
|
// { "bytesStart":true }wXnxQHgUO8g=$
|
||||||
|
// { "bytesStart":true }WGYcTI8=!
|
||||||
|
//
|
||||||
|
// The JSON object may also contain a `sizeHint` field, which gives the
|
||||||
|
// estimated number of bytes in the Byte Blob (excluding the trailing
|
||||||
|
// delimiter). The hint is neither required to exist or be accurate if it does.
|
||||||
|
// The trailing delimeter (`$` or `!`) is required to be sent even if the hint
|
||||||
|
// is sent.
|
||||||
|
//
|
||||||
|
// Stream elements are defined as being a JSON object with a `streamStart` field
|
||||||
|
// with a value of `true`. Immediately following the JSON object will be zero
|
||||||
|
// more jstream elements of any type, possibly separated by whitespace. Finally
|
||||||
|
// the Stream is ended with another JSON object with a `streamEnd` field with a
|
||||||
|
// value of `true`.
|
||||||
|
//
|
||||||
|
// { "streamStart":true }
|
||||||
|
// { "val":{"foo":"bar"} }
|
||||||
|
// { "bytesStart":true }7TdlDQOnA6isxD9C$
|
||||||
|
// { "streamEnd":true }
|
||||||
|
//
|
||||||
|
// A Stream may also be prematurely canceled by the sending of a JSON object
|
||||||
|
// with the `streamCancel` field set to `true` (in place of one with `streamEnd`
|
||||||
|
// set to `true`).
|
||||||
|
//
|
||||||
|
// The Stream's original JSON object (the "head") may also have a `sizeHint`
|
||||||
|
// field, which gives the estimated number of jstream elements in the Stream.
|
||||||
|
// The hint is neither required to exist or be accurate if it does. The tail
|
||||||
|
// JSON object (with the `streamEnd` field) is required even if `sizeHint` is
|
||||||
|
// given.
|
||||||
|
//
|
||||||
|
// One of the elements in a Stream may itself be a Stream. In this way Streams
|
||||||
|
// may be embedded within each other.
|
||||||
|
//
|
||||||
|
// Here's an example of a complex Stream, which carries within it two different
|
||||||
|
// streams and some other elements:
|
||||||
|
//
|
||||||
|
// { "streamStart":true }
|
||||||
|
// { "val":{"foo":"bar" }
|
||||||
|
// { "streamStart":true, "sizeHint":2 }
|
||||||
|
// { "val":{"foo":"baz"} }
|
||||||
|
// { "val":{"foo":"biz"} }
|
||||||
|
// { "streamEnd":true }
|
||||||
|
// { "bytesStart":true }X7KCpLIjqIBJt9vA$
|
||||||
|
// { "streamStart":true }
|
||||||
|
// { "bytesStart":true }0jT+kNCuxHywUYy0$
|
||||||
|
// { "bytesStart":true }LUqjR6OACB2p1BG4$
|
||||||
|
// { "streamEnd":true }
|
||||||
|
// { "streamEnd":true }
|
||||||
|
//
|
||||||
|
// Finally, the byte stream off of which the jstream is based (i.e. the
|
||||||
|
// io.Reader) is implicitly treated as a Stream, with the Stream ending when the
|
||||||
|
// byte stream is closed.
|
||||||
|
//
|
||||||
|
package jstream
|
||||||
|
|
||||||
|
// TODO figure out how to expose the json.Encoder/Decoders so that users can set
|
||||||
|
// custom options on them (like UseNumber and whatnot)
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// byte blob constants
|
||||||
|
const (
|
||||||
|
bbEnd = '$'
|
||||||
|
bbCancel = '!'
|
||||||
|
bbDelims = string(bbEnd) + string(bbCancel)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Type is used to enumerate the types of jstream elements
|
||||||
|
type Type string
|
||||||
|
|
||||||
|
// The jstream element types
|
||||||
|
const (
|
||||||
|
TypeJSONValue Type = "jsonValue"
|
||||||
|
TypeByteBlob Type = "byteBlob"
|
||||||
|
TypeStream Type = "stream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrWrongType is an error returned by the Decode* methods on Decoder when the
|
||||||
|
// wrong decoding method has been called for the element which was read. The
|
||||||
|
// error contains the actual type of the element.
|
||||||
|
type ErrWrongType struct {
|
||||||
|
Actual Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err ErrWrongType) Error() string {
|
||||||
|
return fmt.Sprintf("wrong type, actual type is %q", err.Actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrCanceled is returned when reading either a Byte Blob or a Stream,
|
||||||
|
// indicating that the writer has prematurely canceled the element.
|
||||||
|
ErrCanceled = errors.New("canceled by writer")
|
||||||
|
|
||||||
|
// ErrStreamEnded is returned from Next when the Stream being read has been
|
||||||
|
// ended by the writer.
|
||||||
|
ErrStreamEnded = errors.New("stream ended")
|
||||||
|
)
|
||||||
|
|
||||||
|
type element struct {
|
||||||
|
Value json.RawMessage `json:"val,omitempty"`
|
||||||
|
|
||||||
|
BytesStart bool `json:"bytesStart,omitempty"`
|
||||||
|
|
||||||
|
StreamStart bool `json:"streamStart,omitempty"`
|
||||||
|
StreamEnd bool `json:"streamEnd,omitempty"`
|
||||||
|
StreamCancel bool `json:"streamCancel,omitempty"`
|
||||||
|
|
||||||
|
SizeHint uint `json:"sizeHint,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Element is a single jstream element which is read off a StreamReader.
|
||||||
|
//
|
||||||
|
// If a method is called which expects a particular Element type (e.g.
|
||||||
|
// DecodeValue, which expects a JSONValue Element) but the Element is not of
|
||||||
|
// that type then an ErrWrongType will be returned.
|
||||||
|
//
|
||||||
|
// If there was an error reading the Element off the StreamReader that error is
|
||||||
|
// kept in the Element and returned from any method call.
|
||||||
|
type Element struct {
|
||||||
|
element
|
||||||
|
|
||||||
|
// Err will be set if the StreamReader encountered an error while reading
|
||||||
|
// the next Element. If set then the Element is otherwise unusable.
|
||||||
|
//
|
||||||
|
// Err may be ErrCanceled or ErrStreamEnded, which would indicate the end of
|
||||||
|
// the stream but would not indicate the StreamReader is no longer usable,
|
||||||
|
// depending on the behavior of the writer on the other end.
|
||||||
|
Err error
|
||||||
|
|
||||||
|
// needed for byte blobs and streams
|
||||||
|
sr *StreamReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the Element's Type, or an error
|
||||||
|
func (el Element) Type() (Type, error) {
|
||||||
|
if el.Err != nil {
|
||||||
|
return "", el.Err
|
||||||
|
} else if el.element.StreamStart {
|
||||||
|
return TypeStream, nil
|
||||||
|
} else if el.element.BytesStart {
|
||||||
|
return TypeByteBlob, nil
|
||||||
|
} else if len(el.element.Value) > 0 {
|
||||||
|
return TypeJSONValue, nil
|
||||||
|
}
|
||||||
|
return "", errors.New("malformed Element, can't determine type")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (el Element) assertType(is Type) error {
|
||||||
|
typ, err := el.Type()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if typ != is {
|
||||||
|
return ErrWrongType{Actual: typ}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Value attempts to unmarshal a JSON Value Element's value into the given
|
||||||
|
// receiver.
|
||||||
|
//
|
||||||
|
// This method should not be called more than once.
|
||||||
|
func (el Element) Value(i interface{}) error {
|
||||||
|
if err := el.assertType(TypeJSONValue); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return json.Unmarshal(el.element.Value, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SizeHint returns the size hint which may have been optionally sent for
|
||||||
|
// ByteBlob and Stream elements, or zero. The hint is never required to be
|
||||||
|
// sent or to be accurate.
|
||||||
|
func (el Element) SizeHint() uint {
|
||||||
|
return el.element.SizeHint
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bytes returns an io.Reader which will contain the contents of a ByteBlob
|
||||||
|
// element. The io.Reader _must_ be read till io.EOF or ErrCanceled before the
|
||||||
|
// StreamReader may be used again.
|
||||||
|
//
|
||||||
|
// This method should not be called more than once.
|
||||||
|
func (el Element) Bytes() (io.Reader, error) {
|
||||||
|
if err := el.assertType(TypeByteBlob); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return el.sr.readBytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream returns the embedded stream represented by this Element as a
|
||||||
|
// StreamReader. The returned StreamReader _must_ be iterated (via the Next
|
||||||
|
// method) till ErrStreamEnded or ErrCanceled is returned before the original
|
||||||
|
// StreamReader may be used again.
|
||||||
|
//
|
||||||
|
// This method should not be called more than once.
|
||||||
|
func (el Element) Stream() (*StreamReader, error) {
|
||||||
|
if err := el.assertType(TypeStream); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return el.sr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamReader represents a Stream from which Elements may be read using the
|
||||||
|
// Next method.
|
||||||
|
type StreamReader struct {
|
||||||
|
orig io.Reader
|
||||||
|
|
||||||
|
// only one of these can be set at a time
|
||||||
|
dec *json.Decoder
|
||||||
|
bbr *byteBlobReader
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamReader takes an io.Reader and interprets it as a jstream Stream.
|
||||||
|
func NewStreamReader(r io.Reader) *StreamReader {
|
||||||
|
return &StreamReader{orig: r}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pulls buffered bytes out of either the json.Decoder or byteBlobReader, if
|
||||||
|
// possible, and returns an io.MultiReader of those and orig. Will also set the
|
||||||
|
// json.Decoder/byteBlobReader to nil if that's where the bytes came from.
|
||||||
|
func (sr *StreamReader) multiReader() io.Reader {
|
||||||
|
if sr.dec != nil {
|
||||||
|
buf := sr.dec.Buffered()
|
||||||
|
sr.dec = nil
|
||||||
|
return io.MultiReader(buf, sr.orig)
|
||||||
|
} else if sr.bbr != nil {
|
||||||
|
buf := sr.bbr.buffered()
|
||||||
|
sr.bbr = nil
|
||||||
|
return io.MultiReader(buf, sr.orig)
|
||||||
|
}
|
||||||
|
return sr.orig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next reads, decodes, and returns the next Element off the StreamReader. If
|
||||||
|
// the Element is a ByteBlob or embedded Stream then it _must_ be fully consumed
|
||||||
|
// before Next is called on this StreamReader again.
|
||||||
|
//
|
||||||
|
// The returned Element's Err field will be ErrStreamEnd if the Stream was
|
||||||
|
// ended, or ErrCanceled if it was canceled, and this StreamReader should not be
|
||||||
|
// used again in those cases.
|
||||||
|
//
|
||||||
|
// If the underlying io.Reader is closed the returned Err field will be io.EOF.
|
||||||
|
func (sr *StreamReader) Next() Element {
|
||||||
|
if sr.dec == nil {
|
||||||
|
sr.dec = json.NewDecoder(sr.multiReader())
|
||||||
|
}
|
||||||
|
|
||||||
|
var el element
|
||||||
|
var err error
|
||||||
|
if err = sr.dec.Decode(&el); err != nil {
|
||||||
|
// welp
|
||||||
|
} else if el.StreamEnd {
|
||||||
|
err = ErrStreamEnded
|
||||||
|
} else if el.StreamCancel {
|
||||||
|
err = ErrCanceled
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return Element{Err: err}
|
||||||
|
}
|
||||||
|
return Element{sr: sr, element: el}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sr *StreamReader) readBytes() *byteBlobReader {
|
||||||
|
sr.bbr = newByteBlobReader(sr.multiReader())
|
||||||
|
return sr.bbr
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// StreamWriter represents a Stream to which Elements may be written using any
|
||||||
|
// of the Encode methods.
|
||||||
|
type StreamWriter struct {
|
||||||
|
w io.Writer
|
||||||
|
enc *json.Encoder
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamWriter takes an io.Writer and returns a StreamWriter which will
|
||||||
|
// write to it.
|
||||||
|
func NewStreamWriter(w io.Writer) *StreamWriter {
|
||||||
|
return &StreamWriter{w: w, enc: json.NewEncoder(w)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeValue marshals the given value and writes it to the Stream as a
|
||||||
|
// JSONValue element.
|
||||||
|
func (sw *StreamWriter) EncodeValue(i interface{}) error {
|
||||||
|
b, err := json.Marshal(i)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return sw.enc.Encode(element{Value: b})
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeBytes copies the given io.Reader, until io.EOF, onto the Stream as a
|
||||||
|
// ByteBlob element. This method will block until copying is completed or an
|
||||||
|
// error is encountered.
|
||||||
|
//
|
||||||
|
// If the io.Reader returns any error which isn't io.EOF then the ByteBlob is
|
||||||
|
// canceled and that error is returned from this method. Otherwise nil is
|
||||||
|
// returned.
|
||||||
|
//
|
||||||
|
// sizeHint may be given if it's known or can be guessed how many bytes the
|
||||||
|
// io.Reader will read out.
|
||||||
|
func (sw *StreamWriter) EncodeBytes(sizeHint uint, r io.Reader) error {
|
||||||
|
if err := sw.enc.Encode(element{
|
||||||
|
BytesStart: true,
|
||||||
|
SizeHint: sizeHint,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
enc := base64.NewEncoder(base64.StdEncoding, sw.w)
|
||||||
|
if _, err := io.Copy(enc, r); err != nil {
|
||||||
|
// if canceling doesn't work then the whole connection is broken and
|
||||||
|
// it's not worth doing anything about. if nothing else the brokeness of
|
||||||
|
// it will be discovered the next time it is used.
|
||||||
|
sw.w.Write([]byte{bbCancel})
|
||||||
|
return err
|
||||||
|
} else if err := enc.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := sw.w.Write([]byte{bbEnd}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeStream encodes an embedded Stream element onto the Stream. The callback
|
||||||
|
// is given a new StreamWriter which represents the embedded Stream and to which
|
||||||
|
// any elemens may be written. This methods blocks until the callback has
|
||||||
|
// returned.
|
||||||
|
//
|
||||||
|
// If the callback returns nil the Stream is ended normally. If it returns
|
||||||
|
// anything else the embedded Stream is canceled and that error is returned from
|
||||||
|
// this method.
|
||||||
|
//
|
||||||
|
// sizeHint may be given if it's known or can be guessed how many elements will
|
||||||
|
// be in the embedded Stream.
|
||||||
|
func (sw *StreamWriter) EncodeStream(sizeHint uint, fn func(*StreamWriter) error) error {
|
||||||
|
if err := sw.enc.Encode(element{
|
||||||
|
StreamStart: true,
|
||||||
|
SizeHint: sizeHint,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
|
||||||
|
} else if err := fn(sw); err != nil {
|
||||||
|
// as when canceling a byte blob, we don't really care if this errors
|
||||||
|
sw.enc.Encode(element{StreamCancel: true})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return sw.enc.Encode(element{StreamEnd: true})
|
||||||
|
}
|
246
jstream/jstream_test.go
Normal file
246
jstream/jstream_test.go
Normal file
@ -0,0 +1,246 @@
|
|||||||
|
package jstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"sync"
|
||||||
|
. "testing"
|
||||||
|
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mtest"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
type cancelBuffer struct {
|
||||||
|
lr *io.LimitedReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCancelBuffer(b []byte) io.Reader {
|
||||||
|
return &cancelBuffer{
|
||||||
|
lr: &io.LimitedReader{
|
||||||
|
R: bytes.NewBuffer(b),
|
||||||
|
N: int64(len(b) / 2),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *cancelBuffer) Read(p []byte) (int, error) {
|
||||||
|
if cb.lr.N == 0 {
|
||||||
|
return 0, ErrCanceled
|
||||||
|
}
|
||||||
|
return cb.lr.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEncoderDecoder(t *T) {
|
||||||
|
type testCase struct {
|
||||||
|
typ Type
|
||||||
|
val interface{}
|
||||||
|
bytes []byte
|
||||||
|
stream []testCase
|
||||||
|
cancel bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var randTestCase func(Type, bool) testCase
|
||||||
|
randTestCase = func(typ Type, cancelable bool) testCase {
|
||||||
|
// if typ isn't given then use a random one
|
||||||
|
if typ == "" {
|
||||||
|
pick := mtest.Rand.Intn(5)
|
||||||
|
switch {
|
||||||
|
case pick == 0:
|
||||||
|
typ = TypeStream
|
||||||
|
case pick < 4:
|
||||||
|
typ = TypeJSONValue
|
||||||
|
default:
|
||||||
|
typ = TypeByteBlob
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tc := testCase{
|
||||||
|
typ: typ,
|
||||||
|
cancel: cancelable && mtest.Rand.Intn(10) == 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case TypeJSONValue:
|
||||||
|
tc.val = map[string]interface{}{
|
||||||
|
mtest.RandHex(8): mtest.RandHex(8),
|
||||||
|
mtest.RandHex(8): mtest.RandHex(8),
|
||||||
|
mtest.RandHex(8): mtest.RandHex(8),
|
||||||
|
mtest.RandHex(8): mtest.RandHex(8),
|
||||||
|
mtest.RandHex(8): mtest.RandHex(8),
|
||||||
|
}
|
||||||
|
return tc
|
||||||
|
case TypeByteBlob:
|
||||||
|
tc.bytes = mtest.RandBytes(mtest.Rand.Intn(256))
|
||||||
|
return tc
|
||||||
|
case TypeStream:
|
||||||
|
for i := mtest.Rand.Intn(10); i > 0; i-- {
|
||||||
|
tc.stream = append(tc.stream, randTestCase("", true))
|
||||||
|
}
|
||||||
|
return tc
|
||||||
|
}
|
||||||
|
panic("shouldn't get here")
|
||||||
|
}
|
||||||
|
|
||||||
|
tcLog := func(tcs ...testCase) []interface{} {
|
||||||
|
return []interface{}{"%#v", tcs}
|
||||||
|
}
|
||||||
|
|
||||||
|
var assertRead func(*StreamReader, Element, testCase) bool
|
||||||
|
assertRead = func(r *StreamReader, el Element, tc testCase) bool {
|
||||||
|
l, success := tcLog(tc), true
|
||||||
|
typ, err := el.Type()
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
success = success && assert.Equal(t, tc.typ, typ, l...)
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case TypeJSONValue:
|
||||||
|
var val interface{}
|
||||||
|
success = success && assert.NoError(t, el.Value(&val), l...)
|
||||||
|
success = success && assert.Equal(t, tc.val, val, l...)
|
||||||
|
case TypeByteBlob:
|
||||||
|
br, err := el.Bytes()
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint(), l...)
|
||||||
|
all, err := ioutil.ReadAll(br)
|
||||||
|
if tc.cancel {
|
||||||
|
success = success && assert.Equal(t, ErrCanceled, err, l...)
|
||||||
|
} else {
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
success = success && assert.Equal(t, tc.bytes, all, l...)
|
||||||
|
}
|
||||||
|
case TypeStream:
|
||||||
|
innerR, err := el.Stream()
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
success = success && assert.Equal(t, uint(len(tc.stream)), el.SizeHint(), l...)
|
||||||
|
n := 0
|
||||||
|
for {
|
||||||
|
el := innerR.Next()
|
||||||
|
if tc.cancel && el.Err == ErrCanceled {
|
||||||
|
break
|
||||||
|
} else if n == len(tc.stream) {
|
||||||
|
success = success && assert.Equal(t, ErrStreamEnded, el.Err, l...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
success = success && assertRead(innerR, el, tc.stream[n])
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success
|
||||||
|
}
|
||||||
|
|
||||||
|
var assertWrite func(*StreamWriter, testCase) bool
|
||||||
|
assertWrite = func(w *StreamWriter, tc testCase) bool {
|
||||||
|
l, success := tcLog(tc), true
|
||||||
|
switch tc.typ {
|
||||||
|
case TypeJSONValue:
|
||||||
|
success = success && assert.NoError(t, w.EncodeValue(tc.val), l...)
|
||||||
|
case TypeByteBlob:
|
||||||
|
if tc.cancel {
|
||||||
|
r := newCancelBuffer(tc.bytes)
|
||||||
|
err := w.EncodeBytes(uint(len(tc.bytes)), r)
|
||||||
|
success = success && assert.Equal(t, ErrCanceled, err, l...)
|
||||||
|
} else {
|
||||||
|
r := bytes.NewBuffer(tc.bytes)
|
||||||
|
err := w.EncodeBytes(uint(len(tc.bytes)), r)
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
}
|
||||||
|
case TypeStream:
|
||||||
|
err := w.EncodeStream(uint(len(tc.stream)), func(innerW *StreamWriter) error {
|
||||||
|
if len(tc.stream) == 0 && tc.cancel {
|
||||||
|
return ErrCanceled
|
||||||
|
}
|
||||||
|
for i := range tc.stream {
|
||||||
|
if tc.cancel && i == len(tc.stream)/2 {
|
||||||
|
return ErrCanceled
|
||||||
|
} else if !assertWrite(w, tc.stream[i]) {
|
||||||
|
return errors.New("we got problems")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if tc.cancel {
|
||||||
|
success = success && assert.Equal(t, ErrCanceled, err, l...)
|
||||||
|
} else {
|
||||||
|
success = success && assert.NoError(t, err, l...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success
|
||||||
|
}
|
||||||
|
|
||||||
|
do := func(tcs ...testCase) bool {
|
||||||
|
// we keep a copy of all read/written bytes for debugging, but generally
|
||||||
|
// don't actually log them
|
||||||
|
ioR, ioW := io.Pipe()
|
||||||
|
cpR, cpW := new(bytes.Buffer), new(bytes.Buffer)
|
||||||
|
r, w := NewStreamReader(io.TeeReader(ioR, cpR)), NewStreamWriter(io.MultiWriter(ioW, cpW))
|
||||||
|
|
||||||
|
readCh, writeCh := make(chan bool, 1), make(chan bool, 1)
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
success := true
|
||||||
|
for _, tc := range tcs {
|
||||||
|
success = success && assertRead(r, r.Next(), tc)
|
||||||
|
}
|
||||||
|
success = success && assert.Equal(t, io.EOF, r.Next().Err)
|
||||||
|
readCh <- success
|
||||||
|
ioR.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
success := true
|
||||||
|
for _, tc := range tcs {
|
||||||
|
success = success && assertWrite(w, tc)
|
||||||
|
}
|
||||||
|
writeCh <- success
|
||||||
|
ioW.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
//log.Printf("data written:%q", cpW.Bytes())
|
||||||
|
//log.Printf("data read: %q", cpR.Bytes())
|
||||||
|
|
||||||
|
if !(<-readCh && <-writeCh) {
|
||||||
|
assert.FailNow(t, "test case failed", tcLog(tcs...)...)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// some basic test cases
|
||||||
|
do() // empty stream
|
||||||
|
do(randTestCase(TypeJSONValue, false))
|
||||||
|
do(randTestCase(TypeByteBlob, false))
|
||||||
|
do(
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
)
|
||||||
|
do(
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
randTestCase(TypeByteBlob, false),
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
)
|
||||||
|
do(
|
||||||
|
randTestCase(TypeByteBlob, false),
|
||||||
|
randTestCase(TypeByteBlob, false),
|
||||||
|
randTestCase(TypeByteBlob, false),
|
||||||
|
)
|
||||||
|
do(
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
randTestCase(TypeStream, false),
|
||||||
|
randTestCase(TypeJSONValue, false),
|
||||||
|
)
|
||||||
|
|
||||||
|
// some special cases, empty elements which are canceled
|
||||||
|
do(testCase{typ: TypeStream, cancel: true})
|
||||||
|
do(testCase{typ: TypeByteBlob, cancel: true})
|
||||||
|
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
tc := randTestCase(TypeStream, false)
|
||||||
|
do(tc.stream...)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user