Skip to content

Commit 99ad005

Browse files
committed
Ported Distributed module to Hazelcast 3.0 SNAPSHOT
1 parent 9e27e96 commit 99ad005

5 files changed

Lines changed: 39 additions & 36 deletions

File tree

distributed/config/hazelcast.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR
55
CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. -->
66

7-
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-2.1.xsd" xmlns="http://www.hazelcast.com/schema/config"
7+
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.0.xsd" xmlns="http://www.hazelcast.com/schema/config"
88
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
99
<group>
1010
<name>orientdb</name>

distributed/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.hazelcast</groupId>
4040
<artifactId>hazelcast</artifactId>
41-
<version>2.1.2</version>
41+
<version>3.0-SNAPSHOT</version>
4242
</dependency>
4343
</dependencies>
4444

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPhysicalPosition.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package com.orientechnologies.orient.server.hazelcast;
1717

18-
import java.io.DataInput;
19-
import java.io.DataOutput;
2018
import java.io.IOException;
2119

22-
import com.hazelcast.nio.DataSerializable;
20+
import com.hazelcast.nio.ObjectDataInput;
21+
import com.hazelcast.nio.ObjectDataOutput;
22+
import com.hazelcast.nio.serialization.DataSerializable;
2323
import com.orientechnologies.orient.core.id.OClusterPositionFactory;
2424
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
2525

@@ -41,13 +41,13 @@ public OHazelcastPhysicalPosition(final OPhysicalPosition iFrom) {
4141
}
4242

