Implement jsonrpc2 package, intended for use over the daemon socket

This commit is contained in:
Brian Picciano 2024-06-22 17:37:15 +02:00
parent 4e5d3b28ab
commit 4664ec4a70
10 changed files with 826 additions and 0 deletions

View File

@ -0,0 +1,19 @@
package jsonrpc2
import (
"context"
)
// Client is used to perform requests against a JSONRPC2 server. The Client
// should be discarded after any error which is not an Error.
type Client interface {
// Call performs a single round-trip request.
//
// The result of the request will be JSON unmarshaled into the given
// receiver pointer, unless it is nil in which case the result will be
// discarded.
//
// If an error result is returned from the server that will be returned as
// an Error struct.
Call(ctx context.Context, rcv any, method string, params any) error
}

View File

@ -0,0 +1,63 @@
package jsonrpc2
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
)
type httpClient struct {
c *http.Client
url string
}
// NewHTTPClient returns a Client which will use HTTP POST requests against the
// given URL as a transport for JSONRPC2 method calls.
func NewHTTPClient(url string) Client {
return &httpClient{
c: &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
},
url: url,
}
}
func (c *httpClient) Call(
ctx context.Context, rcv any, method string, params any,
) error {
var (
body = new(bytes.Buffer)
enc = json.NewEncoder(body)
)
id, err := encodeRequest(enc, method, params)
if err != nil {
return fmt.Errorf("encoding request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", c.url, body)
if err != nil {
return fmt.Errorf("constructing POST request to %q: %w", c.url, err)
}
res, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("performing request: %w", err)
}
defer res.Body.Close()
dec := json.NewDecoder(res.Body)
resID, err := decodeResponse(dec, rcv)
if err != nil {
return fmt.Errorf("decoding response: %w", err)
} else if resID != id {
return fmt.Errorf(
"expected response with ID %q, got %q", id, resID,
)
}
return nil
}

View File

@ -0,0 +1,39 @@
package jsonrpc2
import (
"context"
"encoding/json"
"fmt"
"io"
)
type rwClient struct {
enc *json.Encoder
dec *json.Decoder
}
// NewReadWriterClient returns a Client which will use the given ReadWriter to
// perform requests.
func NewReadWriterClient(rw io.ReadWriter) Client {
return rwClient{json.NewEncoder(rw), json.NewDecoder(rw)}
}
func (c rwClient) Call(
ctx context.Context, rcv any, method string, params any,
) error {
id, err := encodeRequest(c.enc, method, params)
if err != nil {
return fmt.Errorf("encoding request: %w", err)
}
resID, err := decodeResponse(c.dec, rcv)
if err != nil {
return fmt.Errorf("decoding response: %w", err)
} else if resID != id {
return fmt.Errorf(
"expected response with ID %q, got %q", id, resID,
)
}
return nil
}

View File

@ -0,0 +1,111 @@
package jsonrpc2
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
)
var (
ctxT = reflect.TypeOf((*context.Context)(nil)).Elem()
errT = reflect.TypeOf((*error)(nil)).Elem()
)
type methodDispatchFunc func(context.Context, Request) (any, error)
func newMethodDispatchFunc(
method reflect.Value,
) methodDispatchFunc {
paramT := method.Type().In(1)
return func(ctx context.Context, req Request) (any, error) {
var (
ctxV = reflect.ValueOf(ctx)
paramPtrV = reflect.New(paramT)
)
err := json.Unmarshal(req.Params, paramPtrV.Interface())
if err != nil {
// The JSON has already been validated, so this is not an
// errCodeParse situation. We assume it's an invalid param then,
// unless the error says otherwise via an UnmarshalJSON method
// returning an Error of its own.
if !errors.As(err, new(Error)) {
err = NewInvalidParamsError(
"JSON unmarshaling params into %T: %v", paramT, err,
)
}
return nil, err
}
var (
callResV = method.Call([]reflect.Value{ctxV, paramPtrV.Elem()})
resV = callResV[0]
errV = callResV[1]
)
if errV.IsNil() {
return resV.Interface(), nil
}
return nil, errV.Interface().(error)
}
}
type dispatcher struct {
methods map[string]methodDispatchFunc
}
// NewDispatchHandler returns a Handler which will use methods on the passed in
// value to dispatch RPC calls. The passed in value must be a pointer. All
// exported methods which look like:
//
// MethodName(context.Context, ParamType) (ResponseType, error)
//
// will be available via RPC calls.
func NewDispatchHandler(i any) Handler {
v := reflect.ValueOf(i)
if v.Kind() != reflect.Pointer {
panic(fmt.Sprintf("expected pointer but got type %T", i))
}
v = v.Elem()
var (
t = v.Type()
numMethods = t.NumMethod()
methods = make(map[string]methodDispatchFunc, numMethods)
)
for i := range numMethods {
var (
method = t.Method(i)
methodV = v.Method(i)
methodT = methodV.Type()
)
if !method.IsExported() ||
methodT.NumIn() != 2 ||
methodT.In(0) != ctxT ||
methodT.NumOut() != 2 ||
methodT.Out(1) != errT {
continue
}
methods[method.Name] = newMethodDispatchFunc(methodV)
}
return &dispatcher{methods}
}
func (d *dispatcher) ServeRPC(ctx context.Context, req Request) (any, error) {
fn, ok := d.methods[req.Method]
if !ok {
return nil, NewError(
errCodeMethodNotFound, "unknown method %q", req.Method,
)
}
return fn(ctx, req)
}

