Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/graphql/execution/AsyncExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
for (FieldValueInfo completeValueInfo : completeValueInfos) {
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
}
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos, parameters);
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
}).exceptionally((ex) -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
// the execution does not hang.
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex, parameters);
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
executionStrategyCtx.onFieldValuesException();
overallResult.completeExceptionally(ex);
return null;
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public AsyncSerialExecutionStrategy(DataFetcherExceptionHandler exceptionHandler
@Override
@SuppressWarnings({"TypeParameterUnusedInFormals", "FutureReturnValueIgnored"})
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
executionContext.getDataLoaderDispatcherStrategy().executionStrategy(executionContext, parameters);
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();

Instrumentation instrumentation = executionContext.getInstrumentation();
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
Expand All @@ -54,7 +54,8 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
ExecutionStrategyParameters newParameters = parameters
.transform(builder -> builder.field(currentField).path(fieldPath));
return resolveField(executionContext, newParameters);

return resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
});

CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
Expand All @@ -65,4 +66,22 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
return overallResult;
}

private Object resolveSerialField(ExecutionContext executionContext,
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy,
ExecutionStrategyParameters newParameters) {
dataLoaderDispatcherStrategy.executionSerialStrategy(executionContext, newParameters);

Object fieldWithInfo = resolveFieldWithInfo(executionContext, newParameters);
if (fieldWithInfo instanceof CompletableFuture) {
//noinspection unchecked
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
return fvi.getFieldValueFuture();
});
} else {
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
return fvi.getFieldValueObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ default void executionStrategy(ExecutionContext executionContext, ExecutionStrat

}

default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
default void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {

}

default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {

}

default void executionStrategyOnFieldValuesException(Throwable t) {

}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/graphql/execution/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon
if (executionContext.getDataLoaderRegistry() == EMPTY_DATALOADER_REGISTRY || doNotAutomaticallyDispatchDataLoader) {
return DataLoaderDispatchStrategy.NO_OP;
}
if (executionStrategy instanceof AsyncExecutionStrategy) {
if (! executionContext.isSubscriptionOperation()) {
Copy link
Member Author

@bbakerman bbakerman Nov 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code under here then uses PerLevelDataLoaderDispatchStrategy

boolean deferEnabled = Optional.ofNullable(executionContext.getGraphQLContext())
.map(graphqlContext -> graphqlContext.getBoolean(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT))
.orElse(false);
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/graphql/execution/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,34 @@ public ValueUnboxer getValueUnboxer() {
return valueUnboxer;
}

/**
* @return true if the current operation is a Query
*/
public boolean isQueryOperation() {
return isOpType(OperationDefinition.Operation.QUERY);
}

/**
* @return true if the current operation is a Mutation
*/
public boolean isMutationOperation() {
return isOpType(OperationDefinition.Operation.MUTATION);
}

/**
* @return true if the current operation is a Subscription
*/
public boolean isSubscriptionOperation() {
return isOpType(OperationDefinition.Operation.SUBSCRIPTION);
}

private boolean isOpType(OperationDefinition.Operation operation) {
if (operationDefinition != null) {
return operation.equals(operationDefinition.getOperation());
}
return false;
}

/**
* This method will only put one error per field path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


/**
* Used when the execution strategy is not an AsyncExecutionStrategy: simply dispatch always after each DF.
* Used when we cant guarantee the fields will be counted right: simply dispatch always after each DF.
*/
@Internal
public class FallbackDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import graphql.execution.ExecutionContext;
import graphql.execution.ExecutionStrategyParameters;
import graphql.execution.FieldValueInfo;
import graphql.execution.MergedField;
import graphql.schema.DataFetcher;
import graphql.util.LockKit;
import org.dataloader.DataLoaderRegistry;
Expand All @@ -27,55 +26,81 @@ private static class CallStack {
private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock();
private final LevelMap expectedFetchCountPerLevel = new LevelMap();
private final LevelMap fetchCountPerLevel = new LevelMap();
private final LevelMap expectedStrategyCallsPerLevel = new LevelMap();
private final LevelMap happenedStrategyCallsPerLevel = new LevelMap();

private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap();
private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap();

private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap();

private final Set<Integer> dispatchedLevels = new LinkedHashSet<>();

public CallStack() {
expectedStrategyCallsPerLevel.set(1, 1);
expectedExecuteObjectCallsPerLevel.set(1, 1);
}

void increaseExpectedFetchCount(int level, int count) {
expectedFetchCountPerLevel.increment(level, count);
}

void clearExpectedFetchCount() {
expectedFetchCountPerLevel.clear();
}

void increaseFetchCount(int level) {
fetchCountPerLevel.increment(level, 1);
}

void increaseExpectedStrategyCalls(int level, int count) {
expectedStrategyCallsPerLevel.increment(level, count);
void clearFetchCount() {
fetchCountPerLevel.clear();
}

void increaseExpectedExecuteObjectCalls(int level, int count) {
expectedExecuteObjectCallsPerLevel.increment(level, count);
}

void increaseHappenedStrategyCalls(int level) {
happenedStrategyCallsPerLevel.increment(level, 1);
void clearExpectedObjectCalls() {
expectedExecuteObjectCallsPerLevel.clear();
}

void increaseHappenedExecuteObjectCalls(int level) {
happenedExecuteObjectCallsPerLevel.increment(level, 1);
}

void clearHappenedExecuteObjectCalls() {
happenedExecuteObjectCallsPerLevel.clear();
}

void increaseHappenedOnFieldValueCalls(int level) {
happenedOnFieldValueCallsPerLevel.increment(level, 1);
}

boolean allStrategyCallsHappened(int level) {
return happenedStrategyCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level);
void clearHappenedOnFieldValueCalls() {
happenedOnFieldValueCallsPerLevel.clear();
}

boolean allExecuteObjectCallsHappened(int level) {
return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level);
}

boolean allOnFieldCallsHappened(int level) {
return happenedOnFieldValueCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level);
return happenedOnFieldValueCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level);
}

boolean allFetchesHappened(int level) {
return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level);
}

void clearDispatchLevels() {
dispatchedLevels.clear();
}

@Override
public String toString() {
return "CallStack{" +
"expectedFetchCountPerLevel=" + expectedFetchCountPerLevel +
", fetchCountPerLevel=" + fetchCountPerLevel +
", expectedStrategyCallsPerLevel=" + expectedStrategyCallsPerLevel +
", happenedStrategyCallsPerLevel=" + happenedStrategyCallsPerLevel +
", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel +
", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel +
", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel +
", dispatchedLevels" + dispatchedLevels +
'}';
Expand Down Expand Up @@ -105,33 +130,37 @@ public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, Execu
@Override
public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1;
increaseCallCounts(curLevel, parameters);
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters);
}

@Override
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
int curLevel = parameters.getPath().getLevel() + 1;
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
resetCallStack();
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1);
}

