Execution
GraphQL Execution
The graphql.execution package is responsible for the execution phase of
fulfilling a GraphQL request.
- graphql.execution.execute(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, max_coercion_errors: int = 50, enable_early_execution: bool = False, middleware: Middleware | None = None, executor_class: type[Executor] | None = None, is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None, is_async_iterable: Callable[[Any], TypeGuard[AsyncIterable]] | None = None, hide_suggestions: bool = False, abort_signal: AbortSignal | None = None, hooks: ExecutionHooks | None = None, **custom_context_args: Any) AwaitableOrValue[ExecutionResult]
Execute a GraphQL operation.
Implements the “Executing requests” section of the GraphQL specification.
Returns an ExecutionResult (if all encountered resolvers are synchronous), or a coroutine object eventually yielding an ExecutionResult.
If the arguments to this function do not result in a legal executor, a GraphQLError will be thrown immediately explaining the invalid input.
This function does not support incremental delivery (@defer and @stream). If an operation that defers or streams data is executed with this function, it will throw an error instead. Use experimental_execute_incrementally if you want to support incremental delivery.
- graphql.execution.execute_root_selection_set(executor: Executor) Awaitable[ExecutionResult] | ExecutionResult
Execute the root selection set.
Implements the “Executing operations” section of the GraphQL specification, running the given executor to completion. This does not support incremental delivery (
@deferand@stream).
- graphql.execution.experimental_execute_incrementally(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, max_coercion_errors: int = 50, enable_early_execution: bool = False, middleware: Middleware | None = None, executor_class: type[Executor] | None = None, is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None, is_async_iterable: Callable[[Any], TypeGuard[AsyncIterable]] | None = None, hide_suggestions: bool = False, abort_signal: AbortSignal | None = None, hooks: ExecutionHooks | None = None, **custom_context_args: Any) AwaitableOrValue[ExecutionResult | ExperimentalIncrementalExecutionResults]
Execute GraphQL operation incrementally (internal implementation).
Implements the “Executing requests” section of the GraphQL specification, including @defer and @stream as proposed in https://github.com/graphql/graphql-spec/pull/742
This function returns an awaitable that is either a single ExecutionResult or an ExperimentalIncrementalExecutionResults object, containing an initial_result and a stream of subsequent_results.
- graphql.execution.execute_sync(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: Callable[[...], Any] | None = None, type_resolver: Callable[[Any, GraphQLResolveInfo, GraphQLInterfaceType | GraphQLUnionType], Awaitable[str | None] | str | None] | None = None, max_coercion_errors: int = 50, middleware: tuple | list | MiddlewareManager | None = None, executor_class: type[Executor] | None = None, check_sync: bool = False, hide_suggestions: bool = False, abort_signal: AbortSignal | None = None, hooks: ExecutionHooks | None = None) ExecutionResult
Execute a GraphQL operation synchronously.
Also implements the “Executing requests” section of the GraphQL specification.
However, it guarantees to complete synchronously (or throw an error) assuming that all field resolvers are also synchronous.
Set check_sync to True to still run checks that no awaitable values are returned.
- graphql.execution.default_field_resolver(source: Any, info: GraphQLResolveInfo, **args: Any) Any
Default field resolver.
If a resolve function is not given, then a default resolve behavior is used which takes the property of the source object of the same name as the field and returns it as the result, or if it’s a function, returns the result of calling that function while passing along args and context.
For dictionaries, the field names are used as keys, for all other objects they are used as attribute names.
- graphql.execution.default_type_resolver(value: Any, info: GraphQLResolveInfo, abstract_type: GraphQLInterfaceType | GraphQLUnionType) Awaitable[str | None] | str | None
Default type resolver function.
If a resolve_type function is not given, then a default resolve behavior is used which attempts two strategies:
First, See if the provided value has a
__typenamefield defined, if so, use that value as name of the resolved type.Otherwise, test each possible type for the abstract type by calling
is_type_of()for the object being coerced, returning the first type that matches.
- class graphql.execution.Executor(schema: GraphQLSchema, fragment_definitions: dict[str, FragmentDefinitionNode], fragments: dict[str, FragmentDetails], root_value: Any, context_value: Any, operation: OperationDefinitionNode, variable_values: VariableValues, field_resolver: GraphQLFieldResolver, type_resolver: GraphQLTypeResolver, subscribe_field_resolver: GraphQLFieldResolver, enable_early_execution: bool = False, middleware_manager: MiddlewareManager | None = None, is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None, is_async_iterable: Callable[[Any], TypeGuard[AsyncIterable]] | None = None, hide_suggestions: bool = False, abort_signal: AbortSignal | None = None, hooks: ExecutionHooks | None = None)
Bases:
IncrementalPublisherContextData that must be available at all points during query execution.
Namely, schema of the type system that is currently executing, and the fragments defined in the query document.
- abort_error() Exception
Return the exception to raise when execution has been aborted.
An abort reason that is itself an exception is raised as is; any other value is reported as an unexpected error value.
- abort_signal: AbortSignal | None
- async_helpers: GraphQLResolveInfoHelpers
- async_work_finished_hook_task: Future[None] | None
- background_futures: set[Future[Any]]
- box_incremental_result(result: Awaitable[T] | T) BoxedAwaitableOrValue[T]
Box a possibly awaitable incremental result.
A pending result is registered so that it can be cancelled when the incremental execution is stopped before it has settled.
- classmethod build(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, raw_variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: GraphQLFieldResolver | None = None, type_resolver: GraphQLTypeResolver | None = None, subscribe_field_resolver: GraphQLFieldResolver | None = None, max_coercion_errors: int = 50, enable_early_execution: bool = False, middleware: Middleware | None = None, is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None, is_async_iterable: Callable[[Any], TypeGuard[AsyncIterable]] | None = None, hide_suggestions: bool = False, abort_signal: AbortSignal | None = None, hooks: ExecutionHooks | None = None, **custom_args: Any) list[GraphQLError] | Executor
Build an executor
Constructs an Executor object from the arguments passed to execute, which we will pass throughout the other execution methods.
Throws a GraphQLError if a valid executor cannot be created.
For internal use only.
- build_async_stream_item_queue(initial_index: int, stream_path: Path, async_iterator: AsyncIterator[Any], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList]) list[BoxedAwaitableOrValue[StreamItemResult] | Callable[[], BoxedAwaitableOrValue[StreamItemResult]]]
Build async stream item queue.
- build_data_response(data: dict[str, Any], incremental_data_records: Sequence[PendingExecutionGroup | StreamRecord] | None) ExecutionResult | ExperimentalIncrementalExecutionResults
Build the data response.
- build_per_event_executor(payload: Any) Executor
Create a copy of the executor for usage with subscribe events.
- build_resolve_info(field_def: GraphQLField, field_nodes: list[FieldNode], parent_type: GraphQLObjectType, path: Path) GraphQLResolveInfo
Build the GraphQLResolveInfo object.
For internal use only.
- build_sub_execution_plan(original_grouped_field_set: dict[str, list[FieldDetails]], defer_usage_set: RefSet[DeferUsage] | None) ExecutionPlan
Build a cached sub-execution plan.
- build_sync_stream_item_queue(initial_item: Awaitable[Any] | Any, initial_index: int, stream_path: Path, iterator: Iterable[Any], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList]) list[BoxedAwaitableOrValue[StreamItemResult] | Callable[[], BoxedAwaitableOrValue[StreamItemResult]]]
Build sync stream item queue.
- async cancel_incremental_work() None
Cancel all pending incremental work and close the stream sources.
Cancels the still pending incremental execution tasks first and waits for their cancellation to settle, so that no early execution continues and no iteration is pending on the stream sources any more, then triggers and awaits the early return of all remaining cancellable streams.
- cancellable_iterable(iterable: AsyncIterable[T]) AsyncIterable[T]
Wrap an async iterable so pending iteration is cancelled on abort.
When the abort signal is triggered, any pending
__anext__call returns immediately by raising the abort reason. This mirrors the JavaScriptcancellableIterable; GraphQL-core needs noAbortSignalListenerclass sincewith_abort_signal()already provides the cancellation mechanism.
- cancellable_streams: set[CancellableStreamRecord] | None
- collect_and_execute_subfields(return_type: GraphQLObjectType, field_details_list: list[FieldDetails], path: Path, result: Any, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Collect sub-fields to execute to complete this value.
- collect_execution_groups(parent_type: GraphQLObjectType, source_value: Any, path: Path | None, parent_defer_usages: RefSet[DeferUsage] | None, new_grouped_field_sets: RefMap[RefSet[DeferUsage], dict[str, list[FieldDetails]]], defer_map: RefMap[DeferUsage, DeferredFragmentRecord]) list[PendingExecutionGroup]
Execute deferred grouped field sets.
- collect_subfields(return_type: GraphQLObjectType, field_details_list: list[FieldDetails]) CollectedFields
Collect subfields.
A memoized function collecting relevant subfields regarding the return type. Memoizing ensures the subfields are not repeatedly calculated, which saves overhead when resolving lists of values.
- complete_abstract_value(return_type: GraphQLInterfaceType | GraphQLUnionType, field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, result: Any, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Complete an abstract value.
Complete a value of an abstract type by determining the runtime object type of that value, then complete the value for that type.
- async complete_async_iterator_value(item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, async_iterator: AsyncIterator[Any], incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) GraphQLWrappedResult[list[Any]]
Complete an async iterator.
Complete an async iterator value by completing the result and calling recursively until all the results are completed.
- async complete_awaitable_list_item_value(item: Any, parent: GraphQLWrappedResult[list[Any]], item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_path: Path, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Any
Complete an awaitable list item value.
- async complete_awaitable_value(return_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, result: Any, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) GraphQLWrappedResult[Any]
Complete an awaitable value.
- complete_iterable_value(item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, items: Iterable[Any], incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[list[Any]]] | GraphQLWrappedResult[list[Any]]
Complete an iterable value.
- static complete_leaf_value(return_type: GraphQLScalarType | GraphQLEnumType, result: Any) Any
Complete a leaf value.
Complete a Scalar or Enum by coercing to a valid value, returning null if coercion is not possible.
- complete_list_item_value(item: Any, complete_results: list[Any], parent: GraphQLWrappedResult[list[Any]], item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_path: Path, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) bool
Complete a list item value by adding it to the completed results.
Returns True if the value is awaitable.
- complete_list_value(return_type: GraphQLList[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList]], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, result: AsyncIterable[Any] | Iterable[Any], incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[list[Any]]] | GraphQLWrappedResult[list[Any]]
Complete a list value.
Complete a list value by completing each item in the list with the inner type.
- complete_object_value(return_type: GraphQLObjectType, field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, result: Any, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Complete an Object value by executing all sub-selections.
- complete_stream_item(item_path: Path, item: Any, incremental_context: IncrementalContext, field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList]) Awaitable[StreamItemResult] | StreamItemResult
Complete the stream items.
- complete_value(return_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, path: Path, result: Any, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[Any]] | GraphQLWrappedResult[Any]
Complete a value.
Implements the instructions for completeValue as defined in the “Value completion” section of the spec.
If the field type is Non-Null, then this recursively completes the value for the inner type. It throws a field error if that completion returns null, as per the “Nullability” section of the spec.
If the field type is a List, then this recursively completes the value for the inner type on each item in the list.
If the field type is a Scalar or Enum, ensures the completed value is a legal value of the type by calling the
coerce_output_valuemethod of GraphQL type definition.If the field is an abstract type, determine the runtime type of the value and then complete based on that type.
Otherwise, the field type expects a sub-selection set, and will complete the value by evaluating all sub-selections.
- context_value: Any
- create_aborted_execution_error(result: Awaitable[Any] | Any) AbortedGraphQLExecutionError
Create an aborted execution error exposing the given result.
- enable_early_execution: bool
- ensure_valid_runtime_type(runtime_type_name: Any, return_type: GraphQLInterfaceType | GraphQLUnionType, field_details_list: list[FieldDetails], info: GraphQLResolveInfo, result: Any) GraphQLObjectType
Ensure that the given type is valid at runtime.
- error_propagation: bool
- errors: list[GraphQLError] | None
- execute_execution_group(pending_execution_group: PendingExecutionGroup, parent_type: GraphQLObjectType, source_value: Any, path: Path | None, grouped_field_set: dict[str, list[FieldDetails]], incremental_context: IncrementalContext, defer_map: RefMap[DeferUsage, DeferredFragmentRecord]) Awaitable[SuccessfulExecutionGroup | FailedExecutionGroup] | SuccessfulExecutionGroup | FailedExecutionGroup
Execute deferred grouped field set.
- execute_execution_plan(return_type: GraphQLObjectType, source_value: Any, new_defer_usages: list[DeferUsage], execution_plan: ExecutionPlan, path: Path | None = None, incremental_context: IncrementalContext | None = None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None = None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Execute an execution plan.
- execute_field(parent_type: GraphQLObjectType, source: Any, field_details_list: FieldDetailsList, path: Path, incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) AwaitableOrValue[GraphQLWrappedResult[Any]] | UndefinedType
Resolve the field on the given source object.
Implements the “Executing fields” section of the spec.
In particular, this method figures out the value that the field returns by calling its resolve function, then calls complete_value to await coroutine objects, coercing scalars, or execute the sub-selection-set for objects.
- execute_fields(parent_type: GraphQLObjectType, source_value: Any, path: Path | None, grouped_field_set: dict[str, list[FieldDetails]], incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Execute the given fields concurrently.
Implements the “Executing selection sets” section of the spec for fields that may be executed in parallel.
- execute_fields_serially(parent_type: GraphQLObjectType, source_value: Any, path: Path | None, grouped_field_set: dict[str, list[FieldDetails]], incremental_context: IncrementalContext | None, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Execute the given fields serially.
Implements the “Executing selection sets” section of the spec for fields that must be executed serially.
- execute_operation(serially: bool | None = None) Awaitable[ExecutionResult | ExperimentalIncrementalExecutionResults] | ExecutionResult | ExperimentalIncrementalExecutionResults
Execute an operation.
Implements the “Executing operations” section of the spec.
Return a possible coroutine object that will eventually yield the data described by the “Response” section of the GraphQL specification.
If errors are encountered while executing a GraphQL field, only that field and its descendants will be omitted, and sibling fields will still be executed. An execution which encounters errors will still result in a coroutine object that can be executed without errors.
Errors from sub-fields of a NonNull type may propagate to the top level, at which point we still log the error and null the parent field, which in this case is the entire response.
If the operation is aborted, the whole operation is rejected with an aborted execution error rather than resolving to a partial response with located errors; the partial result that the unwinding execution can still produce is exposed on that error.
- execute_root_grouped_field_set(root_type: GraphQLObjectType, root_value: Any, grouped_field_set: dict[str, list[FieldDetails]], serially: bool, defer_map: RefMap[DeferUsage, DeferredFragmentRecord] | None) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Execute the root grouped field set.
- field_resolver: GraphQLFieldResolver
- finish(result: T) T
Check that execution has not been aborted before returning its result.
If the operation has been aborted during otherwise synchronous execution, raise an aborted execution error exposing the already completed result.
- fragment_definitions: dict[str, FragmentDefinitionNode]
- fragments: dict[str, FragmentDetails]
- gather_async_work(values: Sequence[Awaitable[Any]]) Awaitable[list[Any]]
Concurrently await the given values as one unit of asynchronous work.
This allows resolvers to await multiple concurrent operations together. When one of the values fails, the others are cancelled and settled before the error is propagated, so that no asynchronous work is orphaned.
- async get_next_async_stream_item_result(stream_item_queue: list[BoxedAwaitableOrValue[StreamItemResult] | Callable[[], BoxedAwaitableOrValue[StreamItemResult]]], stream_path: Path, index: int, async_iterator: AsyncIterator[Any], field_details_list: list[FieldDetails], info: GraphQLResolveInfo, item_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList]) StreamItemResult
Get the next async stream items result.
- get_stream_usage(field_details_list: list[FieldDetails], path: Path) StreamUsage | None
Get stream usage.
Returns an object containing info for streaming if a field should be streamed based on the experimental flag, stream directive present and not disabled by the “if” argument.
- handle_field_error(raw_error: Exception, return_type: GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList | GraphQLNonNull[GraphQLScalarType | GraphQLObjectType | GraphQLInterfaceType | GraphQLUnionType | GraphQLEnumType | GraphQLList], field_details_list: list[FieldDetails], path: Path, incremental_context: IncrementalContext | None = None) None
Handle error properly according to the field type.
- hide_suggestions: bool
- hooks: ExecutionHooks | None
- static is_async_iterable(value: Any) TypeGuard[AsyncIterable]
Return True if object is an asynchronous iterable.
Instead of testing whether the object is an instance of abc.AsyncIterable, we check the existence of an __aiter__ attribute. This is much faster.
- static is_awaitable(value: Any) TypeGuard[Awaitable]
Return True if object can be passed to an
awaitexpression.Instead of testing whether the object is an instance of abc.Awaitable, we check the existence of an __await__ attribute. This is much faster.
- middleware_manager: MiddlewareManager | None
- operation: OperationDefinitionNode
- pending_incremental_futures: set[Future[Any]]
- root_value: Any
- run_async_work_finished_hook() None
Run the hook signaling that all asynchronous work has finished.
If an
async_work_finishedexecution hook is provided, run it as soon as all tracked pending asynchronous work has been settled - synchronously when there is no pending asynchronous work, which allows synchronous execution paths to remain synchronous. Errors raised by the hook are ignored.
- schema: GraphQLSchema
- settle_in_background(awaitables: list[Awaitable[Any]]) None
Settle the given pending awaitables in the background.
A bubbling synchronous error must not wait for pending sibling awaitables, but they must still be settled so that their errors can be observed before they would be orphaned (the Python analog of silencing JS unhandled rejections). Without a running event loop the awaitables can never run; pending coroutines are then closed instead to dispose of them.
- subscribe_field_resolver: GraphQLFieldResolver
- track_async_work(values: Sequence[Any]) None
Track possibly awaitable values as pending asynchronous work.
Awaitables among the given values are settled in the background, so that they are still settled and their errors observed when they would otherwise be abandoned. Non-awaitable values are ignored.
- type_resolver: GraphQLTypeResolver
- variable_values: VariableValues
- async with_abort_signal(awaitable: Awaitable[T]) T
Await a value, but cancel immediately if the abort signal is triggered.
This wraps awaitables returned by resolvers (and awaitable list items) so that a triggered abort signal interrupts execution immediately instead of only at the next field boundary. Without this, a hanging asynchronous resolver would prevent the operation from ever being cancelled.
If the abort signal fires before the awaitable settles, the underlying awaitable is cancelled and the abort reason is raised (an exception reason is raised as is, any other value is reported as an unexpected error value).
- async with_aborted_execution_error(awaitable: Awaitable[T]) T
Await a result, but raise an aborted execution error when aborted.
Unlike
with_abort_signal(), the awaited work is not cancelled as a whole when the abort signal is triggered: only its in-flight resolvers are cancelled individually, so that it still settles into a partial result, which is exposed via the raised aborted execution error.
- with_new_execution_groups(result: Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]], new_pending_execution_groups: list[PendingExecutionGroup]) Awaitable[GraphQLWrappedResult[dict[str, Any]]] | GraphQLWrappedResult[dict[str, Any]]
Add new pending execution groups to result.
- class graphql.execution.ExecutionHooks(async_work_finished: Callable[[AsyncWorkFinishedInfo], None] | None = None)
Bases:
NamedTupleHooks for observing the execution of a GraphQL operation.
The
async_work_finishedhook is run when all asynchronous work tracked by the execution has finished. Cancelled asynchronous work may still be running even after the result has been delivered; this hook allows interested execution harnesses to track when this asynchronous work completes. Errors raised by the hook are ignored.- async_work_finished: Callable[[AsyncWorkFinishedInfo], None] | None
Alias for field number 0
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- class graphql.execution.AsyncWorkFinishedInfo(executor: Executor)
Bases:
NamedTupleInformation passed to the
async_work_finishedexecution hook.- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- exception graphql.execution.AbortedGraphQLExecutionError(reason: Any, result: AwaitableOrValue[Any])
Bases:
ExceptionAn error raised when execution is aborted while work is still resolving.
The message is derived from the abort reason, which is also available via the
reasonattribute (and as__cause__when it is an exception). The partial result that the aborted execution can still produce while unwinding is exposed asaborted_result. It is usually provided as an awaitable, since execution has not finished when the error is raised; it is provided as a plain value when the execution was aborted internally during synchronous execution.- aborted_result: AwaitableOrValue[Any]
- add_note(note, /)
Add a note to the exception
- args
- reason: Any
- with_traceback(tb, /)
Set self.__traceback__ to tb and return self.
- class graphql.execution.ExecutionResult(data: dict[str, Any] | None = None, errors: list[GraphQLError] | None = None, extensions: dict[str, Any] | None = None)
Bases:
objectThe result of GraphQL execution.
datais the result of a successful execution of the query.errorsis included when any errors occurred as a non-empty list.extensionsis reserved for adding non-standard properties.
- data: dict[str, Any] | None
- errors: list[GraphQLError] | None
- extensions: dict[str, Any] | None
- property formatted: FormattedExecutionResult
Get execution result formatted according to the specification.
- class graphql.execution.FormattedExecutionResult
Bases:
TypedDictFormatted execution result
- data: dict[str, Any] | None
- errors: list[GraphQLFormattedError]
- extensions: dict[str, Any]
- class graphql.execution.ExperimentalIncrementalExecutionResults(initial_result: InitialIncrementalExecutionResult, subsequent_results: AsyncGenerator[SubsequentIncrementalExecutionResult, None])
Bases:
NamedTupleExecution results when retrieved incrementally.
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- initial_result: InitialIncrementalExecutionResult
Alias for field number 0
- subsequent_results: AsyncGenerator[SubsequentIncrementalExecutionResult, None]
Alias for field number 1
- class graphql.execution.InitialIncrementalExecutionResult(data: dict[str, Any] | None = None, errors: list[GraphQLError] | None = None, pending: list[PendingResult] | None = None, has_next: bool = False, extensions: dict[str, Any] | None = None)
Bases:
objectInitial incremental execution result.
- data: dict[str, Any] | None
- errors: list[GraphQLError] | None
- extensions: dict[str, Any] | None
- property formatted: FormattedInitialIncrementalExecutionResult
Get execution result formatted according to the specification.
- has_next: bool
- pending: list[PendingResult]
- class graphql.execution.FormattedInitialIncrementalExecutionResult
Bases:
TypedDictFormatted initial incremental execution result
- data: NotRequired[dict[str, Any] | None]
- errors: NotRequired[list[GraphQLFormattedError]]
- extensions: NotRequired[dict[str, Any]]
- hasNext: bool
- incremental: list[FormattedIncrementalResult]
- pending: list[FormattedPendingResult]
- class graphql.execution.SubsequentIncrementalExecutionResult(has_next: bool = False, pending: list[PendingResult] | None = None, incremental: list[IncrementalDeferResult | IncrementalStreamResult] | None = None, completed: list[CompletedResult] | None = None, extensions: dict[str, Any] | None = None)
Bases:
objectSubsequent incremental execution result.
- completed: list[CompletedResult] | None
- extensions: dict[str, Any] | None
- property formatted: FormattedSubsequentIncrementalExecutionResult
Get execution result formatted according to the specification.
- has_next: bool
- incremental: list[IncrementalDeferResult | IncrementalStreamResult] | None
- pending: list[PendingResult] | None
- class graphql.execution.FormattedSubsequentIncrementalExecutionResult
Bases:
TypedDictFormatted subsequent incremental execution result
- completed: NotRequired[list[FormattedCompletedResult]]
- extensions: NotRequired[dict[str, Any]]
- hasNext: bool
- incremental: NotRequired[list[FormattedIncrementalResult]]
- pending: NotRequired[list[FormattedPendingResult]]
- class graphql.execution.IncrementalDeferResult(data: dict[str, Any], id: str, sub_path: list[str | int] | None = None, errors: list[GraphQLError] | None = None, extensions: dict[str, Any] | None = None)
Bases:
ExecutionGroupResultIncremental deferred execution result
- data
- errors
- extensions: dict[str, Any] | None
- property formatted: FormattedIncrementalDeferResult
Get execution result formatted according to the specification.
- id: str
- sub_path: list[str | int] | None
- class graphql.execution.FormattedIncrementalDeferResult
Bases:
TypedDictFormatted incremental deferred execution result
- data: dict[str, Any]
- errors: NotRequired[list[GraphQLFormattedError]]
- extensions: NotRequired[dict[str, Any]]
- id: str
- subPath: NotRequired[list[str | int]]
- class graphql.execution.IncrementalStreamResult(items: list[Any], id: str, sub_path: list[str | int] | None = None, errors: list[GraphQLError] | None = None, extensions: dict[str, Any] | None = None)
Bases:
StreamItemsRecordResultIncremental streamed execution result
- errors
- extensions: dict[str, Any] | None
- property formatted: FormattedIncrementalStreamResult
Get execution result formatted according to the specification.
- id: str
- items
- sub_path: list[str | int] | None
- class graphql.execution.FormattedIncrementalStreamResult
Bases:
TypedDictFormatted incremental stream execution result
- errors: NotRequired[list[GraphQLFormattedError]]
- extensions: NotRequired[dict[str, Any]]
- id: str
- subPath: NotRequired[list[str | int]]
- graphql.execution.IncrementalResult
alias of
IncrementalDeferResult|IncrementalStreamResult
- graphql.execution.FormattedIncrementalResult
alias of
FormattedIncrementalDeferResult|FormattedIncrementalStreamResult
- graphql.execution.subscribe(schema: GraphQLSchema, document: DocumentNode, root_value: Any = None, context_value: Any = None, variable_values: dict[str, Any] | None = None, operation_name: str | None = None, field_resolver: Callable[[...], Any] | None = None, type_resolver: Callable[[Any, GraphQLResolveInfo, GraphQLInterfaceType | GraphQLUnionType], Awaitable[str | None] | str | None] | None = None, subscribe_field_resolver: Callable[[...], Any] | None = None, max_coercion_errors: int = 50, enable_early_execution: bool = False, executor_class: type[Executor] | None = None, middleware: MiddlewareManager | None = None, hide_suggestions: bool = False, **custom_context_args: Any) Awaitable[AsyncIterator[ExecutionResult] | ExecutionResult] | AsyncIterator[ExecutionResult] | ExecutionResult
Create a GraphQL subscription.
Implements the “Subscribe” algorithm described in the GraphQL spec.
Returns a coroutine object which yields either an AsyncIterator (if successful) or an ExecutionResult (client error). The coroutine will raise an exception if a server error occurs.
If the client-provided arguments to this function do not result in a compliant subscription, a GraphQL Response (ExecutionResult) with descriptive errors and no data will be returned.
If the source stream could not be created due to faulty subscription resolver logic or underlying systems, the coroutine object will yield a single ExecutionResult containing
errorsand nodata.If the operation succeeded, the coroutine will yield an AsyncIterator, which yields a stream of ExecutionResults representing the response stream.
This function does not support incremental delivery (@defer and @stream). If an operation that defers or streams data is executed with this function, a field error will be raised at the location of the @defer or @stream directive.
To customize how each subscription event is executed, compose the subscription pipeline directly instead of calling this function: build an executor with
Executor.build(), resolve the source event stream withcreate_source_event_stream(), and map it to the response stream withmap_source_to_response_event(), passing a customroot_selection_set_executor.
- graphql.execution.execute_subscription_event(executor: Executor) Awaitable[ExecutionResult] | ExecutionResult
Execute a single subscription event.
This is the default
root_selection_set_executorused bymap_source_to_response_event(). It provides the “ExecuteSubscriptionEvent” algorithm described in the GraphQL specification, which is nearly identical to the “ExecuteQuery” algorithm. A custom executor may wrap this function to set up and tear down a per-event executor.The passed executor should be a per-event executor as created by
Executor.build_per_event_executor().
- graphql.execution.create_source_event_stream(executor: Executor) Awaitable[AsyncIterable[Any] | ExecutionResult] | AsyncIterable[Any] | ExecutionResult
Create source event stream
Implements the “CreateSourceEventStream” algorithm described in the GraphQL specification, resolving the subscription source event stream for a previously built executor.
Returns a coroutine that yields an AsyncIterable.
If the built executor is invalid, or if the resolved event stream is not an async iterable, a GraphQL Response (ExecutionResult) with descriptive errors and no data will be returned.
If the source stream could not be created due to faulty subscription resolver logic or underlying systems, the coroutine object will yield a single ExecutionResult containing
errorsand nodata.A source event stream represents a sequence of events, each of which triggers a GraphQL execution for that event.
This may be useful when hosting the stateful subscription service in a different process or machine than the stateless GraphQL execution engine, or otherwise separating these two steps. For more on this, see the “Supporting Subscriptions at Scale” information in the GraphQL spec.
- graphql.execution.map_source_to_response_event(executor: ~graphql.execution.execute.Executor, source_event_stream: ~collections.abc.AsyncIterable[~typing.Any], root_selection_set_executor: ~collections.abc.Callable[[~graphql.execution.execute.Executor], ~collections.abc.Awaitable[~graphql.execution.types.ExecutionResult] | ~graphql.execution.types.ExecutionResult] = <function execute_subscription_event>) AsyncGenerator[ExecutionResult, None]
Map a subscription source event stream to a response event stream.
Implements the “MapSourceToResponseEvent” algorithm described in the GraphQL specification, mapping each event from a subscription source event stream to an ExecutionResult in the response stream.
For each payload yielded from the source event stream, it is mapped over the normal GraphQL
execute()function, withpayloadas theroot_value. Each event is executed with the givenroot_selection_set_executor, which defaults toexecute_subscription_event()(providing the “ExecuteSubscriptionEvent” algorithm) but can be overridden to set up and tear down a custom executor around the execution of each event.
- graphql.execution.RootSelectionSetExecutor
alias of
Callable[[Executor],Awaitable[ExecutionResult] |ExecutionResult]
- graphql.execution.Middleware
alias of
tuple|list|MiddlewareManager|None
- class graphql.execution.MiddlewareManager(*middlewares: Any)
Bases:
objectManager for the middleware chain.
This class helps to wrap resolver functions with the provided middleware functions and/or objects. The functions take the next middleware function as first argument. If middleware is provided as an object, it must provide a method
resolvethat is used as the middleware function.Note that since resolvers return “AwaitableOrValue”s, all middleware functions must be aware of this and check whether values are awaitable before awaiting them.
- get_field_resolver(field_resolver: Callable[[...], Any]) Callable[[...], Any]
Wrap the provided resolver with the middleware.
Returns a function that chains the middleware functions with the provided resolver function.
- middlewares
- graphql.execution.get_directive_values(directive_def: GraphQLDirective, node: DirectiveDefinitionNode | DirectiveExtensionNode | EnumValueDefinitionNode | ExecutableDefinitionNode | FieldDefinitionNode | InputValueDefinitionNode | SelectionNode | SchemaDefinitionNode | TypeDefinitionNode | TypeExtensionNode, variable_values: VariableValues | None = None, fragment_variable_values: FragmentVariableValues | None = None, hide_suggestions: bool = False) dict[str, Any] | None
Get coerced argument values based on provided nodes.
Prepares a dict of argument values given a directive definition and an AST node which may contain directives. Optionally also accepts a dict of variable values.
If the directive does not exist on the node, returns None.
- graphql.execution.get_variable_values(schema: GraphQLSchema, var_def_nodes: Collection[VariableDefinitionNode], inputs: dict[str, Any], max_errors: int | None = None, hide_suggestions: bool = False) VariableValuesOrErrors
Get coerced variable values based on provided definitions.
Prepares an object map of variable values of the correct type based on the provided variable definitions and arbitrary input. If the input cannot be parsed to match the variable definitions, a GraphQLError will be raised.
- class graphql.execution.VariableValues(sources: dict[str, VariableValueSource], coerced: dict[str, Any])
Bases:
NamedTupleThe coerced values of the variables and their original sources.
- coerced: dict[str, Any]
Alias for field number 1
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- sources: dict[str, VariableValueSource]
Alias for field number 0