@@ -1442,8 +1442,7 @@ class Session(object):
14421442 """
14431443 A default timeout, measured in seconds, for queries executed through
14441444 :meth:`.execute()` or :meth:`.execute_async()`. This default may be
1445- overridden with the `timeout` parameter for either of those methods
1446- or the `timeout` parameter for :meth:`.ResponseFuture.result()`.
1445+ overridden with the `timeout` parameter for either of those methods.
14471446
14481447 Setting this to :const:`None` will cause no timeouts to be set by default.
14491448
@@ -1581,17 +1580,14 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_
15811580 If `query` is a Statement with its own custom_payload. The message payload
15821581 will be a union of the two, with the values specified here taking precedence.
15831582 """
1584- if timeout is _NOT_SET :
1585- timeout = self .default_timeout
1586-
15871583 if trace and not isinstance (query , Statement ):
15881584 raise TypeError (
15891585 "The query argument must be an instance of a subclass of "
15901586 "cassandra.query.Statement when trace=True" )
15911587
1592- future = self .execute_async (query , parameters , trace , custom_payload )
1588+ future = self .execute_async (query , parameters , trace , custom_payload , timeout )
15931589 try :
1594- result = future .result (timeout )
1590+ result = future .result ()
15951591 finally :
15961592 if trace :
15971593 try :
@@ -1601,7 +1597,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_
16011597
16021598 return result
16031599
1604- def execute_async (self , query , parameters = None , trace = False , custom_payload = None ):
1600+ def execute_async (self , query , parameters = None , trace = False , custom_payload = None , timeout = _NOT_SET ):
16051601 """
16061602 Execute the given query and return a :class:`~.ResponseFuture` object
16071603 which callbacks may be attached to for asynchronous response
@@ -1646,11 +1642,14 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
16461642 ... log.exception("Operation failed:")
16471643
16481644 """
1649- future = self ._create_response_future (query , parameters , trace , custom_payload )
1645+ if timeout is _NOT_SET :
1646+ timeout = self .default_timeout
1647+
1648+ future = self ._create_response_future (query , parameters , trace , custom_payload , timeout )
16501649 future .send_request ()
16511650 return future
16521651
1653- def _create_response_future (self , query , parameters , trace , custom_payload ):
1652+ def _create_response_future (self , query , parameters , trace , custom_payload , timeout ):
16541653 """ Returns the ResponseFuture before calling send_request() on it """
16551654
16561655 prepared_statement = None
@@ -1704,7 +1703,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload):
17041703 message .update_custom_payload (custom_payload )
17051704
17061705 return ResponseFuture (
1707- self , message , query , self . default_timeout , metrics = self ._metrics ,
1706+ self , message , query , timeout , metrics = self ._metrics ,
17081707 prepared_statement = prepared_statement )
17091708
17101709 def prepare (self , query , custom_payload = None ):
@@ -1737,11 +1736,10 @@ def prepare(self, query, custom_payload=None):
17371736 message. See :ref:`custom_payload`.
17381737 """
17391738 message = PrepareMessage (query = query )
1740- message .custom_payload = custom_payload
1741- future = ResponseFuture (self , message , query = None )
1739+ future = ResponseFuture (self , message , query = None , timeout = self .default_timeout )
17421740 try :
17431741 future .send_request ()
1744- query_id , column_metadata , pk_indexes = future .result (self . default_timeout )
1742+ query_id , column_metadata , pk_indexes = future .result ()
17451743 except Exception :
17461744 log .exception ("Error preparing query:" )
17471745 raise
@@ -1767,7 +1765,7 @@ def prepare_on_all_hosts(self, query, excluded_host):
17671765 futures = []
17681766 for host in self ._pools .keys ():
17691767 if host != excluded_host and host .is_up :
1770- future = ResponseFuture (self , PrepareMessage (query = query ), None )
1768+ future = ResponseFuture (self , PrepareMessage (query = query ), None , self . default_timeout )
17711769
17721770 # we don't care about errors preparing against specific hosts,
17731771 # since we can always prepare them as needed when the prepared
@@ -1788,7 +1786,7 @@ def prepare_on_all_hosts(self, query, excluded_host):
17881786
17891787 for host , future in futures :
17901788 try :
1791- future .result (self . default_timeout )
1789+ future .result ()
17921790 except Exception :
17931791 log .exception ("Error preparing query for host %s:" , host )
17941792
@@ -2832,13 +2830,14 @@ class ResponseFuture(object):
28322830 _paging_state = None
28332831 _custom_payload = None
28342832 _warnings = None
2833+ _timer = None
28352834
2836- def __init__ (self , session , message , query , default_timeout = None , metrics = None , prepared_statement = None ):
2835+ def __init__ (self , session , message , query , timeout , metrics = None , prepared_statement = None ):
28372836 self .session = session
28382837 self .row_factory = session .row_factory
28392838 self .message = message
28402839 self .query = query
2841- self .default_timeout = default_timeout
2840+ self .timeout = timeout
28422841 self ._metrics = metrics
28432842 self .prepared_statement = prepared_statement
28442843 self ._callback_lock = Lock ()
@@ -2849,6 +2848,18 @@ def __init__(self, session, message, query, default_timeout=None, metrics=None,
28492848 self ._errors = {}
28502849 self ._callbacks = []
28512850 self ._errbacks = []
2851+ self ._start_timer ()
2852+
2853+ def _start_timer (self ):
2854+ if self .timeout is not None :
2855+ self ._timer = self .session .cluster .connection_class .create_timer (self .timeout , self ._on_timeout )
2856+
2857+ def _cancel_timer (self ):
2858+ if self ._timer :
2859+ self ._timer .cancel ()
2860+
2861+ def _on_timeout (self ):
2862+ self ._set_final_exception (OperationTimedOut (self ._errors , self ._current_host ))
28522863
28532864 def _make_query_plan (self ):
28542865 # convert the list/generator/etc to an iterator so that subsequent
@@ -2973,6 +2984,7 @@ def start_fetching_next_page(self):
29732984 self ._event .clear ()
29742985 self ._final_result = _NOT_SET
29752986 self ._final_exception = None
2987+ self ._start_timer ()
29762988 self .send_request ()
29772989
29782990 def _reprepare (self , prepare_message ):
@@ -3187,6 +3199,7 @@ def _execute_after_prepare(self, response):
31873199 "statement on host %s: %s" % (self ._current_host , response )))
31883200
31893201 def _set_final_result (self , response ):
3202+ self ._cancel_timer ()
31903203 if self ._metrics is not None :
31913204 self ._metrics .request_timer .addValue (time .time () - self ._start_time )
31923205
@@ -3201,6 +3214,7 @@ def _set_final_result(self, response):
32013214 fn (response , * args , ** kwargs )
32023215
32033216 def _set_final_exception (self , response ):
3217+ self ._cancel_timer ()
32043218 if self ._metrics is not None :
32053219 self ._metrics .request_timer .addValue (time .time () - self ._start_time )
32063220
@@ -3244,6 +3258,11 @@ def result(self, timeout=_NOT_SET):
32443258 encountered. If the final result or error has not been set
32453259 yet, this method will block until that time.
32463260
3261+ .. versionchanged:: 2.6.0
3262+
3263+ **`timeout` is deprecated. Use timeout in the Session execute functions instead.
3264+ The following description applies to deprecated behavior:**
3265+
32473266 You may set a timeout (in seconds) with the `timeout` parameter.
32483267 By default, the :attr:`~.default_timeout` for the :class:`.Session`
32493268 this was created through will be used for the timeout on this
@@ -3257,11 +3276,6 @@ def result(self, timeout=_NOT_SET):
32573276 This is a client-side timeout. For more information
32583277 about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.
32593278
3260- **Important**: This timeout currently has no effect on callbacks registered
3261- on a :class:`~.ResponseFuture` through :meth:`.ResponseFuture.add_callback` or
3262- :meth:`.ResponseFuture.add_errback`; even if a query exceeds this default
3263- timeout, neither the registered callback or errback will be called.
3264-
32653279 Example usage::
32663280
32673281 >>> future = session.execute_async("SELECT * FROM mycf")
@@ -3275,27 +3289,24 @@ def result(self, timeout=_NOT_SET):
32753289 ... log.exception("Operation failed:")
32763290
32773291 """
3278- if timeout is _NOT_SET :
3279- timeout = self .default_timeout
3292+ if timeout is not _NOT_SET :
3293+ msg = "ResponseFuture.result timeout argument is deprecated. Specify the request timeout via Session.execute[_async]."
3294+ warnings .warn (msg , DeprecationWarning )
3295+ log .warning (msg )
3296+ else :
3297+ timeout = None
32803298
3299+ self ._event .wait (timeout )
3300+ # TODO: remove this conditional when deprecated timeout parameter is removed
3301+ if not self ._event .is_set ():
3302+ self ._on_timeout ()
32813303 if self ._final_result is not _NOT_SET :
32823304 if self ._paging_state is None :
32833305 return self ._final_result
32843306 else :
3285- return PagedResult (self , self ._final_result , timeout )
3286- elif self ._final_exception :
3287- raise self ._final_exception
3307+ return PagedResult (self , self ._final_result )
32883308 else :
3289- self ._event .wait (timeout = timeout )
3290- if self ._final_result is not _NOT_SET :
3291- if self ._paging_state is None :
3292- return self ._final_result
3293- else :
3294- return PagedResult (self , self ._final_result , timeout )
3295- elif self ._final_exception :
3296- raise self ._final_exception
3297- else :
3298- raise OperationTimedOut (errors = self ._errors , last_host = self ._current_host )
3309+ raise self ._final_exception
32993310
33003311 def get_query_trace (self , max_wait = None ):
33013312 """
@@ -3450,10 +3461,9 @@ class will be returned.
34503461
34513462 response_future = None
34523463
3453- def __init__ (self , response_future , initial_response , timeout = _NOT_SET ):
3464+ def __init__ (self , response_future , initial_response ):
34543465 self .response_future = response_future
34553466 self .current_response = iter (initial_response )
3456- self .timeout = timeout
34573467
34583468 def __iter__ (self ):
34593469 return self
@@ -3466,7 +3476,7 @@ def next(self):
34663476 raise
34673477
34683478 self .response_future .start_fetching_next_page ()
3469- result = self .response_future .result (self . timeout )
3479+ result = self .response_future .result ()
34703480 if self .response_future .has_more_pages :
34713481 self .current_response = result .current_response
34723482 else :
0 commit comments