diff --git a/mrpc/jstreamrpc/jstreamrpc.go b/mrpc/jstreamrpc/jstreamrpc.go index 6200e47..4d55dcb 100644 --- a/mrpc/jstreamrpc/jstreamrpc.go +++ b/mrpc/jstreamrpc/jstreamrpc.go @@ -124,41 +124,26 @@ func HandleCall( /* func sqr(r mrpc.Request, rw *mrpc.ResponseWriter) { - var el jstream.Element - if err := r.Unmarshal(&el); err != nil { + var inCh chan jstream.Element + if err := r.Unmarshal(&inCh); err != nil { rw.Response = err return } - sr, err := el.DecodeStream() - if err != nil { - rw.Response = err - return - } - - ch := make(chan int) + outCh := make(chan int) go func() { - defer close(ch) - for { + defer close(outCh) + for el := range inCh { var i int - if err := sr.Next().Value(&i); err == jstream.ErrStreamEnded { - return - } else if err != nil { - panic("TODO") + // TODO this is the problem right here. Getting this error out of + // here so that the response stream can be canceled + if err := el.DecodeValue(&i); err != nil { + panic(err) } - ch <- i + outCh <- i*i } }() - rw.Response = func(sw *jstream.StreamWriter) error { - return sw.EncodeStream(0, func(sw *jstream.StreamWriter) error { - for i := range ch { - if err := sw.EncodeValue(i * i); err != nil { - return err - } - } - return nil - }) - } + rw.Response = outCh } */ diff --git a/mrpc/mrpc.go b/mrpc/mrpc.go index bb823dd..8c4c10f 100644 --- a/mrpc/mrpc.go +++ b/mrpc/mrpc.go @@ -51,7 +51,7 @@ type ResponseWriter struct { type Response struct { // Unmarshal takes in a pointer value into which the Client will unmarshal // the response value. The exact nature and behavior of how the pointer - // value is treated is dependend on the RPC implementation. + // value is treated is dependent on the RPC implementation. Unmarshal func(interface{}) error // Debug will be whatever debug information was set by the server when