forked from apache/cassandra-cpp-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcluster.hpp
More file actions
454 lines (375 loc) · 13.5 KB
/
cluster.hpp
File metadata and controls
454 lines (375 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
/*
Copyright (c) DataStax, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_CLUSTER_HPP
#define DATASTAX_INTERNAL_CLUSTER_HPP
#include "config.hpp"
#include "control_connector.hpp"
#include "event_loop.hpp"
#include "external.hpp"
#include "metadata.hpp"
#include "monitor_reporting.hpp"
#include "prepare_host_handler.hpp"
#include "prepared.hpp"
#include <uv.h>
namespace datastax { namespace internal { namespace core {
class Cluster;
class LockedHostMap {
public:
typedef HostMap::iterator iterator;
typedef HostMap::const_iterator const_iterator;
LockedHostMap(const HostMap& hosts);
~LockedHostMap();
operator const HostMap&() const { return hosts_; }
const_iterator begin() const { return hosts_.begin(); }
const_iterator end() const { return hosts_.end(); }
const_iterator find(const Address& address) const;
Host::Ptr get(const Address& address) const;
void erase(const Address& address);
Host::Ptr& operator[](const Address& address);
LockedHostMap& operator=(const HostMap& hosts);
private:
mutable uv_mutex_t mutex_;
HostMap hosts_;
};
/**
* A listener that handles token map updates.
*/
class TokenMapListener {
public:
virtual ~TokenMapListener() {}
/**
* A callback that's called when the token map has changed. This happens as
* a result of the token map being rebuilt which can happen if keyspace metadata
* has changed or if node is added/removed from a cluster.
*
* @param token_map The updated token map.
*/
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) = 0;
};
/**
* A listener that handles cluster events.
*/
class ClusterListener
: public HostListener
, public TokenMapListener {
public:
typedef Vector<ClusterListener*> Vec;
virtual ~ClusterListener() {}
/**
* A callback that's called when the control connection receives an up event.
* It means that the host might be available to handle queries, but not
* necessarily.
*
* @param host A host that may be available.
*/
virtual void on_host_maybe_up(const Host::Ptr& host) {}
/**
* A callback that's called as the result of `Cluster::notify_host_up()`.
* It's *always* called for a valid (not ignored) host that's ready to
* receive queries. The ready state means the host has had any previously
* prepared queries setup on the newly available server. If the host was
* previously ready the callback is just called.
*
* @param host A host that's ready to receive queries.
*/
virtual void on_host_ready(const Host::Ptr& host) {}
/**
* A callback that's called when the cluster connects or reconnects to a host.
*
* Note: This is mostly for testing.
*
* @param cluster The cluster object.
*/
virtual void on_reconnect(Cluster* cluster) {}
/**
* A callback that's called when the cluster has closed.
*
* @param cluster The cluster object.
*/
virtual void on_close(Cluster* cluster) = 0;
};
/**
* A class for recording host and token map events so they can be replayed.
*/
struct ClusterEvent {
typedef Vector<ClusterEvent> Vec;
enum Type {
HOST_UP,
HOST_DOWN,
HOST_ADD,
HOST_REMOVE,
HOST_MAYBE_UP,
HOST_READY,
TOKEN_MAP_UPDATE
};
ClusterEvent(Type type, const Host::Ptr& host)
: type(type)
, host(host) {}
ClusterEvent(const TokenMap::Ptr& token_map)
: type(TOKEN_MAP_UPDATE)
, token_map(token_map) {}
static void process_event(const ClusterEvent& event, ClusterListener* listener);
static void process_events(const Vec& events, ClusterListener* listener);
Type type;
Host::Ptr host;
TokenMap::Ptr token_map;
};
/**
* Cluster settings.
*/
struct ClusterSettings {
/**
* Constructor. Initialize with default settings.
*/
ClusterSettings();
/**
* Constructor. Initialize with a config object.
*
* @param config The config object.
*/
ClusterSettings(const Config& config);
/**
* The settings for the underlying control connection.
*/
ControlConnectionSettings control_connection_settings;
/**
* The load balancing policy to use for reconnecting the control
* connection.
*/
LoadBalancingPolicy::Ptr load_balancing_policy;
/**
* Load balancing policies for all profiles.
*/
LoadBalancingPolicy::Vec load_balancing_policies;
/**
* The port to use for the contact points. This setting is spread to
* the other hosts using the contact point hosts.
*/
int port;
/**
* Reconnection policy to use when attempting to reconnect the control connection.
*/
ReconnectionPolicy::Ptr reconnection_policy;
/**
* If true then cached prepared statements are prepared when a host is brought
* up or is added.
*/
bool prepare_on_up_or_add_host;
/**
* Max number of requests to be written out to the socket per write system call.
*/
unsigned max_prepares_per_flush;
/**
* If true then events are disabled on startup. Events can be explicitly
* started by calling `Cluster::start_events()`.
*/
bool disable_events_on_startup;
/**
* A factory for creating cluster metadata resolvers. A cluster metadata resolver is used to
* determine contact points and retrieve other metadata required to connect the
* cluster.
*/
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory;
};
/**
* A cluster connection. This wraps and maintains a control connection to a
* cluster. If a host in the cluster fails then it re-establishes a new control
* connection to a different host. A cluster will never close without an
* explicit call to close because it repeatedly tries to re-establish its
* connection even if no hosts are available.
*/
class Cluster
: public RefCounted<Cluster>
, public ControlConnectionListener {
public:
typedef SharedRefPtr<Cluster> Ptr;
/**
* Constructor. Don't use directly.
*
* @param connection The current control connection.
* @param listener A listener to handle cluster events.
* @param event_loop The event loop.
* @param connected_host The currently connected host.
* @param hosts Available hosts for the cluster (based on load balancing
* policies).
* @param schema Current schema metadata.
* @param load_balancing_policy The default load balancing policy to use for
* determining the next control connection host.
* @param load_balancing_policies
* @param local_dc The local datacenter determined by the metadata service for initializing the
* load balancing policies.
* @param supported_options Supported options discovered during control connection.
* @param settings The control connection settings to use for reconnecting the
* control connection.
*/
Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
const ControlConnectionSchema& schema,
const LoadBalancingPolicy::Ptr& load_balancing_policy,
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
const StringMultimap& supported_options, const ClusterSettings& settings);
/**
* Set the listener that will handle events for the cluster
* (*NOT* thread-safe).
*
* @param listener The cluster listener.
*/
void set_listener(ClusterListener* listener = NULL);
/**
* Close the current connection and stop the re-connection process (thread-safe).
*/
void close();
/**
* Notify that a node has been determined to be available via an external
* source (thread-safe).
*
* @param address The address of the host that is now available.
*/
void notify_host_up(const Address& address);
/**
* Notify that a node has been determined to be down via an external source.
* DOWN events from the control connection are ignored so it is up to other
* sources to determine a host is unavailable (thread-safe).
*
* @param address That address of the host that is now unavailable.
*/
void notify_host_down(const Address& address);
/**
* Start host and token map events. Events that occurred during startup will be
* replayed (thread-safe).
*/
void start_events();
/**
* Start the client monitor events (thread-safe).
*
* @param client_id Client ID associated with the session.
* @param session_id Session ID associated with the session.
* @param config The config object.
*/
void start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config);
/**
* Get the latest snapshot of the schema metadata (thread-safe).
*
* @return A schema metadata snapshot.
*/
Metadata::SchemaSnapshot schema_snapshot();
/**
* Look up a host by address (thread-safe).
*
* @param address The address of the host.
* @return The host object for the specified address or a null object pointer
* if the host doesn't exist.
*/
Host::Ptr find_host(const Address& address) const;
/**
* Get a prepared metadata entry for a prepared ID (thread-safe).
*
* @param id A prepared ID
* @return The prepare metadata object for the specified ID or a null object
* pointer if the entry doesn't exist.
*/
PreparedMetadata::Entry::Ptr prepared(const String& id) const;
/**
* Set the prepared metadata for a given prepared ID (thread-safe).
*
* @param id A prepared ID.
* @param entry A prepared metadata entry.
*/
void prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry);
/**
* Get available hosts (determined by host distance). This filters out ignored
* hosts (*NOT* thread-safe).
*
* @return A mapping of available hosts.
*/
HostMap available_hosts() const;
public:
ProtocolVersion protocol_version() const { return connection_->protocol_version(); }
const Host::Ptr& connected_host() const { return connected_host_; }
const TokenMap::Ptr& token_map() const { return token_map_; }
const String& local_dc() const { return local_dc_; }
const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
const StringMultimap& supported_options() const { return supported_options_; }
private:
friend class ClusterRunClose;
friend class ClusterNotifyUp;
friend class ClusterNotifyDown;
friend class ClusterStartEvents;
friend class ClusterStartClientMonitor;
private:
void update_hosts(const HostMap& hosts);
void update_schema(const ControlConnectionSchema& schema);
void update_token_map(const HostMap& hosts, const String& partitioner,
const ControlConnectionSchema& schema);
bool is_host_ignored(const Host::Ptr& host) const;
void schedule_reconnect();
void on_schedule_reconnect(Timer* timer);
void handle_schedule_reconnect();
void on_reconnect(ControlConnector* connector);
private:
void internal_close();
void handle_close();
void internal_notify_host_up(const Address& address);
void notify_host_up_after_prepare(const Host::Ptr& host);
void internal_notify_host_down(const Address& address);
void internal_start_events();
void internal_start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config);
void on_monitor_reporting(Timer* timer);
void notify_host_add(const Host::Ptr& host);
void notify_host_add_after_prepare(const Host::Ptr& host);
void notify_host_remove(const Address& address);
private:
void notify_or_record(const ClusterEvent& event);
private:
bool prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback);
void on_prepare_host_add(const PrepareHostHandler* handler);
void on_prepare_host_up(const PrepareHostHandler* handler);
private:
// Control connection listener methods
virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
const String& keyspace_name, const String& target_name);
virtual void on_drop_schema(SchemaType type, const String& keyspace_name,
const String& target_name);
virtual void on_up(const Address& address);
virtual void on_down(const Address& address);
virtual void on_add(const Host::Ptr& host);
virtual void on_remove(const Address& address);
virtual void on_close(ControlConnection* connection);
private:
ControlConnection::Ptr connection_;
ControlConnector::Ptr reconnector_;
ClusterListener* listener_;
EventLoop* const event_loop_;
const LoadBalancingPolicy::Ptr load_balancing_policy_;
LoadBalancingPolicy::Vec load_balancing_policies_;
const ClusterSettings settings_;
ScopedPtr<QueryPlan> query_plan_;
bool is_closing_;
Host::Ptr connected_host_;
LockedHostMap hosts_;
Metadata metadata_;
PreparedMetadata prepared_metadata_;
TokenMap::Ptr token_map_;
String local_dc_;
StringMultimap supported_options_;
Timer timer_;
bool is_recording_events_;
ClusterEvent::Vec recorded_events_;
ScopedPtr<MonitorReporting> monitor_reporting_;
Timer monitor_reporting_timer_;
ScopedPtr<ReconnectionSchedule> reconnection_schedule_;
};
}}} // namespace datastax::internal::core
#endif