Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions src/main/java/graphql/ExecutionInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import static graphql.Assert.assertNotNull;
import static graphql.execution.instrumentation.dataloader.EmptyDataLoaderRegistryInstance.EMPTY_DATALOADER_REGISTRY;
Expand All @@ -29,6 +29,7 @@ public class ExecutionInput {
private final DataLoaderRegistry dataLoaderRegistry;
private final ExecutionId executionId;
private final Locale locale;
private final AtomicBoolean cancelled;


@Internal
Expand All @@ -44,6 +45,7 @@ private ExecutionInput(Builder builder) {
this.locale = builder.locale != null ? builder.locale : Locale.getDefault(); // always have a locale in place
this.localContext = builder.localContext;
this.extensions = builder.extensions;
this.cancelled = builder.cancelled;
}

/**
Expand Down Expand Up @@ -139,6 +141,28 @@ public Map<String, Object> getExtensions() {
return extensions;
}


/**
* The graphql engine will check this frequently and if that is true, it will
* throw a {@link graphql.execution.AbortExecutionException} to cancel the execution.
* <p>
* This is a cooperative cancellation. Some asynchronous data fetching code may still continue to
* run but there will be no more efforts run future field fetches say.
*
* @return true if the execution should be cancelled
*/
public boolean isCancelled() {
return cancelled.get();
}

/**
* This can be called to cancel the graphql execution. Remember this is a cooperative cancellation
* and the graphql engine needs to be running on a thread to allow is to respect this flag.
*/
public void cancel() {
cancelled.set(true);
}

/**
* This helps you transform the current ExecutionInput object into another one by starting a builder with all
* the current values and allows you to transform it how you want.
Expand All @@ -152,7 +176,8 @@ public ExecutionInput transform(Consumer<Builder> builderConsumer) {
.query(this.query)
.operationName(this.operationName)
.context(this.context)
.transfer(this.graphQLContext)
.internalTransferContext(this.graphQLContext)
.internalTransferCancelBoolean(this.cancelled)
.localContext(this.localContext)
.root(this.root)
.dataLoaderRegistry(this.dataLoaderRegistry)
Expand Down Expand Up @@ -208,14 +233,15 @@ public static class Builder {
private Object localContext;
private Object root;
private RawVariables rawVariables = RawVariables.emptyVariables();
public Map<String, Object> extensions = ImmutableKit.emptyMap();
private Map<String, Object> extensions = ImmutableKit.emptyMap();
//
// this is important - it allows code to later known if we never really set a dataloader and hence it can optimize
// dataloader field tracking away.
//
private DataLoaderRegistry dataLoaderRegistry = EMPTY_DATALOADER_REGISTRY;
private Locale locale = Locale.getDefault();
private ExecutionId executionId;
private AtomicBoolean cancelled = new AtomicBoolean(false);

public Builder query(String query) {
this.query = assertNotNull(query, () -> "query can't be null");
Expand Down Expand Up @@ -306,11 +332,18 @@ public Builder graphQLContext(Map<?, Object> mapOfContext) {
}

// hidden on purpose
private Builder transfer(GraphQLContext graphQLContext) {
private Builder internalTransferContext(GraphQLContext graphQLContext) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename because its private method. This is a transferring of the context to a new instance

this.graphQLContext = Assert.assertNotNull(graphQLContext);
return this;
}

// hidden on purpose
private Builder internalTransferCancelBoolean(AtomicBoolean cancelled) {
this.cancelled = cancelled;
return this;
}


public Builder root(Object root) {
this.root = root;
return this;
Expand Down
1 change: 0 additions & 1 deletion src/main/java/graphql/GraphQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ private CompletableFuture<ExecutionResult> parseValidateAndExecute(ExecutionInpu
private PreparsedDocumentEntry parseAndValidate(AtomicReference<ExecutionInput> executionInputRef, GraphQLSchema graphQLSchema, InstrumentationState instrumentationState) {

ExecutionInput executionInput = executionInputRef.get();
String query = executionInput.getQuery();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never used. Grey code. took the opportunity to remove it


ParseAndValidateResult parseResult = parse(executionInput, graphQLSchema, instrumentationState);
if (parseResult.isFailure()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc
}

protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
return (List<Object> results, Throwable exception) -> executionContext.run(() -> {
return (List<Object> results, Throwable exception) -> executionContext.run(exception, () -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whenever we call run here we need to respect if an exception was thrown on the inside when we check if we are cancelled.

There is not point throwing a "cancelled" exception if we are already in exception

if (exception != null) {
handleNonNullException(executionContext, overallResult, exception);
return;
}

Map<String, Object> resolvedValuesByField = Maps.newLinkedHashMapWithExpectedSize(fieldNames.size());
int ix = 0;
for (Object result : results) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/graphql/execution/AsyncExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
executionStrategyCtx.onDispatched();

futures.await().whenComplete((completeValueInfos, throwable) -> {
executionContext.run(() -> {
executionContext.run(throwable,() -> {
List<String> fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames);

BiConsumer<List<Object>, Throwable> handleResultsConsumer = handleResults(executionContext, fieldsExecutedOnInitialResult, overallResult);
Expand All @@ -78,7 +78,7 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
});
}).exceptionally((ex) -> executionContext.call(() -> {
}).exceptionally((ex) -> executionContext.call(ex,() -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
// the execution does not hang.
Expand Down
32 changes: 31 additions & 1 deletion src/main/java/graphql/execution/EngineRunningObserver.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
package graphql.execution;

import graphql.ExecutionInput;
import graphql.ExperimentalApi;
import graphql.GraphQLContext;
import org.jspecify.annotations.NullMarked;

/**
* This class lets you observe the running state of the graphql-java engine. As it processes and dispatches graphql fields,
* the engine moves in and out of a running and not running state. As it does this, the callback is called with information telling you the current
* state.
* <p>
* If the engine is cancelled via {@link ExecutionInput#cancel()} then the observer will also be called to indicate that.
*/
@ExperimentalApi
@NullMarked
public interface EngineRunningObserver {

enum RunningState {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add started and finished here to make it complete?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not .., we have ways to detect that

/**
* Represents that the engine code is actively running its own code
*/
RUNNING,
/**
* Represents that the engine code is asynchronously waiting for fetching to happen
*/
NOT_RUNNING,
/**
* Represents that the engine code has been cancelled via {@link ExecutionInput#cancel()}
*/
CANCELLED
}


String ENGINE_RUNNING_OBSERVER_KEY = "__ENGINE_RUNNING_OBSERVER";

void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, boolean runningState);

/**
* This will be called when the running state of the graphql-java engine changes.
*
* @param executionId the id of the current execution
* @param graphQLContext the graphql context
*/
void runningStateChanged(ExecutionId executionId, GraphQLContext graphQLContext, RunningState runningState);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made i an enum versus a boolean

}
70 changes: 58 additions & 12 deletions src/main/java/graphql/execution/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import graphql.Internal;
import graphql.PublicApi;
import graphql.collect.ImmutableKit;
import graphql.execution.EngineRunningObserver.RunningState;
import graphql.execution.incremental.IncrementalCallState;
import graphql.execution.instrumentation.Instrumentation;
import graphql.execution.instrumentation.InstrumentationState;
Expand All @@ -35,6 +36,10 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import static graphql.execution.EngineRunningObserver.RunningState.CANCELLED;
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING;
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING;

