3232import java .util .concurrent .locks .Lock ;
3333
3434import com .hazelcast .config .FileSystemXmlConfig ;
35- import com .hazelcast .core .DistributedTask ;
3635import com .hazelcast .core .EntryEvent ;
3736import com .hazelcast .core .EntryListener ;
3837import com .hazelcast .core .ExecutionCallback ;
7069 *
7170 */
7271public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener , EntryListener <String , Object > {
72+ private static final String DISTRIBUTED_EXECUTOR_NAME = "OHazelcastPlugin::Executor" ;
7373 private static final int SEND_RETRY_MAX = 100 ;
7474 private int nodeNumber ;
7575 private String localNodeId ;
@@ -80,6 +80,7 @@ public class OHazelcastPlugin extends ODistributedAbstractPlugin implements Memb
8080 private volatile String status = "starting" ;
8181 private Map <String , Boolean > pendingAlignments = new HashMap <String , Boolean >();
8282 private TimerTask alignmentTask ;
83+ private String membershipListenerRegistration ;
8384
8485 private volatile HazelcastInstance hazelcastInstance ;
8586
@@ -149,12 +150,14 @@ public void shutdown() {
149150 super .shutdown ();
150151
151152 remoteClusterNodes .clear ();
152- hazelcastInstance .getCluster ().removeMembershipListener (this );
153+ if (membershipListenerRegistration != null ) {
154+ hazelcastInstance .getCluster ().removeMembershipListener (membershipListenerRegistration );
155+ }
153156 }
154157
155158 @ Override
156159 public long incrementDistributedSerial (final String iDatabaseName ) {
157- return hazelcastInstance .getAtomicNumber ("db." + iDatabaseName ).incrementAndGet ();
160+ return hazelcastInstance .getAtomicLong ("db." + iDatabaseName ).incrementAndGet ();
158161 }
159162
160163 @ Override
@@ -200,30 +203,24 @@ public Object sendOperation2Node(final String iNodeId, final OAbstractDistribute
200203
201204 final Member clusterMember = member ;
202205
203- DistributedTask <Object > task = new DistributedTask <Object >((Callable <Object >) iTask , clusterMember );
204-
205206 ExecutionCallback <Object > callback = null ;
206207 if (iTask .getMode () == EXECUTION_MODE .ASYNCHRONOUS )
207208 callback = new ExecutionCallback <Object >() {
208- @ SuppressWarnings ("unused" )
209209 @ Override
210- public void done (Future <Object > future ) {
211- try {
212- if (!future .isCancelled ()) {
213- // CHECK FOR CONFLICTS
214- Object result = future .get ();
215- }
216- } catch (Exception e ) {
217- ODistributedServerLog .error (this , getLocalNodeId (), iNodeId , DIRECTION .OUT ,
218- "error on execution of operation in ASYNCH mode" , e );
219- }
210+ public void onResponse (Object result ) {
211+ }
212+
213+ @ Override
214+ public void onFailure (Throwable t ) {
215+ ODistributedServerLog .error (this , getLocalNodeId (), iNodeId , DIRECTION .OUT ,
216+ "error on execution of operation in ASYNCH mode" , t );
220217 }
221218 };
222219
223220 for (int retry = 0 ; retry < SEND_RETRY_MAX ; ++retry ) {
224221 try {
225222
226- Object result = executeOperation (task , iTask .getMode (), callback );
223+ Object result = executeOperation (( Callable < Object >) iTask , clusterMember , iTask .getMode (), callback );
227224
228225 // OK
229226 return result ;
@@ -242,8 +239,6 @@ public void done(Future<Object> future) {
242239 Thread .interrupted ();
243240 }
244241
245- task = new DistributedTask <Object >((Callable <Object >) iTask , clusterMember );
246-
247242 } else {
248243 ODistributedServerLog .error (this , getLocalNodeId (), iNodeId , DIRECTION .OUT , "error on execution of operation in %s mode" ,
249244 e , EXECUTION_MODE .SYNCHRONOUS );
@@ -314,8 +309,7 @@ public Object routeOperation2Node(final String iClusterName, final Object iKey,
314309 "execution operation %s against db '%s' remotely type=%s..." , iTask .getName ().toUpperCase (), dbName ,
315310 iTask .getExecutionType ());
316311
317- final DistributedTask <Object > task = new DistributedTask <Object >((Callable <Object >) iTask , iKey );
318- remoteResult = executeOperation (task , EXECUTION_MODE .SYNCHRONOUS , null );
312+ remoteResult = executeOperation ((Callable <Object >) iTask , iKey , EXECUTION_MODE .SYNCHRONOUS , null );
319313 } else
320314 remoteResult = null ;
321315
@@ -508,7 +502,7 @@ public void setStatus(final String iStatus) {
508502 }
509503
510504 public void registerAndAlignNodes () {
511- hazelcastInstance .getCluster ().addMembershipListener (this );
505+ membershipListenerRegistration = hazelcastInstance .getCluster ().addMembershipListener (this );
512506
513507 // COLLECTS THE MEMBER LIST
514508 for (Member clusterMember : hazelcastInstance .getCluster ().getMembers ()) {
@@ -763,15 +757,24 @@ public Class<? extends OReplicationConflictResolver> getConfictResolverClass() {
763757 return confictResolverClass ;
764758 }
765759
766- protected Object executeOperation (final DistributedTask <Object > task , final EXECUTION_MODE iMode ,
760+ protected Object executeOperation (final Callable <Object > task , final Object iKey , final EXECUTION_MODE iMode ,
761+ final ExecutionCallback <Object > callback ) throws ExecutionException , InterruptedException {
762+ Member member = hazelcastInstance .getPartitionService ().getPartition (iKey ).getOwner ();
763+ return executeOperation (task , member , iMode , callback );
764+ }
765+
766+ protected Object executeOperation (final Callable <Object > task , Member member , final EXECUTION_MODE iMode ,
767767 final ExecutionCallback <Object > callback ) throws ExecutionException , InterruptedException {
768- if (iMode == EXECUTION_MODE .ASYNCHRONOUS && callback != null )
769- task .setExecutionCallback (callback );
770768
771- hazelcastInstance .getExecutorService ().execute (task );
769+ if (iMode == EXECUTION_MODE .ASYNCHRONOUS && callback != null ) {
770+ hazelcastInstance .getExecutorService (DISTRIBUTED_EXECUTOR_NAME ).submitToMember (task , member , callback );
771+ return null ;
772+ }
773+
774+ Future <Object > future = hazelcastInstance .getExecutorService (DISTRIBUTED_EXECUTOR_NAME ).submitToMember (task , member );
772775
773776 if (iMode == EXECUTION_MODE .SYNCHRONOUS )
774- return task .get ();
777+ return future .get ();
775778
776779 return null ;
777780 }
0 commit comments