Skip to content

Commit 4cd6c82

Browse files
author
Michael Penick
committed
CPP-510/CPP-183 - Try all contact points until one succeeds
* Also, connects to all contact points concurrently and cancels the others after the first successful connection. * Fixed error message format (added a ":") in control connector. * Start the connection settings with a default authentication provider.
1 parent 61c687d commit 4cd6c82

File tree

6 files changed

+106
-43
lines changed

6 files changed

+106
-43
lines changed

cpp-driver/src/cluster_connector.cpp

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,16 @@ class RunCancelCluster : public Task {
5757
ClusterConnector::ClusterConnector(const ContactPointList& contact_points,
5858
ProtocolVersion protocol_version,
5959
const Callback& callback)
60-
: contact_points_(contact_points)
60+
: remaining_connector_count_(0)
61+
, contact_points_(contact_points)
6162
, protocol_version_(protocol_version)
6263
, listener_(NULL)
6364
, event_loop_(NULL)
6465
, random_(NULL)
6566
, metrics_(NULL)
6667
, callback_(callback)
67-
, error_code_(CLUSTER_OK) { }
68+
, error_code_(CLUSTER_OK)
69+
, ssl_error_code_(CASS_OK) { }
6870

6971
void ClusterConnector::connect(EventLoop* event_loop) {
7072
event_loop_ = event_loop;
@@ -113,29 +115,42 @@ void ClusterConnector::internal_resolve_and_connect() {
113115
}
114116

115117
if (!resolver_) {
116-
contact_points_resolved_it_ = contact_points_resolved_.begin();
117-
internal_connect();
118+
internal_connect_all();
118119
}
119120
}
120121

121-
void ClusterConnector::internal_connect() {
122-
if (contact_points_resolved_it_ == contact_points_resolved_.end()) {
123-
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE, "Unable to connect to any contact points");
124-
return;
125-
}
126-
connector_.reset(Memory::allocate<ControlConnector>(*contact_points_resolved_it_,
127-
protocol_version_,
128-
bind_callback(&ClusterConnector::on_connect, this)));
129-
connector_
122+
void ClusterConnector::internal_connect(const Address& address, ProtocolVersion version) {
123+
ControlConnector::Ptr connector(Memory::allocate<ControlConnector>(address,
124+
version,
125+
bind_callback(&ClusterConnector::on_connect, this)));
126+
connectors_[address] = connector; // Keep track of the connectors so they can be canceled.
127+
connector
130128
->with_metrics(metrics_)
131129
->with_settings(settings_.control_connection_settings)
132130
->connect(event_loop_->loop());
133131
}
134132

133+
void ClusterConnector::internal_connect_all() {
134+
if (contact_points_resolved_.empty()) {
135+
error_code_ = CLUSTER_ERROR_NO_HOSTS_AVAILABLE;
136+
error_message_ = "Unable to connect to any contact points";
137+
finish();
138+
return;
139+
}
140+
remaining_connector_count_ = contact_points_resolved_.size();
141+
for (AddressVec::const_iterator it = contact_points_resolved_.begin(),
142+
end = contact_points_resolved_.end(); it != end; ++it) {
143+
internal_connect(*it, protocol_version_);
144+
}
145+
}
146+
135147
void ClusterConnector::internal_cancel() {
136148
error_code_ = CLUSTER_CANCELED;
137149
if (resolver_) resolver_->cancel();
138-
if (connector_) connector_->cancel();
150+
for (ConnectorMap::iterator it = connectors_.begin(),
151+
end = connectors_.end(); it != end; ++it) {
152+
it->second->cancel();
153+
}
139154
if (cluster_) cluster_->close();
140155
}
141156

@@ -144,19 +159,23 @@ void ClusterConnector::finish() {
144159
if (cluster_) cluster_->close();
145160
// Explicitly release resources on the event loop thread.
146161
resolver_.reset();
147-
connector_.reset();
162+
connectors_.clear();
148163
cluster_.reset();
149164
dec_ref();
150165
}
151166

167+
void ClusterConnector::maybe_finish() {
168+
if (remaining_connector_count_ > 0 && --remaining_connector_count_ == 0) {
169+
finish();
170+
}
171+
}
172+
152173
void ClusterConnector::on_error(ClusterConnector::ClusterError code,
153174
const String& message) {
154175
assert(code != CLUSTER_OK && "Notified error without an error");
155-
if (error_code_ == CLUSTER_OK) { // Only perform this once
156-
error_message_ = message;
157-
error_code_ = code;
158-
finish();
159-
}
176+
error_message_ = message;
177+
error_code_ = code;
178+
maybe_finish();
160179
}
161180

162181
void ClusterConnector::on_resolve(MultiResolver* resolver) {
@@ -189,16 +208,24 @@ void ClusterConnector::on_resolve(MultiResolver* resolver) {
189208
}
190209
}
191210

192-
contact_points_resolved_it_ = contact_points_resolved_.begin();
193-
internal_connect();
211+
internal_connect_all();
194212
}
195213

