Execution

GraphQL Execution

The graphql.execution package is responsible for the execution phase of fulfilling a GraphQL request.

graphql.execution.execute(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, middleware: Middleware | None = None, execution_context_class: type[ExecutionContext] | None = None, is_awaitable: Callable[[Any], bool] | None = None) AwaitableOrValue[ExecutionResult]

Execute a GraphQL operation.

Implements the “Executing requests” section of the GraphQL specification.

Returns an ExecutionResult (if all encountered resolvers are synchronous), or a coroutine object eventually yielding an ExecutionResult.

If the arguments to this function do not result in a legal execution context, a GraphQLError will be thrown immediately explaining the invalid input.

This function does not support incremental delivery (@defer and @stream). If an operation that defers or streams data is executed with this function, it will throw an error instead. Use experimental_execute_incrementally if you want to support incremental delivery.

graphql.execution.experimental_execute_incrementally(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, middleware: Middleware | None = None, execution_context_class: type[ExecutionContext] | None = None, is_awaitable: Callable[[Any], bool] | None = None) AwaitableOrValue[ExecutionResult | ExperimentalIncrementalExecutionResults]

Execute GraphQL operation incrementally (internal implementation).

Implements the “Executing requests” section of the GraphQL specification, including @defer and @stream as proposed in https://github.com/graphql/graphql-spec/pull/742

This function returns an awaitable that is either a single ExecutionResult or an ExperimentalIncrementalExecutionResults object, containing an initialResult and a stream of subsequent_results.

graphql.execution.execute_sync(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, middleware: Middleware | None = None, execution_context_class: type[ExecutionContext] | None = None, check_sync: bool = False) ExecutionResult

Execute a GraphQL operation synchronously.

Also implements the “Executing requests” section of the GraphQL specification.

However, it guarantees to complete synchronously (or throw an error) assuming that all field resolvers are also synchronous.

Set check_sync to True to still run checks that no awaitable values are returned.

graphql.execution.default_field_resolver(source: Any, info: GraphQLResolveInfo, **args: Any) Any

Default field resolver.

If a resolve function is not given, then a default resolve behavior is used which takes the property of the source object of the same name as the field and returns it as the result, or if it’s a function, returns the result of calling that function while passing along args and context.

For dictionaries, the field names are used as keys, for all other objects they are used as attribute names.

graphql.execution.default_type_resolver(value: Any, info: GraphQLResolveInfo, abstract_type: GraphQLAbstractType) AwaitableOrValue[str | None]

Default type resolver function.

If a resolve_type function is not given, then a default resolve behavior is used which attempts two strategies:

First, See if the provided value has a __typename field defined, if so, use that value as name of the resolved type.

Otherwise, test each possible type for the abstract type by calling is_type_of() for the object being coerced, returning the first type that matches.

class graphql.execution.ExecutionContext(schema: GraphQLSchema, fragments: dict[str, graphql.language.ast.FragmentDefinitionNode], root_value: Any, context_value: Any, operation: OperationDefinitionNode, variable_values: dict[str, Any], field_resolver: Callable[[...], Any], type_resolver: Callable[[Any, GraphQLResolveInfo, Union[GraphQLInterfaceType, GraphQLUnionType]], Optional[Union[Awaitable[Optional[str]], str]]], subscribe_field_resolver: Callable[[...], Any], subsequent_payloads: dict[Union[graphql.execution.incremental_publisher.DeferredFragmentRecord, graphql.execution.incremental_publisher.StreamItemsRecord], None], errors: list[graphql.error.graphql_error.GraphQLError], middleware_manager: graphql.execution.middleware.MiddlewareManager | None, is_awaitable: Optional[Callable[[Any], bool]])

Bases: IncrementalPublisherMixin

Data that must be available at all points during query execution.

Namely, schema of the type system that is currently executing, and the fragments defined in the query document.

