6464 RESULT_KIND_SET_KEYSPACE , RESULT_KIND_ROWS ,
6565 RESULT_KIND_SCHEMA_CHANGE , ProtocolHandler ,
6666 RESULT_KIND_VOID )
67- from cassandra .metadata import Metadata , protect_name , murmur3
67+ from cassandra .metadata import Metadata , protect_name , murmur3 , _NodeInfo
6868from cassandra .policies import (TokenAwarePolicy , DCAwareRoundRobinPolicy , SimpleConvictionPolicy ,
6969 ExponentialReconnectionPolicy , HostDistance ,
7070 RetryPolicy , IdentityTranslator , NoSpeculativeExecutionPlan ,
@@ -581,7 +581,7 @@ class Cluster(object):
581581 contact_points = ['127.0.0.1' ]
582582 """
583583 The list of contact points to try connecting for cluster discovery. A
584- contact point can be a string (ip, hostname) or a
584+ contact point can be a string (ip or hostname), a tuple (ip/ hostname, port ) or a
585585 :class:`.connection.EndPoint` instance.
586586
587587 Defaults to loopback interface.
@@ -1152,20 +1152,24 @@ def __init__(self,
11521152 self .endpoint_factory = endpoint_factory or DefaultEndPointFactory (port = self .port )
11531153 self .endpoint_factory .configure (self )
11541154
1155- raw_contact_points = [cp for cp in self .contact_points if not isinstance (cp , EndPoint )]
1155+ raw_contact_points = []
1156+ for cp in [cp for cp in self .contact_points if not isinstance (cp , EndPoint )]:
1157+ raw_contact_points .append (cp if isinstance (cp , tuple ) else (cp , port ))
1158+
11561159 self .endpoints_resolved = [cp for cp in self .contact_points if isinstance (cp , EndPoint )]
11571160 self ._endpoint_map_for_insights = {repr (ep ): '{ip}:{port}' .format (ip = ep .address , port = ep .port )
11581161 for ep in self .endpoints_resolved }
11591162
1160- strs_resolved_map = _resolve_contact_points_to_string_map (raw_contact_points , port )
1163+ strs_resolved_map = _resolve_contact_points_to_string_map (raw_contact_points )
11611164 self .endpoints_resolved .extend (list (chain (
11621165 * [
1163- [DefaultEndPoint (x , port ) for x in xs if x is not None ]
1166+ [DefaultEndPoint (ip , port ) for ip , port in xs if ip is not None ]
11641167 for xs in strs_resolved_map .values () if xs is not None
11651168 ]
11661169 )))
1170+
11671171 self ._endpoint_map_for_insights .update (
1168- {key : ['{ip}:{port}' .format (ip = ip , port = port ) for ip in value ]
1172+ {key : ['{ip}:{port}' .format (ip = ip , port = port ) for ip , port in value ]
11691173 for key , value in strs_resolved_map .items () if value is not None }
11701174 )
11711175
@@ -3420,8 +3424,17 @@ class ControlConnection(object):
34203424 _SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, host_id, {nt_col_name}, schema_version FROM system.peers"
34213425 _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
34223426
3427+ _SELECT_PEERS_V2 = "SELECT * FROM system.peers_v2"
3428+ _SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
3429+ _SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"
3430+
34233431 _MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version ("6.0.0" )
34243432
3433+ class PeersQueryType (object ):
3434+ """internal Enum for _peers_query"""
3435+ PEERS = 0
3436+ PEERS_SCHEMA = 1
3437+
34253438 _is_shutdown = False
34263439 _timeout = None
34273440 _protocol_version = None
@@ -3433,6 +3446,8 @@ class ControlConnection(object):
34333446 _schema_meta_enabled = True
34343447 _token_meta_enabled = True
34353448
3449+ _uses_peers_v2 = True
3450+
34363451 # for testing purposes
34373452 _time = time
34383453
@@ -3547,13 +3562,25 @@ def _try_connect(self, host):
35473562 "SCHEMA_CHANGE" : partial (_watch_callback , self_weakref , '_handle_schema_change' )
35483563 }, register_timeout = self ._timeout )
35493564
3550- sel_peers = self ._peers_query_for_version ( connection , self ._SELECT_PEERS_NO_TOKENS_TEMPLATE )
3565+ sel_peers = self ._get_peers_query ( self .PeersQueryType . PEERS , connection )
35513566 sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
35523567 peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
35533568 local_query = QueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE )
3554- shared_results = connection .wait_for_responses (
3555- peers_query , local_query , timeout = self ._timeout )
3569+ (peers_success , peers_result ), (local_success , local_result ) = connection .wait_for_responses (
3570+ peers_query , local_query , timeout = self ._timeout , fail_on_error = False )
3571+
3572+ if not local_success :
3573+ raise local_result
35563574
3575+ if not peers_success :
3576+ # error with the peers v2 query, fallback to peers v1
3577+ self ._uses_peers_v2 = False
3578+ sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3579+ peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3580+ peers_result = connection .wait_for_response (
3581+ peers_query , timeout = self ._timeout )
3582+
3583+ shared_results = (peers_result , local_result )
35573584 self ._refresh_node_list_and_token_map (connection , preloaded_results = shared_results )
35583585 self ._refresh_schema (connection , preloaded_results = shared_results , schema_agreement_wait = - 1 )
35593586 except Exception :
@@ -3675,20 +3702,18 @@ def refresh_node_list_and_token_map(self, force_token_rebuild=False):
36753702
36763703 def _refresh_node_list_and_token_map (self , connection , preloaded_results = None ,
36773704 force_token_rebuild = False ):
3678-
36793705 if preloaded_results :
36803706 log .debug ("[control connection] Refreshing node list and token map using preloaded results" )
36813707 peers_result = preloaded_results [0 ]
36823708 local_result = preloaded_results [1 ]
36833709 else :
36843710 cl = ConsistencyLevel .ONE
3711+ sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
36853712 if not self ._token_meta_enabled :
36863713 log .debug ("[control connection] Refreshing node list without token map" )
3687- sel_peers = self ._peers_query_for_version (connection , self ._SELECT_PEERS_NO_TOKENS_TEMPLATE )
36883714 sel_local = self ._SELECT_LOCAL_NO_TOKENS
36893715 else :
36903716 log .debug ("[control connection] Refreshing node list and token map" )
3691- sel_peers = self ._SELECT_PEERS
36923717 sel_local = self ._SELECT_LOCAL
36933718 peers_query = QueryMessage (query = sel_peers , consistency_level = cl )
36943719 local_query = QueryMessage (query = sel_local , consistency_level = cl )
@@ -3718,13 +3743,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37183743 self ._update_location_info (host , datacenter , rack )
37193744 host .host_id = local_row .get ("host_id" )
37203745 host .listen_address = local_row .get ("listen_address" )
3721- host .broadcast_address = local_row .get ("broadcast_address" )
3746+ host .listen_port = local_row .get ("listen_port" )
3747+ host .broadcast_address = _NodeInfo .get_broadcast_address (local_row )
3748+ host .broadcast_port = _NodeInfo .get_broadcast_port (local_row )
37223749
3723- host .broadcast_rpc_address = self ._address_from_row (local_row )
3750+ host .broadcast_rpc_address = _NodeInfo .get_broadcast_rpc_address (local_row )
3751+ host .broadcast_rpc_port = _NodeInfo .get_broadcast_rpc_port (local_row )
37243752 if host .broadcast_rpc_address is None :
37253753 if self ._token_meta_enabled :
37263754 # local rpc_address is not available, use the connection endpoint
37273755 host .broadcast_rpc_address = connection .endpoint .address
3756+ host .broadcast_rpc_port = connection .endpoint .port
37283757 else :
37293758 # local rpc_address has not been queried yet, try to fetch it
37303759 # separately, which might fail because C* < 2.1.6 doesn't have rpc_address
@@ -3737,9 +3766,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37373766 row = dict_factory (
37383767 local_rpc_address_result .column_names ,
37393768 local_rpc_address_result .parsed_rows )
3740- host .broadcast_rpc_address = row [0 ]['rpc_address' ]
3769+ host .broadcast_rpc_address = _NodeInfo .get_broadcast_rpc_address (row [0 ])
3770+ host .broadcast_rpc_port = _NodeInfo .get_broadcast_rpc_port (row [0 ])
37413771 else :
37423772 host .broadcast_rpc_address = connection .endpoint .address
3773+ host .broadcast_rpc_port = connection .endpoint .port
37433774
37443775 host .release_version = local_row .get ("release_version" )
37453776 host .dse_version = local_row .get ("dse_version" )
@@ -3777,8 +3808,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
37773808 should_rebuild_token_map |= self ._update_location_info (host , datacenter , rack )
37783809
37793810 host .host_id = row .get ("host_id" )
3780- host .broadcast_address = row .get ("peer" )
3781- host .broadcast_rpc_address = self ._address_from_row (row )
3811+ host .broadcast_address = _NodeInfo .get_broadcast_address (row )
3812+ host .broadcast_port = _NodeInfo .get_broadcast_port (row )
3813+ host .broadcast_rpc_address = _NodeInfo .get_broadcast_rpc_address (row )
3814+ host .broadcast_rpc_port = _NodeInfo .get_broadcast_rpc_port (row )
37823815 host .release_version = row .get ("release_version" )
37833816 host .dse_version = row .get ("dse_version" )
37843817 host .dse_workload = row .get ("workload" )
@@ -3834,7 +3867,8 @@ def _refresh_nodes_if_not_up(self, host):
38343867
38353868 def _handle_topology_change (self , event ):
38363869 change_type = event ["change_type" ]
3837- host = self ._cluster .metadata .get_host (event ["address" ][0 ])
3870+ addr , port = event ["address" ]
3871+ host = self ._cluster .metadata .get_host (addr , port )
38383872 if change_type == "NEW_NODE" or change_type == "MOVED_NODE" :
38393873 if self ._topology_event_refresh_window >= 0 :
38403874 delay = self ._delay_for_event_type ('topology_change' , self ._topology_event_refresh_window )
@@ -3844,7 +3878,8 @@ def _handle_topology_change(self, event):
38443878
38453879 def _handle_status_change (self , event ):
38463880 change_type = event ["change_type" ]
3847- host = self ._cluster .metadata .get_host (event ["address" ][0 ])
3881+ addr , port = event ["address" ]
3882+ host = self ._cluster .metadata .get_host (addr , port )
38483883 if change_type == "UP" :
38493884 delay = self ._delay_for_event_type ('status_change' , self ._status_event_refresh_window )
38503885 if host is None :
@@ -3898,7 +3933,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
38983933 elapsed = 0
38993934 cl = ConsistencyLevel .ONE
39003935 schema_mismatches = None
3901- select_peers_query = self ._peers_query_for_version ( connection , self ._SELECT_SCHEMA_PEERS_TEMPLATE )
3936+ select_peers_query = self ._get_peers_query ( self .PeersQueryType . PEERS_SCHEMA , connection )
39023937
39033938 while elapsed < total_timeout :
39043939 peers_query = QueryMessage (query = select_peers_query , consistency_level = cl )
@@ -3955,43 +3990,50 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
39553990
39563991 return dict ((version , list (nodes )) for version , nodes in six .iteritems (versions ))
39573992
3958- def _address_from_row (self , row ):
3993+ def _get_peers_query (self , peers_query_type , connection = None ):
39593994 """
3960- Parse the broadcast rpc address from a row and return it untranslated.
3961- """
3962- addr = None
3963- if "rpc_address" in row :
3964- addr = row .get ("rpc_address" ) # peers and local
3965- if "native_transport_address" in row :
3966- addr = row .get ("native_transport_address" )
3967- if not addr or addr in ["0.0.0.0" , "::" ]:
3968- addr = row .get ("peer" )
3969- return addr
3995+ Determine the peers query to use.
3996+
3997+ :param peers_query_type: Should be one of PeersQueryType enum.
3998+
3999+ If _uses_peers_v2 is True, return the proper peers_v2 query (no templating).
4000+ Else, apply the logic below to choose the peers v1 address column name:
39704001
3971- def _peers_query_for_version (self , connection , peers_query_template ):
3972- """
39734002 Given a connection:
39744003
39754004 - find the server product version running on the connection's host,
39764005 - use that to choose the column name for the transport address (see APOLLO-1130), and
39774006 - use that column name in the provided peers query template.
3978-
3979- The provided template should be a string with a format replacement
3980- field named nt_col_name.
39814007 """
3982- host_release_version = self ._cluster .metadata .get_host (connection .endpoint ).release_version
3983- host_dse_version = self ._cluster .metadata .get_host (connection .endpoint ).dse_version
3984- uses_native_address_query = (
3985- host_dse_version and Version (host_dse_version ) >= self ._MINIMUM_NATIVE_ADDRESS_DSE_VERSION )
4008+ if peers_query_type not in (self .PeersQueryType .PEERS , self .PeersQueryType .PEERS_SCHEMA ):
4009+ raise ValueError ("Invalid peers query type: %s" % peers_query_type )
39864010
3987- if uses_native_address_query :
3988- select_peers_query = peers_query_template .format (nt_col_name = "native_transport_address" )
3989- elif host_release_version :
3990- select_peers_query = peers_query_template .format (nt_col_name = "rpc_address" )
4011+ if self ._uses_peers_v2 :
4012+ if peers_query_type == self .PeersQueryType .PEERS :
4013+ query = self ._SELECT_PEERS_V2 if self ._token_meta_enabled else self ._SELECT_PEERS_NO_TOKENS_V2
4014+ else :
4015+ query = self ._SELECT_SCHEMA_PEERS_V2
39914016 else :
3992- select_peers_query = self ._SELECT_PEERS
4017+ if peers_query_type == self .PeersQueryType .PEERS and self ._token_meta_enabled :
4018+ query = self ._SELECT_PEERS
4019+ else :
4020+ query_template = (self ._SELECT_SCHEMA_PEERS_TEMPLATE
4021+ if peers_query_type == self .PeersQueryType .PEERS_SCHEMA
4022+ else self ._SELECT_PEERS_NO_TOKENS_TEMPLATE )
4023+
4024+ host_release_version = self ._cluster .metadata .get_host (connection .endpoint ).release_version
4025+ host_dse_version = self ._cluster .metadata .get_host (connection .endpoint ).dse_version
4026+ uses_native_address_query = (
4027+ host_dse_version and Version (host_dse_version ) >= self ._MINIMUM_NATIVE_ADDRESS_DSE_VERSION )
4028+
4029+ if uses_native_address_query :
4030+ query = query_template .format (nt_col_name = "native_transport_address" )
4031+ elif host_release_version :
4032+ query = query_template .format (nt_col_name = "rpc_address" )
4033+ else :
4034+ query = self ._SELECT_PEERS
39934035
3994- return select_peers_query
4036+ return query
39954037
39964038 def _signal_error (self ):
39974039 with self ._lock :
@@ -4181,7 +4223,7 @@ class ResponseFuture(object):
41814223
41824224 coordinator_host = None
41834225 """
4184- The host from which we recieved a response
4226+ The host from which we received a response
41854227 """
41864228
41874229 attempted_hosts = None
0 commit comments