Skip to content

Commit 68ec447

Browse files
committed
Distirbuted: managed new "hotAlignment" setting. If it's "false" then the db is not shared unless "autoDeploy" is true
1 parent 6ceffd2 commit 68ec447

8 files changed

Lines changed: 112 additions & 22 deletions

File tree

distributed/config/default-distributed-db-config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"replication": true,
33
"autoDeploy": true,
4+
"hotAlignment": true,
45
"resyncEvery": 15,
56
"clusters": {
67
"internal": {

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,4 +536,45 @@ protected void checkLocalNodeInConfiguration() {
536536
}
537537
}
538538

539+
protected void removeNodeInConfiguration(final String iNode, final boolean iForce) {
540+
// LOAD DATABASE FILE IF ANY
541+
manager.loadDatabaseConfiguration(databaseName, manager.getDistributedConfigFile(databaseName));
542+
543+
final ODistributedConfiguration cfg = manager.getDatabaseConfiguration(databaseName);
544+
545+
if (!iForce && cfg.isHotAlignment())
546+
// DO NOTHING
547+
return;
548+
549+
boolean dirty = false;
550+
for (String clusterName : cfg.getClusterNames()) {
551+
final List<List<String>> partitions = cfg.getPartitions(clusterName);
552+
if (partitions != null) {
553+
for (int p = 0; p < partitions.size(); ++p) {
554+
final List<String> partition = partitions.get(p);
555+
556+
for (int n = 0; n < partition.size(); ++n) {
557+
final String node = partition.get(n);
558+
559+
if (node.equals(iNode)) {
560+
// FOUND: REMOVE IT
561+
ODistributedServerLog.info(this, manager.getLocalNodeName(), null, DIRECTION.NONE,
562+
"removing node '%s' in partition: %s.%s.%d", iNode, databaseName, clusterName, p);
563+
564+
partition.remove(n);
565+
dirty = true;
566+
break;
567+
}
568+
}
569+
}
570+
}
571+
}
572+
573+
if (dirty) {
574+
final ODocument doc = cfg.serialize();
575+
manager.getConfigurationMap().put(OHazelcastPlugin.CONFIG_DATABASE_PREFIX + databaseName, doc);
576+
manager.updateDatabaseConfiguration(databaseName, doc);
577+
}
578+
}
579+
539580
}

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedMessageService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,17 @@ protected <T> IQueue<T> getQueue(final String iQueueName) {
244244
}
245245
}
246246