__init__(schema: GraphQLSchema, fragments: dict[str, graphql.language.ast.FragmentDefinitionNode], root_value: Any, context_value: Any, operation: OperationDefinitionNode, variable_values: dict[str, Any], field_resolver: Callable[[...], Any], type_resolver: Callable[[Any, GraphQLResolveInfo, Union[GraphQLInterfaceType, GraphQLUnionType]], Optional[Union[Awaitable[Optional[str]], str]]], subscribe_field_resolver: Callable[[...], Any], subsequent_payloads: dict[Union[graphql.execution.incremental_publisher.DeferredFragmentRecord, graphql.execution.incremental_publisher.StreamItemsRecord], None], errors: list[graphql.error.graphql_error.GraphQLError], middleware_manager: graphql.execution.middleware.MiddlewareManager | None, is_awaitable: Optional[Callable[[Any], bool]]) None
classmethod build(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, raw_variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, middleware: Middleware | None = None, is_awaitable: Callable[[Any], bool] | None = None) list[GraphQLError] | ExecutionContext

Build an execution context

Constructs a ExecutionContext object from the arguments passed to execute, which we will pass throughout the other execution methods.

Throws a GraphQLError if a valid execution context cannot be created.

For internal use only.

build_per_event_execution_context(payload: Any) ExecutionContext

Create a copy of the execution context for usage with subscribe events.

build_resolve_info(field_def: GraphQLField, field_group: list[graphql.language.ast.FieldNode], parent_type: GraphQLObjectType, path: Path) GraphQLResolveInfo

Build the GraphQLResolveInfo object.

For internal use only.

static build_response(data: dict[str, Any] | None, errors: list[graphql.error.graphql_error.GraphQLError]) ExecutionResult

Build response.

Given a completed execution context and data, build the (data, errors) response defined by the “Response” section of the GraphQL spec.

collect_and_execute_subfields(return_type: GraphQLObjectType, field_group: FieldGroup, path: Path, result: Any, incremental_data_record: IncrementalDataRecord | None) AwaitableOrValue[dict[str, Any]]

Collect sub-fields to execute to complete this value.

collect_subfields(return_type: GraphQLObjectType, field_group: list[graphql.language.ast.FieldNode]) FieldsAndPatches

Collect subfields.

A cached collection of relevant subfields with regard to the return type is kept in the execution context as _subfields_cache. This ensures the subfields are not repeatedly calculated, which saves overhead when resolving lists of values.

complete_abstract_value(return_type: GraphQLAbstractType, field_group: FieldGroup, info: GraphQLResolveInfo, path: Path, result: Any, incremental_data_record: IncrementalDataRecord | None) AwaitableOrValue[Any]

Complete an abstract value.

Complete a value of an abstract type by determining the runtime object type of that value, then complete the value for that type.

async complete_async_iterator_value(item_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, path: Path, async_iterator: AsyncIterator[Any], incremental_data_record: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]]) list[Any]

Complete an async iterator.

Complete an async iterator value by completing the result and calling recursively until all the results are completed.

async complete_awaitable_value(return_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, path: Path, result: Any, incremental_data_record: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]] = None) Any

Complete an awaitable value.

static complete_leaf_value(return_type: Union[GraphQLScalarType, GraphQLEnumType], result: Any) Any

Complete a leaf value.

Complete a Scalar or Enum by serializing to a valid value, returning null if serialization is not possible.

complete_list_item_value(item: Any, complete_results: list[Any], item_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, item_path: Path, incremental_data_record: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]]) bool

Complete a list item value by adding it to the completed results.

Returns True if the value is awaitable.

complete_list_value(return_type: GraphQLList[GraphQLOutputType], field_group: FieldGroup, info: GraphQLResolveInfo, path: Path, result: AsyncIterable[Any] | Iterable[Any], incremental_data_record: IncrementalDataRecord | None) AwaitableOrValue[list[Any]]

Complete a list value.

Complete a list value by completing each item in the list with the inner type.

complete_object_value(return_type: GraphQLObjectType, field_group: FieldGroup, info: GraphQLResolveInfo, path: Path, result: Any, incremental_data_record: IncrementalDataRecord | None) AwaitableOrValue[dict[str, Any]]

