WIP jstream: working on channel functionality in jstream
This commit is contained in:
parent
d20ad7830c
commit
196e119c30
96
jstream/chan.go
Normal file
96
jstream/chan.go
Normal file
@ -0,0 +1,96 @@
|
||||
package jstream
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type chanElement struct {
|
||||
Type Type
|
||||
SizeHint uint
|
||||
Err error
|
||||
Value json.RawMessage
|
||||
Bytes io.Reader
|
||||
Stream chan chanElement
|
||||
}
|
||||
|
||||
func chanWriter(sw *StreamWriter, ch <-chan chanElement) <-chan error {
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
for el := range ch {
|
||||
var err error
|
||||
switch {
|
||||
case el.Err != nil:
|
||||
err = el.Err
|
||||
case el.Value != nil:
|
||||
err = sw.EncodeValue(el.Value)
|
||||
case el.Bytes != nil:
|
||||
err = sw.EncodeBytes(el.SizeHint, el.Bytes)
|
||||
case el.Stream != nil:
|
||||
err = sw.EncodeStream(el.SizeHint, func(innerSW *StreamWriter) error {
|
||||
// TODO this is a sticking point. The error which occurs in
|
||||
// here should be available to whatever is writing to
|
||||
// el.Stream, so it can know if it's wasting its time and
|
||||
// should be canceled
|
||||
innerErrCh := chanWriter(innerSW, el.Stream)
|
||||
return <-innerErrCh
|
||||
})
|
||||
default:
|
||||
err = errors.New("malformed chanElement, no fields set")
|
||||
}
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return errCh
|
||||
}
|
||||
|
||||
func chanReader(ch chan<- chanElement, sr *StreamReader) <-chan error {
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
for {
|
||||
el := sr.Next()
|
||||
var err error
|
||||
switch {
|
||||
case el.Err == ErrStreamEnded:
|
||||
return
|
||||
case el.Err != nil:
|
||||
err = el.Err
|
||||
case el.Type == TypeValue:
|
||||
ch <- chanElement{
|
||||
Type: TypeValue,
|
||||
Value: el.value,
|
||||
}
|
||||
case el.Type == TypeBytes:
|
||||
ch <- chanElement{
|
||||
Type: TypeBytes,
|
||||
SizeHint: el.SizeHint,
|
||||
Bytes: el.br,
|
||||
}
|
||||
case el.Type == TypeStream:
|
||||
// TODO this sucks for two reasons:
|
||||
// 1) the user can't set a buffer on this channel, like they
|
||||
// could with the outermost one.
|
||||
// 2) the error returned from chanReader shouldn't get passed up
|
||||
// if it's ErrCanceled, as that will cancel the outer channel
|
||||
// too, which isn't necessary. But in that case there's no
|
||||
// way to let the user know that the inner one was closed due
|
||||
// to being canceled.
|
||||
innerCh := make(chan chanElement)
|
||||
ch <- chanElement{
|
||||
Type: TypeStream,
|
||||
SizeHint: el.SizeHint,
|
||||
Stream: innerCh,
|
||||
}
|
||||
innerSR, _ := el.DecodeStream() // already checked Err previously
|
||||
err = <-chanReader(innerCh, innerSR)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return errCh
|
||||
}
|
@ -108,8 +108,6 @@ package jstream
|
||||
// TODO figure out how to expose the json.Encoder/Decoders so that users can set
|
||||
// custom options on them (like UseNumber and whatnot)
|
||||
|
||||
// TODO attempt to refactor this into using channels? or write a layer on top
|
||||
// which does so?
|
||||
// TODO TypeNil?
|
||||
|
||||
import (
|
||||
|
Loading…
Reference in New Issue
Block a user