247+
/**
248+
* Remove the queue.
249+
*/
250+
protected void removeQueue(final String iQueueName) {
251+
synchronized (queues) {
252+
queues.remove(iQueueName);
253+
IQueue<?> queue = manager.getHazelcastInstance().getQueue(iQueueName);
254+
queue.clear();
255+
}
256+
}
257+
247258
public void registerRequest(final long id, final ODistributedResponseManager currentResponseMgr) {
248259
responsesByRequestIds.put(id, currentResponseMgr);
249260
}

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ public void startup() {
161161
timeOffset = System.currentTimeMillis() - hazelcastInstance.getCluster().getClusterTime();
162162
cachedClusterNodes.put(getLocalNodeName(), hazelcastInstance.getCluster().getLocalMember());
163163

164+
membershipListenerRegistration = hazelcastInstance.getCluster().addMembershipListener(this);
165+
164166
OServer.registerServerInstance(getLocalNodeName(), serverInstance);
165167

166168
final IMap<String, Object> configurationMap = getConfigurationMap();
@@ -175,13 +177,13 @@ public void startup() {
175177

176178
messageService = new OHazelcastDistributedMessageService(this);
177179

180+
// PUBLISH LOCAL NODE CFG
181+
getConfigurationMap().put(CONFIG_NODE_PREFIX + getLocalNodeId(), getLocalNodeConfiguration());
182+
178183
loadDistributedDatabases();
179184

180185
installNewDatabases();
181186

182-
// REGISTER CURRENT MEMBERS
183-
setStatus(STATUS.ONLINE);
184-
185187
super.startup();
186188

187189
} catch (FileNotFoundException e) {
@@ -214,6 +216,8 @@ public void shutdown() {
214216
hazelcastInstance.getCluster().removeMembershipListener(membershipListenerRegistration);
215217
}
216218
setStatus(STATUS.OFFLINE);
219+
220+
getConfigurationMap().remove(CONFIG_NODE_PREFIX + getLocalNodeId());
217221
}
218222

219223
@Override
@@ -249,7 +253,6 @@ public ODocument getLocalNodeConfiguration() {
249253

250254
nodeCfg.field("id", getLocalNodeId());
251255
nodeCfg.field("name", getLocalNodeName());
252-
nodeCfg.field("status", getStatus());
253256
nodeCfg.field("startedOn", startedOn);
254257

255258
List<Map<String, Object>> listeners = new ArrayList<Map<String, Object>>();
@@ -287,7 +290,8 @@ public void setStatus(final STATUS iStatus) {
287290

288291
status = iStatus;
289292

290-
getConfigurationMap().put(CONFIG_NODE_PREFIX + getLocalNodeId(), getLocalNodeConfiguration());
293+
// DON'T PUT THE STATUS IN CFG ANYMORE
294+
// getConfigurationMap().put(CONFIG_NODE_PREFIX + getLocalNodeId(), getLocalNodeConfiguration());
291295

292296
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "updated node status to '%s'", status);
293297
}
@@ -421,7 +425,15 @@ public void memberRemoved(final MembershipEvent iEvent) {
421425
getNodeName(iEvent.getMember()));
422426

423427
final Member member = iEvent.getMember();
424-
cachedClusterNodes.remove(getNodeName(member));
428+
429+
final String nodeName = getNodeName(member);
430+
cachedClusterNodes.remove(nodeName);
431+
432+
for (String dbName : messageService.getDatabases()) {
433+
final OHazelcastDistributedDatabase db = messageService.getDatabase(dbName);
434+
db.removeNodeInConfiguration(nodeName, false);
435+
}
436+
425437
OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
426438
}
427439

