5555import com .orientechnologies .orient .server .distributed .ODistributedServerLog ;
5656import com .orientechnologies .orient .server .distributed .ODistributedServerLog .DIRECTION ;
5757import com .orientechnologies .orient .server .distributed .ODistributedThreadLocal ;
58+ import com .orientechnologies .orient .server .distributed .OReplicationConfig ;
5859import com .orientechnologies .orient .server .distributed .OServerOfflineException ;
5960import com .orientechnologies .orient .server .distributed .OStorageSynchronizer ;
6061import com .orientechnologies .orient .server .distributed .conflict .OReplicationConflictResolver ;
7273public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener , EntryListener <String , Object > {
7374 protected static final String DISTRIBUTED_EXECUTOR_NAME = "OHazelcastPlugin::Executor" ;
7475 protected static final int SEND_RETRY_MAX = 100 ;
75-
76+
7677 protected int nodeNumber ;
7778 protected String localNodeId ;
7879 protected String configFile = "hazelcast.xml" ;
@@ -261,23 +262,40 @@ public void onFailure(Throwable t) {
261262 throw new ODistributedException ("Cannot complete the operation because the cluster is offline" );
262263 }
263264
264- public Object execute (final String iClusterName , final Object iKey , final OAbstractRemoteTask <? extends Object > iTask )
265- throws ExecutionException {
265+ public Object execute (final String iClusterName , final Object iKey , final OAbstractRemoteTask <? extends Object > iTask ,
266+ OReplicationConfig replicationData ) throws ExecutionException {
266267
267268 final String dbName = iTask .getDatabaseName ();
268269
269- // ASSIGN DESTINATION NODE
270- String masterNodeId = getMasterNode (dbName , iClusterName , iKey );
271- iTask .setNodeDestination (masterNodeId );
272-
273- masterNodeId = waitUntilMasterNodeIsOnline (iClusterName , iKey , dbName , masterNodeId );
270+ String masterNodeId = null ;
274271
275272 try {
276- if (isLocalNodeMaster (iKey ))
277- return executeLocallyAndPropagate ((OAbstractReplicatedTask <? extends Object >) iTask );
278- else
279- return executeRemotelyAndApplyLocally (iClusterName , iKey , iTask , dbName );
273+ if (replicationData == null ) {
274+ // NO REPLICATION: LOCAL ONLY
275+ ODistributedThreadLocal .INSTANCE .set (iTask .getNodeSource ());
276+ try {
277+ // EXECUTE IT LOCALLY
278+ return ((OAbstractReplicatedTask <? extends Object >) iTask ).executeOnLocalNode ();
279+ } finally {
280+ // SET LAST EXECUTION SERIAL
281+ ODistributedThreadLocal .INSTANCE .set (null );
282+ }
280283
284+ } else {
285+ if (replicationData != null ) {
286+ // SET THE DESTINATION NODE
287+ iTask .setNodeDestination (replicationData .masterNode );
288+ replicationData .masterNode = waitUntilMasterNodeIsOnline (iClusterName , iKey , dbName , replicationData .masterNode );
289+ masterNodeId = replicationData .masterNode ;
290+ }
291+
292+ if (getLocalNodeId ().equals (replicationData .masterNode ))
293+ // LOCAL + PROPAGATE
294+ return executeLocallyAndPropagate ((OAbstractReplicatedTask <? extends Object >) iTask );
295+ else
296+ // REMOTE + LOCAL
297+ return executeRemotelyAndApplyLocally (iClusterName , iKey , iTask , dbName , replicationData );
298+ }
281299 } catch (InterruptedException e ) {
282300 Thread .interrupted ();
283301
@@ -293,8 +311,8 @@ public Object execute(final String iClusterName, final Object iKey, final OAbstr
293311
294312 @ SuppressWarnings ("unchecked" )
295313 protected Object executeRemotelyAndApplyLocally (final String iClusterName , final Object iKey ,
296- final OAbstractRemoteTask <? extends Object > iTask , final String dbName ) throws InterruptedException , Exception ,
297- ExecutionException {
314+ final OAbstractRemoteTask <? extends Object > iTask , final String dbName , final OReplicationConfig iReplicationData )
315+ throws InterruptedException , Exception , ExecutionException {
298316
299317 // RETRY UNTIL SUCCEED
300318 for (int retry = 0 ; retry < SEND_RETRY_MAX ; ++retry ) {
@@ -340,7 +358,7 @@ protected Object executeRemotelyAndApplyLocally(final String iClusterName, final
340358 "error on execution of operation in %s mode, because node left. Re-route it in transparent way" , e ,
341359 EXECUTION_MODE .SYNCHRONOUS );
342360
343- return execute (iClusterName , iKey , iTask );
361+ return execute (iClusterName , iKey , iTask , iReplicationData );
344362
345363 } catch (ExecutionException e ) {
346364 if (e .getCause () instanceof OServerOfflineException ) {
@@ -419,15 +437,26 @@ public boolean isLocalNodeMaster(final Object iKey) {
419437 final Member partitionOwner = hazelcastInstance .getPartitionService ().getPartition (iKey ).getOwner ();
420438 final boolean local = partitionOwner .equals (hazelcastInstance .getCluster ().getLocalMember ());
421439
422- // ODistributedServerLog.debug(this, getLocalNodeId(), null, DIRECTION.NONE,
423- // "network partition: check for local master: key '%s' is assigned to %s (local=%s)", iKey, getNodeId(partitionOwner), local);
440+ ODistributedServerLog .debug (this , getLocalNodeId (), null , DIRECTION .NONE ,
441+ "network partition: check for local master: key '%s' is assigned to %s (local=%s)" , iKey , getNodeId (partitionOwner ), local );
424442
425443 return local ;
426444 }
427445
428- public String getMasterNode (final String iDatabaseName , final String iClusterName , final Object iKey ) {
429- String masterNode = getDatabaseClusterConfiguration (iDatabaseName , iClusterName ).field ("master" );
430- if (masterNode == null ) {
446+ /**
447+ * Returns the replicaiton data, or null if replication is not active.
448+ */
449+ public OReplicationConfig getReplicationData (final String iDatabaseName , final String iClusterName , final Object iKey ) {
450+
451+ final ODocument cfg = getDatabaseClusterConfiguration (iDatabaseName , iClusterName );
452+ final Boolean active = cfg .field ("synchronization" );
453+ if (active == null || !active )
454+ // NOT ACTIVE, RETURN
455+ return null ;
456+
457+ final OReplicationConfig data = new OReplicationConfig ();
458+ data .masterNode = cfg .field ("master" );
459+ if (data .masterNode == null ) {
431460 ODistributedServerLog
432461 .warn (
433462 this ,
@@ -436,15 +465,21 @@ public String getMasterNode(final String iDatabaseName, final String iClusterNam
436465 DIRECTION .NONE ,
437466 "network partition: found wrong configuration for database '%s': cannot find the 'master' field for the cluster '%s'. '$auto' will be used" ,
438467 iDatabaseName , iClusterName );
439- } else if (masterNode .equalsIgnoreCase ("$auto" ))
440- // AUTO, BY HAZELCAST PARTITION SERVICE
441- masterNode = getNodeId (hazelcastInstance .getPartitionService ().getPartition (iKey ).getOwner ());
468+ data .masterNode = MASTER_AUTO ;
469+ }
470+
471+ if (data .masterNode .startsWith ("$" ))
472+ // GET THE MASTER NODE BY USING THE STRATEGY FACTORY
473+ data .masterNode = getReplicationStrategy (data .masterNode ).getNode (this , iClusterName , iKey );
442474
443- final boolean local = masterNode .equals (getLocalNodeId ());
475+ if (data .masterNode == null )
476+ throw new ODistributedException ("Cannot find a master node for the key '" + iKey + "'" );
477+
478+ final boolean local = data .masterNode .equals (getLocalNodeId ());
444479 ODistributedServerLog .debug (this , getLocalNodeId (), "?" , DIRECTION .OUT ,
445- "network partition: get node master for key '%s' is assigned %s (local=%s)" , iKey , masterNode , local );
480+ "network partition: get node master for key '%s' is assigned %s (local=%s)" , iKey , data . masterNode , local );
446481
447- return masterNode ;
482+ return data ;
448483 }
449484
450485 @ Override
@@ -682,7 +717,7 @@ public String getLocalNodeAlias() {
682717 return getLocalNodeId ();
683718 }
684719
685- protected String getNodeId (final Member iMember ) {
720+ public String getNodeId (final Member iMember ) {
686721 return iMember .getInetSocketAddress ().toString ().substring (1 );
687722 }
688723
@@ -770,7 +805,7 @@ public int getNodeNumber() {
770805 return nodeNumber ;
771806 }
772807
773- protected HazelcastInstance getHazelcastInstance () {
808+ public HazelcastInstance getHazelcastInstance () {
774809 while (hazelcastInstance == null ) {
775810 // WAIT UNTIL THE INSTANCE IS READY
776811 try {
@@ -840,15 +875,16 @@ protected String waitUntilMasterNodeIsOnline(final String iClusterName, final Ob
840875 Thread .interrupted ();
841876 }
842877 // RE-READ THE KEY OWNER (IT COULD BE CHANGED DURING THE PAUSE)
843- final String newMasterNodeId = getMasterNode (dbName , iClusterName , iKey );
878+ final OReplicationConfig newReplicationConfig = getReplicationData (dbName , iClusterName , iKey );
844879
845- if (!newMasterNodeId .equals (masterNodeId )) {
880+ if (!newReplicationConfig . masterNode .equals (masterNodeId )) {
846881 ODistributedServerLog .warn (this , getLocalNodeId (), masterNodeId , DIRECTION .OUT ,
847882 "node %s is the new owner of the requested key set" , getRemoteNodeStatus (masterNodeId ));
848- masterNodeId = newMasterNodeId ;
883+ masterNodeId = newReplicationConfig . masterNode ;
849884 }
850885
851886 }
887+
852888 ODistributedServerLog .warn (this , getLocalNodeId (), masterNodeId , DIRECTION .OUT ,
853889 "node %s is aligned. Flushing pending operations..." );
854890 }
0 commit comments