View File

@ -0,0 +1,75 @@
package jsonrpc2
import (
"bytes"
"encoding/json"
"fmt"
)
const (
// Invalid JSON was received by the server.
// An error occurred on the server while parsing the JSON text.
errCodeParse = -32700
// The JSON sent is not a valid Request object.
errCodeInvalidRequest = -32600
// The method does not exist / is not available.
errCodeMethodNotFound = -32601
// Invalid method parameter(s).
errCodeInvalidParams = -32602
// Reserved for implementation-defined server-errors.
errCodeServerError = -32000
)
// Error implements the error stuct type defined in the spec, as well as the go
// error interface and the `Is` method for the errors package. Note that the
// `Is` method will check both the type and Code of the given target in order to
// check same-ness.
type Error struct {
// Code is an identifier for the kind of error being returned. All
// application-defined error codes should be positive.
Code int `json:"code"`
Message string `json:"message"`
Data any `json:"data,omitempty"`
}
// NewError returns an error with the given Code and formatted Message.
func NewError(code int, msgFmt string, msgArgs ...any) Error {
return Error{Code: code, Message: fmt.Sprintf(msgFmt, msgArgs...)}
}
// NewInvalidParamsError returns an error indicating that the provided
// parameters were invalid.
//
// Parameters are considered invalid if they could never possibly be valid in
// the format they were given and always indicate a bug in the caller. For
// example a timedate string in the wrong format.
//
// If a parameter is invalid due to the context it was used in, for example a
// wrong password, a specific error code should be used to indicate that to the
// caller.
func NewInvalidParamsError(msgFmt string, msgArgs ...any) Error {
return NewError(errCodeInvalidParams, msgFmt, msgArgs...)
}
func (e Error) Error() string {
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "[%d] %s", e.Code, e.Message)
if e.Data != nil {
fmt.Fprint(buf, "\nExtra data: ")
if err := json.NewEncoder(buf).Encode(e.Data); err != nil {
panic(fmt.Sprintf("json encoding Error.Data %#v: %v", e.Data, err))
}
}
return buf.String()
}
func (e Error) Is(target error) bool {
if tErr, ok := target.(Error); ok {
return tErr.Code == e.Code
}
return false
}

View File