@@ -465,10 +477,11 @@ public void entryUpdated(EntryEvent<String, Object> iEvent) {
465477
public void entryRemoved(EntryEvent<String, Object> iEvent) {
466478
final String key = iEvent.getKey();
467479
if (key.startsWith(CONFIG_NODE_PREFIX)) {
480+
final String nName = getNodeName(iEvent.getMember());
468481
ODistributedServerLog.info(this, getLocalNodeName(), null, DIRECTION.NONE, "removed node configuration id=%s name=%s",
469-
iEvent.getMember(), getNodeName(iEvent.getMember()));
482+
iEvent.getMember(), nName);
470483

471-
cachedClusterNodes.remove(getNodeName(iEvent.getMember()));
484+
cachedClusterNodes.remove(nName);
472485

473486
} else if (key.startsWith(CONFIG_DATABASE_PREFIX)) {
474487
synchronized (cachedDatabaseConfiguration) {
@@ -486,9 +499,8 @@ public boolean isNodeAvailable(final String iNodeName) {
486499
return cachedClusterNodes.containsKey(iNodeName);
487500
}
488501

489-
public boolean isOfflineNodeById(final String iNodeId) {
490-
final ODocument cfg = getNodeConfigurationById(iNodeId);
491-
return cfg == null || !cfg.field("status").equals(STATUS.ONLINE.toString());
502+
public boolean isOffline() {
503+
return status != STATUS.ONLINE;
492504
}
493505

494506
public void waitUntilOnline() throws InterruptedException {
@@ -573,16 +585,25 @@ protected void installNewDatabases() {
573585

574586
// LOCKING THIS RESOURCE PREVENT CONCURRENT INSTALL OF THE SAME DB
575587
synchronized (installDatabaseLock) {
576-
final Set<String> configuredDatabases = serverInstance.getAvailableStorageNames().keySet();
577-
578588
for (Entry<String, Object> entry : getConfigurationMap().entrySet()) {
579589
if (entry.getKey().startsWith(CONFIG_DATABASE_PREFIX)) {
580590
final String databaseName = entry.getKey().substring(CONFIG_DATABASE_PREFIX.length());
581591

582-
if (!configuredDatabases.contains(databaseName)) {
583-
final ODocument config = (ODocument) entry.getValue();
584-
final Boolean autoDeploy = config.field("autoDeploy");
585-
if (autoDeploy != null && autoDeploy) {
592+
final ODocument config = (ODocument) entry.getValue();
593+
final Boolean autoDeploy = config.field("autoDeploy");
594+
595+
if (autoDeploy != null && autoDeploy) {
596+
final Boolean hotAlignment = config.field("hotAlignment");
597+
final String dbPath = serverInstance.getDatabaseDirectory() + databaseName;
598+
599+
final Set<String> configuredDatabases = serverInstance.getAvailableStorageNames().keySet();
600+
if (configuredDatabases.contains(databaseName) && hotAlignment != null && !hotAlignment) {
601+
// DROP THE DATABASE ON CURRENT NODE
602+
final ODatabaseDocumentTx db = new ODatabaseDocumentTx("local:" + dbPath);
603+
db.drop();
604+
}
605+
606+
if (!configuredDatabases.contains(databaseName)) {
586607

587608
final OHazelcastDistributedDatabase distrDatabase = messageService.registerDatabase(databaseName);
588609

@@ -593,8 +614,6 @@ protected void installNewDatabases() {
593614
final Map<String, OBuffer> results = (Map<String, OBuffer>) sendRequest(databaseName, null,
594615
new ODeployDatabaseTask(), EXECUTION_MODE.RESPONSE);
595616

596-
final String dbPath = serverInstance.getDatabaseDirectory() + databaseName;
597-
598617
// EXTRACT THE REAL RESULT
599618
OBuffer result = null;
600619
for (Entry<String, OBuffer> r : results.entrySet())
@@ -637,6 +656,9 @@ protected void installNewDatabases() {
637656
}
638657
}
639658
}
659+
660+
// REGISTER CURRENT MEMBERS
661+
setStatus(STATUS.ONLINE);
640662
}
641663

642664
@Override

distributed/src/test/resources/default-distributed-db-config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"replication": true,
33
"autoDeploy": true,
4+
"hotAlignment": true,
45
"resyncEvery": 15,
56
"clusters": {
67
"internal": {

server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,20 @@ public boolean isReplicationActive(final String iClusterName) {
5151
}
5252
}
5353

54+
/**
55+
* Returns true if hot alignment is supported.
56+
*
57+
* @return
58+
*/
59+
public boolean isHotAlignment() {
60+
synchronized (configuration) {
61+
final Boolean value = configuration.field("hotAlignment");
62+
if (value != null)
63+
return value;
64+
return true;
65+
}
66+
}
67+
5468
/**
5569
* Returns the read quorum.
5670
*/

server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedServerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public enum STATUS {
4545

4646
public boolean isNodeAvailable(final String iNodeName);
4747

48-
public boolean isOfflineNodeById(String iNodeName);
48+
public boolean isOffline();
4949

5050
public String getLocalNodeId();
5151

server/src/main/java/com/orientechnologies/orient/server/network/protocol/ONetworkProtocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ public OServer getServer() {
5858
public void waitNodeIsOnline() throws OTimeoutException {
5959
// WAIT THE NODE IS ONLINE AGAIN
6060
final ODistributedServerManager mgr = server.getDistributedManager();
61-
if (mgr != null && mgr.isEnabled() && mgr.isOfflineNodeById(mgr.getLocalNodeId())) {
61+
if (mgr != null && mgr.isEnabled() && mgr.isOffline()) {
6262
for (int retry = 0; retry < MAX_RETRIES; ++retry) {
63-
if (mgr != null && mgr.isOfflineNodeById(mgr.getLocalNodeId())) {
63+
if (mgr != null && mgr.isOffline()) {
6464
// NODE NOT ONLINE YET, REFUSE THE CONNECTION
6565
OLogManager.instance().info(this, "Node is not online yet (status=%s), blocking the command until it's online %d/%d",
6666
mgr.getStatus(), retry + 1, MAX_RETRIES);

0 commit comments

Comments
 (0)