diff --git a/mrpc/jstreamrpc/jstreamrpc.go b/mrpc/jstreamrpc/jstreamrpc.go index 714bb83..e2d247a 100644 --- a/mrpc/jstreamrpc/jstreamrpc.go +++ b/mrpc/jstreamrpc/jstreamrpc.go @@ -5,7 +5,6 @@ package jstreamrpc import ( "context" - "encoding/json" "errors" "io" @@ -13,19 +12,12 @@ import ( "github.com/mediocregopher/mediocre-go-lib/mrpc" ) -// TODO Debug -// - ReqHead -// - client encodes into context -// - handler decodes from context -// - ResTail -// - handler ? -// - client ? - // TODO Error? // TODO SizeHints +// TODO it'd be nice if the types here played nice with mrpc.ReflectClient type debug struct { - Debug map[string]map[string]json.RawMessage `json:"debug,omitempty"` + Debug mrpc.Debug `json:"debug,omitempty"` } type reqHead struct { @@ -35,7 +27,7 @@ type reqHead struct { type resTail struct { debug - Error interface{} `json:"err,omitempty"` + Error error `json:"err,omitempty"` } type ctxVal int @@ -86,6 +78,9 @@ func HandleCall( w *jstream.StreamWriter, h mrpc.Handler, ) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var head reqHead if err := r.Next().Value(&head); err != nil { return err @@ -96,27 +91,31 @@ func HandleCall( var didReadBody bool ctx = context.WithValue(ctx, ctxValR, r) ctx = context.WithValue(ctx, ctxValW, w) - ret, err := h.ServeRPC(mrpc.Call{ + + rw := new(mrpc.ResponseWriter) + h.ServeRPC(mrpc.Request{ Context: ctx, Method: head.Method, - UnmarshalArgs: func(i interface{}) error { + Unmarshal: func(i interface{}) error { didReadBody = true return unmarshalBody(i, r.Next()) }, - }) - // TODO that error is ignored, need a way to differentiate a recoverable - // error from a non-recoverable one + Debug: head.debug.Debug, + }, rw) - // TODO the writing and reading of the next section could be done in - // parallel? + // TODO unmarshaling request and marshaling response should be in + // their own go-routines, just in case they are streams/bytes which depend + // on each other - // TODO if ret is a byte blob or stream there may be user-spawned - // go-routines waiting to write to it, but if this errors out before - // marshalBody is called they will block forever. Probably need to cancel - // the context to let them know? - - if err := marshalBody(w, ret); err != nil { - return err + resErr, resErrOk := rw.Response.(error) + if resErrOk { + if err := w.EncodeValue(nil); err != nil { + return err + } + } else { + if err := marshalBody(w, rw.Response); err != nil { + return err + } } // TODO to reduce chance of user error maybe Discard should discard any @@ -127,11 +126,49 @@ func HandleCall( // Reading the tail (and maybe discarding the body) should only be done once // marshalBody has finished if !didReadBody { - // TODO what if this errors? - r.Next().Discard() + if err := r.Next().Discard(); err != nil { + return err + } } - // TODO write response tail + if err := w.EncodeValue(resTail{ + debug: debug{Debug: rw.Debug}, + Error: resErr, + }); err != nil { + return err + } return nil } + +/* +func sqr(r mrpc.Request, rw *mrpc.ResponseWriter) { + ch := make(chan int) + rw.Response = func(w *jstream.StreamWriter) error { + for i := range ch { + if err := w.EncodeValue(i * i); err != nil { + return err + } + } + return nil + } + + go func() { + defer close(ch) + err := r.Unmarshal(func(r *jstream.StreamReader) error { + for { + var i int + if err := r.Next().Value(&i); err == jstream.ErrStreamEnded { + return nil + } else if err != nil { + return err + } + ch <- i + } + }) + if err != nil { + panic("TODO") + } + }() +} +*/ diff --git a/mrpc/mrpc.go b/mrpc/mrpc.go index 93044c5..bb823dd 100644 --- a/mrpc/mrpc.go +++ b/mrpc/mrpc.go @@ -15,14 +15,8 @@ import ( "reflect" ) -// TODO Request is making things difficult. It wants to be split into two types, -// like the Response is, but naming them is annoying. ClientRequest and -// HandlerRequest? RequestReader? Maybe 4 arguments to Client isn't that much... - // Request describes an RPC request being processed by a Handler type Request struct { - // Depending on the implementation of Client, Context may be canceled to - // indicate the Client has canceled the request. Context context.Context // The name of the RPC method being called. @@ -34,7 +28,7 @@ type Request struct { Unmarshal func(interface{}) error // Debugging information being carried with the Request. See Debug's docs - // for more on how it is intended to be used + // for more on how it is intended to be used. Debug Debug } @@ -68,6 +62,9 @@ type Response struct { // Handler is a type which serves RPC calls. For each incoming Requests the // ServeRPC method is called with a ResponseWriter which will write the call's // response back to the client. +// +// Any go-routines spawned by ServeRPC should expect to terminate if the +// Request's Context is canceled type Handler interface { ServeRPC(Request, *ResponseWriter) } @@ -84,16 +81,23 @@ func (hf HandlerFunc) ServeRPC(r Request, rw *ResponseWriter) { // Client is an entity which can perform RPC calls against a remote endpoint. type Client interface { - CallRPC(Request) Response + CallRPC(ctx context.Context, method string, args interface{}, debug Debug) Response } // ClientFunc can be used to wrap an individual function which fits the CallRPC // signature, and use that function as a Client -type ClientFunc func(Request) Response +type ClientFunc func(context.Context, string, interface{}, Debug) Response // CallRPC implements the method for the Client interface by calling the // underlying function -func (cf ClientFunc) CallRPC(r Request) { return cf(r) } +func (cf ClientFunc) CallRPC( + ctx context.Context, + method string, + args interface{}, + debug Debug, +) Response { + return cf(ctx, method, args, debug) +} // ReflectClient returns a Client whose CallRPC method will use reflection to // call the given Handler's ServeRPC method directly, using reflect.Value's Set @@ -115,11 +119,13 @@ func ReflectClient(h Handler) Client { ctx context.Context, method string, args interface{}, + debug Debug, ) Response { req := Request{ Context: ctx, Method: method, Unmarshal: func(i interface{}) error { return into(i, args) }, + Debug: debug, } rw := ResponseWriter{} h.ServeRPC(req, &rw)