WIP mrpc/jstreamrpc trying to make the types work with jstreamrpc

This commit is contained in:
Brian Picciano 2018-05-16 12:56:06 +00:00
parent bb0eeaff16
commit 92d026f53d
2 changed files with 81 additions and 38 deletions

View File

@ -5,7 +5,6 @@ package jstreamrpc
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"io" "io"
@ -13,19 +12,12 @@ import (
"github.com/mediocregopher/mediocre-go-lib/mrpc" "github.com/mediocregopher/mediocre-go-lib/mrpc"
) )
// TODO Debug
// - ReqHead
// - client encodes into context
// - handler decodes from context
// - ResTail
// - handler ?
// - client ?
// TODO Error? // TODO Error?
// TODO SizeHints // TODO SizeHints
// TODO it'd be nice if the types here played nice with mrpc.ReflectClient
type debug struct { type debug struct {
Debug map[string]map[string]json.RawMessage `json:"debug,omitempty"` Debug mrpc.Debug `json:"debug,omitempty"`
} }
type reqHead struct { type reqHead struct {
@ -35,7 +27,7 @@ type reqHead struct {
type resTail struct { type resTail struct {
debug debug
Error interface{} `json:"err,omitempty"` Error error `json:"err,omitempty"`
} }
type ctxVal int type ctxVal int
@ -86,6 +78,9 @@ func HandleCall(
w *jstream.StreamWriter, w *jstream.StreamWriter,
h mrpc.Handler, h mrpc.Handler,
) error { ) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var head reqHead var head reqHead
if err := r.Next().Value(&head); err != nil { if err := r.Next().Value(&head); err != nil {
return err return err
@ -96,28 +91,32 @@ func HandleCall(
var didReadBody bool var didReadBody bool
ctx = context.WithValue(ctx, ctxValR, r) ctx = context.WithValue(ctx, ctxValR, r)
ctx = context.WithValue(ctx, ctxValW, w) ctx = context.WithValue(ctx, ctxValW, w)
ret, err := h.ServeRPC(mrpc.Call{
rw := new(mrpc.ResponseWriter)
h.ServeRPC(mrpc.Request{
Context: ctx, Context: ctx,
Method: head.Method, Method: head.Method,
UnmarshalArgs: func(i interface{}) error { Unmarshal: func(i interface{}) error {
didReadBody = true didReadBody = true
return unmarshalBody(i, r.Next()) return unmarshalBody(i, r.Next())
}, },
}) Debug: head.debug.Debug,
// TODO that error is ignored, need a way to differentiate a recoverable }, rw)
// error from a non-recoverable one
// TODO the writing and reading of the next section could be done in // TODO unmarshaling request and marshaling response should be in
// parallel? // their own go-routines, just in case they are streams/bytes which depend
// on each other
// TODO if ret is a byte blob or stream there may be user-spawned resErr, resErrOk := rw.Response.(error)
// go-routines waiting to write to it, but if this errors out before if resErrOk {
// marshalBody is called they will block forever. Probably need to cancel if err := w.EncodeValue(nil); err != nil {
// the context to let them know?
if err := marshalBody(w, ret); err != nil {
return err return err
} }
} else {
if err := marshalBody(w, rw.Response); err != nil {
return err
}
}
// TODO to reduce chance of user error maybe Discard should discard any // TODO to reduce chance of user error maybe Discard should discard any
// remaining data on the Element, not on Elements which haven't been read // remaining data on the Element, not on Elements which haven't been read
@ -127,11 +126,49 @@ func HandleCall(
// Reading the tail (and maybe discarding the body) should only be done once // Reading the tail (and maybe discarding the body) should only be done once
// marshalBody has finished // marshalBody has finished
if !didReadBody { if !didReadBody {
// TODO what if this errors? if err := r.Next().Discard(); err != nil {
r.Next().Discard() return err
}
} }
// TODO write response tail if err := w.EncodeValue(resTail{
debug: debug{Debug: rw.Debug},
Error: resErr,
}); err != nil {
return err
}
return nil return nil
} }
/*
func sqr(r mrpc.Request, rw *mrpc.ResponseWriter) {
ch := make(chan int)
rw.Response = func(w *jstream.StreamWriter) error {
for i := range ch {
if err := w.EncodeValue(i * i); err != nil {
return err
}
}
return nil
}
go func() {
defer close(ch)
err := r.Unmarshal(func(r *jstream.StreamReader) error {
for {
var i int
if err := r.Next().Value(&i); err == jstream.ErrStreamEnded {
return nil
} else if err != nil {
return err
}
ch <- i
}
})
if err != nil {
panic("TODO")
}
}()
}
*/

View File

@ -15,14 +15,8 @@ import (
"reflect" "reflect"
) )
// TODO Request is making things difficult. It wants to be split into two types,
// like the Response is, but naming them is annoying. ClientRequest and
// HandlerRequest? RequestReader? Maybe 4 arguments to Client isn't that much...
// Request describes an RPC request being processed by a Handler // Request describes an RPC request being processed by a Handler
type Request struct { type Request struct {
// Depending on the implementation of Client, Context may be canceled to
// indicate the Client has canceled the request.
Context context.Context Context context.Context
// The name of the RPC method being called. // The name of the RPC method being called.
@ -34,7 +28,7 @@ type Request struct {
Unmarshal func(interface{}) error Unmarshal func(interface{}) error
// Debugging information being carried with the Request. See Debug's docs // Debugging information being carried with the Request. See Debug's docs
// for more on how it is intended to be used // for more on how it is intended to be used.
Debug Debug Debug Debug
} }
@ -68,6 +62,9 @@ type Response struct {
// Handler is a type which serves RPC calls. For each incoming Requests the // Handler is a type which serves RPC calls. For each incoming Requests the
// ServeRPC method is called with a ResponseWriter which will write the call's // ServeRPC method is called with a ResponseWriter which will write the call's
// response back to the client. // response back to the client.
//
// Any go-routines spawned by ServeRPC should expect to terminate if the
// Request's Context is canceled
type Handler interface { type Handler interface {
ServeRPC(Request, *ResponseWriter) ServeRPC(Request, *ResponseWriter)
} }
@ -84,16 +81,23 @@ func (hf HandlerFunc) ServeRPC(r Request, rw *ResponseWriter) {
// Client is an entity which can perform RPC calls against a remote endpoint. // Client is an entity which can perform RPC calls against a remote endpoint.
type Client interface { type Client interface {
CallRPC(Request) Response CallRPC(ctx context.Context, method string, args interface{}, debug Debug) Response
} }
// ClientFunc can be used to wrap an individual function which fits the CallRPC // ClientFunc can be used to wrap an individual function which fits the CallRPC
// signature, and use that function as a Client // signature, and use that function as a Client
type ClientFunc func(Request) Response type ClientFunc func(context.Context, string, interface{}, Debug) Response
// CallRPC implements the method for the Client interface by calling the // CallRPC implements the method for the Client interface by calling the
// underlying function // underlying function
func (cf ClientFunc) CallRPC(r Request) { return cf(r) } func (cf ClientFunc) CallRPC(
ctx context.Context,
method string,
args interface{},
debug Debug,
) Response {
return cf(ctx, method, args, debug)
}
// ReflectClient returns a Client whose CallRPC method will use reflection to // ReflectClient returns a Client whose CallRPC method will use reflection to
// call the given Handler's ServeRPC method directly, using reflect.Value's Set // call the given Handler's ServeRPC method directly, using reflect.Value's Set
@ -115,11 +119,13 @@ func ReflectClient(h Handler) Client {
ctx context.Context, ctx context.Context,
method string, method string,
args interface{}, args interface{},
debug Debug,
) Response { ) Response {
req := Request{ req := Request{
Context: ctx, Context: ctx,
Method: method, Method: method,
Unmarshal: func(i interface{}) error { return into(i, args) }, Unmarshal: func(i interface{}) error { return into(i, args) },
Debug: debug,
} }
rw := ResponseWriter{} rw := ResponseWriter{}
h.ServeRPC(req, &rw) h.ServeRPC(req, &rw)