diff options
Diffstat (limited to 'vendor/github.com/graph-gophers/graphql-go/subscriptions.go')
-rw-r--r-- | vendor/github.com/graph-gophers/graphql-go/subscriptions.go | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/vendor/github.com/graph-gophers/graphql-go/subscriptions.go b/vendor/github.com/graph-gophers/graphql-go/subscriptions.go new file mode 100644 index 00000000..34064dc7 --- /dev/null +++ b/vendor/github.com/graph-gophers/graphql-go/subscriptions.go @@ -0,0 +1,96 @@ +package graphql + +import ( + "context" + "errors" + + qerrors "github.com/graph-gophers/graphql-go/errors" + "github.com/graph-gophers/graphql-go/internal/common" + "github.com/graph-gophers/graphql-go/internal/exec" + "github.com/graph-gophers/graphql-go/internal/exec/resolvable" + "github.com/graph-gophers/graphql-go/internal/exec/selected" + "github.com/graph-gophers/graphql-go/internal/query" + "github.com/graph-gophers/graphql-go/internal/validation" + "github.com/graph-gophers/graphql-go/introspection" +) + +// Subscribe returns a response channel for the given subscription with the schema's +// resolver. It returns an error if the schema was created without a resolver. +// If the context gets cancelled, the response channel will be closed and no +// further resolvers will be called. The context error will be returned as soon +// as possible (not immediately). +func (s *Schema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) { + if !s.res.Resolver.IsValid() { + return nil, errors.New("schema created without resolver, can not subscribe") + } + if _, ok := s.schema.EntryPoints["subscription"]; !ok { + return nil, errors.New("no subscriptions are offered by the schema") + } + return s.subscribe(ctx, queryString, operationName, variables, s.res), nil +} + +func (s *Schema) subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}, res *resolvable.Schema) <-chan interface{} { + doc, qErr := query.Parse(queryString) + if qErr != nil { + return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qErr}}) + } + + validationFinish := s.validationTracer.TraceValidation(ctx) + errs := validation.Validate(s.schema, doc, variables, s.maxDepth) + validationFinish(errs) + if len(errs) != 0 { + return sendAndReturnClosed(&Response{Errors: errs}) + } + + op, err := getOperation(doc, operationName) + if err != nil { + return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qerrors.Errorf("%s", err)}}) + } + + r := &exec.Request{ + Request: selected.Request{ + Doc: doc, + Vars: variables, + Schema: s.schema, + }, + Limiter: make(chan struct{}, s.maxParallelism), + Tracer: s.tracer, + Logger: s.logger, + PanicHandler: s.panicHandler, + SubscribeResolverTimeout: s.subscribeResolverTimeout, + } + varTypes := make(map[string]*introspection.Type) + for _, v := range op.Vars { + t, err := common.ResolveType(v.Type, s.schema.Resolve) + if err != nil { + return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{err}}) + } + varTypes[v.Name.Name] = introspection.WrapType(t) + } + + if op.Type == query.Query || op.Type == query.Mutation { + data, errs := r.Execute(ctx, res, op) + return sendAndReturnClosed(&Response{Data: data, Errors: errs}) + } + + responses := r.Subscribe(ctx, res, op) + c := make(chan interface{}) + go func() { + for resp := range responses { + c <- &Response{ + Data: resp.Data, + Errors: resp.Errors, + } + } + close(c) + }() + + return c +} + +func sendAndReturnClosed(resp *Response) chan interface{} { + c := make(chan interface{}, 1) + c <- resp + close(c) + return c +} |