Complete an Object value by executing all sub-selections.

complete_value(return_type: GraphQLOutputType, field_group: FieldGroup, info: GraphQLResolveInfo, path: Path, result: Any, incremental_data_record: IncrementalDataRecord | None) AwaitableOrValue[Any]

Complete a value.

Implements the instructions for completeValue as defined in the “Value completion” section of the spec.

If the field type is Non-Null, then this recursively completes the value for the inner type. It throws a field error if that completion returns null, as per the “Nullability” section of the spec.

If the field type is a List, then this recursively completes the value for the inner type on each item in the list.

If the field type is a Scalar or Enum, ensures the completed value is a legal value of the type by calling the serialize method of GraphQL type definition.

If the field is an abstract type, determine the runtime type of the value and then complete based on that type.

Otherwise, the field type expects a sub-selection set, and will complete the value by evaluating all sub-selections.

context_value: Any
ensure_valid_runtime_type(runtime_type_name: Any, return_type: Union[GraphQLInterfaceType, GraphQLUnionType], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, result: Any) GraphQLObjectType

Ensure that the given type is valid at runtime.

errors: list[graphql.error.graphql_error.GraphQLError]
execute_deferred_fragment(parent_type: GraphQLObjectType, source_value: Any, fields: dict[str, list[graphql.language.ast.FieldNode]], label: Optional[str] = None, path: Optional[Path] = None, parent_context: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]] = None) None

Execute deferred fragment.

execute_field(parent_type: GraphQLObjectType, source: Any, field_group: FieldGroup, path: Path, incremental_data_record: IncrementalDataRecord | None = None) AwaitableOrValue[Any]

Resolve the field on the given source object.

Implements the “Executing fields” section of the spec.

In particular, this method figures out the value that the field returns by calling its resolve function, then calls complete_value to await coroutine objects, serialize scalars, or execute the sub-selection-set for objects.

execute_fields(parent_type: GraphQLObjectType, source_value: Any, path: Path | None, grouped_field_set: GroupedFieldSet, incremental_data_record: IncrementalDataRecord | None = None) AwaitableOrValue[dict[str, Any]]

Execute the given fields concurrently.

Implements the “Executing selection sets” section of the spec for fields that may be executed in parallel.

execute_fields_serially(parent_type: GraphQLObjectType, source_value: Any, path: Path | None, grouped_field_set: GroupedFieldSet) AwaitableOrValue[dict[str, Any]]

Execute the given fields serially.

Implements the “Executing selection sets” section of the spec for fields that must be executed serially.

execute_operation() AwaitableOrValue[dict[str, Any]]

Execute an operation.

Implements the “Executing operations” section of the spec.

async execute_stream_async_iterator(initial_index: int, async_iterator: AsyncIterator[Any], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, item_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], path: Path, label: Optional[str] = None, parent_context: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]] = None) None

Execute stream iterator.

async execute_stream_async_iterator_item(async_iterator: AsyncIterator[Any], field_group: list[graphql.language.ast.FieldNode], info: GraphQLResolveInfo, item_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], incremental_data_record: StreamItemsRecord, path: Path, item_path: Path) Any

Execute stream iterator item.

execute_stream_field(path: Path, item_path: Path, item: AwaitableOrValue[Any], field_group: FieldGroup, info: GraphQLResolveInfo, item_type: GraphQLOutputType, label: str | None = None, parent_context: IncrementalDataRecord | None = None) IncrementalDataRecord

Execute stream field.

field_resolver: Callable[[...], Any]
filter_subsequent_payloads(null_path: Path, current_incremental_data_record: IncrementalDataRecord | None = None) None

Filter subsequent payloads.

fragments: dict[str, graphql.language.ast.FragmentDefinitionNode]
get_completed_incremental_results() list[Union[graphql.execution.incremental_publisher.IncrementalDeferResult, graphql.execution.incremental_publisher.IncrementalStreamResult]]

Get completed incremental results.

