mediocre-go-lib/jstream/chan.go

97 lines
2.3 KiB
Go
Raw Permalink Normal View History

package jstream
import (
"encoding/json"
"errors"
"io"
)
type chanElement struct {
Type Type
SizeHint uint
Err error
Value json.RawMessage
Bytes io.Reader
Stream chan chanElement
}
func chanWriter(sw *StreamWriter, ch <-chan chanElement) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
for el := range ch {
var err error
switch {
case el.Err != nil:
err = el.Err
case el.Value != nil:
err = sw.EncodeValue(el.Value)
case el.Bytes != nil:
err = sw.EncodeBytes(el.SizeHint, el.Bytes)
case el.Stream != nil:
err = sw.EncodeStream(el.SizeHint, func(innerSW *StreamWriter) error {
// TODO this is a sticking point. The error which occurs in
// here should be available to whatever is writing to
// el.Stream, so it can know if it's wasting its time and
// should be canceled
innerErrCh := chanWriter(innerSW, el.Stream)
return <-innerErrCh
})
default:
err = errors.New("malformed chanElement, no fields set")
}
if err != nil {
errCh <- err
return
}
}
}()
return errCh
}
func chanReader(ch chan<- chanElement, sr *StreamReader) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
for {
el := sr.Next()
var err error
switch {
case el.Err == ErrStreamEnded:
return
case el.Err != nil:
err = el.Err
case el.Type == TypeValue:
ch <- chanElement{
Type: TypeValue,
Value: el.value,
}
case el.Type == TypeBytes:
ch <- chanElement{
Type: TypeBytes,
SizeHint: el.SizeHint,
Bytes: el.br,
}
case el.Type == TypeStream:
// TODO this sucks for two reasons:
// 1) the user can't set a buffer on this channel, like they
// could with the outermost one.
// 2) the error returned from chanReader shouldn't get passed up
// if it's ErrCanceled, as that will cancel the outer channel
// too, which isn't necessary. But in that case there's no
// way to let the user know that the inner one was closed due
// to being canceled.
innerCh := make(chan chanElement)
ch <- chanElement{
Type: TypeStream,
SizeHint: el.SizeHint,
Stream: innerCh,
}
innerSR, _ := el.DecodeStream() // already checked Err previously
err = <-chanReader(innerCh, innerSR)
}
}
}()
return errCh
}