Skip to content

Commit ee2243c

Browse files
committed
Revert "Revert "Merge pull request apache#298 from datastax/PYTHON-108""
This reverts commit dfa91b8. Conflicts: cassandra/cluster.py cassandra/connection.py cassandra/io/asyncorereactor.py cassandra/io/eventletreactor.py cassandra/io/geventreactor.py cassandra/io/libevreactor.py cassandra/io/twistedreactor.py cassandra/query.py
1 parent da7727b commit ee2243c

File tree

12 files changed

+460
-196
lines changed

12 files changed

+460
-196
lines changed

cassandra/cluster.py

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,8 +1442,7 @@ class Session(object):
14421442
"""
14431443
A default timeout, measured in seconds, for queries executed through
14441444
:meth:`.execute()` or :meth:`.execute_async()`. This default may be
1445-
overridden with the `timeout` parameter for either of those methods
1446-
or the `timeout` parameter for :meth:`.ResponseFuture.result()`.
1445+
overridden with the `timeout` parameter for either of those methods.
14471446
14481447
Setting this to :const:`None` will cause no timeouts to be set by default.
14491448
@@ -1581,17 +1580,14 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_
15811580
If `query` is a Statement with its own custom_payload. The message payload
15821581
will be a union of the two, with the values specified here taking precedence.
15831582
"""
1584-
if timeout is _NOT_SET:
1585-
timeout = self.default_timeout
1586-
15871583
if trace and not isinstance(query, Statement):
15881584
raise TypeError(
15891585
"The query argument must be an instance of a subclass of "
15901586
"cassandra.query.Statement when trace=True")
15911587

1592-
future = self.execute_async(query, parameters, trace, custom_payload)
1588+
future = self.execute_async(query, parameters, trace, custom_payload, timeout)
15931589
try:
1594-
result = future.result(timeout)
1590+
result = future.result()
15951591
finally:
15961592
if trace:
15971593
try:
@@ -1601,7 +1597,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_
16011597

16021598
return result
16031599

1604-
def execute_async(self, query, parameters=None, trace=False, custom_payload=None):
1600+
def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET):
16051601
"""
16061602
Execute the given query and return a :class:`~.ResponseFuture` object
16071603
which callbacks may be attached to for asynchronous response
@@ -1646,11 +1642,14 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
16461642
... log.exception("Operation failed:")
16471643
16481644
"""
1649-
future = self._create_response_future(query, parameters, trace, custom_payload)
1645+
if timeout is _NOT_SET:
1646+
timeout = self.default_timeout
1647+
1648+
future = self._create_response_future(query, parameters, trace, custom_payload, timeout)
16501649
future.send_request()
16511650
return future
16521651

1653-
def _create_response_future(self, query, parameters, trace, custom_payload):
1652+
def _create_response_future(self, query, parameters, trace, custom_payload, timeout):
16541653
""" Returns the ResponseFuture before calling send_request() on it """
16551654

16561655
prepared_statement = None
@@ -1704,7 +1703,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload):
17041703
message.update_custom_payload(custom_payload)
17051704

17061705
return ResponseFuture(
1707-
self, message, query, self.default_timeout, metrics=self._metrics,
1706+
self, message, query, timeout, metrics=self._metrics,
17081707
prepared_statement=prepared_statement)
17091708