196214
void ClusterConnector::on_connect(ControlConnector* connector) {
197-
if (is_canceled()) {
198-
finish();
215+
if (!connector->is_ok() && !connector->is_canceled()) {
216+
LOG_ERROR("Unable to establish a control connection to host %s because of the following error: %s",
217+
connector->address().to_string().c_str(),
218+
connector->error_message().c_str());
219+
}
220+
221+
// If the cluster object is successfully initialized or if the connector is
222+
// canceled then attempt to finish the connection process.
223+
if (cluster_ || is_canceled()) {
224+
maybe_finish();
199225
return;
200226
}
201227

228+
// Otherwise, initialize the cluster and handle errors.
202229
if (connector->is_ok()) {
203230
const HostMap& hosts(connector->hosts());
204231
LoadBalancingPolicy::Vec policies;
@@ -209,9 +236,9 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
209236
// metadata is corrupt or missing and the control connection process
210237
// would've probably failed before this happens.
211238
LOG_ERROR("Current control connection host %s not found in hosts metadata",
212-
connector_->address().to_string().c_str());
213-
++contact_points_resolved_it_; // Move to the next contact point
214-
internal_connect();
239+
connector->address().to_string().c_str());
240+
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE,
241+
"Control connection host is not found in hosts metadata");
215242
return;
216243
}
217244
Host::Ptr connected_host(host_it->second);
@@ -234,6 +261,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
234261

235262
ScopedPtr<QueryPlan> query_plan(default_policy->new_query_plan("", NULL, NULL));
236263
if (!query_plan->compute_next()) { // No hosts in the query plan
264+
LOG_ERROR("Current control connection host %s has no hosts available in "
265+
"it's query plan for the configured load balancing policy. If "
266+
"using DC-aware check to see if the local datacenter is valid.",
267+
connector->address().to_string().c_str());
268+
237269
const char* message;
238270
if (dynamic_cast<DCAwarePolicy::DCAwareQueryPlan*>(query_plan.get()) != NULL) { // Check if DC-aware
239271
message = "No hosts available for the control connection using the " \
@@ -256,25 +288,38 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
256288
default_policy,
257289
policies,
258290
settings_));
259-
finish();
291+
292+
// Clear any connection errors and set the final negotiated protocol version.
293+
error_code_ = CLUSTER_OK;
294+
error_message_.clear();
295+
protocol_version_ = connector->protocol_version();
296+
297+
// The cluster is initialized so the rest of the connectors can be canceled.
298+
for (ConnectorMap::iterator it = connectors_.begin(),
299+
end = connectors_.end(); it != end; ++it) {
300+
if (it->first != connector->address()) { // Not the current connector.
301+
it->second->cancel();
302+
}
303+
}
304+
305+
maybe_finish();
260306
} else if (connector->is_invalid_protocol()) {
261-
if (!protocol_version_.attempt_lower_supported(contact_points_resolved_it_->to_string())) {
262-
on_error(CLUSTER_ERROR_INVALID_PROTOCOL, "Unable to find supported protocol version");
307+
ProtocolVersion lower_version(connector->protocol_version());
308+
if (!lower_version.attempt_lower_supported(connector->address().to_string())) {
309+
on_error(CLUSTER_ERROR_INVALID_PROTOCOL,
310+
"Unable to find supported protocol version");
263311
return;
264312
}
265-
internal_connect();
313+
internal_connect(connector->address(), lower_version);
266314
} else if(connector->is_ssl_error()) {
315+
ssl_error_code_ = connector->ssl_error_code();
267316
on_error(CLUSTER_ERROR_SSL_ERROR, connector->error_message());
268317
} else if(connector->is_auth_error()) {
269318
on_error(CLUSTER_ERROR_AUTH_ERROR, connector->error_message());
270319
} else {
271-
LOG_ERROR("Unable to establish a control connection to host %s because of the following error: %s",
272-
connector->address().to_string().c_str(),
273-
connector->error_message().c_str());
274-
275-
// Possibly a recoverable error or timeout try the next host
276-
++contact_points_resolved_it_; // Move to the next contact point
277-
internal_connect();
320+
assert(!connector->is_canceled() &&
321+
"The control connector should have an error and not be canceled");
322+
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE, connector->error_message());
278323
}
279324
}
280325

