Skip to content

Commit 40fe726

Browse files
committed
Support port discovery for C* 4.0
1 parent 1f2103f commit 40fe726

15 files changed

Lines changed: 408 additions & 104 deletions

CHANGELOG.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ Unreleased
44

55
Features
66
--------
7-
Transient Replication Support (PYTHON-1207)
7+
* Transient Replication Support (PYTHON-1207)
8+
* Support system.peers_v2 and port discovery for C* 4.0 (PYTHON-700)
89

910
Bug Fixes
1011
---------

build.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ schedules:
2121
matrix:
2222
exclude:
2323
- python: [3.4, 3.6, 3.7, 3.8]
24-
- cassandra: ['2.1', '3.0', '4.0', 'test-dse']
24+
- cassandra: ['2.1', '3.0', 'test-dse']
2525

2626
commit_branches:
2727
schedule: per_commit
@@ -34,7 +34,7 @@ schedules:
3434
matrix:
3535
exclude:
3636
- python: [3.4, 3.6, 3.7, 3.8]
37-
- cassandra: ['2.1', '3.0', '4.0', 'test-dse']
37+
- cassandra: ['2.1', '3.0', 'test-dse']
3838

3939
commit_branches_dev:
4040
schedule: per_commit
@@ -184,9 +184,11 @@ build:
184184
pip install --upgrade pip
185185
pip install -U setuptools
186186
187+
pip install git+ssh://[email protected]/riptano/ccm-private.git@cassandra-7544-native-ports-with-dse-fix
188+
187189
# Remove this pyyaml installation when removing Python 3.4 support
188190
pip install PyYAML==5.2
189-
pip install $HOME/ccm
191+
#pip install $HOME/ccm
190192
191193
if [ -n "$CCM_IS_DSE" ]; then
192194
pip install -r test-datastax-requirements.txt

cassandra/cluster.py