17101709
def prepare(self, query, custom_payload=None):
@@ -1737,11 +1736,10 @@ def prepare(self, query, custom_payload=None):
17371736
message. See :ref:`custom_payload`.
17381737
"""
17391738
message = PrepareMessage(query=query)
1740-
message.custom_payload = custom_payload
1741-
future = ResponseFuture(self, message, query=None)
1739+
future = ResponseFuture(self, message, query=None, timeout=self.default_timeout)
17421740
try:
17431741
future.send_request()
1744-
query_id, column_metadata, pk_indexes = future.result(self.default_timeout)
1742+
query_id, column_metadata, pk_indexes = future.result()
17451743
except Exception:
17461744
log.exception("Error preparing query:")
17471745
raise
@@ -1767,7 +1765,7 @@ def prepare_on_all_hosts(self, query, excluded_host):
17671765
futures = []
17681766
for host in self._pools.keys():
17691767
if host != excluded_host and host.is_up:
1770-
future = ResponseFuture(self, PrepareMessage(query=query), None)
1768+
future = ResponseFuture(self, PrepareMessage(query=query), None, self.default_timeout)
17711769

17721770
# we don't care about errors preparing against specific hosts,
17731771
# since we can always prepare them as needed when the prepared
@@ -1788,7 +1786,7 @@ def prepare_on_all_hosts(self, query, excluded_host):
17881786

17891787
for host, future in futures:
17901788
try:
1791-
future.result(self.default_timeout)
1789+
future.result()
17921790
except Exception:
17931791
log.exception("Error preparing query for host %s:", host)
17941792

@@ -2832,13 +2830,14 @@ class ResponseFuture(object):
28322830
_paging_state = None
28332831
_custom_payload = None
28342832
_warnings = None
2833+
_timer = None
28352834

2836-
def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None):
2835+
def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None):
28372836
self.session = session
28382837
self.row_factory = session.row_factory
28392838
self.message = message
28402839
self.query = query
2841-
self.default_timeout = default_timeout
2840+
self.timeout = timeout
28422841
self._metrics = metrics
28432842
self.prepared_statement = prepared_statement
28442843
self._callback_lock = Lock()
@@ -2849,6 +2848,18 @@ def __init__(self, session, message, query, default_timeout=None, metrics=None,
28492848
self._errors = {}
28502849
self._callbacks = []
28512850
self._errbacks = []
2851+
self._start_timer()
2852+
2853+
def _start_timer(self):
2854+
if self.timeout is not None:
2855+
self._timer = self.session.cluster.connection_class.create_timer(self.timeout, self._on_timeout)
2856+
2857+
def _cancel_timer(self):
2858+
if self._timer:
2859+
self._timer.cancel()
2860+
2861+
def _on_timeout(self):
2862+
self._set_final_exception(OperationTimedOut(self._errors, self._current_host))
28522863

28532864
def _make_query_plan(self):
28542865
# convert the list/generator/etc to an iterator so that subsequent
@@ -2973,6 +2984,7 @@ def start_fetching_next_page(self):
29732984
self._event.clear()
29742985
self._final_result = _NOT_SET
29752986
self._final_exception = None
2987+
self._start_timer()
29762988
self.send_request()
29772989

29782990
def _reprepare(self, prepare_message):
@@ -3187,6 +3199,7 @@ def _execute_after_prepare(self, response):
31873199
"statement on host %s: %s" % (self._current_host, response)))
31883200

31893201
def _set_final_result(self, response):
3202+
self._cancel_timer()
31903203
if self._metrics is not None:
31913204
self._metrics.request_timer.addValue(time.time() - self._start_time)
31923205

@@ -3201,6 +3214,7 @@ def _set_final_result(self, response):
32013214
fn(response, *args, **kwargs)
32023215

32033216
def _set_final_exception(self, response):
3217+
self._cancel_timer()
32043218
if self._metrics is not None:
32053219
self._metrics.request_timer.addValue(time.time() - self._start_time)
32063220

@@ -3244,6 +3258,11 @@ def result(self, timeout=_NOT_SET):
32443258
encountered. If the final result or error has not been set
32453259
yet, this method will block until that time.
32463260
3261+
.. versionchanged:: 2.6.0
3262+
3263+
**`timeout` is deprecated. Use timeout in the Session execute functions instead.
3264+
The following description applies to deprecated behavior:**
3265+
32473266
You may set a timeout (in seconds) with the `timeout` parameter.
32483267
By default, the :attr:`~.default_timeout` for the :class:`.Session`
32493268
this was created through will be used for the timeout on this
@@ -3257,11 +3276,6 @@ def result(self, timeout=_NOT_SET):
32573276
This is a client-side timeout. For more information
32583277
about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.
32593278
3260-
**Important**: This timeout currently has no effect on callbacks registered
3261-
on a :class:`~.ResponseFuture` through :meth:`.ResponseFuture.add_callback` or
3262-
:meth:`.ResponseFuture.add_errback`; even if a query exceeds this default
3263-
timeout, neither the registered callback or errback will be called.
3264-
32653279
Example usage::
32663280
32673281
>>> future = session.execute_async("SELECT * FROM mycf")
@@ -3275,27 +3289,24 @@ def result(self, timeout=_NOT_SET):
32753289
... log.exception("Operation failed:")
32763290
32773291
"""
3278-
if timeout is _NOT_SET:
3279-
timeout = self.default_timeout
3292+
if timeout is not _NOT_SET:
3293+
msg = "ResponseFuture.result timeout argument is deprecated. Specify the request timeout via Session.execute[_async]."
3294+
warnings.warn(msg, DeprecationWarning)
3295+
log.warning(msg)
3296+
else:
3297+
timeout = None
32803298

3299+
self._event.wait(timeout)
3300+
# TODO: remove this conditional when deprecated timeout parameter is removed
3301+
if not self._event.is_set():
3302+
self._on_timeout()
32813303
if self._final_result is not _NOT_SET:
32823304
if self._paging_state is None:
32833305
return self._final_result
32843306
else:
3285-
return PagedResult(self, self._final_result, timeout)
3286-
elif self._final_exception:
3287-
raise self._final_exception
3307+
return PagedResult(self, self._final_result)
32883308
else:
3289-
self._event.wait(timeout=timeout)
3290-
if self._final_result is not _NOT_SET:
3291-
if self._paging_state is None:
3292-
return self._final_result
3293-
else:
3294-
return PagedResult(self, self._final_result, timeout)
3295-
elif self._final_exception:
3296-
raise self._final_exception
3297-
else:
3298-
raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
3309+
raise self._final_exception
32993310