@ -0,0 +1,94 @@
package jsonrpc2
import (
"context"
"errors"
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
)
// Handler is any type which is capable of handling arbitrary RPC calls. The
// value returned from the ServeRPC method will be JSON encoded, wrapped in the
// response object dictated by the spec, and returned as the response.
//
// If an Error is returned as the error field then that will be wrapped and
// returned to the caller instead. If a non-Error error is returned then an
// error response indicating that a server-side error occurred will be returned
// to the client.
type Handler interface {
ServeRPC(context.Context, Request) (any, error)
}
// HandlerFunc implements Handler on a stand-alone function.
type HandlerFunc func(context.Context, Request) (any, error)
func (hf HandlerFunc) ServeRPC(ctx context.Context, req Request) (any, error) {
return hf(ctx, req)
}
// Middleware is used to transparently wrap a Handler with some functionality.
type Middleware func(Handler) Handler
// Chain combines a sequence of Middlewares into a single one. The Middlewares
// will apply to the wrapped handler such that the first given Middleware is the
// outermost one.
func Chain(mm ...Middleware) Middleware {
return func(h Handler) Handler {
for i := range mm {
h = mm[len(mm)-1-i](h)
}
return h
}
}
// NewMLogMiddleware returns a Middleware which will log an Info message on
// every successful RPC request, a Warn message if an application-level error is
// being returned (denoted by an Error being returned from the inner Handler),
// and an Error message otherwise.
//
// If the Logger has debug logging enabled then the full request parameters will
// also be logged.
func NewMLogMiddleware(logger *mlog.Logger) Middleware {
return func(h Handler) Handler {
return HandlerFunc(func(ctx context.Context, req Request) (any, error) {
ctx = mctx.Annotate(
ctx,
"rpcRequestID", req.ID,
"rpcRequestMethod", req.Method,
)
if logger.MaxLevel() >= mlog.LevelDebug.Int() {
ctx := mctx.Annotate(
ctx, "rpcRequestParams", string(req.Params),
)
logger.Debug(ctx, "Handling RPC request")
}
res, err := h.ServeRPC(ctx, req)
if jErr := (Error{}); errors.As(err, &jErr) && jErr.Code != errCodeServerError {
logger.Warn(ctx, "Returning error to client", err)
} else if err != nil {
logger.Error(ctx, "Unexpected server-side error", err)
} else {
logger.Info(ctx, "Handled RPC request")
}
return res, err
})
}
}
// ExposeServerSideErrorsMiddleware causes non-Error error messages to be
// exposed to the caller in the error message they receive.
var ExposeServerSideErrorsMiddleware Middleware = func(h Handler) Handler {
return HandlerFunc(func(ctx context.Context, req Request) (any, error) {
res, err := h.ServeRPC(ctx, req)
if err != nil && !errors.As(err, new(Error)) {
err = NewError(
errCodeServerError, "unexpected server-side error: %v", err,
)
}
return res, err
})
}

View File

