From d20ad7830cbae3ccc2724a4a03a7c1e1f4e1244d Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sun, 27 May 2018 03:38:03 +0000 Subject: [PATCH] jstream: normalize type names to make a bit more sense everywhere --- .../{byte_blob_reader.go => bytes_reader.go} | 18 ++-- ...ob_reader_test.go => bytes_reader_test.go} | 40 ++++---- jstream/jstream.go | 98 ++++++++++--------- jstream/jstream_test.go | 46 ++++----- mrpc/proto.md | 18 ++-- 5 files changed, 111 insertions(+), 109 deletions(-) rename jstream/{byte_blob_reader.go => bytes_reader.go} (66%) rename jstream/{byte_blob_reader_test.go => bytes_reader_test.go} (83%) diff --git a/jstream/byte_blob_reader.go b/jstream/bytes_reader.go similarity index 66% rename from jstream/byte_blob_reader.go rename to jstream/bytes_reader.go index 236c068..7d72be5 100644 --- a/jstream/byte_blob_reader.go +++ b/jstream/bytes_reader.go @@ -25,24 +25,24 @@ func (dr *delimReader) Read(b []byte) (int, error) { return n, err } -type byteBlobReader struct { +type bytesReader struct { dr *delimReader dec io.Reader } -func newByteBlobReader(r io.Reader) *byteBlobReader { +func newBytesReader(r io.Reader) *bytesReader { dr := &delimReader{r: r} - return &byteBlobReader{ + return &bytesReader{ dr: dr, dec: base64.NewDecoder(base64.StdEncoding, dr), } } -func (bbr *byteBlobReader) Read(into []byte) (int, error) { - n, err := bbr.dec.Read(into) - if bbr.dr.delim == bbEnd { +func (br *bytesReader) Read(into []byte) (int, error) { + n, err := br.dec.Read(into) + if br.dr.delim == bbEnd { return n, io.EOF - } else if bbr.dr.delim == bbCancel { + } else if br.dr.delim == bbCancel { return n, ErrCanceled } return n, err @@ -50,6 +50,6 @@ func (bbr *byteBlobReader) Read(into []byte) (int, error) { // returns the bytes which were read off the underlying io.Reader but which // haven't been consumed yet. -func (bbr *byteBlobReader) buffered() io.Reader { - return bytes.NewBuffer(bbr.dr.rest) +func (br *bytesReader) buffered() io.Reader { + return bytes.NewBuffer(br.dr.rest) } diff --git a/jstream/byte_blob_reader_test.go b/jstream/bytes_reader_test.go similarity index 83% rename from jstream/byte_blob_reader_test.go rename to jstream/bytes_reader_test.go index 9480dae..c5dbfab 100644 --- a/jstream/byte_blob_reader_test.go +++ b/jstream/bytes_reader_test.go @@ -11,14 +11,14 @@ import ( "github.com/stretchr/testify/assert" ) -type bbrTest struct { +type brTest struct { wsSuffix []byte // whitespace body []byte shouldCancel bool intoSize int } -func randBBRTest(minBodySize, maxBodySize int) bbrTest { +func randBRTest(minBodySize, maxBodySize int) brTest { var whitespace = []byte{' ', '\n', '\t', '\r'} genWhitespace := func(n int) []byte { ws := make([]byte, n) @@ -29,18 +29,18 @@ func randBBRTest(minBodySize, maxBodySize int) bbrTest { } body := mtest.RandBytes(minBodySize + mtest.Rand.Intn(maxBodySize-minBodySize)) - return bbrTest{ + return brTest{ wsSuffix: genWhitespace(mtest.Rand.Intn(10)), body: body, intoSize: 1 + mtest.Rand.Intn(len(body)+1), } } -func (bt bbrTest) msgAndArgs() []interface{} { +func (bt brTest) msgAndArgs() []interface{} { return []interface{}{"bt:%#v len(body):%d", bt, len(bt.body)} } -func (bt bbrTest) mkBytes() []byte { +func (bt brTest) mkBytes() []byte { buf := new(bytes.Buffer) enc := base64.NewEncoder(base64.StdEncoding, buf) @@ -58,13 +58,13 @@ func (bt bbrTest) mkBytes() []byte { return buf.Bytes() } -func (bt bbrTest) do(t *T) bool { +func (bt brTest) do(t *T) bool { buf := bytes.NewBuffer(bt.mkBytes()) - bbr := newByteBlobReader(buf) + br := newBytesReader(buf) into := make([]byte, bt.intoSize) outBuf := new(bytes.Buffer) - _, err := io.CopyBuffer(outBuf, bbr, into) + _, err := io.CopyBuffer(outBuf, br, into) if bt.shouldCancel { return assert.Equal(t, ErrCanceled, err, bt.msgAndArgs()...) } @@ -74,7 +74,7 @@ func (bt bbrTest) do(t *T) bool { if !assert.Equal(t, bt.body, outBuf.Bytes(), bt.msgAndArgs()...) { return false } - fullRest := append(bbr.dr.rest, buf.Bytes()...) + fullRest := append(br.dr.rest, buf.Bytes()...) if len(bt.wsSuffix) == 0 { return assert.Empty(t, fullRest, bt.msgAndArgs()...) } @@ -83,15 +83,15 @@ func (bt bbrTest) do(t *T) bool { func TestByteBlobReader(t *T) { // some sanity tests - bbrTest{ + brTest{ body: []byte{2, 3, 4, 5}, intoSize: 4, }.do(t) - bbrTest{ + brTest{ body: []byte{2, 3, 4, 5}, intoSize: 3, }.do(t) - bbrTest{ + brTest{ body: []byte{2, 3, 4, 5}, shouldCancel: true, intoSize: 3, @@ -99,7 +99,7 @@ func TestByteBlobReader(t *T) { // fuzz this bitch for i := 0; i < 50000; i++ { - bt := randBBRTest(0, 1000) + bt := randBRTest(0, 1000) if !bt.do(t) { return } @@ -112,7 +112,7 @@ func TestByteBlobReader(t *T) { func BenchmarkByteBlobReader(b *B) { type bench struct { - bt bbrTest + bt brTest body []byte buf *bytes.Reader cpBuf []byte @@ -122,7 +122,7 @@ func BenchmarkByteBlobReader(b *B) { n := 100 benches := make([]bench, n) for i := range benches { - bt := randBBRTest(minBodySize, maxBodySize) + bt := randBRTest(minBodySize, maxBodySize) body := bt.mkBytes() benches[i] = bench{ bt: bt, @@ -146,15 +146,15 @@ func BenchmarkByteBlobReader(b *B) { } } - testBBR := func(b *B, benches []bench) { + testBR := func(b *B, benches []bench) { j := 0 for i := 0; i < b.N; i++ { if j >= len(benches) { j = 0 } benches[j].buf.Reset(benches[j].body) - bbr := newByteBlobReader(benches[j].buf) - io.CopyBuffer(ioutil.Discard, bbr, benches[j].cpBuf) + br := newBytesReader(benches[j].buf) + io.CopyBuffer(ioutil.Discard, br, benches[j].cpBuf) j++ } } @@ -177,8 +177,8 @@ func BenchmarkByteBlobReader(b *B) { b.Run("raw", func(b *B) { testRaw(b, set) }) - b.Run("bbr", func(b *B) { - testBBR(b, set) + b.Run("br", func(b *B) { + testBR(b, set) }) b.StopTimer() }) diff --git a/jstream/jstream.go b/jstream/jstream.go index 8887d6c..9b27bdc 100644 --- a/jstream/jstream.go +++ b/jstream/jstream.go @@ -34,32 +34,30 @@ // // There are three jstream element types: // -// * JSON Value: Any JSON value -// * Byte Blob: A stream of bytes of unknown, and possibly infinite, size +// * Value: Any JSON value +// * Bytes: A stream of bytes of unknown, and possibly infinite, size // * Stream: A heterogenous sequence of jstream elements of unknown, and // possibly infinite, size // -// JSON Value elements are defined as being JSON objects with a `val` field. The +// Value elements are defined as being JSON objects with a `val` field. The // value of that field is the JSON Value. // // { "val":{"foo":"bar"} } // -// Byte Blob elements are defined as being a JSON object with a `bytesStart` -// field with a value of `true`. Immediately following the JSON object are the -// bytes which are the Byte Blob, encoded using standard base64. Immediately -// following the encoded bytes is the character `$`, to indicate the bytes have -// been completely written. Alternatively the character `!` may be written -// immediately after the bytes to indicate writing was canceled prematurely by -// the writer. +// Bytes elements are defined as being a JSON object with a `bytesStart` field +// with a value of `true`. Immediately following the JSON object are the bytes +// encoded using standard base64. Immediately following the encoded bytes is the +// character `$`, to indicate the bytes have been completely written. +// Alternatively the character `!` may be written immediately after the bytes to +// indicate writing was canceled prematurely by the writer. // // { "bytesStart":true }wXnxQHgUO8g=$ // { "bytesStart":true }WGYcTI8=! // // The JSON object may also contain a `sizeHint` field, which gives the -// estimated number of bytes in the Byte Blob (excluding the trailing -// delimiter). The hint is neither required to exist or be accurate if it does. -// The trailing delimeter (`$` or `!`) is required to be sent even if the hint -// is sent. +// estimated number of bytes (excluding the trailing delimiter). The hint is +// neither required to exist or be accurate if it does. The trailing delimeter +// (`$` or `!`) is required to be sent even if the hint is sent. // // Stream elements are defined as being a JSON object with a `streamStart` field // with a value of `true`. Immediately following the JSON object will be zero @@ -110,6 +108,10 @@ package jstream // TODO figure out how to expose the json.Encoder/Decoders so that users can set // custom options on them (like UseNumber and whatnot) +// TODO attempt to refactor this into using channels? or write a layer on top +// which does so? +// TODO TypeNil? + import ( "encoding/base64" "encoding/json" @@ -119,7 +121,7 @@ import ( "io/ioutil" ) -// byte blob constants +// bytes constants const ( bbEnd = '$' bbCancel = '!' @@ -131,9 +133,9 @@ type Type string // The jstream element types const ( - TypeJSONValue Type = "jsonValue" - TypeByteBlob Type = "byteBlob" - TypeStream Type = "stream" + TypeValue Type = "value" + TypeBytes Type = "bytes" + TypeStream Type = "stream" ) // ErrWrongType is an error returned by the Decode* methods on Decoder when the @@ -148,7 +150,7 @@ func (err ErrWrongType) Error() string { } var ( - // ErrCanceled is returned when reading either a Byte Blob or a Stream, + // ErrCanceled is returned when reading either a Bytes or a Stream element, // indicating that the writer has prematurely canceled the element. ErrCanceled = errors.New("canceled by writer") @@ -172,8 +174,8 @@ type element struct { // Element is a single jstream element which is read off a StreamReader. // // If a method is called which expects a particular Element type (e.g. -// DecodeValue, which expects a JSONValue Element) but the Element is not of -// that type then an ErrWrongType will be returned. +// DecodeValue, which expects a Value Element) but the Element is not of that +// type then an ErrWrongType will be returned. // // If there was an error reading the Element off the StreamReader that error is // kept in the Element and returned from any method call. @@ -207,20 +209,20 @@ func (el Element) assertType(is Type) error { return nil } -// DecodeValue attempts to unmarshal a JSON Value Element's value into the given +// DecodeValue attempts to unmarshal a Value Element's value into the given // receiver. func (el Element) DecodeValue(i interface{}) error { - if err := el.assertType(TypeJSONValue); err != nil { + if err := el.assertType(TypeValue); err != nil { return err } return json.Unmarshal(el.value, i) } // DecodeBytes returns an io.Reader which will contain the contents of a -// ByteBlob element. The io.Reader _must_ be read till io.EOF or ErrCanceled -// before the StreamReader may be used again. +// Bytes element. The io.Reader _must_ be read till io.EOF or ErrCanceled is +// returned before the StreamReader may be used again. func (el Element) DecodeBytes() (io.Reader, error) { - if err := el.assertType(TypeByteBlob); err != nil { + if err := el.assertType(TypeBytes); err != nil { return nil, err } return el.br, nil @@ -241,12 +243,12 @@ func (el Element) DecodeStream() (*StreamReader, error) { // it came from and discards it, making the StreamReader ready to have Next // called on it again. // -// If the Element is a Byte Blob and is ended with io.EOF, or if the Element is -// a Stream and is ended with ErrStreamEnded then this returns nil. If either is +// If the Element is a Bytes and is ended with io.EOF, or if the Element is a +// Stream and is ended with ErrStreamEnded then this returns nil. If either is // canceled this also returns nil. All other errors are returned. func (el Element) Discard() error { switch el.Type { - case TypeByteBlob: + case TypeBytes: r, _ := el.DecodeBytes() _, err := io.Copy(ioutil.Discard, r) if err == ErrCanceled { @@ -265,7 +267,7 @@ func (el Element) Discard() error { return err } } - default: // TypeJSONValue + default: // TypeValue return nil } } @@ -277,7 +279,7 @@ type StreamReader struct { // only one of these can be set at a time dec *json.Decoder - bbr *byteBlobReader + br *bytesReader // once set this StreamReader will always return this error on Next err error @@ -293,25 +295,25 @@ func (sr *StreamReader) clone() *StreamReader { return &sr2 } -// pulls buffered bytes out of either the json.Decoder or byteBlobReader, if +// pulls buffered bytes out of either the json.Decoder or bytesReader, if // possible, and returns an io.MultiReader of those and orig. Will also set the -// json.Decoder/byteBlobReader to nil if that's where the bytes came from. +// json.Decoder/bytesReader to nil if that's where the bytes came from. func (sr *StreamReader) multiReader() io.Reader { if sr.dec != nil { buf := sr.dec.Buffered() sr.dec = nil return io.MultiReader(buf, sr.orig) - } else if sr.bbr != nil { - buf := sr.bbr.buffered() - sr.bbr = nil + } else if sr.br != nil { + buf := sr.br.buffered() + sr.br = nil return io.MultiReader(buf, sr.orig) } return sr.orig } // Next reads, decodes, and returns the next Element off the StreamReader. If -// the Element is a ByteBlob or embedded Stream then it _must_ be fully consumed -// before Next is called on this StreamReader again. +// the Element is a Bytes or embedded Stream Element then it _must_ be fully +// consumed before Next is called on this StreamReader again. // // The returned Element's Err field will be ErrStreamEnd if the Stream was // ended, or ErrCanceled if it was canceled, and this StreamReader should not be @@ -350,22 +352,22 @@ func (sr *StreamReader) Next() Element { } } else if el.BytesStart { return Element{ - Type: TypeByteBlob, + Type: TypeBytes, SizeHint: el.SizeHint, br: sr.readBytes(), } } else if len(el.Value) > 0 { return Element{ - Type: TypeJSONValue, + Type: TypeValue, value: el.Value, } } return Element{Err: errors.New("malformed Element, can't determine type")} } -func (sr *StreamReader) readBytes() *byteBlobReader { - sr.bbr = newByteBlobReader(sr.multiReader()) - return sr.bbr +func (sr *StreamReader) readBytes() *bytesReader { + sr.br = newBytesReader(sr.multiReader()) + return sr.br } //////////////////////////////////////////////////////////////////////////////// @@ -384,7 +386,7 @@ func NewStreamWriter(w io.Writer) *StreamWriter { } // EncodeValue marshals the given value and writes it to the Stream as a -// JSONValue element. +// Value element. func (sw *StreamWriter) EncodeValue(i interface{}) error { b, err := json.Marshal(i) if err != nil { @@ -394,11 +396,11 @@ func (sw *StreamWriter) EncodeValue(i interface{}) error { } // EncodeBytes copies the given io.Reader, until io.EOF, onto the Stream as a -// ByteBlob element. This method will block until copying is completed or an -// error is encountered. +// Bytes element. This method will block until copying is completed or an error +// is encountered. // -// If the io.Reader returns any error which isn't io.EOF then the ByteBlob is -// canceled and that error is returned from this method. Otherwise nil is +// If the io.Reader returns any error which isn't io.EOF then the Bytes element +// is canceled and that error is returned from this method. Otherwise nil is // returned. // // sizeHint may be given if it's known or can be guessed how many bytes the diff --git a/jstream/jstream_test.go b/jstream/jstream_test.go index 5df63d0..b67ee52 100644 --- a/jstream/jstream_test.go +++ b/jstream/jstream_test.go @@ -51,9 +51,9 @@ func TestEncoderDecoder(t *T) { case pick == 0: typ = TypeStream case pick < 4: - typ = TypeJSONValue + typ = TypeValue default: - typ = TypeByteBlob + typ = TypeBytes } } @@ -66,7 +66,7 @@ func TestEncoderDecoder(t *T) { } switch typ { - case TypeJSONValue: + case TypeValue: tc.val = map[string]interface{}{ mtest.RandHex(8): mtest.RandHex(8), mtest.RandHex(8): mtest.RandHex(8), @@ -75,7 +75,7 @@ func TestEncoderDecoder(t *T) { mtest.RandHex(8): mtest.RandHex(8), } return tc - case TypeByteBlob: + case TypeBytes: tc.bytes = mtest.RandBytes(mtest.Rand.Intn(256)) return tc case TypeStream: @@ -97,7 +97,7 @@ func TestEncoderDecoder(t *T) { success = success && assert.Equal(t, tc.typ, el.Type, l...) switch el.Type { - case TypeJSONValue: + case TypeValue: if tc.discard { success = success && assert.NoError(t, el.Discard()) break @@ -106,7 +106,7 @@ func TestEncoderDecoder(t *T) { var val interface{} success = success && assert.NoError(t, el.DecodeValue(&val), l...) success = success && assert.Equal(t, tc.val, val, l...) - case TypeByteBlob: + case TypeBytes: br, err := el.DecodeBytes() success = success && assert.NoError(t, err, l...) success = success && assert.Equal(t, uint(len(tc.bytes)), el.SizeHint, l...) @@ -170,9 +170,9 @@ func TestEncoderDecoder(t *T) { assertWrite = func(w *StreamWriter, tc testCase) bool { l, success := tcLog(tc), true switch tc.typ { - case TypeJSONValue: + case TypeValue: success = success && assert.NoError(t, w.EncodeValue(tc.val), l...) - case TypeByteBlob: + case TypeBytes: if tc.cancel { r := newCancelBuffer(tc.bytes) err := w.EncodeBytes(uint(len(tc.bytes)), r) @@ -248,32 +248,32 @@ func TestEncoderDecoder(t *T) { // some basic test cases do() // empty stream - do(randTestCase(TypeJSONValue, false)) - do(randTestCase(TypeByteBlob, false)) + do(randTestCase(TypeValue, false)) + do(randTestCase(TypeBytes, false)) do( - randTestCase(TypeJSONValue, false), - randTestCase(TypeJSONValue, false), - randTestCase(TypeJSONValue, false), + randTestCase(TypeValue, false), + randTestCase(TypeValue, false), + randTestCase(TypeValue, false), ) do( - randTestCase(TypeJSONValue, false), - randTestCase(TypeByteBlob, false), - randTestCase(TypeJSONValue, false), + randTestCase(TypeValue, false), + randTestCase(TypeBytes, false), + randTestCase(TypeValue, false), ) do( - randTestCase(TypeByteBlob, false), - randTestCase(TypeByteBlob, false), - randTestCase(TypeByteBlob, false), + randTestCase(TypeBytes, false), + randTestCase(TypeBytes, false), + randTestCase(TypeBytes, false), ) do( - randTestCase(TypeJSONValue, false), + randTestCase(TypeValue, false), randTestCase(TypeStream, false), - randTestCase(TypeJSONValue, false), + randTestCase(TypeValue, false), ) // some special cases, empty elements which are canceled do(testCase{typ: TypeStream, cancel: true}) - do(testCase{typ: TypeByteBlob, cancel: true}) + do(testCase{typ: TypeBytes, cancel: true}) for i := 0; i < 1000; i++ { tc := randTestCase(TypeStream, false) @@ -297,7 +297,7 @@ func TestDoubleDiscardBytes(t *T) { assert.NoError(t, err) assert.Equal(t, b1, b1got) - // discarding an already consumed byte blob shouldn't do anything + // discarding an already consumed bytes shouldn't do anything assert.NoError(t, el1.Discard()) r2, err := r.Next().DecodeBytes() diff --git a/mrpc/proto.md b/mrpc/proto.md index d49d471..28bce7b 100644 --- a/mrpc/proto.md +++ b/mrpc/proto.md @@ -50,13 +50,13 @@ completely consumed and the pipe may be used for a new request. The two elements of the request stream are specified as follows: -* The first element, the head, is a JSON value with an object containing a +* The first element, the head, is a value element with an object containing a `name` field, which identifies the call being made, and optionally a `debug` field. -* The second element is the argument to the call. This may be a JSON value, a - byte blob, or an embedded stream containing even more elements, depending on - the call. It's up to the client and server to coordinate beforehand what to +* The second element is the argument to the call. This may be a value, a + bytes, or an embedded stream element containing even more elements, depending + on the call. It's up to the client and server to coordinate beforehand what to expect here. ## Call response @@ -67,12 +67,12 @@ completely consumed and the pipe may be used for a new request. The two elements of the response stream are specified as follows: -* The first element is the response from the call. This may be a JSON value, a - byte blob, or an embedded stream containing even more elements, depending on - the call. It's up to the client and server to coordinate beforehand what to - expect here. +* The first element is the response from the call. This may be a value, a bytes, + or an embedded stream element containing even more elements, depending on the + call. It's up to the client and server to coordinate beforehand what to expect + here. -* The second element, the tail, is a JSON value with an object optionally +* The second element, the tail, is a value element with an object optionally containing an `err` field, and optionally containing a `debug` field. The value of `err` may be any JSON value which is meaningful to the client and server. This element is required even if there's no `err` or `debug` data.