@Override
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1);
}

public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters executionStrategyParameters) {
int curLevel = executionStrategyParameters.getPath().getLevel() + 1;
public void executionStrategyOnFieldValuesException(Throwable t) {
callStack.lock.runLocked(() ->
callStack.increaseHappenedOnFieldValueCalls(curLevel)
callStack.increaseHappenedOnFieldValueCalls(1)
);
}


@Override
public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1;
increaseCallCounts(curLevel, parameters);
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters);
}

@Override
public void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
int curLevel = parameters.getPath().getLevel() + 1;
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel);
}


Expand All @@ -143,16 +172,30 @@ public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyPa
);
}

private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, ExecutionStrategyParameters executionStrategyParameters) {
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, executionStrategyParameters.getFields().size());
}

private void increaseCallCounts(int curLevel, ExecutionStrategyParameters executionStrategyParameters) {
int fieldCount = executionStrategyParameters.getFields().size();
private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount) {
callStack.lock.runLocked(() -> {
callStack.increaseHappenedExecuteObjectCalls(curLevel);
callStack.increaseExpectedFetchCount(curLevel, fieldCount);
callStack.increaseHappenedStrategyCalls(curLevel);
});
}

private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel, ExecutionStrategyParameters parameters) {
private void resetCallStack() {
callStack.lock.runLocked(() -> {
callStack.clearDispatchLevels();
callStack.clearExpectedObjectCalls();
callStack.clearExpectedFetchCount();
callStack.clearFetchCount();
callStack.clearHappenedExecuteObjectCalls();
callStack.clearHappenedOnFieldValueCalls();
callStack.expectedExecuteObjectCallsPerLevel.set(1, 1);
});
}

private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel) {
boolean dispatchNeeded = callStack.lock.callLocked(() ->
handleOnFieldValuesInfo(fieldValueInfoList, curLevel)
);
Expand All @@ -166,18 +209,21 @@ private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueIn
//
private boolean handleOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfos, int curLevel) {
callStack.increaseHappenedOnFieldValueCalls(curLevel);
int expectedStrategyCalls = getCountForList(fieldValueInfos);
callStack.increaseExpectedStrategyCalls(curLevel + 1, expectedStrategyCalls);
int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos);
callStack.increaseExpectedExecuteObjectCalls(curLevel + 1, expectedOnObjectCalls);
return dispatchIfNeeded(curLevel + 1);
}

private int getCountForList(List<FieldValueInfo> fieldValueInfos) {
/**
* the amount of (non nullable) objects that will require an execute object call
*/
private int getObjectCountForList(List<FieldValueInfo> fieldValueInfos) {
int result = 0;
for (FieldValueInfo fieldValueInfo : fieldValueInfos) {
if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) {
result += 1;
} else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) {
result += getCountForList(fieldValueInfo.getFieldValueInfos());
result += getObjectCountForList(fieldValueInfo.getFieldValueInfos());
}
}
return result;
Expand Down Expand Up @@ -221,7 +267,7 @@ private boolean levelReady(int level) {
return callStack.allFetchesHappened(1);
}
if (levelReady(level - 1) && callStack.allOnFieldCallsHappened(level - 1)
&& callStack.allStrategyCallsHappened(level) && callStack.allFetchesHappened(level)) {
&& callStack.allExecuteObjectCallsHappened(level) && callStack.allFetchesHappened(level)) {

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,17 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa
}

@Override
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
if (this.startedDeferredExecution.get()) {
this.dispatch();
}
int curLevel = parameters.getPath().getLevel() + 1;
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1);
}

@Override
public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters executionStrategyParameters) {
int curLevel = executionStrategyParameters.getPath().getLevel() + 1;
public void executionStrategyOnFieldValuesException(Throwable t) {
callStack.lock.runLocked(() ->
callStack.increaseHappenedOnFieldValueCalls(curLevel)
callStack.increaseHappenedOnFieldValueCalls(1)
);
}

Expand All @@ -159,7 +157,7 @@ public void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoLi
this.dispatch();
}
int curLevel = parameters.getPath().getLevel() + 1;
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel);
}


Expand Down Expand Up @@ -207,7 +205,7 @@ private void increaseCallCounts(int curLevel, ExecutionStrategyParameters parame
});
}

private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel, ExecutionStrategyParameters parameters) {
private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel) {
boolean dispatchNeeded = callStack.lock.callLocked(() ->
handleOnFieldValuesInfo(fieldValueInfoList, curLevel)
);
Expand Down
Loading