From bdfd7500fbe4c4289471e75e4add353c6494b452 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sat, 14 Apr 2018 10:45:58 +0000 Subject: [PATCH] WIP jstreamrpc began implementation, found lots of weird holes in the spec that need to be addressed --- mrpc/jstreamrpc/jstreamrpc.go | 130 ++++++++++++++++++++++++++++++++++ mrpc/proto.md | 18 ++--- 2 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 mrpc/jstreamrpc/jstreamrpc.go diff --git a/mrpc/jstreamrpc/jstreamrpc.go b/mrpc/jstreamrpc/jstreamrpc.go new file mode 100644 index 0000000..dd3f85b --- /dev/null +++ b/mrpc/jstreamrpc/jstreamrpc.go @@ -0,0 +1,130 @@ +// Package jstreamrpc implements the mrpc interfaces using the jstream protocol. +// This implementation makes a few design decisions which are not specified in +// the protocol, but which are good best practices none-the-less. +package jstreamrpc + +import ( + "context" + "encoding/json" + "errors" + "io" + + "github.com/mediocregopher/mediocre-go-lib/jstream" + "github.com/mediocregopher/mediocre-go-lib/mrpc" +) + +// TODO Debug +// TODO Error? +// TODO SizeHints +// TODO it seems like request tail and response head aren't useful nor +// convenient to use, might be better to leave them out + +type headTail struct { + Debug map[string]map[string]json.RawMessage `json:"debug,omitempty"` +} + +type reqHead struct { + headTail + Method string `json:"method"` +} + +type ctxVal int + +const ( + ctxValR ctxVal = iota + ctxValW +) + +func unmarshalBody(i interface{}, el jstream.Element) error { + switch iT := i.(type) { + case func(*jstream.StreamReader) error: + stream, err := el.Stream() + if err != nil { + return err + } + return iT(stream) + case *io.Reader: + ioR, err := el.Bytes() + if err != nil { + return err + } + *iT = ioR + return nil + default: + return el.Value(i) + } +} + +func marshalBody(w *jstream.StreamWriter, i interface{}) error { + switch iT := i.(type) { + case func(*jstream.StreamWriter) error: + return w.EncodeStream(0, iT) + case io.Reader: + return w.EncodeBytes(0, iT) + default: + return w.EncodeValue(iT) + } +} + +// HandleCall TODO +// +// If this returns an error then both r and w should be discarded and no longer +// used. +func HandleCall( + ctx context.Context, + r *jstream.StreamReader, + w *jstream.StreamWriter, + h mrpc.Handler, +) error { + var head reqHead + if err := r.Next().Value(&head); err != nil { + return err + } else if head.Method == "" { + return errors.New("request head missing 'method' field") + } + + var didReadBody bool + ctx = context.WithValue(ctx, ctxValR, r) + ctx = context.WithValue(ctx, ctxValW, w) + ret, err := h.ServeRPC(mrpc.Call{ + Context: ctx, + Method: head.Method, + UnmarshalArgs: func(i interface{}) error { + didReadBody = true + return unmarshalBody(i, r.Next()) + }, + }) + // TODO that error is ignored, need a way to differentiate a recoverable + // error from a non-recoverable one + + // TODO the writing and reading of the next section could be done in + // parallel? + + // TODO if ret is a byte blob or stream there may be user-spawned + // go-routines waiting to write to it, but if this errors out before + // marshalBody is called they will block forever. Probably need to cancel + // the context to let them know? + + if err := w.EncodeValue(headTail{}); err != nil { + return err + } else if err := marshalBody(w, ret); err != nil { + return err + } + + // TODO to reduce chance of user error maybe Discard should discard any + // remaining data on the Element, not on Elements which haven't been read + // from yet. Then it could always be called on the request body at this + // point. + + // Reading the tail (and maybe discarding the body) should only be done once + // marshalBody has finished + if !didReadBody { + // TODO if this errors then presumably reading the tail will too? + r.Next().Discard() + } + if err := r.Next().Discard(); err != nil { + return err + } + + return nil +} diff --git a/mrpc/proto.md b/mrpc/proto.md index 4bfea16..50473d2 100644 --- a/mrpc/proto.md +++ b/mrpc/proto.md @@ -15,7 +15,7 @@ this spec: * An "RPC call", or just "call", is composed of two events: a "request" and a "response". -* The entity which initiates the call by sending a request is the "caller". +* The entity which initiates the call by sending a request is the "client". * The entity which serves the call by responding to a request is the "server". @@ -30,7 +30,7 @@ Many components of this RPC protocol carry a `debug` field, whose value may be some arbitrary set of data as desired by the user. The use and purpose of the `debug` field will be different for everyone, but some example use-cases would be a unique ID useful for tracing, metadata useful for logging in case of an -error, and request/response timings from both the caller and server sides +error, and request/response timings from both the client and server sides (useful for determining RTT). When determining if some piece of data should be considered debug data or part @@ -41,7 +41,7 @@ should run _identically_ in that scenario as in real-life. In other words: if some field in `debug` effects the behavior of a call directly then it should not be carried via `debug`. This could mean duplicating data between `debug` and the request/response proper, e.g. the IP address of the -caller. +client. ## Call request @@ -57,7 +57,7 @@ The three elements of the request stream are specified as follows: * The second element is the argument to the call. This may be a JSON value, a byte blob, or an embedded stream containing even more elements, depending on - the call. It's up to the caller and server to coordinate beforehand what to + the call. It's up to the client and server to coordinate beforehand what to expect here. * The third element, the tail, is a JSON value with an object optionally @@ -70,7 +70,7 @@ lack of `name` field in the head, and the addition of the `err` field in the tail. A call response is defined as being three jstream elements read off the pipe by -the caller. Once all three elements have been read the response is considered to +the client. Once all three elements have been read the response is considered to be completely consumed and the pipe may be used for a new request. The three elements of the response stream are specified as follows: @@ -80,23 +80,23 @@ The three elements of the response stream are specified as follows: * The second element is the response from the call. This may be a JSON value, a byte blob, or an embedded stream containing even more elements, depending on - the call. It's up to the caller and server to coordinate beforehand what to + the call. It's up to the client and server to coordinate beforehand what to expect here. * The third element, the tail, is a JSON value with an object optionally containing an `err` field, and optionally containing `debug` field. The value - of `err` may be any JSON value which is meaningful to the caller and server. + of `err` may be any JSON value which is meaningful to the client and server. ## Pipelining The protocol allows for the server to begin sending back a response, and even to send back a complete response, _as soon as_ it receives the request head. In effect this means that the server can be sending back response data while the -caller is still sending request data. +client is still sending request data. Once the server has sent the response tail it can assume the call has completed successfully and ignore all subsequent request data (though it must still fully read the three request elements off the pipe in order to use it again). -Likewise, once a caller receives the response tail it can cancel whatever it's +Likewise, once a client receives the response tail it can cancel whatever it's doing, finish sending the request argument and tail as soon as possible, and assume the call has been completed.