get_stream_values(field_group: list[graphql.language.ast.FieldNode], path: Path) graphql.execution.execute.StreamArguments | None

Get stream values.

Returns an object containing the @stream arguments if a field should be streamed based on the experimental flag, stream directive present and not disabled by the “if” argument.

handle_field_error(raw_error: Exception, return_type: Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList, GraphQLNonNull[Union[GraphQLScalarType, GraphQLObjectType, GraphQLInterfaceType, GraphQLUnionType, GraphQLEnumType, GraphQLList]]], field_group: list[graphql.language.ast.FieldNode], path: Path, incremental_data_record: Optional[Union[DeferredFragmentRecord, StreamItemsRecord]] = None) None

Handle error properly according to the field type.

static is_awaitable(value: Any) TypeGuard[Awaitable]

Return True if object can be passed to an await expression.

Instead of testing whether the object is an instance of abc.Awaitable, we check the existence of an __await__ attribute. This is much faster.

map_source_to_response(result_or_stream: Union[ExecutionResult, AsyncIterable[Any]]) Union[AsyncGenerator[ExecutionResult, None], ExecutionResult]

Map source result to response.

For each payload yielded from a subscription, map it over the normal GraphQL execute() function, with payload as the root_value. This implements the “MapSourceToResponseEvent” algorithm described in the GraphQL specification. The execute() function provides the “ExecuteSubscriptionEvent” algorithm, as it is nearly identical to the “ExecuteQuery” algorithm, for which execute() is also used.

middleware_manager: graphql.execution.middleware.MiddlewareManager | None
operation: OperationDefinitionNode
root_value: Any
schema: GraphQLSchema
subscribe_field_resolver: Callable[[...], Any]
subsequent_payloads: dict[Union[graphql.execution.incremental_publisher.DeferredFragmentRecord, graphql.execution.incremental_publisher.StreamItemsRecord], None]
type_resolver: Callable[[Any, GraphQLResolveInfo, Union[GraphQLInterfaceType, GraphQLUnionType]], Optional[Union[Awaitable[Optional[str]], str]]]
variable_values: dict[str, Any]
async yield_subsequent_payloads() AsyncGenerator[SubsequentIncrementalExecutionResult, None]

Yield subsequent payloads.

class graphql.execution.ExecutionResult(data: Optional[dict[str, Any]] = None, errors: Optional[list[graphql.error.graphql_error.GraphQLError]] = None, extensions: Optional[dict[str, Any]] = None)

Bases: object

The result of GraphQL execution.

  • data is the result of a successful execution of the query.

  • errors is included when any errors occurred as a non-empty list.

  • extensions is reserved for adding non-standard properties.

__init__(data: Optional[dict[str, Any]] = None, errors: Optional[list[graphql.error.graphql_error.GraphQLError]] = None, extensions: Optional[dict[str, Any]] = None) None
data: dict[str, Any] | None
errors: list[graphql.error.graphql_error.GraphQLError] | None
extensions: dict[str, Any] | None
property formatted: FormattedExecutionResult

Get execution result formatted according to the specification.

class graphql.execution.FormattedExecutionResult

Bases: TypedDict

Formatted execution result

data: dict[str, Any] | None
errors: list[graphql.error.graphql_error.GraphQLFormattedError]
extensions: dict[str, Any]
class graphql.execution.ExperimentalIncrementalExecutionResults(initial_result: InitialIncrementalExecutionResult, subsequent_results: AsyncGenerator[SubsequentIncrementalExecutionResult, None])

Bases: NamedTuple

Execution results when retrieved incrementally.

__init__()
count(value, /)

Return number of occurrences of value.

index(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

initial_result: InitialIncrementalExecutionResult

Alias for field number 0

subsequent_results: AsyncGenerator[SubsequentIncrementalExecutionResult, None]

Alias for field number 1

class graphql.execution.InitialIncrementalExecutionResult(data: Optional[dict[str, Any]] = None, errors: Optional[list[graphql.error.graphql_error.GraphQLError]] = None, incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]] = None, has_next: bool = False, extensions: Optional[dict[str, Any]] = None)

