@@ -254,6 +254,15 @@ def test_async_paging_verify_writes(self):
254254 self .assertSequenceEqual (range (1 , 101 ), value_array )
255255
256256 def test_paging_callbacks (self ):
257+ """
258+ Test to validate callback api
259+ @since 3.9.0
260+ @jira_ticket PYTHON-733
261+ @expected_result callbacks shouldn't be called twice per message
262+ and the fetch_size should be handled in a transparent way to the user
263+
264+ @test_category queries
265+ """
257266 statements_and_params = zip (cycle (["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)" ]),
258267 [(i , ) for i in range (100 )])
259268 execute_concurrent (self .session , list (statements_and_params ))
@@ -266,8 +275,10 @@ def test_paging_callbacks(self):
266275
267276 event = Event ()
268277 counter = count ()
278+ number_of_calls = count ()
269279
270- def handle_page (rows , future , counter ):
280+ def handle_page (rows , future , counter , number_of_calls ):
281+ next (number_of_calls )
271282 for row in rows :
272283 next (counter )
273284
@@ -280,26 +291,34 @@ def handle_error(err):
280291 event .set ()
281292 self .fail (err )
282293
283- future .add_callbacks (callback = handle_page , callback_args = (future , counter ), errback = handle_error )
294+ future .add_callbacks (callback = handle_page , callback_args = (future , counter , number_of_calls ),
295+ errback = handle_error )
284296 event .wait ()
297+ self .assertEqual (next (number_of_calls ), 100 // fetch_size + 1 )
285298 self .assertEqual (next (counter ), 100 )
286299
287300 # simple statement
288301 future = self .session .execute_async (SimpleStatement ("SELECT * FROM test3rf.test" ), timeout = 20 )
289302 event .clear ()
290303 counter = count ()
304+ number_of_calls = count ()
291305
292- future .add_callbacks (callback = handle_page , callback_args = (future , counter ), errback = handle_error )
306+ future .add_callbacks (callback = handle_page , callback_args = (future , counter , number_of_calls ),
307+ errback = handle_error )
293308 event .wait ()
309+ self .assertEqual (next (number_of_calls ), 100 // fetch_size + 1 )
294310 self .assertEqual (next (counter ), 100 )
295311
296312 # prepared statement
297313 future = self .session .execute_async (prepared , timeout = 20 )
298314 event .clear ()
299315 counter = count ()
316+ number_of_calls = count ()
300317
301- future .add_callbacks (callback = handle_page , callback_args = (future , counter ), errback = handle_error )
318+ future .add_callbacks (callback = handle_page , callback_args = (future , counter , number_of_calls ),
319+ errback = handle_error )
302320 event .wait ()
321+ self .assertEqual (next (number_of_calls ), 100 // fetch_size + 1 )
303322 self .assertEqual (next (counter ), 100 )
304323
305324 def test_concurrent_with_paging (self ):
0 commit comments