1919import com .datastax .dse .driver .api .core .cql .continuous .ContinuousAsyncResultSet ;
2020import com .datastax .dse .driver .internal .core .cql .DseConversions ;
2121import com .datastax .dse .protocol .internal .response .result .DseRowsMetadata ;
22+ import com .datastax .oss .driver .api .core .cql .ColumnDefinitions ;
2223import com .datastax .oss .driver .api .core .cql .ExecutionInfo ;
2324import com .datastax .oss .driver .api .core .cql .Row ;
2425import com .datastax .oss .driver .api .core .cql .Statement ;
25- import com .datastax .oss .driver .api .core .session .throttling .Throttled ;
26- import com .datastax .oss .driver .internal .core .channel .ResponseCallback ;
26+ import com .datastax .oss .driver .api .core .metadata .Node ;
2727import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
2828import com .datastax .oss .driver .internal .core .cql .DefaultExecutionInfo ;
2929import com .datastax .oss .driver .internal .core .cql .DefaultRow ;
3535import com .datastax .oss .protocol .internal .response .result .Rows ;
3636import edu .umd .cs .findbugs .annotations .NonNull ;
3737import edu .umd .cs .findbugs .annotations .Nullable ;
38- import io .netty .util .concurrent .Future ;
39- import io .netty .util .concurrent .GenericFutureListener ;
4038import java .nio .ByteBuffer ;
4139import java .time .Duration ;
4240import java .util .List ;
4947 */
5048@ ThreadSafe
5149public class ContinuousCqlRequestHandler
52- extends ContinuousRequestHandlerBase <Statement , ContinuousAsyncResultSet , ExecutionInfo >
53- implements ResponseCallback , GenericFutureListener <Future <java .lang .Void >>, Throttled {
50+ extends ContinuousRequestHandlerBase <Statement , ContinuousAsyncResultSet , ExecutionInfo > {
5451
5552 private final Message message ;
5653 private final Duration firstPageTimeout ;
@@ -63,9 +60,8 @@ public class ContinuousCqlRequestHandler
6360 @ NonNull DefaultSession session ,
6461 @ NonNull InternalDriverContext context ,
6562 @ NonNull String sessionLogPrefix ) {
66- super (statement , session , context , sessionLogPrefix , ContinuousAsyncResultSet . class );
63+ super (statement , session , context , sessionLogPrefix , false );
6764 message = DseConversions .toContinuousPagingMessage (statement , executionProfile , context );
68- throttler .register (this );
6965 firstPageTimeout =
7066 executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE );
7167 otherPagesTimeout =
@@ -77,19 +73,19 @@ public class ContinuousCqlRequestHandler
7773
7874 @ NonNull
7975 @ Override
80- protected Duration getGlobalTimeout () {
76+ protected Duration getGlobalTimeoutDuration () {
8177 return Duration .ZERO ;
8278 }
8379
8480 @ NonNull
8581 @ Override
86- protected Duration getPageTimeout (int pageNumber ) {
82+ protected Duration getPageTimeoutDuration (int pageNumber ) {
8783 return pageNumber == 1 ? firstPageTimeout : otherPagesTimeout ;
8884 }
8985
9086 @ NonNull
9187 @ Override
92- protected Duration getReviseRequestTimeout () {
88+ protected Duration getReviseRequestTimeoutDuration () {
9389 return otherPagesTimeout ;
9490 }
9591
@@ -129,14 +125,17 @@ protected ContinuousAsyncResultSet createEmptyResultSet(@NonNull ExecutionInfo e
129125 @ NonNull
130126 @ Override
131127 protected DefaultExecutionInfo createExecutionInfo (
132- @ NonNull Result result , @ Nullable Frame response ) {
128+ @ NonNull Node node ,
129+ @ Nullable Result result ,
130+ @ Nullable Frame response ,
131+ int successfulExecutionIndex ) {
133132 ByteBuffer pagingState =
134133 result instanceof Rows ? ((Rows ) result ).getMetadata ().pagingState : null ;
135134 return new DefaultExecutionInfo (
136135 statement ,
137136 node ,
138- 0 ,
139- 0 ,
137+ startedSpeculativeExecutionsCount . get () ,
138+ successfulExecutionIndex ,
140139 errors ,
141140 pagingState ,
142141 response ,
@@ -149,7 +148,9 @@ protected DefaultExecutionInfo createExecutionInfo(
149148 @ NonNull
150149 @ Override
151150 protected DefaultContinuousAsyncResultSet createResultSet (
152- @ NonNull Rows rows , @ NonNull ExecutionInfo executionInfo ) {
151+ @ NonNull Rows rows ,
152+ @ NonNull ExecutionInfo executionInfo ,
153+ @ NonNull ColumnDefinitions columnDefinitions ) {
153154 Queue <List <ByteBuffer >> data = rows .getData ();
154155 CountingIterator <Row > iterator =
155156 new CountingIterator <Row >(data .size ()) {
0 commit comments