4343
@Override
44-
public void readData(final DataInput in) throws IOException {
44+
public void readData(final ObjectDataInput in) throws IOException {
4545
clusterPosition = OClusterPositionFactory.INSTANCE.fromStream(in);
4646
recordVersion.getSerializer().readFrom(in, recordVersion);
4747
}
4848

4949
@Override
50-
public void writeData(final DataOutput out) throws IOException {
50+
public void writeData(final ObjectDataOutput out) throws IOException {
5151
out.write(clusterPosition.toStream());
5252
recordVersion.getSerializer().writeTo(out, recordVersion);
5353
}

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.locks.Lock;
3333

3434
import com.hazelcast.config.FileSystemXmlConfig;
35-
import com.hazelcast.core.DistributedTask;
3635
import com.hazelcast.core.EntryEvent;
3736
import com.hazelcast.core.EntryListener;
3837
import com.hazelcast.core.ExecutionCallback;
@@ -70,6 +69,7 @@
7069
*
7170
*/
7271
public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener, EntryListener<String, Object> {
72+
private static final String DISTRIBUTED_EXECUTOR_NAME = "OHazelcastPlugin::Executor";
7373
private static final int SEND_RETRY_MAX = 100;
7474
private int nodeNumber;
7575
private String localNodeId;
@@ -80,6 +80,7 @@ public class OHazelcastPlugin extends ODistributedAbstractPlugin implements Memb
8080
private volatile String status = "starting";
8181
private Map<String, Boolean> pendingAlignments = new HashMap<String, Boolean>();
8282
private TimerTask alignmentTask;
83+
private String membershipListenerRegistration;
8384

8485
private volatile HazelcastInstance hazelcastInstance;
8586

@@ -149,12 +150,14 @@ public void shutdown() {
149150
super.shutdown();
150151

151152
remoteClusterNodes.clear();
152-
hazelcastInstance.getCluster().removeMembershipListener(this);
153+
if (membershipListenerRegistration != null) {
154+
hazelcastInstance.getCluster().removeMembershipListener(membershipListenerRegistration);
155+
}
153156
}
154157

155158
@Override
156159
public long incrementDistributedSerial(final String iDatabaseName) {
157-
return hazelcastInstance.getAtomicNumber("db." + iDatabaseName).incrementAndGet();
160+
return hazelcastInstance.getAtomicLong("db." + iDatabaseName).incrementAndGet();
158161
}
159162

160163
@Override
@@ -200,30 +203,24 @@ public Object sendOperation2Node(final String iNodeId, final OAbstractDistribute
200203

201204
final Member clusterMember = member;
202205

203-
DistributedTask<Object> task = new DistributedTask<Object>((Callable<Object>) iTask, clusterMember);
204-
205206
ExecutionCallback<Object> callback = null;
206207
if (iTask.getMode() == EXECUTION_MODE.ASYNCHRONOUS)
207208
callback = new ExecutionCallback<Object>() {
208-
@SuppressWarnings("unused")
209209
@Override
210-
public void done(Future<Object> future) {
211-
try {
212-
if (!future.isCancelled()) {
213-
// CHECK FOR CONFLICTS
214-
Object result = future.get();
215-
}
216-
} catch (Exception e) {
217-
ODistributedServerLog.error(this, getLocalNodeId(), iNodeId, DIRECTION.OUT,
218-
"error on execution of operation in ASYNCH mode", e);
219-
}
210+
public void onResponse(Object result) {
211+
}
212+
213+
@Override
214+
public void onFailure(Throwable t) {
215+
ODistributedServerLog.error(this, getLocalNodeId(), iNodeId, DIRECTION.OUT,
216+
"error on execution of operation in ASYNCH mode", t);
220217
}
221218
};
222219

223220
for (int retry = 0; retry < SEND_RETRY_MAX; ++retry) {
224221
try {
225222

226-
Object result = executeOperation(task, iTask.getMode(), callback);
223+
Object result = executeOperation((Callable<Object>) iTask, clusterMember, iTask.getMode(), callback);
227224

228225
// OK
229226
return result;
@@ -242,8 +239,6 @@ public void done(Future<Object> future) {
242239
Thread.interrupted();
243240
}
244241

245-
task = new DistributedTask<Object>((Callable<Object>) iTask, clusterMember);
246-
247242
} else {
248243
ODistributedServerLog.error(this, getLocalNodeId(), iNodeId, DIRECTION.OUT, "error on execution of operation in %s mode",
249244
e, EXECUTION_MODE.SYNCHRONOUS);
@@ -314,8 +309,7 @@ public Object routeOperation2Node(final String iClusterName, final Object iKey,
314309
"execution operation %s against db '%s' remotely type=%s...", iTask.getName().toUpperCase(), dbName,
315310
iTask.getExecutionType());
316311

317-
final DistributedTask<Object> task = new DistributedTask<Object>((Callable<Object>) iTask, iKey);
318-
remoteResult = executeOperation(task, EXECUTION_MODE.SYNCHRONOUS, null);
312+
remoteResult = executeOperation((Callable<Object>) iTask, iKey, EXECUTION_MODE.SYNCHRONOUS, null);
319313
} else
320314
remoteResult = null;
321315

@@ -508,7 +502,7 @@ public void setStatus(final String iStatus) {
508502
}
509503

510504
public void registerAndAlignNodes() {
511-
hazelcastInstance.getCluster().addMembershipListener(this);
505+
membershipListenerRegistration = hazelcastInstance.getCluster().addMembershipListener(this);
512506

513507
// COLLECTS THE MEMBER LIST
514508
for (Member clusterMember : hazelcastInstance.getCluster().getMembers()) {
@@ -763,15 +757,24 @@ public Class<? extends OReplicationConflictResolver> getConfictResolverClass() {
763757
return confictResolverClass;
764758
}
765759

766-
protected Object executeOperation(final DistributedTask<Object> task, final EXECUTION_MODE iMode,
760+
protected Object executeOperation(final Callable<Object> task, final Object iKey, final EXECUTION_MODE iMode,
761+
final ExecutionCallback<Object> callback) throws ExecutionException, InterruptedException {
762+
Member member = hazelcastInstance.getPartitionService().getPartition(iKey).getOwner();
763+
return executeOperation(task, member, iMode, callback);
764+
}
765+
766+
protected Object executeOperation(final Callable<Object> task, Member member, final EXECUTION_MODE iMode,
767767
final ExecutionCallback<Object> callback) throws ExecutionException, InterruptedException {
768-
if (iMode == EXECUTION_MODE.ASYNCHRONOUS && callback != null)
769-
task.setExecutionCallback(callback);
770768

771-
hazelcastInstance.getExecutorService().execute(task);
769+
if (iMode == EXECUTION_MODE.ASYNCHRONOUS && callback != null) {
770+
hazelcastInstance.getExecutorService(DISTRIBUTED_EXECUTOR_NAME).submitToMember(task, member, callback);
771+
return null;
772+
}
773+
774+
Future<Object> future = hazelcastInstance.getExecutorService(DISTRIBUTED_EXECUTOR_NAME).submitToMember(task, member);
772775

773776
if (iMode == EXECUTION_MODE.SYNCHRONOUS)
774-
return task.get();
777+
return future.get();
775778

776779
return null;
777780
}

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/sharding/hazelcast/OHazelcastDHTNodeProxy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.concurrent.ExecutionException;
99
import java.util.concurrent.Future;
1010

11-
import com.hazelcast.core.DistributedTask;
1211
import com.hazelcast.core.HazelcastInstance;
1312
import com.hazelcast.core.Member;
1413
import com.orientechnologies.common.log.OLogManager;
@@ -27,6 +26,7 @@
2726
* @since 17.08.12
2827
*/
2928
public class OHazelcastDHTNodeProxy implements ODHTNode {
29+
private static final String DHT_EXECUTOR = "OHazelcastDHTNodeProxy::DHT";
3030
private final long nodeId;
3131
private final Member member;
3232
private final HazelcastInstance hazelcastInstance;
@@ -105,7 +105,7 @@ public boolean isLocal() {
105105

106106
private <T> T callOnRemoteMember(final NodeCall<T> call, boolean async) {
107107
try {
108-
Future<T> future = (Future<T>) hazelcastInstance.getExecutorService().submit(new DistributedTask<T>(call, member));
108+
Future<T> future = (Future<T>) hazelcastInstance.getExecutorService(DHT_EXECUTOR).submitToMember(call, member);
109109

110110
if (async)
111111
return null;

0 commit comments

Comments
 (0)