Skip to content

Commit 83eb682

Browse files
committed
Refactoring of AutoSharded storage interface
1 parent 34908be commit 83eb682

7 files changed

Lines changed: 68 additions & 39 deletions

File tree

core/src/main/java/com/orientechnologies/orient/core/sql/functions/OSQLFunctionAbstract.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package com.orientechnologies.orient.core.sql.functions;
1717

18-
import java.util.List;
19-
2018
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
2119
import com.orientechnologies.orient.core.storage.OAutoshardedStorage;
2220

21+
import java.util.List;
22+
2323
/**
2424
* Abstract class to extend to build Custom SQL Functions. Extend it and register it with:
2525
* <code>OSQLParser.getInstance().registerStatelessFunction()</code> or
@@ -96,7 +96,7 @@ protected boolean returnDistributedResult() {
9696
return ODatabaseRecordThreadLocal.INSTANCE.get().getStorage() instanceof OAutoshardedStorage;
9797
}
9898

99-
protected long getDistributedStorageId() {
99+
protected String getDistributedStorageId() {
100100
return ((OAutoshardedStorage) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage()).getStorageId();
101101
}
102102
}

core/src/main/java/com/orientechnologies/orient/core/storage/OAutoshardedStorage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
* @author edegtyarenko
2222
* @since 15.10.12 10:27
2323
*/
24-
public interface OAutoshardedStorage extends OStorage {
24+
public interface OAutoshardedStorage {
2525

2626
/**
27-
* Storage unique id
27+
* Storage unique id, made by node name + database name
2828
*
2929
* @return storage unique id
3030
*/
31-
long getStorageId();
31+
String getStorageId();
3232
}

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,15 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final
130130
}
131131

132132
final int queueSize = iNodes.size();
133+
final boolean groupByResponse;
133134
int expectedSynchronousResponses = quorum > 0 ? Math.min(quorum, availableNodes) : 1;
134-
if (iRequest.getTask().getResultStrategy() == OAbstractRemoteTask.RESULT_STRATEGY.UNION)
135+
136+
if (iRequest.getTask().getResultStrategy() == OAbstractRemoteTask.RESULT_STRATEGY.UNION) {
135137
expectedSynchronousResponses = availableNodes;
138+
groupByResponse = false;
139+
} else
140+
groupByResponse = true;
136141

137-
final boolean groupByResponse = iRequest.getTask().getResultStrategy() != OAbstractRemoteTask.RESULT_STRATEGY.MERGE;
138142
final boolean waitLocalNode = waitForLocalNode(cfg, iClusterNames, iNodes);
139143

140144
// CREATE THE RESPONSE MANAGER

