Skip to content

Commit df65ac2

Browse files
committed
Replication: supported pluggable strategy through xml file
1 parent 4eedf45 commit df65ac2

16 files changed

Lines changed: 288 additions & 74 deletions

File tree

distributed/config/orientdb-dserver-config.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
<parameter name="alignment.startup" value="true" />
1212
<parameter name="alignment.timer" value="120000" />
1313
<parameter name="conflict.resolver.impl" value="com.orientechnologies.orient.server.distributed.conflict.ODefaultReplicationConflictResolver" />
14+
15+
<!-- REPLICATION STRATEGIES -->
16+
<parameter name="replication.strategy.auto"
17+
value="com.orientechnologies.orient.server.hazelcast.strategy.OAutoReplicationStrategy" />
1418
</parameters>
1519
</handler>
1620
<!-- AUTOMATIC BACKUP, TO TURN ON SET THE 'ENABLED' PARAMETER TO 'true' -->

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java

Lines changed: 67 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
5656
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
5757
import com.orientechnologies.orient.server.distributed.ODistributedThreadLocal;
58+
import com.orientechnologies.orient.server.distributed.OReplicationConfig;
5859
import com.orientechnologies.orient.server.distributed.OServerOfflineException;
5960
import com.orientechnologies.orient.server.distributed.OStorageSynchronizer;
6061
import com.orientechnologies.orient.server.distributed.conflict.OReplicationConflictResolver;
@@ -72,7 +73,7 @@
7273
public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener, EntryListener<String, Object> {
7374
protected static final String DISTRIBUTED_EXECUTOR_NAME = "OHazelcastPlugin::Executor";
7475
protected static final int SEND_RETRY_MAX = 100;
75-
76+
7677
protected int nodeNumber;
7778
protected String localNodeId;
7879
protected String configFile = "hazelcast.xml";
@@ -261,23 +262,40 @@ public void onFailure(Throwable t) {
261262
throw new ODistributedException("Cannot complete the operation because the cluster is offline");
262263
}
263264

264-
public Object execute(final String iClusterName, final Object iKey, final OAbstractRemoteTask<? extends Object> iTask)
265-
throws ExecutionException {
265+
public Object execute(final String iClusterName, final Object iKey, final OAbstractRemoteTask<? extends Object> iTask,
266+
OReplicationConfig replicationData) throws ExecutionException {
266267

267268
final String dbName = iTask.getDatabaseName();
268269

269-
// ASSIGN DESTINATION NODE
270-
String masterNodeId = getMasterNode(dbName, iClusterName, iKey);
271-
iTask.setNodeDestination(masterNodeId);
272-
273-
masterNodeId = waitUntilMasterNodeIsOnline(iClusterName, iKey, dbName, masterNodeId);
270+
String masterNodeId = null;
274271

275272
try {
276-
if (isLocalNodeMaster(iKey))
277-
return executeLocallyAndPropagate((OAbstractReplicatedTask<? extends Object>) iTask);
278-
else
279-
return executeRemotelyAndApplyLocally(iClusterName, iKey, iTask, dbName);
273+
if (replicationData == null) {
274+
// NO REPLICATION: LOCAL ONLY
275+
ODistributedThreadLocal.INSTANCE.set(iTask.getNodeSource());
276+
try {
277+
// EXECUTE IT LOCALLY
278+
return ((OAbstractReplicatedTask<? extends Object>) iTask).executeOnLocalNode();
279+
} finally {
280+
// SET LAST EXECUTION SERIAL
281+
ODistributedThreadLocal.INSTANCE.set(null);
282+
}
280283

284+
} else {
285+
if (replicationData != null) {
286+
// SET THE DESTINATION NODE
287+
iTask.setNodeDestination(replicationData.masterNode);
288+
replicationData.masterNode = waitUntilMasterNodeIsOnline(iClusterName, iKey, dbName, replicationData.masterNode);
289+
masterNodeId = replicationData.masterNode;
290+
}
291+
292+
if (getLocalNodeId().equals(replicationData.masterNode))
293+
// LOCAL + PROPAGATE
294+
return executeLocallyAndPropagate((OAbstractReplicatedTask<? extends Object>) iTask);
295+
else
296+
// REMOTE + LOCAL
297+
return executeRemotelyAndApplyLocally(iClusterName, iKey, iTask, dbName, replicationData);
298+
}
281299
} catch (InterruptedException e) {
282300
Thread.interrupted();
283301

@@ -293,8 +311,8 @@ public Object execute(final String iClusterName, final Object iKey, final OAbstr
293311

294312
@SuppressWarnings("unchecked")
295313
protected Object executeRemotelyAndApplyLocally(final String iClusterName, final Object iKey,
296-
final OAbstractRemoteTask<? extends Object> iTask, final String dbName) throws InterruptedException, Exception,
297-
ExecutionException {
314+
final OAbstractRemoteTask<? extends Object> iTask, final String dbName, final OReplicationConfig iReplicationData)
315+
throws InterruptedException, Exception, ExecutionException {
298316

299317
// RETRY UNTIL SUCCEED
300318
for (int retry = 0; retry < SEND_RETRY_MAX; ++retry) {
@@ -340,7 +358,7 @@ protected Object executeRemotelyAndApplyLocally(final String iClusterName, final
340358
"error on execution of operation in %s mode, because node left. Re-route it in transparent way", e,
341359
EXECUTION_MODE.SYNCHRONOUS);
342360

343-
return execute(iClusterName, iKey, iTask);
361+
return execute(iClusterName, iKey, iTask, iReplicationData);
344362

345363
} catch (ExecutionException e) {
346364
if (e.getCause() instanceof OServerOfflineException) {
@@ -419,15 +437,26 @@ public boolean isLocalNodeMaster(final Object iKey) {
419437
final Member partitionOwner = hazelcastInstance.getPartitionService().getPartition(iKey).getOwner();
420438
final boolean local = partitionOwner.equals(hazelcastInstance.getCluster().getLocalMember());
421439

422-
// ODistributedServerLog.debug(this, getLocalNodeId(), null, DIRECTION.NONE,
423-
// "network partition: check for local master: key '%s' is assigned to %s (local=%s)", iKey, getNodeId(partitionOwner), local);
440+
ODistributedServerLog.debug(this, getLocalNodeId(), null, DIRECTION.NONE,
441+
"network partition: check for local master: key '%s' is assigned to %s (local=%s)", iKey, getNodeId(partitionOwner), local);
424442

425443
return local;
426444
}
427445

428-
public String getMasterNode(final String iDatabaseName, final String iClusterName, final Object iKey) {
429-
String masterNode = getDatabaseClusterConfiguration(iDatabaseName, iClusterName).field("master");
430-
if (masterNode == null) {
446+
/**
447+
* Returns the replicaiton data, or null if replication is not active.
448+
*/
449+
public OReplicationConfig getReplicationData(final String iDatabaseName, final String iClusterName, final Object iKey) {
450+
451+
final ODocument cfg = getDatabaseClusterConfiguration(iDatabaseName, iClusterName);
452+
final Boolean active = cfg.field("synchronization");
453+
if (active == null || !active)
454+
// NOT ACTIVE, RETURN
455+
return null;
456+
457+
final OReplicationConfig data = new OReplicationConfig();
458+
data.masterNode = cfg.field("master");
459+
if (data.masterNode == null) {
431460
ODistributedServerLog
432461
.warn(
433462
this,
@@ -436,15 +465,21 @@ public String getMasterNode(final String iDatabaseName, final String iClusterNam
436465
DIRECTION.NONE,
437466
"network partition: found wrong configuration for database '%s': cannot find the 'master' field for the cluster '%s'. '$auto' will be used",
438467
iDatabaseName, iClusterName);
439-
} else if (masterNode.equalsIgnoreCase("$auto"))
440-
// AUTO, BY HAZELCAST PARTITION SERVICE
441-
masterNode = getNodeId(hazelcastInstance.getPartitionService().getPartition(iKey).getOwner());
468+
data.masterNode = MASTER_AUTO;
469+
}
470+
471+
if (data.masterNode.startsWith("$"))
472+
// GET THE MASTER NODE BY USING THE STRATEGY FACTORY
473+
data.masterNode = getReplicationStrategy(data.masterNode).getNode(this, iClusterName, iKey);
442474

443-
final boolean local = masterNode.equals(getLocalNodeId());
475+
if (data.masterNode == null)
476+
throw new ODistributedException("Cannot find a master node for the key '" + iKey + "'");
477+
478+
final boolean local = data.masterNode.equals(getLocalNodeId());
444479
ODistributedServerLog.debug(this, getLocalNodeId(), "?", DIRECTION.OUT,
445-
"network partition: get node master for key '%s' is assigned %s (local=%s)", iKey, masterNode, local);
480+
"network partition: get node master for key '%s' is assigned %s (local=%s)", iKey, data.masterNode, local);
446481

447-
return masterNode;
482+
return data;
448483
}
449484

450485
@Override
@@ -682,7 +717,7 @@ public String getLocalNodeAlias() {
682717
return getLocalNodeId();
683718
}
684719

685-
protected String getNodeId(final Member iMember) {
720+
public String getNodeId(final Member iMember) {
686721
return iMember.getInetSocketAddress().toString().substring(1);
687722
}
688723

@@ -770,7 +805,7 @@ public int getNodeNumber() {
770805
return nodeNumber;
771806
}
772807

773-
protected HazelcastInstance getHazelcastInstance() {
808+
public HazelcastInstance getHazelcastInstance() {
774809
while (hazelcastInstance == null) {
775810
// WAIT UNTIL THE INSTANCE IS READY
776811
try {
@@ -840,15 +875,16 @@ protected String waitUntilMasterNodeIsOnline(final String iClusterName, final Ob
840875
Thread.interrupted();
841876
}
842877
// RE-READ THE KEY OWNER (IT COULD BE CHANGED DURING THE PAUSE)
843-
final String newMasterNodeId = getMasterNode(dbName, iClusterName, iKey);
878+
final OReplicationConfig newReplicationConfig = getReplicationData(dbName, iClusterName, iKey);
844879

845-
if (!newMasterNodeId.equals(masterNodeId)) {
880+
if (!newReplicationConfig.masterNode.equals(masterNodeId)) {
846881
ODistributedServerLog.warn(this, getLocalNodeId(), masterNodeId, DIRECTION.OUT,
847882
"node %s is the new owner of the requested key set", getRemoteNodeStatus(masterNodeId));
848-
masterNodeId = newMasterNodeId;
883+
masterNodeId = newReplicationConfig.masterNode;
849884
}
850885

851886
}
887+
852888
ODistributedServerLog.warn(this, getLocalNodeId(), masterNodeId, DIRECTION.OUT,
853889
"node %s is aligned. Flushing pending operations...");
854890
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2010-2013 Luca Garulli (l.garulli--at--orientechnologies.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.orientechnologies.orient.server.hazelcast.strategy;
17+
18+
import com.hazelcast.core.Member;
19+
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
20+
import com.orientechnologies.orient.server.distributed.OReplicationStrategy;
21+
import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin;
22+
23+
/**
24+
* Interface that represents the replication strategy.
25+
*
26+
* @author luca
27+
*
28+
*/
29+
public class OAutoReplicationStrategy implements OReplicationStrategy {
30+
@Override
31+
public String getNode(final ODistributedServerManager iManager, final String iClusterName, final Object iKey) {
32+
final Member member = ((OHazelcastPlugin) iManager).getHazelcastInstance().getPartitionService().getPartition(iKey).getOwner();
33+
return ((OHazelcastPlugin) iManager).getNodeId(member);
34+
}
35+
}

distributed/src/test/java/com/orientechnologies/orient/server/distributed/ServerClusterInsertTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ public void executeTest() throws Exception {
7878
final ExecutorService writerExecutor = Executors.newCachedThreadPool();
7979
final ExecutorService readerExecutor = Executors.newCachedThreadPool();
8080

81+
int i = 0;
8182
for (ServerRun server : serverInstance) {
82-
Writer writer = new Writer(getDatabaseURL(server));
83+
Writer writer = new Writer(i++, getDatabaseURL(server));
8384
writerExecutor.submit(writer);
8485

8586
Reader reader = new Reader(getDatabaseURL(server));
@@ -120,8 +121,10 @@ protected String getDatabaseURL(final ServerRun server) {
120121

121122
class Writer implements Runnable {
122123
private final String databaseUrl;
124+
private int serverId;
123125

124-
public Writer(final String db) {
126+
public Writer(final int iServerId, final String db) {
127+
serverId = iServerId;
125128
databaseUrl = db;
126129
}
127130

@@ -134,11 +137,13 @@ public void run() {
134137
if (name == null)
135138
name = database.getURL();
136139

137-
if ((i + 1) % 10000 == 0)
140+
if ((i + 1) % 1 == 0)
138141
System.out.println("\nWriter " + name + " created " + (i + 1) + "/" + count + " records so far");
139142

140-
ODocument person = new ODocument("Person").fields("id", UUID.randomUUID().toString(), "name", "Billy" + i, "surname",
141-
"Mayes" + i, "birthday", new Date(), "children", i);
143+
final int uniqueId = count * serverId + i;
144+
145+
ODocument person = new ODocument("Person").fields("id", UUID.randomUUID().toString(), "name", "Billy" + uniqueId,
146+
"surname", "Mayes" + uniqueId, "birthday", new Date(), "children", uniqueId);
142147
database.save(person);
143148

144149
Thread.sleep(delayWriter);
@@ -192,14 +197,16 @@ private void printStats(final String databaseUrl) {
192197
System.out.println("\nReader " + name + " sql count: " + result.get(0) + " counting class: " + database.countClass("Person")
193198
+ " counting cluster: " + database.countClusterElements("Person"));
194199

195-
try {
196-
List<ODocument> conflicts = database.query(new OSQLSynchQuery<OIdentifiable>("select count(*) from ODistributedConflict"));
197-
long totalConflicts = (Long) conflicts.get(0).field("count");
198-
Assert.assertEquals(0l, totalConflicts);
199-
System.out.println("\nReader " + name + " conflicts: " + totalConflicts);
200-
} catch (OQueryParsingException e) {
201-
// IGNORE IT
202-
}
200+
if (database.getMetadata().getSchema().existsClass("ODistributedConflict"))
201+
try {
202+
List<ODocument> conflicts = database
203+
.query(new OSQLSynchQuery<OIdentifiable>("select count(*) from ODistributedConflict"));
204+
long totalConflicts = (Long) conflicts.get(0).field("count");
205+
Assert.assertEquals(0l, totalConflicts);
206+
System.out.println("\nReader " + name + " conflicts: " + totalConflicts);
207+
} catch (OQueryParsingException e) {
208+
// IGNORE IT
209+
}
203210

204211
} finally {
205212
database.close();

distributed/src/test/resources/hazelcast-0.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
the specific language governing permissions and ~ limitations under the License. -->
1010

1111
<hazelcast
12-
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-2.1.xsd"
12+
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.0.xsd"
1313
xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
1414
<group>
1515
<name>orientdb</name>
@@ -18,10 +18,14 @@
1818
<network>
1919
<port auto-increment="false">2434</port>
2020
<join>
21-
<multicast enabled="true">
21+
<multicast enabled="false">
2222
<multicast-group>224.2.2.3</multicast-group>
2323
<multicast-port>2434</multicast-port>
2424
</multicast>
25+
<tcp-ip enabled="true">
26+
<member>127.0.0.1:2435</member>
27+
<interface>127.0.0.1</interface>
28+
</tcp-ip>
2529
</join>
2630
</network>
2731
<executor-service>

distributed/src/test/resources/hazelcast-1.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
the specific language governing permissions and ~ limitations under the License. -->
1010

1111
<hazelcast
12-
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-2.1.xsd"
12+
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.0.xsd"
1313
xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
1414
<group>
1515
<name>orientdb</name>
@@ -18,10 +18,14 @@
1818
<network>
1919
<port auto-increment="false">2435</port>
2020
<join>
21-
<multicast enabled="true">
21+
<multicast enabled="false">
2222
<multicast-group>224.2.2.3</multicast-group>
2323
<multicast-port>2434</multicast-port>
2424
</multicast>
25+
<tcp-ip enabled="true">
26+
<member>127.0.0.1:2434</member>
27+
<interface>127.0.0.1</interface>
28+
</tcp-ip>
2529
</join>
2630
</network>
2731
<executor-service>

distributed/src/test/resources/orientdb-dserver-config-0.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
value="com.orientechnologies.orient.server.distributed.conflict.ODefaultReplicationConflictResolver" />
1515
<parameter name="configuration.db.default"
1616
value="src/test/resources/default-distributed-db-config.json" />
17+
18+
<parameter name="replication.strategy.auto"
19+
value="com.orientechnologies.orient.server.hazelcast.strategy.OAutoReplicationStrategy" />
1720
</parameters>
1821
</handler>
1922
<handler

distributed/src/test/resources/orientdb-dserver-config-1.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
value="com.orientechnologies.orient.server.distributed.conflict.ODefaultReplicationConflictResolver" />
1515
<parameter name="configuration.db.default"
1616
value="src/test/resources/default-distributed-db-config.json" />
17+
18+
<parameter name="replication.strategy.auto"
19+
value="com.orientechnologies.orient.server.hazelcast.strategy.OAutoReplicationStrategy" />
1720
</parameters>
1821
</handler>
1922
<handler

distributed/src/test/resources/orientdb-dserver-config-2.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
value="com.orientechnologies.orient.server.distributed.conflict.ODefaultReplicationConflictResolver" />
1515
<parameter name="configuration.db.default"
1616
value="src/test/resources/default-distributed-db-config.json" />
17+
18+
<parameter name="replication.strategy.auto"
19+
value="com.orientechnologies.orient.server.hazelcast.strategy.OAutoReplicationStrategy" />
1720
</parameters>
1821
</handler>
1922
<handler

graphdb/config/orientdb-dserver-config.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
<parameter name="alignment.startup" value="true" />
1515
<parameter name="alignment.timer" value="120000" />
1616
<parameter name="conflict.resolver.impl" value="com.orientechnologies.orient.server.distributed.conflict.ODefaultReplicationConflictResolver" />
17+
18+
<!-- REPLICATION STRATEGIES -->
19+
<parameter name="replication.strategy.auto"
20+
value="com.orientechnologies.orient.server.hazelcast.strategy.OAutoReplicationStrategy" />
1721
</parameters>
1822
</handler>
1923
<!-- AUTOMATIC BACKUP, TO TURN ON SET THE 'ENABLED' PARAMETER TO 'true' -->

0 commit comments

Comments
 (0)