summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go')
-rw-r--r--vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go179
1 files changed, 179 insertions, 0 deletions
diff --git a/vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go b/vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
new file mode 100644
index 00000000..37ebacbc
--- /dev/null
+++ b/vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
@@ -0,0 +1,179 @@
+package exec
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "time"
+
+ "github.com/graph-gophers/graphql-go/errors"
+ "github.com/graph-gophers/graphql-go/internal/exec/resolvable"
+ "github.com/graph-gophers/graphql-go/internal/exec/selected"
+ "github.com/graph-gophers/graphql-go/types"
+)
+
+type Response struct {
+ Data json.RawMessage
+ Errors []*errors.QueryError
+}
+
+func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *types.OperationDefinition) <-chan *Response {
+ var result reflect.Value
+ var f *fieldToExec
+ var err *errors.QueryError
+ func() {
+ defer r.handlePanic(ctx)
+
+ sels := selected.ApplyOperation(&r.Request, s, op)
+ var fields []*fieldToExec
+ collectFieldsToResolve(sels, s, s.Resolver, &fields, make(map[string]*fieldToExec))
+
+ // TODO: move this check into validation.Validate
+ if len(fields) != 1 {
+ err = errors.Errorf("%s", "can subscribe to at most one subscription at a time")
+ return
+ }
+ f = fields[0]
+
+ var in []reflect.Value
+ if f.field.HasContext {
+ in = append(in, reflect.ValueOf(ctx))
+ }
+ if f.field.ArgsPacker != nil {
+ in = append(in, f.field.PackedArgs)
+ }
+ callOut := f.resolver.Method(f.field.MethodIndex).Call(in)
+ result = callOut[0]
+
+ if f.field.HasError && !callOut[1].IsNil() {
+ switch resolverErr := callOut[1].Interface().(type) {
+ case *errors.QueryError:
+ err = resolverErr
+ case error:
+ err = errors.Errorf("%s", resolverErr)
+ err.ResolverError = resolverErr
+ default:
+ panic(fmt.Errorf("can only deal with *QueryError and error types, got %T", resolverErr))
+ }
+ }
+ }()
+
+ // Handles the case where the locally executed func above panicked
+ if len(r.Request.Errs) > 0 {
+ return sendAndReturnClosed(&Response{Errors: r.Request.Errs})
+ }
+
+ if f == nil {
+ return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
+ }
+
+ if err != nil {
+ if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild {
+ return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
+ }
+ return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: []*errors.QueryError{err}})
+ }
+
+ if ctxErr := ctx.Err(); ctxErr != nil {
+ return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{errors.Errorf("%s", ctxErr)}})
+ }
+
+ c := make(chan *Response)
+ // TODO: handle resolver nil channel better?
+ if result.IsZero() {
+ close(c)
+ return c
+ }
+
+ go func() {
+ for {
+ // Check subscription context
+ chosen, resp, ok := reflect.Select([]reflect.SelectCase{
+ {
+ Dir: reflect.SelectRecv,
+ Chan: reflect.ValueOf(ctx.Done()),
+ },
+ {
+ Dir: reflect.SelectRecv,
+ Chan: result,
+ },
+ })
+ switch chosen {
+ // subscription context done
+ case 0:
+ close(c)
+ return
+ // upstream received
+ case 1:
+ // upstream closed
+ if !ok {
+ close(c)
+ return
+ }
+
+ subR := &Request{
+ Request: selected.Request{
+ Doc: r.Request.Doc,
+ Vars: r.Request.Vars,
+ Schema: r.Request.Schema,
+ },
+ Limiter: r.Limiter,
+ Tracer: r.Tracer,
+ Logger: r.Logger,
+ }
+ var out bytes.Buffer
+ func() {
+ timeout := r.SubscribeResolverTimeout
+ if timeout == 0 {
+ timeout = time.Second
+ }
+
+ subCtx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+
+ // resolve response
+ func() {
+ defer subR.handlePanic(subCtx)
+
+ var buf bytes.Buffer
+ subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, &buf)
+
+ propagateChildError := false
+ if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild && resolvedToNull(&buf) {
+ propagateChildError = true
+ }
+
+ if !propagateChildError {
+ out.WriteString(fmt.Sprintf(`{"%s":`, f.field.Alias))
+ out.Write(buf.Bytes())
+ out.WriteString(`}`)
+ }
+ }()
+
+ if err := subCtx.Err(); err != nil {
+ c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}}
+ return
+ }
+
+ // Send response within timeout
+ // TODO: maybe block until sent?
+ select {
+ case <-subCtx.Done():
+ case c <- &Response{Data: out.Bytes(), Errors: subR.Errs}:
+ }
+ }()
+ }
+ }
+ }()
+
+ return c
+}
+
+func sendAndReturnClosed(resp *Response) chan *Response {
+ c := make(chan *Response, 1)
+ c <- resp
+ close(c)
+ return c
+}