From 196e119c30779d73004bb5c89fa152ce6842f67e Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sun, 27 May 2018 04:33:30 +0000 Subject: [PATCH] WIP jstream: working on channel functionality in jstream --- jstream/chan.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++ jstream/jstream.go | 2 - 2 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 jstream/chan.go diff --git a/jstream/chan.go b/jstream/chan.go new file mode 100644 index 0000000..d386c89 --- /dev/null +++ b/jstream/chan.go @@ -0,0 +1,96 @@ +package jstream + +import ( + "encoding/json" + "errors" + "io" +) + +type chanElement struct { + Type Type + SizeHint uint + Err error + Value json.RawMessage + Bytes io.Reader + Stream chan chanElement +} + +func chanWriter(sw *StreamWriter, ch <-chan chanElement) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + for el := range ch { + var err error + switch { + case el.Err != nil: + err = el.Err + case el.Value != nil: + err = sw.EncodeValue(el.Value) + case el.Bytes != nil: + err = sw.EncodeBytes(el.SizeHint, el.Bytes) + case el.Stream != nil: + err = sw.EncodeStream(el.SizeHint, func(innerSW *StreamWriter) error { + // TODO this is a sticking point. The error which occurs in + // here should be available to whatever is writing to + // el.Stream, so it can know if it's wasting its time and + // should be canceled + innerErrCh := chanWriter(innerSW, el.Stream) + return <-innerErrCh + }) + default: + err = errors.New("malformed chanElement, no fields set") + } + if err != nil { + errCh <- err + return + } + } + }() + return errCh +} + +func chanReader(ch chan<- chanElement, sr *StreamReader) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + for { + el := sr.Next() + var err error + switch { + case el.Err == ErrStreamEnded: + return + case el.Err != nil: + err = el.Err + case el.Type == TypeValue: + ch <- chanElement{ + Type: TypeValue, + Value: el.value, + } + case el.Type == TypeBytes: + ch <- chanElement{ + Type: TypeBytes, + SizeHint: el.SizeHint, + Bytes: el.br, + } + case el.Type == TypeStream: + // TODO this sucks for two reasons: + // 1) the user can't set a buffer on this channel, like they + // could with the outermost one. + // 2) the error returned from chanReader shouldn't get passed up + // if it's ErrCanceled, as that will cancel the outer channel + // too, which isn't necessary. But in that case there's no + // way to let the user know that the inner one was closed due + // to being canceled. + innerCh := make(chan chanElement) + ch <- chanElement{ + Type: TypeStream, + SizeHint: el.SizeHint, + Stream: innerCh, + } + innerSR, _ := el.DecodeStream() // already checked Err previously + err = <-chanReader(innerCh, innerSR) + } + } + }() + return errCh +} diff --git a/jstream/jstream.go b/jstream/jstream.go index 9b27bdc..44ceed5 100644 --- a/jstream/jstream.go +++ b/jstream/jstream.go @@ -108,8 +108,6 @@ package jstream // TODO figure out how to expose the json.Encoder/Decoders so that users can set // custom options on them (like UseNumber and whatnot) -// TODO attempt to refactor this into using channels? or write a layer on top -// which does so? // TODO TypeNil? import (