Bases: object

Initial incremental execution result.

  • has_next is True if a future payload is expected.

  • incremental is a list of the results from defer/stream directives.

__init__(data: Optional[dict[str, Any]] = None, errors: Optional[list[graphql.error.graphql_error.GraphQLError]] = None, incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]] = None, has_next: bool = False, extensions: Optional[dict[str, Any]] = None) None
data: dict[str, Any] | None
errors: list[graphql.error.graphql_error.GraphQLError] | None
extensions: dict[str, Any] | None
property formatted: FormattedInitialIncrementalExecutionResult

Get execution result formatted according to the specification.

has_next: bool
incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]]
class graphql.execution.FormattedInitialIncrementalExecutionResult

Bases: TypedDict

Formatted initial incremental execution result

data: dict[str, Any] | None
errors: list[graphql.error.graphql_error.GraphQLFormattedError]
extensions: dict[str, Any]
hasNext: bool
incremental: list[Union[graphql.execution.incremental_publisher.FormattedIncrementalDeferResult, graphql.execution.incremental_publisher.FormattedIncrementalStreamResult]]
class graphql.execution.SubsequentIncrementalExecutionResult(incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]] = None, has_next: bool = False, extensions: Optional[dict[str, Any]] = None)

Bases: object

Subsequent incremental execution result.

  • has_next is True if a future payload is expected.

  • incremental is a list of the results from defer/stream directives.

__init__(incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]] = None, has_next: bool = False, extensions: Optional[dict[str, Any]] = None) None
extensions: dict[str, Any] | None
property formatted: FormattedSubsequentIncrementalExecutionResult

Get execution result formatted according to the specification.

has_next: bool
incremental: Optional[Sequence[Union[IncrementalDeferResult, IncrementalStreamResult]]]
class graphql.execution.FormattedSubsequentIncrementalExecutionResult

Bases: TypedDict

Formatted subsequent incremental execution result

extensions: dict[str, Any]
hasNext: bool
incremental: list[Union[graphql.execution.incremental_publisher.FormattedIncrementalDeferResult, graphql.execution.incremental_publisher.FormattedIncrementalStreamResult]]
class graphql.execution.IncrementalDeferResult(data: dict[str, Any] | None = None, errors: list[GraphQLError] | None = None, path: list[str | int] | None = None, label: str | None = None, extensions: dict[str, Any] | None = None)

Bases: object

Incremental deferred execution result

__init__(data: dict[str, Any] | None = None, errors: list[GraphQLError] | None = None, path: list[str | int] | None = None, label: str | None = None, extensions: dict[str, Any] | None = None) None
data: dict[str, Any] | None
errors: list[GraphQLError] | None
extensions: dict[str, Any] | None
property formatted: FormattedIncrementalDeferResult

Get execution result formatted according to the specification.

label: str | None
path: list[str | int] | None
class graphql.execution.FormattedIncrementalDeferResult

Bases: TypedDict

Formatted incremental deferred execution result

data: dict[str, Any] | None
errors: list[GraphQLFormattedError]
extensions: dict[str, Any]
label: str
path: list[str | int]
class graphql.execution.IncrementalStreamResult(items: list[Any] | None = None, errors: list[GraphQLError] | None = None, path: list[str | int] | None = None, label: str | None = None, extensions: dict[str, Any] | None = None)

Bases: object

Incremental streamed execution result

__init__(items: list[Any] | None = None, errors: list[GraphQLError] | None = None, path: list[str | int] | None = None, label: str | None = None, extensions: dict[str, Any] | None = None) None
errors: list[GraphQLError] | None
extensions: dict[str, Any] | None
property formatted: FormattedIncrementalStreamResult

Get execution result formatted according to the specification.

items: list[Any] | None
label: str | None
path: list[str | int] | None
class graphql.execution.FormattedIncrementalStreamResult

Bases: TypedDict

Formatted incremental stream execution result

errors: list[GraphQLFormattedError]
extensions: dict[str, Any]
label: str
path: list[str | int]
graphql.execution.IncrementalResult

