Skip to content

Commit c0df24b

Browse files
committed
Supported sharded queries with the management of projections/aggregations
Now functions like max(), count(), etc. works distributed for a real map reduce.
1 parent 2c051a8 commit c0df24b

5 files changed

Lines changed: 189 additions & 94 deletions

File tree

core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLResultsetDelegate.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
*/
1616
package com.orientechnologies.orient.core.sql;
1717

18+
import com.orientechnologies.orient.core.db.record.OIdentifiable;
19+
1820
import java.util.Iterator;
1921
import java.util.Map;
20-
21-
import com.orientechnologies.orient.core.db.record.OIdentifiable;
22+
import java.util.Set;
2223

2324
/**
2425
* SQL UPDATE command.
@@ -30,6 +31,11 @@
3031
public class OCommandExecutorSQLResultsetDelegate extends OCommandExecutorSQLDelegate implements OIterableRecordSource,
3132
Iterable<OIdentifiable> {
3233

34+
@Override
35+
public Set<String> getInvolvedClusters() {
36+
return ((OCommandExecutorSQLResultsetAbstract) delegate).getInvolvedClusters();
37+
}
38+
3339
@Override
3440
public Iterator<OIdentifiable> iterator() {
3541
return ((OCommandExecutorSQLResultsetAbstract) delegate).iterator();

core/src/main/java/com/orientechnologies/orient/core/sql/OCommandExecutorSQLSelect.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,6 @@
1515
*/
1616
package com.orientechnologies.orient.core.sql;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collection;
20-
import java.util.Collections;
21-
import java.util.Comparator;
22-
import java.util.HashSet;
23-
import java.util.Iterator;
24-
import java.util.LinkedHashMap;
25-
import java.util.List;
26-
import java.util.Locale;
27-
import java.util.Map;
28-
import java.util.Map.Entry;
29-
import java.util.Set;
30-
3118
import com.orientechnologies.common.collection.OMultiCollectionIterator;
3219
import com.orientechnologies.common.collection.OMultiValue;
3320
import com.orientechnologies.common.concur.resource.OSharedResource;
@@ -74,6 +61,9 @@
7461
import com.orientechnologies.orient.core.sql.query.OSQLQuery;
7562
import com.orientechnologies.orient.core.storage.OStorage;
7663

64+
import java.util.*;
65+
import java.util.Map.Entry;
66+
7767
/**
7868
* Executes the SQL SELECT statement. the parse() method compiles the query and builds the meta information needed by the execute().
7969
* If the query contains the ORDER BY clause, the results are temporary collected internally, then ordered and finally returned all
@@ -359,11 +349,16 @@ public Set<String> getInvolvedClusters() {
359349

360350
final ODatabaseRecord db = getDatabase();
361351

362-
if (parsedTarget.getTargetRecords() != null) {
352+
if (parsedTarget.getTargetQuery() != null) {
353+
// SUB QUERY, PROPAGATE THE CALL
354+
clusters.addAll(parsedTarget.getTargetQuery().getInvolvedClusters());
355+
} else if (parsedTarget.getTargetRecords() != null) {
356+
// SINGLE RECORDS: BROWSE ALL (COULD BE EXPENSIVE).
363357
for (OIdentifiable identifiable : parsedTarget.getTargetRecords()) {
364358
clusters.add(db.getClusterNameById(identifiable.getIdentity().getClusterId()).toLowerCase());
365359
}
366360
}
361+
367362
if (parsedTarget.getTargetClasses() != null) {
368363
for (String clazz : parsedTarget.getTargetClasses().values()) {
369364
final OClass cls = db.getMetadata().getSchema().getClass(clazz);
@@ -380,7 +375,19 @@ public Set<String> getInvolvedClusters() {
380375
}
381376
}
382377
if (parsedTarget.getTargetIndex() != null) {
383-
// TODO indexes??
378+
// EXTRACT THE CLASS NAME -> CLUSTERS FROM THE INDEX DEFINITION
379+
final OIndex<?> idx = db.getMetadata().getIndexManager().getIndex(parsedTarget.getTargetIndex());
380+
if (idx != null) {
381+
final String clazz = idx.getDefinition().getClassName();
382+
383+
if (clazz != null) {
384+
final OClass cls = db.getMetadata().getSchema().getClass(clazz);
385+
if (cls != null)
386+
for (int clId : cls.getClusterIds()) {
387+
clusters.add(db.getClusterNameById(clId).toLowerCase());
388+
}
389+
}
390+
}
384391
}
385392
return clusters;
386393
}

core/src/main/java/com/orientechnologies/orient/core/sql/filter/OSQLTarget.java

Lines changed: 65 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,8 @@
1515
*/
1616
package com.orientechnologies.orient.core.sql.filter;
1717

