-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Added ExecutionInput cancellation #3880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5f3e632
9c12ab3
9a1fe99
d68efe9
cf2bc61
61ae7b0
136301e
fd302f0
033ba93
bd4a098
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -483,7 +483,6 @@ private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInpu | |
| private PreparsedDocumentEntry parseAndValidate(AtomicReference<ExecutionInput> executionInputRef, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) { | ||
|
|
||
| ExecutionInput executionInput = executionInputRef.get(); | ||
| String query = executionInput.getQuery(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. never used. Grey code. took the opportunity to remove it |
||
|
|
||
| ParseAndValidateResult parseResult = parse(executionInput, graphQLSchema, instrumentationState); | ||
| if (parseResult.isFailure()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,11 +22,12 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc | |
| } | ||
|
|
||
| protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) { | ||
| return (List<Object> results, Throwable exception) -> executionContext.run(() -> { | ||
| return (List<Object> results, Throwable exception) -> executionContext.run(exception, () -> { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whenever we call There is not point throwing a "cancelled" exception if we are already in exception |
||
| if (exception != null) { | ||
| handleNonNullException(executionContext, overallResult, exception); | ||
| return; | ||
| } | ||
|
|
||
| Map<String, Object> resolvedValuesByField = Maps.newLinkedHashMapWithExpectedSize(fieldNames.size()); | ||
| int ix = 0; | ||
| for (Object result : results) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,45 @@ | ||
| package graphql.execution; | ||
|
|
||
| import graphql.ExecutionInput; | ||
| import graphql.ExperimentalApi; | ||
| import graphql.GraphQLContext; | ||
| import org.jspecify.annotations.NullMarked; | ||
|
|
||
| /** | ||
| * This class lets you observe the running state of the graphql-java engine. As it processes and dispatches graphql fields, | ||
| * the engine moves in and out of a running and not running state. As it does this, the callback is called with information telling you the current | ||
| * state. | ||
| * <p> | ||
| * If the engine is cancelled via {@link ExecutionInput#cancel()} then the observer will also be called to indicate that. | ||
| */ | ||
| @ExperimentalApi | ||
| @NullMarked | ||
| public interface EngineRunningObserver { | ||
|
|
||
| enum RunningState { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add started and finished here to make it complete?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not .., we have ways to detect that |
||
| /** | ||
| * Represents that the engine code is actively running its own code | ||
| */ | ||
| RUNNING, | ||
| /** | ||
| * Represents that the engine code is asynchronously waiting for fetching to happen | ||
| */ | ||
| NOT_RUNNING, | ||
| /** | ||
| * Represents that the engine code has been cancelled via {@link ExecutionInput#cancel()} | ||
| */ | ||
| CANCELLED | ||
| } | ||
|
|
||
|
|
||
| String ENGINE_RUNNING_OBSERVER_KEY = "__ENGINE_RUNNING_OBSERVER"; | ||
|
|
||
| void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, boolean runningState); | ||
|
|
||
| /** | ||
| * This will be called when the running state of the graphql-java engine changes. | ||
| * | ||
| * @param executionId the id of the current execution | ||
| * @param graphQLContext the graphql context | ||
| */ | ||
| void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, RunningState runningState); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made i an enum versus a boolean |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| import graphql.Internal; | ||
| import graphql.PublicApi; | ||
| import graphql.collect.ImmutableKit; | ||
| import graphql.execution.EngineRunningObserver.RunningState; | ||
| import graphql.execution.incremental.IncrementalCallState; | ||
| import graphql.execution.instrumentation.Instrumentation; | ||
| import graphql.execution.instrumentation.InstrumentationState; | ||
|
|
@@ -35,6 +36,10 @@ | |
| import java.util.function.Consumer; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static graphql.execution.EngineRunningObserver.RunningState.CANCELLED; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING; | ||
| import static graphql.execution.EngineRunningObserver.RunningState.RUNNING; | ||
|
|
||
| @SuppressWarnings("TypeParameterUnusedInFormals") | ||
| @PublicApi | ||
| public class ExecutionContext { | ||
|
|
@@ -371,48 +376,89 @@ public boolean isRunning() { | |
| return isRunning.get() > 0; | ||
| } | ||
|
|
||
| public void incrementRunning() { | ||
| if (isRunning.incrementAndGet() == 1 && engineRunningObserver != null) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, true); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. never needed to be public. Never used publically |
||
| private void incrementRunning(Throwable throwable) { | ||
| checkIsCancelled(throwable); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we now check if we are cancelled in a common place |
||
|
|
||
| if (isRunning.incrementAndGet() == 1) { | ||
| changeOfState(RUNNING); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changeOfState encapsulates the null check |
||
| } | ||
| } | ||
|
|
||
| public void decrementRunning() { | ||
| if (isRunning.decrementAndGet() == 0 && engineRunningObserver != null) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, false); | ||
| private void decrementRunning(Throwable throwable) { | ||
| checkIsCancelled(throwable); | ||
|
|
||
| if (isRunning.decrementAndGet() == 0) { | ||
| changeOfState(NOT_RUNNING); | ||
| } | ||
| } | ||
|
|
||
| @Internal | ||
| public void incrementRunning(CompletableFuture<?> cf) { | ||
| cf.whenComplete((result, throwable) -> { | ||
| incrementRunning(); | ||
| incrementRunning(throwable); | ||
| }); | ||
| } | ||
|
|
||
| @Internal | ||
| public void decrementRunning(CompletableFuture<?> cf) { | ||
| cf.whenComplete((result, throwable) -> { | ||
| decrementRunning(); | ||
| decrementRunning(throwable); | ||
| }); | ||
|
|
||
| } | ||
|
|
||
| @Internal | ||
| public <T> T call(Supplier<T> callable) { | ||
| incrementRunning(); | ||
| return call(null, callable); | ||
| } | ||
|
|
||
| @Internal | ||
| public <T> T call(Throwable throwable, Supplier<T> callable) { | ||
| incrementRunning(throwable); | ||
| try { | ||
| return callable.get(); | ||
| } finally { | ||
| decrementRunning(); | ||
| decrementRunning(throwable); | ||
| } | ||
| } | ||
|
|
||
| @Internal | ||
| public void run(Runnable runnable) { | ||
| incrementRunning(); | ||
| run(null, runnable); | ||
| } | ||
|
|
||
| @Internal | ||
| public void run(Throwable throwable, Runnable runnable) { | ||
| incrementRunning(throwable); | ||
| try { | ||
| runnable.run(); | ||
| } finally { | ||
| decrementRunning(); | ||
| decrementRunning(throwable); | ||
| } | ||
| } | ||
|
|
||
| private void checkIsCancelled(Throwable currentThrowable) { | ||
| // no need to check we are cancelled if we already have an exception in play | ||
| // since it can lead to an exception being thrown when an exception has already been | ||
| // thrown | ||
| if (currentThrowable == null) { | ||
| checkIsCancelled(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This will abort the execution via {@link AbortExecutionException} if the {@link ExecutionInput} has been cancelled | ||
| */ | ||
| private void checkIsCancelled() { | ||
| if (executionInput.isCancelled()) { | ||
| changeOfState(CANCELLED); | ||
| throw new AbortExecutionException("Execution has been asked to be cancelled"); | ||
| } | ||
| } | ||
|
|
||
| private void changeOfState(RunningState runningState) { | ||
| if (engineRunningObserver != null) { | ||
| engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,19 +68,18 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont | |
|
|
||
| // | ||
| // when the upstream source event stream completes, subscribe to it and wire in our adapter | ||
| CompletableFuture<ExecutionResult> overallResult = sourceEventStream.thenApply((publisher) -> { | ||
| return executionContext.call(() -> { | ||
| if (publisher == null) { | ||
| ExecutionResultImpl executionResult = new ExecutionResultImpl(null, executionContext.getErrors()); | ||
| CompletableFuture<ExecutionResult> overallResult = sourceEventStream.thenApply((publisher) -> | ||
| executionContext.call(() -> { | ||
| if (publisher == null) { | ||
| ExecutionResultImpl executionResult = new ExecutionResultImpl(null, executionContext.getErrors()); | ||
| return executionResult; | ||
| } | ||
| Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload); | ||
| boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext()); | ||
| SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered); | ||
| ExecutionResultImpl executionResult = new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors()); | ||
| return executionResult; | ||
| } | ||
| Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload); | ||
| boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext()); | ||
| SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered); | ||
| ExecutionResultImpl executionResult = new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors()); | ||
| return executionResult; | ||
| }); | ||
| }); | ||
| })); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like formatting |
||
|
|
||
| // dispatched the subscription query | ||
| executionStrategyCtx.onDispatched(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can rename because its private method. This is a transferring of the context to a new instance