Skip to content

Commit 1bfe33f

Browse files
Alexandre Dutraadutra
authored andcommitted
JAVA-2472: Enable speculative executions for paged Graph Queries
1 parent bce94b4 commit 1bfe33f

File tree

11 files changed

+1987
-944
lines changed

11 files changed

+1987
-944
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### NGDG (in progress)
66

7+
- [improvement] JAVA-2472: Enable speculative executions for paged graph queries
78
- [improvement] JAVA-1579: Change default result format to latest GraphSON format
89
- [improvement] JAVA-2496: Revisit timeouts for paged graph queries
910
- [bug] JAVA-2510: Fix GraphBinaryDataTypesTest Codec registry initialization

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestAsyncProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ public CompletionStage<ContinuousAsyncResultSet> process(
4444
DefaultSession session,
4545
InternalDriverContext context,
4646
String sessionLogPrefix) {
47-
return new ContinuousCqlRequestHandler(request, session, context, sessionLogPrefix)
48-
.dequeueOrCreatePending();
47+
return new ContinuousCqlRequestHandler(request, session, context, sessionLogPrefix).handle();
4948
}
5049

5150
@Override

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandler.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
2020
import com.datastax.dse.driver.internal.core.cql.DseConversions;
2121
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
22+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
2223
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
2324
import com.datastax.oss.driver.api.core.cql.Row;
2425
import com.datastax.oss.driver.api.core.cql.Statement;
25-
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
26-
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
26+
import com.datastax.oss.driver.api.core.metadata.Node;
2727
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2828
import com.datastax.oss.driver.internal.core.cql.DefaultExecutionInfo;
2929
import com.datastax.oss.driver.internal.core.cql.DefaultRow;
@@ -35,8 +35,6 @@
3535
import com.datastax.oss.protocol.internal.response.result.Rows;
3636
import edu.umd.cs.findbugs.annotations.NonNull;
3737
import edu.umd.cs.findbugs.annotations.Nullable;
38-
import io.netty.util.concurrent.Future;
39-
import io.netty.util.concurrent.GenericFutureListener;
4038
import java.nio.ByteBuffer;
4139
import java.time.Duration;
4240
import java.util.List;
@@ -49,8 +47,7 @@
4947
*/
5048
@ThreadSafe
5149
public class ContinuousCqlRequestHandler
52-
extends ContinuousRequestHandlerBase<Statement, ContinuousAsyncResultSet, ExecutionInfo>
53-
implements ResponseCallback, GenericFutureListener<Future<java.lang.Void>>, Throttled {
50+
extends ContinuousRequestHandlerBase<Statement, ContinuousAsyncResultSet, ExecutionInfo> {
5451

5552
private final Message message;
5653
private final Duration firstPageTimeout;
@@ -63,9 +60,8 @@ public class ContinuousCqlRequestHandler
6360
@NonNull DefaultSession session,
6461
@NonNull InternalDriverContext context,
6562
@NonNull String sessionLogPrefix) {
66-
super(statement, session, context, sessionLogPrefix, ContinuousAsyncResultSet.class);
63+
super(statement, session, context, sessionLogPrefix, false);
6764
message = DseConversions.toContinuousPagingMessage(statement, executionProfile, context);
68-
throttler.register(this);
6965
firstPageTimeout =
7066
executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE);
7167
otherPagesTimeout =
@@ -77,19 +73,19 @@ public class ContinuousCqlRequestHandler
7773

7874
@NonNull
7975
@Override
80-
protected Duration getGlobalTimeout() {
76+
protected Duration getGlobalTimeoutDuration() {
8177
return Duration.ZERO;
8278
}
8379

8480
@NonNull
8581
@Override
86-
protected Duration getPageTimeout(int pageNumber) {
82+
protected Duration getPageTimeoutDuration(int pageNumber) {
8783
return pageNumber == 1 ? firstPageTimeout : otherPagesTimeout;
8884
}
8985

9086
@NonNull
9187
@Override
92-
protected Duration getReviseRequestTimeout() {
88+
protected Duration getReviseRequestTimeoutDuration() {
9389
return otherPagesTimeout;
9490
}
9591

@@ -129,14 +125,17 @@ protected ContinuousAsyncResultSet createEmptyResultSet(@NonNull ExecutionInfo e
129125
@NonNull
130126
@Override
131127
protected DefaultExecutionInfo createExecutionInfo(
132-
@NonNull Result result, @Nullable Frame response) {
128+
@NonNull Node node,
129+
@Nullable Result result,
130+
@Nullable Frame response,
131+
int successfulExecutionIndex) {
133132
ByteBuffer pagingState =
134133
result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null;
135134
return new DefaultExecutionInfo(
136135
statement,
137136
node,
138-
0,
139-
0,
137+
startedSpeculativeExecutionsCount.get(),
138+
successfulExecutionIndex,
140139
errors,
141140
pagingState,
142141
response,
@@ -149,7 +148,9 @@ protected DefaultExecutionInfo createExecutionInfo(
149148
@NonNull
150149
@Override
151150
protected DefaultContinuousAsyncResultSet createResultSet(
152-
@NonNull Rows rows, @NonNull ExecutionInfo executionInfo) {
151+
@NonNull Rows rows,
152+
@NonNull ExecutionInfo executionInfo,
153+
@NonNull ColumnDefinitions columnDefinitions) {
153154
Queue<List<ByteBuffer>> data = rows.getData();
154155
CountingIterator<Row> iterator =
155156
new CountingIterator<Row>(data.size()) {

0 commit comments

Comments
 (0)