18-
import java.util.ArrayList;
19-
import java.util.HashMap;
20-
import java.util.List;
21-
import java.util.Map;
22-
2318
import com.orientechnologies.common.parser.OBaseParser;
2419
import com.orientechnologies.orient.core.command.OCommandContext;
25-
import com.orientechnologies.orient.core.command.OCommandExecutor;
2620
import com.orientechnologies.orient.core.command.OCommandManager;
2721
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
2822
import com.orientechnologies.orient.core.db.record.OIdentifiable;
@@ -32,24 +26,31 @@
3226
import com.orientechnologies.orient.core.metadata.schema.OClass;
3327
import com.orientechnologies.orient.core.serialization.serializer.OStringSerializerHelper;
3428
import com.orientechnologies.orient.core.sql.OCommandExecutorSQLAbstract;
29+
import com.orientechnologies.orient.core.sql.OCommandExecutorSQLResultsetDelegate;
3530
import com.orientechnologies.orient.core.sql.OCommandSQL;
3631
import com.orientechnologies.orient.core.sql.OCommandSQLParsingException;
3732
import com.orientechnologies.orient.core.sql.OCommandSQLResultset;
3833

34+
import java.util.ArrayList;
35+
import java.util.HashMap;
36+
import java.util.List;
37+
import java.util.Map;
38+
3939
/**
4040
* Target parser.
4141
*
4242
* @author Luca Garulli
4343
*
4444
*/
4545
public class OSQLTarget extends OBaseParser {
46-
protected String targetVariable;
47-
protected Iterable<? extends OIdentifiable> targetRecords;
48-
protected Map<String, String> targetClusters;
49-
protected Map<OClass, String> targetClasses;
50-
protected String targetIndex;
51-
protected final boolean empty;
52-
protected final OCommandContext context;
46+
protected final boolean empty;
47+
protected final OCommandContext context;
48+
protected String targetVariable;
49+
protected OCommandExecutorSQLResultsetDelegate targetQuery;
50+
protected Iterable<? extends OIdentifiable> targetRecords;
51+
protected Map<String, String> targetClusters;
52+
protected Map<OClass, String> targetClasses;
53+
protected String targetIndex;
5354

5455
public OSQLTarget(final String iText, final OCommandContext iContext, final String iFilterKeyword) {
5556
super();
@@ -71,6 +72,54 @@ public OSQLTarget(final String iText, final OCommandContext iContext, final Stri
7172
}
7273
}
7374

75+
public Map<String, String> getTargetClusters() {
76+
return targetClusters;
77+
}
78+
79+
public Map<OClass, String> getTargetClasses() {
80+
return targetClasses;
81+
}
82+
83+
public Iterable<? extends OIdentifiable> getTargetRecords() {
84+
return targetRecords;
85+
}
86+
87+
public OCommandExecutorSQLResultsetDelegate getTargetQuery() {
88+
return targetQuery;
89+
}
90+
91+
public String getTargetIndex() {
92+
return targetIndex;
93+
}
94+
95+
@Override
96+
public String toString() {
97+
if (targetClasses != null)
98+
return "class " + targetClasses.keySet();
99+
else if (targetClusters != null)
100+
return "cluster " + targetClusters.keySet();
101+
if (targetIndex != null)
102+
return "index " + targetIndex;
103+
if (targetRecords != null)
104+
return "records from " + targetRecords.getClass().getSimpleName();
105+
if (targetVariable != null)
106+
return "variable " + targetVariable;
107+
return "?";
108+
}
109+
110+
public String getTargetVariable() {
111+
return targetVariable;
112+
}
113+
114+
public boolean isEmpty() {
115+
return empty;
116+
}
117+
118+
@Override
119+
protected void throwSyntaxErrorException(String iText) {
120+
throw new OCommandSQLParsingException(iText + ". Use " + getSyntax(), parserText, parserGetPreviousPosition());
121+
}
122+
74123
@SuppressWarnings("unchecked")
75124
private boolean extractTargets() {
76125
parserSkipWhiteSpaces();
@@ -94,7 +143,8 @@ private boolean extractTargets() {
94143
parserSetCurrentPosition(OStringSerializerHelper.getEmbedded(parserText, parserGetCurrentPosition(), -1, subText) + 1);
95144
final OCommandSQL subCommand = new OCommandSQLResultset(subText.toString());
96145

97-
final OCommandExecutor executor = OCommandManager.instance().getExecutor(subCommand);
146+
final OCommandExecutorSQLResultsetDelegate executor = (OCommandExecutorSQLResultsetDelegate) OCommandManager.instance()
147+
.getExecutor(subCommand);
98148
executor.setProgressListener(subCommand.getProgressListener());
99149
executor.parse(subCommand);
100150
context.setChild(executor.getContext());
@@ -103,6 +153,7 @@ private boolean extractTargets() {
103153
throw new OCommandSQLParsingException("Sub-query cannot be iterated because doesn't implement the Iterable interface: "
104154
+ subCommand);
105155

156+
targetQuery = executor;
106157
targetRecords = (Iterable<? extends OIdentifiable>) executor;
107158

108159
} else if (c == OStringSerializerHelper.LIST_BEGIN) {
@@ -180,48 +231,4 @@ private boolean extractTargets() {
180231

181232
return !parserIsEnded();
182233
}
183-
184-
public Map<String, String> getTargetClusters() {
185-
return targetClusters;
186-
}
187-
188-
public Map<OClass, String> getTargetClasses() {
189-
return targetClasses;
190-
}
191-
192-
public Iterable<? extends OIdentifiable> getTargetRecords() {
193-
return targetRecords;
194-
}
195-
196-
public String getTargetIndex() {
197-
return targetIndex;
198-
}
199-
200-
@Override
201-
public String toString() {
202-
if (targetClasses != null)
203-
return "class " + targetClasses.keySet();
204-
else if (targetClusters != null)
205-
return "cluster " + targetClusters.keySet();
206-
if (targetIndex != null)
207-
return "index " + targetIndex;
208-
if (targetRecords != null)
209-
return "records from " + targetRecords.getClass().getSimpleName();
210-
if (targetVariable != null)
211-
return "variable " + targetVariable;
212-
return "?";
213-
}
214-
215-
public String getTargetVariable() {
216-
return targetVariable;
217-
}
218-
219-
@Override
220-
protected void throwSyntaxErrorException(String iText) {
221-
throw new OCommandSQLParsingException(iText + ". Use " + getSyntax(), parserText, parserGetPreviousPosition());
222-
}
223-
224-
public boolean isEmpty() {
225-
return empty;
226-
}
227234
}

distributed/src/test/java/com/orientechnologies/orient/server/distributed/TestSharding.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,14 @@ protected void executeTest() throws Exception {
4848

4949
final int clId = vertices[i].getIdentity().getClusterId();
5050

51-
System.out.println("Create vertex, class: " + vertices[i].getLabel() + ", cluster: " + clId);
52-
5351
Assert
5452
.assertEquals("Error on assigning cluster client_" + i, clId, graph.getRawGraph().getClusterIdByName("client_" + i));
5553

5654
vertices[i].setProperty("name", "shard_" + i);
5755
vertices[i].setProperty("amount", i * 10000);
56+
57+
System.out.println("Create vertex, class: " + vertices[i].getLabel() + ", cluster: " + clId + " -> "
58+
+ vertices[i].getRecord());
5859
}
5960
} finally {
6061
graph.shutdown();
@@ -82,6 +83,27 @@ protected void executeTest() throws Exception {
8283
}
8384
}
8485

86+
// TEST DISTRIBUTED QUERY + AGGREGATION + SUB_QUERY AGAINST ALL 3 DATABASES TO TEST MAP/REDUCE
87+
for (int server = 0; server < vertices.length; ++server) {
88+
OrientGraphFactory f = new OrientGraphFactory("plocal:target/server" + 0 + "/databases/" + getDatabaseName());
89+
OrientGraphNoTx g = f.getNoTx();
90+
try {
91+
// MISC QUERIES
92+
Iterable<OrientVertex> result = g.command(new OCommandSQL("select sum(amount) from ( select from Client )")).execute();
93+
94+
int count = 0;
95+
for (OrientVertex v : result) {
96+
System.out.println("select sum(amount) from ( select from Client ) -> " + v.getRecord());
97+
count++;
98+
}
99+
100+
Assert.assertEquals("Returned wrong vertices count on server " + server, 1, count);
101+
102+
} finally {
103+
g.shutdown();
104+
}
105+
}
106+
85107
// TEST DISTRIBUTED QUERY AGAINST ALL 3 DATABASES TO TEST MAP/REDUCE
86108
for (int server = 0; server < vertices.length; ++server) {
87109
OrientGraphFactory f = new OrientGraphFactory("plocal:target/server" + server + "/databases/" + getDatabaseName());
@@ -107,11 +129,14 @@ protected void executeTest() throws Exception {
107129

108130
Iterable<OrientVertex> result = g.command(new OCommandSQL("select max(amount), avg(amount), sum(amount) from Client"))
109131
.execute();
132+
110133
int count = 0;
111-
for (OrientVertex v : result)
134+
for (OrientVertex v : result) {
135+
System.out.println("select max(amount), avg(amount), sum(amount) from Client -> " + v.getRecord());
112136
count++;
137+
}
113138

114-
Assert.assertEquals("Returned wrong vertices count on server " + server, 3, count);
139+
Assert.assertEquals("Returned wrong vertices count on server " + server, 1, count);
115140
} finally {
116141
g.shutdown();
117142
}

0 commit comments

Comments
 (0)