distributed/src/test/java/com/orientechnologies/orient/server/distributed/TestSharding.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ protected void executeTest() throws Exception {
5454
.assertEquals("Error on assigning cluster client_" + i, clId, graph.getRawGraph().getClusterIdByName("client_" + i));
5555

5656
vertices[i].setProperty("name", "shard_" + i);
57+
vertices[i].setProperty("amount", i * 10000);
5758
}
5859
} finally {
5960
graph.shutdown();
@@ -97,6 +98,24 @@ protected void executeTest() throws Exception {
9798
g.shutdown();
9899
}
99100
}
101+
102+
// TEST DISTRIBUTED QUERY AGAINST ALL 3 DATABASES TO TEST AGGREGATION
103+
for (int server = 0; server < vertices.length; ++server) {
104+
OrientGraphFactory f = new OrientGraphFactory("plocal:target/server" + server + "/databases/" + getDatabaseName());
105+
OrientGraphNoTx g = f.getNoTx();
106+
try {
107+
108+
Iterable<OrientVertex> result = g.command(new OCommandSQL("select max(amount), avg(amount), sum(amount) from Client"))
109+
.execute();
110+
int count = 0;
111+
for (OrientVertex v : result)
112+
count++;
113+
114+
Assert.assertEquals("Returned wrong vertices count on server " + server, 3, count);
115+
} finally {
116+
g.shutdown();
117+
}
118+
}
100119
} catch (Exception e) {
101120
e.printStackTrace();
102121

server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponseManager.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -397,25 +397,6 @@ public ODistributedResponse getFinalResponse() {
397397
// DEFAULT: RETURN BEST ANSWER
398398
break;
399399

400-
case MERGE: {
401-
// MERGE THE RESULT IN A UNIQUE OBJECT OR COLLECTION
402-
Object result = null;
403-
for (Object response : responses.values()) {
404-
final Object responsePayload = response instanceof ODistributedResponse ? ((ODistributedResponse) response).getPayload()
405-
: response;
406-
407-
if (result == null)
408-
result = responsePayload;
409-
else
410-
result = OMultiValue.add(result, responsePayload);
411-
}
412-
413-
final ODistributedResponse response = (ODistributedResponse) responses.values().iterator().next();
414-
response.setExecutorNodeName(responses.keySet().toString());
415-
response.setPayload(result);
416-
return response;
417-
}
418-
419400
case UNION: {
420401
// COLLECT ALL THE RESPONSE IN A MAP OF <NODE, RESULT>
421402
final Map<String, Object> payloads = new HashMap<String, Object>();

server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,8 @@
4444
import com.orientechnologies.orient.core.id.ORecordId;
4545
import com.orientechnologies.orient.core.record.ORecordInternal;
4646
import com.orientechnologies.orient.core.sql.OCommandExecutorSQLDelegate;
47-
import com.orientechnologies.orient.core.storage.OCluster;
48-
import com.orientechnologies.orient.core.storage.ODataSegment;
49-
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
50-
import com.orientechnologies.orient.core.storage.ORawBuffer;
51-
import com.orientechnologies.orient.core.storage.ORecordCallback;
52-
import com.orientechnologies.orient.core.storage.ORecordMetadata;
53-
import com.orientechnologies.orient.core.storage.OStorage;
54-
import com.orientechnologies.orient.core.storage.OStorageEmbedded;
55-
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
47+
import com.orientechnologies.orient.core.sql.OCommandExecutorSQLSelect;
48+
import com.orientechnologies.orient.core.storage.*;
5649
import com.orientechnologies.orient.core.storage.impl.local.OFreezableStorage;
5750
import com.orientechnologies.orient.core.tx.OTransaction;
5851
import com.orientechnologies.orient.core.version.ORecordVersion;
@@ -70,6 +63,7 @@
7063
import java.io.IOException;
7164
import java.io.InputStream;
7265
import java.io.OutputStream;
66+
import java.util.ArrayList;
7367
import java.util.Collection;
7468
import java.util.HashSet;
7569
import java.util.Iterator;
@@ -86,7 +80,7 @@
8680
*
8781
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
8882
*/
89-
public class ODistributedStorage implements OStorage, OFreezableStorage {
83+
public class ODistributedStorage implements OStorage, OFreezableStorage, OAutoshardedStorage {
9084
protected final OServer serverInstance;
9185
protected final ODistributedServerManager dManager;
9286
protected final OStorageEmbedded wrapped;
@@ -186,7 +180,7 @@ public Object command(final OCommandRequestText iCommand) {
186180
result = dManager.sendRequest(getName(), involvedClusters, nodes, task, EXECUTION_MODE.RESPONSE);
187181
} else {
188182
// SHARDED, GET ONLY ONE NODE PER INVOLVED CLUSTER
189-
task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.MERGE);
183+
task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.UNION);
190184

191185
nodes = dbCfg.getOneServerPerCluster(involvedClusters, dManager.getLocalNodeName());
192186

@@ -196,6 +190,32 @@ public Object command(final OCommandRequestText iCommand) {
196190
return wrapped.command(iCommand);
197191

198192
result = dManager.sendRequest(getName(), involvedClusters, nodes, task, EXECUTION_MODE.RESPONSE);
193+
194+
if (result instanceof Map) {
195+
if (executor instanceof OCommandExecutorSQLDelegate
196+
&& ((OCommandExecutorSQLDelegate) executor).getDelegate() instanceof OCommandExecutorSQLSelect) {
197+
final OCommandExecutorSQLSelect cmd = (OCommandExecutorSQLSelect) ((OCommandExecutorSQLDelegate) executor)
198+
.getDelegate();
199+
200+
final Map<String, Object> proj = cmd.getProjections();
201+
// if (proj != null) {
202+
// final List<Object> list = new ArrayList<Object>();
203+
// result = list;
204+
// } else {
205+
if (((Map<String, Object>) result).size() == 1)
206+
result = ((Map<String, Object>) result).values().iterator().next();
207+
else {
208+
// MIX & FILTER RESULT SET AVOIDING DUPLICATES
209+
// TODO: FILTER PER CLUSTER WITH QUERY
210+
final Set<Object> set = new HashSet<Object>();
211+
for (Map.Entry<String, Object> entry : ((Map<String, Object>) result).entrySet()) {
212+
set.addAll((Collection<?>) entry.getValue());
213+
}
214+
result = new ArrayList<Object>(set);
215+
}
216+
// }
217+
}
218+
}
199219
}
200220

201221
if (result instanceof ONeedRetryException)
@@ -796,6 +816,11 @@ public String getClusterNameByRID(final ORecordId iRid) {
796816
return cluster != null ? cluster.getName() : "*";
797817
}
798818

819+
@Override
820+
public String getStorageId() {
821+
return dManager.getLocalNodeName() + "." + getName();
822+
}
823+
799824
protected void handleDistributedException(final String iMessage, final Exception e, final Object... iParams) {
800825
OLogManager.instance().error(this, iMessage, e, iParams);
801826
final Throwable t = e.getCause();

server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public abstract class OAbstractRemoteTask implements Externalizable {
3232
private static final long serialVersionUID = 1L;
3333

3434
public enum RESULT_STRATEGY {
35-
ANY, MERGE, UNION
35+
ANY, UNION
3636
}
3737

3838
public enum QUORUM_TYPE {

0 commit comments

Comments
 (0)