alias of Union[IncrementalDeferResult, IncrementalStreamResult]

graphql.execution.FormattedIncrementalResult

alias of Union[FormattedIncrementalDeferResult, FormattedIncrementalStreamResult]

graphql.execution.subscribe(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, execution_context_class: type[ExecutionContext] | None = None) AwaitableOrValue[AsyncIterator[ExecutionResult] | ExecutionResult]

Create a GraphQL subscription.

Implements the “Subscribe” algorithm described in the GraphQL spec.

Returns a coroutine object which yields either an AsyncIterator (if successful) or an ExecutionResult (client error). The coroutine will raise an exception if a server error occurs.

If the client-provided arguments to this function do not result in a compliant subscription, a GraphQL Response (ExecutionResult) with descriptive errors and no data will be returned.

If the source stream could not be created due to faulty subscription resolver logic or underlying systems, the coroutine object will yield a single ExecutionResult containing errors and no data.

If the operation succeeded, the coroutine will yield an AsyncIterator, which yields a stream of ExecutionResults representing the response stream.

This function does not support incremental delivery (@defer and @stream). If an operation that defers or streams data is executed with this function, a field error will be raised at the location of the @defer or @stream directive.

graphql.execution.create_source_event_stream(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, execution_context_class: type[ExecutionContext] | None = None) AwaitableOrValue[AsyncIterable[Any] | ExecutionResult]

Create source event stream

Implements the “CreateSourceEventStream” algorithm described in the GraphQL specification, resolving the subscription source event stream.

Returns a coroutine that yields an AsyncIterable.

If the client-provided arguments to this function do not result in a compliant subscription, a GraphQL Response (ExecutionResult) with descriptive errors and no data will be returned.

If the source stream could not be created due to faulty subscription resolver logic or underlying systems, the coroutine object will yield a single ExecutionResult containing errors and no data.

A source event stream represents a sequence of events, each of which triggers a GraphQL execution for that event.

This may be useful when hosting the stateful subscription service in a different process or machine than the stateless GraphQL execution engine, or otherwise separating these two steps. For more on this, see the “Supporting Subscriptions at Scale” information in the GraphQL spec.

graphql.execution.Middleware

alias of Optional[Union[Tuple, List, MiddlewareManager]]

class graphql.execution.MiddlewareManager(*middlewares: Any)

Bases: object

Manager for the middleware chain.

This class helps to wrap resolver functions with the provided middleware functions and/or objects. The functions take the next middleware function as first argument. If middleware is provided as an object, it must provide a method resolve that is used as the middleware function.

Note that since resolvers return “AwaitableOrValue”s, all middleware functions must be aware of this and check whether values are awaitable before awaiting them.

__init__(*middlewares: Any) None
get_field_resolver(field_resolver: Callable[[...], Any]) Callable[[...], Any]

Wrap the provided resolver with the middleware.

Returns a function that chains the middleware functions with the provided resolver function.

middlewares
graphql.execution.get_directive_values(directive_def: GraphQLDirective, node: Union[EnumValueDefinitionNode, ExecutableDefinitionNode, FieldDefinitionNode, InputValueDefinitionNode, SelectionNode, SchemaDefinitionNode, TypeDefinitionNode, TypeExtensionNode], variable_values: Optional[dict[str, Any]] = None) dict[str, Any] | None

Get coerced argument values based on provided nodes.

Prepares a dict of argument values given a directive definition and an AST node which may contain directives. Optionally also accepts a dict of variable values.

If the directive does not exist on the node, returns None.

graphql.execution.get_variable_values(schema: GraphQLSchema, var_def_nodes: Collection[VariableDefinitionNode], inputs: dict[str, Any], max_errors: Optional[int] = None) Union[List[GraphQLError], Dict[str, Any]]

Get coerced variable values based on provided definitions.

Prepares a dict of variable values of the correct type based on the provided variable definitions and arbitrary input. If the input cannot be parsed to match the variable definitions, a GraphQLError will be raised.