33003311
def get_query_trace(self, max_wait=None):
33013312
"""
@@ -3450,10 +3461,9 @@ class will be returned.
34503461

34513462
response_future = None
34523463

3453-
def __init__(self, response_future, initial_response, timeout=_NOT_SET):
3464+
def __init__(self, response_future, initial_response):
34543465
self.response_future = response_future
34553466
self.current_response = iter(initial_response)
3456-
self.timeout = timeout
34573467

34583468
def __iter__(self):
34593469
return self
@@ -3466,7 +3476,7 @@ def next(self):
34663476
raise
34673477

34683478
self.response_future.start_fetching_next_page()
3469-
result = self.response_future.result(self.timeout)
3479+
result = self.response_future.result()
34703480
if self.response_future.has_more_pages:
34713481
self.current_response = result.current_response
34723482
else:

cassandra/connection.py

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from collections import defaultdict, deque, namedtuple
1717
import errno
1818
from functools import wraps, partial
19+
from heapq import heappush, heappop
1920
import io
2021
import logging
2122
import socket
@@ -44,7 +45,8 @@
4445
QueryMessage, ResultMessage, decode_response,
4546
InvalidRequestException, SupportedMessage,
4647
AuthResponseMessage, AuthChallengeMessage,
47-
AuthSuccessMessage, ProtocolException, MAX_SUPPORTED_VERSION)
48+
AuthSuccessMessage, ProtocolException,
49+
MAX_SUPPORTED_VERSION, RegisterMessage)
4850
from cassandra.util import OrderedDict
4951

5052

@@ -219,6 +221,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
219221
self.is_control_connection = is_control_connection
220222
self.user_type_map = user_type_map
221223
self._push_watchers = defaultdict(set)
224+
self._callbacks = {}
222225
self._iobuf = io.BytesIO()
223226

224227
if protocol_version >= 3:
@@ -233,6 +236,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
233236
self.highest_request_id = self.max_request_id
234237

235238
self.lock = RLock()
239+
self.connected_event = Event()
236240

237241
@classmethod
238242
def initialize_reactor(self):
@@ -250,6 +254,10 @@ def handle_fork(self):
250254
"""
251255
pass
252256

257+
@classmethod
258+
def create_timer(cls, timeout, callback):
259+
raise NotImplementedError()
260+
253261
@classmethod
254262
def factory(cls, host, timeout, *args, **kwargs):
255263
"""
@@ -407,11 +415,24 @@ def wait_for_responses(self, *msgs, **kwargs):
407415
self.defunct(exc)
408416
raise
409417

410-
def register_watcher(self, event_type, callback):
411-
raise NotImplementedError()
418+
def register_watcher(self, event_type, callback, register_timeout=None):
419+
"""
420+
Register a callback for a given event type.
421+
"""
422+
self._push_watchers[event_type].add(callback)
423+
self.wait_for_response(
424+
RegisterMessage(event_list=[event_type]),
425+
timeout=register_timeout)
412426

413-
def register_watchers(self, type_callback_dict):
414-
raise NotImplementedError()
427+
def register_watchers(self, type_callback_dict, register_timeout=None):
428+
"""
429+
Register multiple callback/event type pairs, expressed as a dict.
430+
"""
431+
for event_type, callback in type_callback_dict.items():
432+
self._push_watchers[event_type].add(callback)
433+
self.wait_for_response(
434+
RegisterMessage(event_list=type_callback_dict.keys()),
435+
timeout=register_timeout)
415436

416437
def control_conn_disposed(self):
417438
self.is_control_connection = False
@@ -907,3 +928,77 @@ def stop(self):
907928
def _raise_if_stopped(self):
908929
if self._shutdown_event.is_set():
909930
raise self.ShutdownException()
931+
932+
933+
class Timer(object):
934+
935+
canceled = False
936+
937+
def __init__(self, timeout, callback):
938+
self.end = time.time() + timeout
939+
self.callback = callback
940+
if timeout < 0:
941+
self.callback()
942+
943+
def cancel(self):
944+
self.canceled = True
945+
946+
def finish(self, time_now):
947+
if self.canceled:
948+
return True
949+
950+
if time_now >= self.end:
951+
self.callback()
952+
return True
953+
954+
return False
955+
956+
957+
class TimerManager(object):
958+
959+
def __init__(self):
960+
self._queue = []
961+
self._new_timers = []
962+
963+
def add_timer(self, timer):
964+
"""
965+
called from client thread with a Timer object
966+
"""
967+
self._new_timers.append((timer.end, timer))
968+
969+
def service_timeouts(self):
970+
"""
971+
run callbacks on all expired timers
972+
Called from the event thread
973+
:return: next end time, or None
974+
"""
975+
queue = self._queue
976+
new_timers = self._new_timers
977+
while new_timers:
978+
heappush(queue, new_timers.pop())
979+
980+
now = time.time()
981+
while queue:
982+
try:
983+
timer = queue[0][1]
984+
if timer.finish(now):
985+
heappop(queue)
986+
else:
987+
return timer.end
988+
except Exception:
989+
log.exception("Exception while servicing timeout callback: ")
990+
991+
@property
992+
def next_timeout(self):
993+
try:
994+
return self._queue[0][0]
995+
except IndexError:
996+
pass
997+
998+
@property
999+
def next_offset(self):
1000+
try:
1001+
next_end = self._queue[0][0]
1002+
return next_end - time.time()
1003+
except IndexError:
1004+
pass

0 commit comments

Comments
 (0)