summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/graph-gophers/graphql-go/subscriptions.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/graph-gophers/graphql-go/subscriptions.go')
-rw-r--r--vendor/github.com/graph-gophers/graphql-go/subscriptions.go96
1 files changed, 96 insertions, 0 deletions
diff --git a/vendor/github.com/graph-gophers/graphql-go/subscriptions.go b/vendor/github.com/graph-gophers/graphql-go/subscriptions.go
new file mode 100644
index 00000000..34064dc7
--- /dev/null
+++ b/vendor/github.com/graph-gophers/graphql-go/subscriptions.go
@@ -0,0 +1,96 @@
+package graphql
+
+import (
+ "context"
+ "errors"
+
+ qerrors "github.com/graph-gophers/graphql-go/errors"
+ "github.com/graph-gophers/graphql-go/internal/common"
+ "github.com/graph-gophers/graphql-go/internal/exec"
+ "github.com/graph-gophers/graphql-go/internal/exec/resolvable"
+ "github.com/graph-gophers/graphql-go/internal/exec/selected"
+ "github.com/graph-gophers/graphql-go/internal/query"
+ "github.com/graph-gophers/graphql-go/internal/validation"
+ "github.com/graph-gophers/graphql-go/introspection"
+)
+
+// Subscribe returns a response channel for the given subscription with the schema's
+// resolver. It returns an error if the schema was created without a resolver.
+// If the context gets cancelled, the response channel will be closed and no
+// further resolvers will be called. The context error will be returned as soon
+// as possible (not immediately).
+func (s *Schema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) {
+ if !s.res.Resolver.IsValid() {
+ return nil, errors.New("schema created without resolver, can not subscribe")
+ }
+ if _, ok := s.schema.EntryPoints["subscription"]; !ok {
+ return nil, errors.New("no subscriptions are offered by the schema")
+ }
+ return s.subscribe(ctx, queryString, operationName, variables, s.res), nil
+}
+
+func (s *Schema) subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}, res *resolvable.Schema) <-chan interface{} {
+ doc, qErr := query.Parse(queryString)
+ if qErr != nil {
+ return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qErr}})
+ }
+
+ validationFinish := s.validationTracer.TraceValidation(ctx)
+ errs := validation.Validate(s.schema, doc, variables, s.maxDepth)
+ validationFinish(errs)
+ if len(errs) != 0 {
+ return sendAndReturnClosed(&Response{Errors: errs})
+ }
+
+ op, err := getOperation(doc, operationName)
+ if err != nil {
+ return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qerrors.Errorf("%s", err)}})
+ }
+
+ r := &exec.Request{
+ Request: selected.Request{
+ Doc: doc,
+ Vars: variables,
+ Schema: s.schema,
+ },
+ Limiter: make(chan struct{}, s.maxParallelism),
+ Tracer: s.tracer,
+ Logger: s.logger,
+ PanicHandler: s.panicHandler,
+ SubscribeResolverTimeout: s.subscribeResolverTimeout,
+ }
+ varTypes := make(map[string]*introspection.Type)
+ for _, v := range op.Vars {
+ t, err := common.ResolveType(v.Type, s.schema.Resolve)
+ if err != nil {
+ return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{err}})
+ }
+ varTypes[v.Name.Name] = introspection.WrapType(t)
+ }
+
+ if op.Type == query.Query || op.Type == query.Mutation {
+ data, errs := r.Execute(ctx, res, op)
+ return sendAndReturnClosed(&Response{Data: data, Errors: errs})
+ }
+
+ responses := r.Subscribe(ctx, res, op)
+ c := make(chan interface{})
+ go func() {
+ for resp := range responses {
+ c <- &Response{
+ Data: resp.Data,
+ Errors: resp.Errors,
+ }
+ }
+ close(c)
+ }()
+
+ return c
+}
+
+func sendAndReturnClosed(resp *Response) chan interface{} {
+ c := make(chan interface{}, 1)
+ c <- resp
+ close(c)
+ return c
+}