@@ -57,14 +57,16 @@ class RunCancelCluster : public Task {
5757ClusterConnector::ClusterConnector (const ContactPointList& contact_points,
5858 ProtocolVersion protocol_version,
5959 const Callback& callback)
60- : contact_points_(contact_points)
60+ : remaining_connector_count_(0 )
61+ , contact_points_(contact_points)
6162 , protocol_version_(protocol_version)
6263 , listener_(NULL )
6364 , event_loop_(NULL )
6465 , random_(NULL )
6566 , metrics_(NULL )
6667 , callback_(callback)
67- , error_code_(CLUSTER_OK) { }
68+ , error_code_(CLUSTER_OK)
69+ , ssl_error_code_(CASS_OK) { }
6870
6971void ClusterConnector::connect (EventLoop* event_loop) {
7072 event_loop_ = event_loop;
@@ -113,29 +115,42 @@ void ClusterConnector::internal_resolve_and_connect() {
113115 }
114116
115117 if (!resolver_) {
116- contact_points_resolved_it_ = contact_points_resolved_.begin ();
117- internal_connect ();
118+ internal_connect_all ();
118119 }
119120}
120121
121- void ClusterConnector::internal_connect () {
122- if (contact_points_resolved_it_ == contact_points_resolved_.end ()) {
123- on_error (CLUSTER_ERROR_NO_HOSTS_AVAILABLE, " Unable to connect to any contact points" );
124- return ;
125- }
126- connector_.reset (Memory::allocate<ControlConnector>(*contact_points_resolved_it_,
127- protocol_version_,
128- bind_callback (&ClusterConnector::on_connect, this )));
129- connector_
122+ void ClusterConnector::internal_connect (const Address& address, ProtocolVersion version) {
123+ ControlConnector::Ptr connector (Memory::allocate<ControlConnector>(address,
124+ version,
125+ bind_callback (&ClusterConnector::on_connect, this )));
126+ connectors_[address] = connector; // Keep track of the connectors so they can be canceled.
127+ connector
130128 ->with_metrics (metrics_)
131129 ->with_settings (settings_.control_connection_settings )
132130 ->connect (event_loop_->loop ());
133131}
134132
133+ void ClusterConnector::internal_connect_all () {
134+ if (contact_points_resolved_.empty ()) {
135+ error_code_ = CLUSTER_ERROR_NO_HOSTS_AVAILABLE;
136+ error_message_ = " Unable to connect to any contact points" ;
137+ finish ();
138+ return ;
139+ }
140+ remaining_connector_count_ = contact_points_resolved_.size ();
141+ for (AddressVec::const_iterator it = contact_points_resolved_.begin (),
142+ end = contact_points_resolved_.end (); it != end; ++it) {
143+ internal_connect (*it, protocol_version_);
144+ }
145+ }
146+
135147void ClusterConnector::internal_cancel () {
136148 error_code_ = CLUSTER_CANCELED;
137149 if (resolver_) resolver_->cancel ();
138- if (connector_) connector_->cancel ();
150+ for (ConnectorMap::iterator it = connectors_.begin (),
151+ end = connectors_.end (); it != end; ++it) {
152+ it->second ->cancel ();
153+ }
139154 if (cluster_) cluster_->close ();
140155}
141156
@@ -144,19 +159,23 @@ void ClusterConnector::finish() {
144159 if (cluster_) cluster_->close ();
145160 // Explicitly release resources on the event loop thread.
146161 resolver_.reset ();
147- connector_. reset ();
162+ connectors_. clear ();
148163 cluster_.reset ();
149164 dec_ref ();
150165}
151166
167+ void ClusterConnector::maybe_finish () {
168+ if (remaining_connector_count_ > 0 && --remaining_connector_count_ == 0 ) {
169+ finish ();
170+ }
171+ }
172+
152173void ClusterConnector::on_error (ClusterConnector::ClusterError code,
153174 const String& message) {
154175 assert (code != CLUSTER_OK && " Notified error without an error" );
155- if (error_code_ == CLUSTER_OK) { // Only perform this once
156- error_message_ = message;
157- error_code_ = code;
158- finish ();
159- }
176+ error_message_ = message;
177+ error_code_ = code;
178+ maybe_finish ();
160179}
161180
162181void ClusterConnector::on_resolve (MultiResolver* resolver) {
@@ -189,16 +208,24 @@ void ClusterConnector::on_resolve(MultiResolver* resolver) {
189208 }
190209 }
191210
192- contact_points_resolved_it_ = contact_points_resolved_.begin ();
193- internal_connect ();
211+ internal_connect_all ();
194212}
195213
196214void ClusterConnector::on_connect (ControlConnector* connector) {
197- if (is_canceled ()) {
198- finish ();
215+ if (!connector->is_ok () && !connector->is_canceled ()) {
216+ LOG_ERROR (" Unable to establish a control connection to host %s because of the following error: %s" ,
217+ connector->address ().to_string ().c_str (),
218+ connector->error_message ().c_str ());
219+ }
220+
221+ // If the cluster object is successfully initialized or if the connector is
222+ // canceled then attempt to finish the connection process.
223+ if (cluster_ || is_canceled ()) {
224+ maybe_finish ();
199225 return ;
200226 }
201227
228+ // Otherwise, initialize the cluster and handle errors.
202229 if (connector->is_ok ()) {
203230 const HostMap& hosts (connector->hosts ());
204231 LoadBalancingPolicy::Vec policies;
@@ -209,9 +236,9 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
209236 // metadata is corrupt or missing and the control connection process
210237 // would've probably failed before this happens.
211238 LOG_ERROR (" Current control connection host %s not found in hosts metadata" ,
212- connector_ ->address ().to_string ().c_str ());
213- ++contact_points_resolved_it_; // Move to the next contact point
214- internal_connect ( );
239+ connector ->address ().to_string ().c_str ());
240+ on_error (CLUSTER_ERROR_NO_HOSTS_AVAILABLE,
241+ " Control connection host is not found in hosts metadata " );
215242 return ;
216243 }
217244 Host::Ptr connected_host (host_it->second );
@@ -234,6 +261,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
234261
235262 ScopedPtr<QueryPlan> query_plan (default_policy->new_query_plan (" " , NULL , NULL ));
236263 if (!query_plan->compute_next ()) { // No hosts in the query plan
264+ LOG_ERROR (" Current control connection host %s has no hosts available in "
265+ " it's query plan for the configured load balancing policy. If "
266+ " using DC-aware check to see if the local datacenter is valid." ,
267+ connector->address ().to_string ().c_str ());
268+
237269 const char * message;
238270 if (dynamic_cast <DCAwarePolicy::DCAwareQueryPlan*>(query_plan.get ()) != NULL ) { // Check if DC-aware
239271 message = " No hosts available for the control connection using the " \
@@ -256,25 +288,38 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
256288 default_policy,
257289 policies,
258290 settings_));
259- finish ();
291+
292+ // Clear any connection errors and set the final negotiated protocol version.
293+ error_code_ = CLUSTER_OK;
294+ error_message_.clear ();
295+ protocol_version_ = connector->protocol_version ();
296+
297+ // The cluster is initialized so the rest of the connectors can be canceled.
298+ for (ConnectorMap::iterator it = connectors_.begin (),
299+ end = connectors_.end (); it != end; ++it) {
300+ if (it->first != connector->address ()) { // Not the current connector.
301+ it->second ->cancel ();
302+ }
303+ }
304+
305+ maybe_finish ();
260306 } else if (connector->is_invalid_protocol ()) {
261- if (!protocol_version_.attempt_lower_supported (contact_points_resolved_it_->to_string ())) {
262- on_error (CLUSTER_ERROR_INVALID_PROTOCOL, " Unable to find supported protocol version" );
307+ ProtocolVersion lower_version (connector->protocol_version ());
308+ if (!lower_version.attempt_lower_supported (connector->address ().to_string ())) {
309+ on_error (CLUSTER_ERROR_INVALID_PROTOCOL,
310+ " Unable to find supported protocol version" );
263311 return ;
264312 }
265- internal_connect ();
313+ internal_connect (connector-> address (), lower_version );
266314 } else if (connector->is_ssl_error ()) {
315+ ssl_error_code_ = connector->ssl_error_code ();
267316 on_error (CLUSTER_ERROR_SSL_ERROR, connector->error_message ());
268317 } else if (connector->is_auth_error ()) {
269318 on_error (CLUSTER_ERROR_AUTH_ERROR, connector->error_message ());
270319 } else {
271- LOG_ERROR (" Unable to establish a control connection to host %s because of the following error: %s" ,
272- connector->address ().to_string ().c_str (),
273- connector->error_message ().c_str ());
274-
275- // Possibly a recoverable error or timeout try the next host
276- ++contact_points_resolved_it_; // Move to the next contact point
277- internal_connect ();
320+ assert (!connector->is_canceled () &&
321+ " The control connector should have an error and not be canceled" );
322+ on_error (CLUSTER_ERROR_NO_HOSTS_AVAILABLE, connector->error_message ());
278323 }
279324}
280325
0 commit comments