Skip to content

Commit e19c041

Browse files
committed
Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses
1 parent e313946 commit e19c041

5 files changed

Lines changed: 27 additions & 8 deletions

File tree

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
3.11
22
====
33

4+
Features
5+
--------
6+
* Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses. (PYTHON-762)
7+
48
Bug Fixes
59
---------
610
* is_idempotent flag is not propagated from PreparedStatement to BoundStatement (PYTHON-736)

cassandra/cluster.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,12 @@ def default_retry_policy(self, policy):
590590
Setting to zero disables heartbeats.
591591
"""
592592

593+
idle_heartbeat_timeout = 30
594+
"""
595+
Timeout, in seconds, on which the heartbeat wait for idle connection responses.
596+
Lowering this value can help to discover bad connections earlier.
597+
"""
598+
593599
schema_event_refresh_window = 2
594600
"""
595601
Window, in seconds, within which a schema component will be refreshed after
@@ -756,7 +762,8 @@ def __init__(self,
756762
reprepare_on_up=True,
757763
execution_profiles=None,
758764
allow_beta_protocol_version=False,
759-
timestamp_generator=None):
765+
timestamp_generator=None,
766+
idle_heartbeat_timeout=30):
760767
"""
761768
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
762769
extablishing connection pools or refreshing metadata.
@@ -847,6 +854,7 @@ def __init__(self,
847854
self.max_schema_agreement_wait = max_schema_agreement_wait
848855
self.control_connection_timeout = control_connection_timeout
849856
self.idle_heartbeat_interval = idle_heartbeat_interval
857+
self.idle_heartbeat_timeout = idle_heartbeat_timeout
850858
self.schema_event_refresh_window = schema_event_refresh_window
851859
self.topology_event_refresh_window = topology_event_refresh_window
852860
self.status_event_refresh_window = status_event_refresh_window
@@ -1187,7 +1195,11 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
11871195
self.profile_manager.check_supported() # todo: rename this method
11881196

11891197
if self.idle_heartbeat_interval:
1190-
self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
1198+
self._idle_heartbeat = ConnectionHeartbeat(
1199+
self.idle_heartbeat_interval,
1200+
self.get_connection_holders,
1201+
timeout=self.idle_heartbeat_timeout
1202+
)
11911203
self._is_setup = True
11921204

11931205
session = self._new_session(keyspace)

cassandra/connection.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -951,9 +951,10 @@ def _options_callback(self, response):
951951

952952
class ConnectionHeartbeat(Thread):
953953

954-
def __init__(self, interval_sec, get_connection_holders):
954+
def __init__(self, interval_sec, get_connection_holders, timeout):
955955
Thread.__init__(self, name="Connection heartbeat")
956956
self._interval = interval_sec
957+
self._timeout = timeout
957958
self._get_connection_holders = get_connection_holders
958959
self._shutdown_event = Event()
959960
self.daemon = True
@@ -990,8 +991,8 @@ def run(self):
990991
owner.return_connection(connection)
991992
self._raise_if_stopped()
992993

993-
# Wait max `self._interval` seconds for all HeartbeatFutures to complete
994-
timeout = self._interval
994+
# Wait max `self._timeout` seconds for all HeartbeatFutures to complete
995+
timeout = self._timeout
995996
start_time = time.time()
996997
for f in futures:
997998
self._raise_if_stopped()
@@ -1007,7 +1008,7 @@ def run(self):
10071008
id(connection), connection.host)
10081009
failed_connections.append((f.connection, f.owner, e))
10091010

1010-
timeout = self._interval - (time.time() - start_time)
1011+
timeout = self._timeout - (time.time() - start_time)
10111012

10121013
for connection, owner, exc in failed_connections:
10131014
self._raise_if_stopped()

docs/api/cassandra/cluster.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646

4747
.. autoattribute:: idle_heartbeat_interval
4848

49+
.. autoattribute:: idle_heartbeat_timeout
50+
4951
.. autoattribute:: schema_event_refresh_window
5052

5153
.. autoattribute:: topology_event_refresh_window

tests/unit/test_connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ def make_get_holders(len):
277277
get_holders = Mock(return_value=holders)
278278
return get_holders
279279

280-
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05):
281-
ch = ConnectionHeartbeat(interval, get_holders_fun)
280+
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05, timeout=0.05):
281+
ch = ConnectionHeartbeat(interval, get_holders_fun, timeout=timeout)
282282
time.sleep(interval * count)
283283
ch.stop()
284284
self.assertTrue(get_holders_fun.call_count)

0 commit comments

Comments
 (0)