cpp-driver/src/cluster_connector.hpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,30 +137,41 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
137137

138138
ClusterError error_code() const { return error_code_; }
139139
const String& error_message() const { return error_message_; }
140-
CassError ssl_error_code() { return connector_->ssl_error_code(); }
140+
CassError ssl_error_code() { return ssl_error_code_; }
141141

142142
private:
143143
friend class RunResolveAndConnectCluster;
144144
friend class RunCancelCluster;
145145

146146
private:
147147
void internal_resolve_and_connect();
148-
void internal_connect();
148+
void internal_connect(const Address& address, ProtocolVersion version);
149+
void internal_connect_all();
149150
void internal_cancel();
150151

151152
void finish();
153+
void maybe_finish();
152154

153155
void on_error(ClusterError code, const String& message);
154156
void on_resolve(MultiResolver* resolver);
155157
void on_connect(ControlConnector* connector);
156158

159+
private:
160+
class ConnectorMap : public DenseHashMap<Address, ControlConnector::Ptr, AddressHash> {
161+
public:
162+
ConnectorMap() {
163+
set_empty_key(Address::EMPTY_KEY);
164+
set_deleted_key(Address::DELETED_KEY);
165+
}
166+
};
167+
157168
private:
158169
Cluster::Ptr cluster_;
159170
MultiResolver::Ptr resolver_;
160-
ControlConnector::Ptr connector_;
171+
ConnectorMap connectors_;
172+
size_t remaining_connector_count_;
161173
ContactPointList contact_points_;
162174
AddressVec contact_points_resolved_;
163-
AddressVec::const_iterator contact_points_resolved_it_;
164175
ProtocolVersion protocol_version_;
165176
ClusterListener* listener_;
166177
EventLoop* event_loop_;
@@ -172,6 +183,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
172183

173184
ClusterError error_code_;
174185
String error_message_;
186+
CassError ssl_error_code_;
175187
};
176188

177189
} // namespace cass

cpp-driver/src/connector.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ void StartupCallback::on_result_response(ResponseMessage* response) {
152152

153153
ConnectionSettings::ConnectionSettings()
154154
: connect_timeout_ms(CASS_DEFAULT_CONNECT_TIMEOUT_MS)
155+
, auth_provider(Memory::allocate<AuthProvider>())
155156
, idle_timeout_secs(CASS_DEFAULT_IDLE_TIMEOUT_SECS)
156157
, heartbeat_interval_secs(CASS_DEFAULT_HEARTBEAT_INTERVAL_SECS)
157158
, no_compact(CASS_DEFAULT_NO_COMPACT) { }

cpp-driver/src/connector.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class Connector
164164
uv_loop_t* loop() { return loop_; }
165165

166166
const Address& address() const { return socket_connector_->address(); }
167+
const ProtocolVersion protocol_version() const { return protocol_version_; }
167168

168169

169170
bool is_ok() const {

cpp-driver/src/control_connector.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class HostsConnectorRequestCallback : public ChainedRequestCallback {
3838

3939
virtual void on_chain_error(CassError code, const String& message) {
4040
connector_->on_error(ControlConnector::CONTROL_CONNECTION_ERROR_HOSTS,
41-
"Error running host queries on control connection " + message);
41+
"Error running host queries on control connection: " + message);
4242
}
4343

4444
virtual void on_chain_timeout() {

cpp-driver/src/control_connector.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ class ControlConnector : public RefCounted<ControlConnector>
201201
return connector_->address();
202202
}
203203

204+
const ProtocolVersion protocol_version() const {
205+
return connector_->protocol_version();
206+
}
207+
204208
bool is_ok() const { return error_code_ == CONTROL_CONNECTION_OK; }
205209
bool is_canceled() const { return error_code_ == CONTROL_CONNECTION_CANCELED; }
206210
bool is_invalid_protocol() const {

0 commit comments

Comments
 (0)