@ -0,0 +1,145 @@
// Package jsonrpc2 implements server/client interfaces for a JSONRPC2
// transport, as defined by https://www.jsonrpc.org/specification.
package jsonrpc2
import (
"crypto/rand"
"encoding/json"
"fmt"
"time"
)
const version = "2.0"
// Request encodes an RPC request according to the spec.
type Request struct {
Version string `json:"jsonrpc"` // must be "2.0"
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
ID string `json:"id"`
}
type response[Result any] struct {
Version string `json:"jsonrpc"` // must be "2.0"
ID string `json:"id"`
Result Result `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
}
func newID() string {
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return fmt.Sprintf("%d-%x", time.Now().UnixNano(), b)
}
// encodeRequest writes a request to an io.Writer, returning the ID of the
// request.
func encodeRequest(
enc *json.Encoder, method string, params any,
) (
string, error,
) {
paramsB, err := json.Marshal(params)
if err != nil {
return "", fmt.Errorf("encoding params as JSON: %w", err)
}
var (
id = newID()
reqEnvelope = Request{
Version: version,
Method: method,
Params: paramsB,
ID: id,
}
)
if err := enc.Encode(reqEnvelope); err != nil {
return "", fmt.Errorf("encoding as JSON: %w", err)
}
return id, nil
}
// decodeRequest decodes a request from an io.Reader, or returns an error code
// and an error describing what went wrong.
func decodeRequest(dec *json.Decoder) (Request, Error) {
var reqEnvelope Request
err := dec.Decode(&reqEnvelope)
switch {
case err != nil:
return reqEnvelope, NewError(
errCodeParse, "decoding request as JSON: %v", err,
)
case reqEnvelope.Version != version:
return reqEnvelope, NewError(
errCodeInvalidRequest,
"unknown JSONRPC version: %q",
reqEnvelope.Version,
)
case reqEnvelope.ID == "":
return reqEnvelope, NewError(
errCodeInvalidRequest, "request is missing an ID",
)
}
return reqEnvelope, Error{}
}
// encodeRequest writes a successful response to an io.Writer.
func encodeResponse(enc *json.Encoder, id string, res any) error {
resEnvelope := response[any]{
Version: version,
ID: id,
Result: res,
}
if err := enc.Encode(resEnvelope); err != nil {
return fmt.Errorf("encoding as JSON: %w", err)
}
return nil
}
// encodeErrorRequest writes an error response to an io.Writer.
func encodeErrorResponse(enc *json.Encoder, id string, err Error) error {
resEnvelope := response[any]{
Version: version,
ID: id,
Error: &err,
}
if err := enc.Encode(resEnvelope); err != nil {
return fmt.Errorf("encoding as JSON: %w", err)
}
return nil
}
// decodeResponse decodes a response from an io.Reader, unmarshaling the result
// field into the given pointer receiver. If the pointer receiver is nil then
// the result is discarded. If an error response is decoded then an Error struct
// is returned. The ID of the response is returned in both cases.
func decodeResponse(dec *json.Decoder, rcv any) (string, error) {
var rcvEnvelope response[json.RawMessage]
err := dec.Decode(&rcvEnvelope)
switch {
case err != nil:
return "", fmt.Errorf("decoding response as JSON: %w", err)
case rcvEnvelope.Version != version:
return "", fmt.Errorf(
"unknown JSONRPC version: %q", rcvEnvelope.Version,
)
case rcvEnvelope.Error != nil:
return rcvEnvelope.ID, *rcvEnvelope.Error
}
if rcv != nil {
if err := json.Unmarshal(rcvEnvelope.Result, rcv); err != nil {
return rcvEnvelope.ID, fmt.Errorf(
"decoding result into receiver of type %T: %w", rcv, err,
)
}
}
return rcvEnvelope.ID, nil
}

View File

@ -0,0 +1,158 @@
package jsonrpc2
import (
"context"
"errors"
"io"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
)
type DivideParams struct {
Top, Bottom int
}
var ErrDivideByZero = Error{
Code: 1,
Message: "Cannot divide by zero",
}
type dividerImpl struct{}
func (dividerImpl) Divide(ctx context.Context, p DivideParams) (int, error) {
if p.Bottom == 0 {
return 0, ErrDivideByZero
}
if p.Top%p.Bottom != 0 {
return 0, errors.New("numbers don't divide evenly, cannot compute!")
}
return p.Top / p.Bottom, nil
}
func (dividerImpl) Hidden(ctx context.Context, p struct{}) (int, error) {
return 0, errors.New("Shouldn't be possible to call this!")
}
type divider interface {
Divide(ctx context.Context, p DivideParams) (int, error)
}
var testHandler = func() Handler {
var (
logger = mlog.NewLogger(&mlog.LoggerOpts{
MaxLevel: mlog.LevelDebug.Int(),
})
d = divider(dividerImpl{})
)
return Chain(
NewMLogMiddleware(logger),
ExposeServerSideErrorsMiddleware,
)(
NewDispatchHandler(&d),
)
}()
func testClient(t *testing.T, client Client) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
t.Run("success/no_result", func(t *testing.T) {
err := client.Call(ctx, nil, "Divide", DivideParams{12, 4})
if err != nil {
t.Fatal(err)
}
})
t.Run("success/with_result", func(t *testing.T) {
var res int
err := client.Call(ctx, &res, "Divide", DivideParams{6, 3})
if err != nil {
t.Fatal(err)
} else if res != 2 {
t.Fatalf("expected 2, got %d", res)
}
})
t.Run("err/application", func(t *testing.T) {
err := client.Call(ctx, nil, "Divide", DivideParams{})
if !errors.Is(err, ErrDivideByZero) {
t.Fatalf("expected application error but got: %#v", err)
}
})
t.Run("err/calling hidden method", func(t *testing.T) {
err := client.Call(ctx, nil, "Hidden", struct{}{})
if jErr := (Error{}); !errors.As(err, &jErr) {
t.Fatalf("expected RPC error but got: %#v", err)
} else if jErr.Code != errCodeMethodNotFound {
t.Fatalf("expected method not found error but got: %#v", jErr)
}
})
t.Run("err/sever-side error", func(t *testing.T) {
err := client.Call(ctx, nil, "Divide", DivideParams{6, 4})
t.Log(err)
if jErr := (Error{}); !errors.As(err, &jErr) {
t.Fatalf("expected RPC error but got: %#v", err)
} else if jErr.Code != errCodeServerError {
t.Fatalf("expected server error but got: %#v", jErr)
} else if !strings.Contains(jErr.Message, "cannot compute!") {
t.Fatalf("expected server-side error message to be propagated but got: %#v", jErr)
}
})
}
func TestReadWriter(t *testing.T) {
type (
rw struct {
io.Reader
io.Writer
}
)
var (
ctx = context.Background()
clientReader, handlerWriter = io.Pipe()
handlerReader, clientWriter = io.Pipe()
clientRW = rw{clientReader, clientWriter}
handlerRW = rw{handlerReader, handlerWriter}
server = NewReadWriterServer(testHandler, handlerRW)
client = NewReadWriterClient(clientRW)
wg = new(sync.WaitGroup)
)
defer wg.Wait()
defer clientWriter.Close()
defer handlerWriter.Close()
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := server.HandleNext(ctx); errors.Is(err, io.EOF) {
return
} else if err != nil {
return
}
}
}()
testClient(t, client)
}
func TestHTTP(t *testing.T) {
server := httptest.NewServer(NewHTTPHandler(testHandler))
t.Cleanup(server.Close)
testClient(t, NewHTTPClient(server.URL))
}

View File

@ -0,0 +1,62 @@
package jsonrpc2
import (
"encoding/json"
"errors"
"net/http"
)
type httpHandler struct {
handler Handler
}
// NewHTTPHandler returns an http.Handler which will decode RPC calls out of the
// HTTP request body, and return the result as the response body. It will try to
// intelligently set the status code on the response as well.
func NewHTTPHandler(h Handler) http.Handler {
return httpHandler{h}
}
func (h httpHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
enc = json.NewEncoder(rw)
dec = json.NewDecoder(r.Body)
)
rw.Header().Set("Content-Type", "application/json")
req, decErr := decodeRequest(dec)
if decErr.Code != 0 {
rw.WriteHeader(http.StatusBadRequest)
_ = encodeErrorResponse(enc, req.ID, decErr)
return
}
var rpcErr Error
res, err := h.handler.ServeRPC(ctx, req)
if err != nil && !errors.As(err, &rpcErr) {
// We deliberately don't include the actual error in the response. If
// doing so is desired this can be achieved using a middleware which
// translates the non-Error into an Error prior to it reaching this
// point.
rpcErr = NewError(errCodeServerError, "unexpected server-side error")
}
switch rpcErr.Code {
case 0: // no error
_ = encodeResponse(enc, req.ID, res)
return
case errCodeMethodNotFound:
rw.WriteHeader(http.StatusNotFound)
case errCodeServerError:
rw.WriteHeader(http.StatusInternalServerError)
default:
rw.WriteHeader(http.StatusBadRequest)
}
_ = encodeErrorResponse(enc, req.ID, rpcErr)
}

View File

@ -0,0 +1,60 @@
package jsonrpc2
import (
"context"
"encoding/json"
"errors"
"io"
)
// ReadWriterServer wraps an io.ReadWriter so that it can handle RPC calls.
//
// The HandleNext method will process a single request and response. It should
// be called in a loop until it returns an error.
type ReadWriterServer struct {
handler Handler
enc *json.Encoder
dec *json.Decoder
}
// NewReadWriterServer returns a ReadWriterServer which will read/write on the
// given io.ReadWriter.
//
// All methods exported from rpcInterface which accept a context and one other
// value, and return a value and an error, will be considered to be available
// RPC methods, and the ReadWriterServer will execute calls to those methods
// against the given rpcInterface instance.
func NewReadWriterServer(handler Handler, rw io.ReadWriter) *ReadWriterServer {
return &ReadWriterServer{
handler: handler,
enc: json.NewEncoder(rw),
dec: json.NewDecoder(rw),
}
}
// HandleNext blocks until a single RPC call has been handled. If an error is
// returned, including context errors, then the ReadWriterServer should be
// considered in an unusable state and discarded.
func (h *ReadWriterServer) HandleNext(ctx context.Context) error {
req, decErr := decodeRequest(h.dec)
if decErr.Code != 0 {
encErr := encodeErrorResponse(h.enc, req.ID, decErr)
return errors.Join(decErr, encErr)
}
var rpcErr Error
res, err := h.handler.ServeRPC(ctx, req)
if err != nil && !errors.As(err, &rpcErr) {
// We deliberately don't include the actual error in the response. If
// doing so is desired this can be achieved using a middleware which
// translates the non-Error into an Error prior to it reaching this
// point.
rpcErr = NewError(errCodeServerError, "unexpected server-side error")
}
if rpcErr.Code != 0 {
return encodeErrorResponse(h.enc, req.ID, rpcErr)
}
return encodeResponse(h.enc, req.ID, res)
}