Lines changed: 89 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
6565
RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler,
6666
RESULT_KIND_VOID)
67-
from cassandra.metadata import Metadata, protect_name, murmur3
67+
from cassandra.metadata import Metadata, protect_name, murmur3, _NodeInfo
6868
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
6969
ExponentialReconnectionPolicy, HostDistance,
7070
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
@@ -581,7 +581,7 @@ class Cluster(object):
581581
contact_points = ['127.0.0.1']
582582
"""
583583
The list of contact points to try connecting for cluster discovery. A
584-
contact point can be a string (ip, hostname) or a
584+
contact point can be a string (ip or hostname), a tuple (ip/hostname, port) or a
585585
:class:`.connection.EndPoint` instance.
586586
587587
Defaults to loopback interface.
@@ -1152,20 +1152,24 @@ def __init__(self,
11521152
self.endpoint_factory = endpoint_factory or DefaultEndPointFactory(port=self.port)
11531153
self.endpoint_factory.configure(self)
11541154

1155-
raw_contact_points = [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]
1155+
raw_contact_points = []
1156+
for cp in [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]:
1157+
raw_contact_points.append(cp if isinstance(cp, tuple) else (cp, port))
1158+
11561159
self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)]
11571160
self._endpoint_map_for_insights = {repr(ep): '{ip}:{port}'.format(ip=ep.address, port=ep.port)
11581161
for ep in self.endpoints_resolved}
11591162

1160-
strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points, port)
1163+
strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points)
11611164
self.endpoints_resolved.extend(list(chain(
11621165
*[
1163-
[DefaultEndPoint(x, port) for x in xs if x is not None]
1166+
[DefaultEndPoint(ip, port) for ip, port in xs if ip is not None]
11641167
for xs in strs_resolved_map.values() if xs is not None
11651168
]
11661169
)))
1170+
11671171
self._endpoint_map_for_insights.update(
1168-
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip in value]
1172+
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip, port in value]
11691173
for key, value in strs_resolved_map.items() if value is not None}
11701174
)
11711175

@@ -3420,8 +3424,17 @@ class ControlConnection(object):
34203424
_SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, host_id, {nt_col_name}, schema_version FROM system.peers"
34213425
_SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
34223426

3427+
_SELECT_PEERS_V2 = "SELECT * FROM system.peers_v2"
3428+
_SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
3429+
_SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"
3430+
34233431
_MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version("6.0.0")
34243432

3433+
class PeersQueryType(object):
3434+
"""internal Enum for _peers_query"""
3435+
PEERS = 0
3436+
PEERS_SCHEMA = 1
3437+
34253438
_is_shutdown = False
34263439
_timeout = None
34273440
_protocol_version = None
@@ -3433,6 +3446,8 @@ class ControlConnection(object):
34333446
_schema_meta_enabled = True
34343447
_token_meta_enabled = True
34353448

3449+
_uses_peers_v2 = True
3450+
34363451
# for testing purposes
34373452
_time = time
34383453

@@ -3547,13 +3562,25 @@ def _try_connect(self, host):
35473562
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
35483563
}, register_timeout=self._timeout)
35493564

3550-
sel_peers = self._peers_query_for_version(connection, self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
3565+
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
35513566
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
35523567
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
35533568
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
3554-
shared_results = connection.wait_for_responses(
3555-
peers_query, local_query, timeout=self._timeout)
3569+
(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
3570+
peers_query, local_query, timeout=self._timeout, fail_on_error=False)
3571+
3572+
if not local_success:
3573+
raise local_result
35563574

3575+
if not peers_success:
3576+
# error with the peers v2 query, fallback to peers v1
3577+
self._uses_peers_v2 = False
3578+
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
3579+
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
3580+
peers_result = connection.wait_for_response(
3581+
peers_query, timeout=self._timeout)
3582+
3583+
shared_results = (peers_result, local_result)
35573584
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
35583585
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
35593586
except Exception:
@@ -3675,20 +3702,18 @@ def refresh_node_list_and_token_map(self, force_token_rebuild=False):
36753702

36763703
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
36773704
force_token_rebuild=False):
3678-
36793705
if preloaded_results:
36803706
log.debug("[control connection] Refreshing node list and token map using preloaded results")
36813707
peers_result = preloaded_results[0]
36823708
local_result = preloaded_results[1]
36833709
else:
36843710
cl = ConsistencyLevel.ONE
3711+
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36853712
if not self._token_meta_enabled:
36863713
log.debug("[control connection] Refreshing node list without token map")
3687-
sel_peers = self._peers_query_for_version(connection, self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
36883714
sel_local = self._SELECT_LOCAL_NO_TOKENS
36893715
else:
36903716
log.debug("[control connection] Refreshing node list and token map")
3691-
sel_peers = self._SELECT_PEERS
36923717
sel_local = self._SELECT_LOCAL
36933718
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
36943719
local_query = QueryMessage(query=sel_local, consistency_level=cl)
@@ -3718,13 +3743,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37183743
self._update_location_info(host, datacenter, rack)
37193744
host.host_id = local_row.get("host_id")
37203745
host.listen_address = local_row.get("listen_address")
3721-
host.broadcast_address = local_row.get("broadcast_address")
3746+
host.listen_port = local_row.get("listen_port")
3747+
host.broadcast_address = _NodeInfo.get_broadcast_address(local_row)
3748+
host.broadcast_port = _NodeInfo.get_broadcast_port(local_row)
37223749

3723-
host.broadcast_rpc_address = self._address_from_row(local_row)
3750+
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(local_row)
3751+
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(local_row)
37243752
if host.broadcast_rpc_address is None:
37253753
if self._token_meta_enabled:
37263754
# local rpc_address is not available, use the connection endpoint
37273755
host.broadcast_rpc_address = connection.endpoint.address
3756+
host.broadcast_rpc_port = connection.endpoint.port
37283757
else:
37293758
# local rpc_address has not been queried yet, try to fetch it
37303759
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
@@ -3737,9 +3766,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37373766
row = dict_factory(
37383767
local_rpc_address_result.column_names,
37393768
local_rpc_address_result.parsed_rows)
3740-
host.broadcast_rpc_address = row[0]['rpc_address']
3769+
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row[0])
3770+
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(row[0])
37413771
else:
37423772
host.broadcast_rpc_address = connection.endpoint.address
3773+
host.broadcast_rpc_port = connection.endpoint.port
37433774

37443775
host.release_version = local_row.get("release_version")
37453776
host.dse_version = local_row.get("dse_version")
@@ -3777,8 +3808,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37773808
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)
37783809

37793810
host.host_id = row.get("host_id")
3780-
host.broadcast_address = row.get("peer")
3781-
host.broadcast_rpc_address = self._address_from_row(row)
3811+
host.broadcast_address = _NodeInfo.get_broadcast_address(row)
3812+
host.broadcast_port = _NodeInfo.get_broadcast_port(row)
3813+
host.broadcast_rpc_address = _NodeInfo.get_broadcast_rpc_address(row)
3814+
host.broadcast_rpc_port = _NodeInfo.get_broadcast_rpc_port(row)
37823815
host.release_version = row.get("release_version")
37833816
host.dse_version = row.get("dse_version")
37843817
host.dse_workload = row.get("workload")
@@ -3834,7 +3867,8 @@ def _refresh_nodes_if_not_up(self, host):
38343867

38353868
def _handle_topology_change(self, event):
38363869
change_type = event["change_type"]
3837-
host = self._cluster.metadata.get_host(event["address"][0])
3870+
addr, port = event["address"]
3871+
host = self._cluster.metadata.get_host(addr, port)
38383872
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
38393873
if self._topology_event_refresh_window >= 0:
38403874
delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
@@ -3844,7 +3878,8 @@ def _handle_topology_change(self, event):
38443878

38453879
def _handle_status_change(self, event):
38463880
change_type = event["change_type"]
3847-
host = self._cluster.metadata.get_host(event["address"][0])
3881+
addr, port = event["address"]
3882+
host = self._cluster.metadata.get_host(addr, port)
38483883
if change_type == "UP":
38493884
delay = self._delay_for_event_type('status_change', self._status_event_refresh_window)
38503885
if host is None:
@@ -3898,7 +3933,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
38983933
elapsed = 0
38993934
cl = ConsistencyLevel.ONE
39003935
schema_mismatches = None
3901-
select_peers_query = self._peers_query_for_version(connection, self._SELECT_SCHEMA_PEERS_TEMPLATE)
3936+
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
39023937

39033938
while elapsed < total_timeout:
39043939
peers_query = QueryMessage(query=select_peers_query, consistency_level=cl)
@@ -3955,43 +3990,50 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
39553990

39563991
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
39573992

3958-
def _address_from_row(self, row):
3993+
def _get_peers_query(self, peers_query_type, connection=None):
39593994
"""
3960-
Parse the broadcast rpc address from a row and return it untranslated.
3961-
"""
3962-
addr = None
3963-
if "rpc_address" in row:
3964-
addr = row.get("rpc_address") # peers and local
3965-
if "native_transport_address" in row:
3966-
addr = row.get("native_transport_address")
3967-
if not addr or addr in ["0.0.0.0", "::"]:
3968-
addr = row.get("peer")
3969-
return addr
3995+
Determine the peers query to use.
3996+
3997+
:param peers_query_type: Should be one of PeersQueryType enum.
3998+
3999+
If _uses_peers_v2 is True, return the proper peers_v2 query (no templating).
4000+
Else, apply the logic below to choose the peers v1 address column name:
39704001
3971-
def _peers_query_for_version(self, connection, peers_query_template):
3972-
"""
39734002
Given a connection:
39744003
39754004
- find the server product version running on the connection's host,
39764005
- use that to choose the column name for the transport address (see APOLLO-1130), and
39774006
- use that column name in the provided peers query template.
3978-
3979-
The provided template should be a string with a format replacement
3980-
field named nt_col_name.
39814007
"""
3982-
host_release_version = self._cluster.metadata.get_host(connection.endpoint).release_version
3983-
host_dse_version = self._cluster.metadata.get_host(connection.endpoint).dse_version
3984-
uses_native_address_query = (
3985-
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
4008+
if peers_query_type not in (self.PeersQueryType.PEERS, self.PeersQueryType.PEERS_SCHEMA):
4009+
raise ValueError("Invalid peers query type: %s" % peers_query_type)
39864010

3987-
if uses_native_address_query:
3988-
select_peers_query = peers_query_template.format(nt_col_name="native_transport_address")
3989-
elif host_release_version:
3990-
select_peers_query = peers_query_template.format(nt_col_name="rpc_address")
4011+
if self._uses_peers_v2:
4012+
if peers_query_type == self.PeersQueryType.PEERS:
4013+
query = self._SELECT_PEERS_V2 if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS_V2
4014+
else:
4015+
query = self._SELECT_SCHEMA_PEERS_V2
39914016
else:
3992-
select_peers_query = self._SELECT_PEERS
4017+
if peers_query_type == self.PeersQueryType.PEERS and self._token_meta_enabled:
4018+
query = self._SELECT_PEERS
4019+
else:
4020+
query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE
4021+
if peers_query_type == self.PeersQueryType.PEERS_SCHEMA
4022+
else self._SELECT_PEERS_NO_TOKENS_TEMPLATE)
4023+
4024+
host_release_version = self._cluster.metadata.get_host(connection.endpoint).release_version
4025+
host_dse_version = self._cluster.metadata.get_host(connection.endpoint).dse_version
4026+
uses_native_address_query = (
4027+
host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION)
4028+
4029+
if uses_native_address_query:
4030+
query = query_template.format(nt_col_name="native_transport_address")
4031+
elif host_release_version:
4032+
query = query_template.format(nt_col_name="rpc_address")
4033+
else:
4034+
query = self._SELECT_PEERS
39934035

3994-
return select_peers_query
4036+
return query
39954037

39964038
def _signal_error(self):
39974039
with self._lock:
@@ -4181,7 +4223,7 @@ class ResponseFuture(object):
41814223

41824224
coordinator_host = None
41834225
"""
4184-
The host from which we recieved a response
4226+
The host from which we received a response
41854227
"""
41864228

41874229
attempted_hosts = None

cassandra/connection.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,26 @@ class DefaultEndPointFactory(EndPointFactory):
214214

215215
port = None
216216
"""
217-
If set, force all endpoints to use this port.
217+
If no port is discovered in the row, this is the default port
218+
used for endpoint creation.
218219
"""
219220

220221
def __init__(self, port=None):
221222
self.port = port
222223

223224
def create(self, row):
224-
addr = None
225-
if "rpc_address" in row:
226-
addr = row.get("rpc_address")
227-
if "native_transport_address" in row:
228-
addr = row.get("native_transport_address")
229-
if not addr or addr in ["0.0.0.0", "::"]:
230-
addr = row.get("peer")
225+
# TODO next major... move this class so we don't need this kind of hack
226+
from cassandra.metadata import _NodeInfo
227+
addr = _NodeInfo.get_broadcast_rpc_address(row)
228+
port = _NodeInfo.get_broadcast_rpc_port(row)
229+
if port is None:
230+
port = self.port if self.port else 9042
231231

232232
# create the endpoint with the translated address
233+
# TODO next major, create a TranslatedEndPoint type
233234
return DefaultEndPoint(
234235
self.cluster.address_translator.translate(addr),
235-
self.port if self.port is not None else 9042)
236+
port)
236237

237238

238239
@total_ordering

0 commit comments

Comments
 (0)