@SuppressWarnings("TypeParameterUnusedInFormals")
@PublicApi
public class ExecutionContext {
Expand Down Expand Up @@ -371,48 +376,89 @@ public boolean isRunning() {
return isRunning.get() > 0;
}

public void incrementRunning() {
if (isRunning.incrementAndGet() == 1 && engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never needed to be public. Never used publically

private void incrementRunning(Throwable throwable) {
checkIsCancelled(throwable);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now check if we are cancelled in a common place


if (isRunning.incrementAndGet() == 1) {
changeOfState(RUNNING);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changeOfState encapsulates the null check

}
}

public void decrementRunning() {
if (isRunning.decrementAndGet() == 0 && engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, false);
private void decrementRunning(Throwable throwable) {
checkIsCancelled(throwable);

if (isRunning.decrementAndGet() == 0) {
changeOfState(NOT_RUNNING);
}
}

@Internal
public void incrementRunning(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
incrementRunning();
incrementRunning(throwable);
});
}

@Internal
public void decrementRunning(CompletableFuture<?> cf) {
cf.whenComplete((result, throwable) -> {
decrementRunning();
decrementRunning(throwable);
});

}

@Internal
public <T> T call(Supplier<T> callable) {
incrementRunning();
return call(null, callable);
}

@Internal
public <T> T call(Throwable throwable, Supplier<T> callable) {
incrementRunning(throwable);
try {
return callable.get();
} finally {
decrementRunning();
decrementRunning(throwable);
}
}

@Internal
public void run(Runnable runnable) {
incrementRunning();
run(null, runnable);
}

@Internal
public void run(Throwable throwable, Runnable runnable) {
incrementRunning(throwable);
try {
runnable.run();
} finally {
decrementRunning();
decrementRunning(throwable);
}
}

private void checkIsCancelled(Throwable currentThrowable) {
// no need to check we are cancelled if we already have an exception in play
// since it can lead to an exception being thrown when an exception has already been
// thrown
if (currentThrowable == null) {
checkIsCancelled();
}
}

/**
* This will abort the execution via {@link AbortExecutionException} if the {@link ExecutionInput} has been cancelled
*/
private void checkIsCancelled() {
if (executionInput.isCancelled()) {
changeOfState(CANCELLED);
throw new AbortExecutionException("Execution has been asked to be cancelled");
}
}

private void changeOfState(RunningState runningState) {
if (engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState);
}
}
}
8 changes: 4 additions & 4 deletions src/main/java/graphql/execution/ExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat
if (fieldValueInfosResult instanceof CompletableFuture) {
CompletableFuture<List<FieldValueInfo>> fieldValueInfos = (CompletableFuture<List<FieldValueInfo>>) fieldValueInfosResult;
fieldValueInfos.whenComplete((completeValueInfos, throwable) -> {
executionContext.run(() -> {
executionContext.run(throwable,() -> {
if (throwable != null) {
handleResultsConsumer.accept(null, throwable);
return;
Expand Down Expand Up @@ -280,7 +280,7 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat

private BiConsumer<List<Object>, Throwable> buildFieldValueMap(List<String> fieldNames, CompletableFuture<Map<String, Object>> overallResult, ExecutionContext executionContext) {
return (List<Object> results, Throwable exception) -> {
executionContext.run(() -> {
executionContext.run(exception,() -> {
if (exception != null) {
handleValueException(overallResult, exception, executionContext);
return;
Expand Down Expand Up @@ -488,7 +488,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
CompletableFuture<Object> fetchedValue = (CompletableFuture<Object>) fetchedObject;
executionContext.decrementRunning(fetchedValue);
CompletableFuture<FetchedValue> fetchedValueCF = fetchedValue
.handle((result, exception) -> executionContext.call(() -> {
.handle((result, exception) -> executionContext.call(exception,() -> {
fetchCtx.onCompleted(result, exception);
if (exception != null) {
CompletableFuture<Object> handleFetchingExceptionResult = handleFetchingException(dataFetchingEnvironment.get(), parameters, exception);
Expand Down Expand Up @@ -819,7 +819,7 @@ protected FieldValueInfo completeValueForList(ExecutionContext executionContext,
overallResult.whenComplete(completeListCtx::onCompleted);

resultsFuture.whenComplete((results, exception) -> {
executionContext.run(() -> {
executionContext.run(exception,() -> {
if (exception != null) {
handleValueException(overallResult, exception, executionContext);
return;
Expand Down
23 changes: 11 additions & 12 deletions src/main/java/graphql/execution/SubscriptionExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,18 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont

//
// when the upstream source event stream completes, subscribe to it and wire in our adapter
CompletableFuture<ExecutionResult> overallResult = sourceEventStream.thenApply((publisher) -> {
return executionContext.call(() -> {
if (publisher == null) {
ExecutionResultImpl executionResult = new ExecutionResultImpl(null, executionContext.getErrors());
CompletableFuture<ExecutionResult> overallResult = sourceEventStream.thenApply((publisher) ->
executionContext.call(() -> {
if (publisher == null) {
ExecutionResultImpl executionResult = new ExecutionResultImpl(null, executionContext.getErrors());
return executionResult;
}
Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload);
boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext());
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered);
ExecutionResultImpl executionResult = new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
return executionResult;
}
Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload);
boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext());
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered);
ExecutionResultImpl executionResult = new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
return executionResult;
});
});
}));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like formatting


// dispatched the subscription query
executionStrategyCtx.onDispatched();
Expand Down
Loading
Loading