Skip to content

Commit 5aea458

Browse files
committed
Fixed console "replication" commands
1 parent 0cd7e0b commit 5aea458

7 files changed

Lines changed: 178 additions & 50 deletions

File tree

client/src/main/java/com/orientechnologies/orient/client/remote/OServerAdmin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,12 +466,13 @@ public synchronized OServerAdmin replicationStop(final String iDatabaseName, fin
466466
* involved. Example {"10022":"1#10:3"}
467467
* @throws IOException
468468
*/
469-
public synchronized ODocument getReplicationJournal(final String iDatabaseName, final String iRemoteServer) throws IOException {
469+
public synchronized ODocument getReplicationJournal(final String iDatabaseName, final String iRemoteServer, final int iMaxRecords)
470+
throws IOException {
470471
OLogManager.instance().debug(this, "Retrieving the replication log for database '%s' from server '%s'...", iDatabaseName,
471472
storage.getURL());
472473

473474
final ODocument response = sendRequest(OChannelBinaryProtocol.REQUEST_REPLICATION,
474-
new ODocument().field("operation", "getJournal").field("node", iRemoteServer).field("db", iDatabaseName),
475+
new ODocument().fields("operation", "getJournal", "node", iRemoteServer, "db", iDatabaseName, "limit", iMaxRecords),
475476
"Retrieve replication log");
476477

477478
OLogManager.instance().debug(this,

server/src/main/java/com/orientechnologies/orient/server/distributed/conflict/ODefaultReplicationConflictResolver.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,17 +174,28 @@ public ODocument getAllConflicts() {
174174
+ DISTRIBUTED_CONFLICT_CLASS));
175175

176176
// EARLY LOAD CONTENT
177-
final ODocument result = new ODocument().field("entries", entries);
177+
final ODocument result = new ODocument().field("result", entries);
178178
for (int i = 0; i < entries.size(); ++i) {
179179
final ODocument record = entries.get(i).getRecord();
180180
record.setClassName(null);
181181
record.addOwner(result);
182182
record.getIdentity().reset();
183+
184+
final Byte operation = record.field("operation");
185+
record.field("operation", ORecordOperation.getName(operation));
186+
183187
entries.set(i, record);
184188
}
185189
return result;
186190
}
187191

192+
@Override
193+
public ODocument reset() {
194+
ODatabaseRecordThreadLocal.INSTANCE.set((ODatabaseRecord) database);
195+
final int deleted = database.command(new OSQLSynchQuery<OIdentifiable>("delete from " + DISTRIBUTED_CONFLICT_CLASS));
196+
return new ODocument().field("result", deleted);
197+
}
198+
188199
/**
189200
* Searches for a conflict by RID.
190201
*

server/src/main/java/com/orientechnologies/orient/server/distributed/conflict/OReplicationConflictResolver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ public void handleUpdateConflict(String iRemoteNodeId, ORecordId iCurrentRID, OR
4343
public void handleCommandConflict(String iRemoteNodeId, Object iCommand, Object iLocalResult, Object iRemoteResult);
4444

4545
public boolean existConflictsForRecord(final ORecordId iRID);
46+
47+
public ODocument reset();
4648
}

server/src/main/java/com/orientechnologies/orient/server/journal/ODatabaseJournal.java

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import com.orientechnologies.common.concur.resource.OSharedResourceAdaptiveExternal;
2727
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
2828
import com.orientechnologies.orient.core.id.OClusterPositionFactory;
29+
import com.orientechnologies.orient.core.id.OClusterPositionLong;
2930
import com.orientechnologies.orient.core.id.ORecordId;
31+
import com.orientechnologies.orient.core.record.impl.ODocument;
3032
import com.orientechnologies.orient.core.serialization.OBinaryProtocol;
3133
import com.orientechnologies.orient.core.sql.OCommandSQL;
3234
import com.orientechnologies.orient.core.storage.ORawBuffer;
@@ -123,6 +125,15 @@ public ODatabaseJournal(final OServer iServer, final ODistributedServerManager i
123125
file.create(DEF_START_SIZE);
124126
}
125127

128+
public void reset() throws IOException {
129+
lock.acquireExclusiveLock();
130+
try {
131+
file.shrink(0);
132+
} finally {
133+
lock.releaseExclusiveLock();
134+
}
135+
}
136+
126137
public OStorage getStorage() {
127138
return storage;
128139
}
@@ -183,6 +194,9 @@ public OAbstractReplicatedTask<?> getOperation(final long iOffsetEndOperation) t
183194
task = new OSQLCommandTask(runId, operationId, new String(buffer));
184195
break;
185196
}
197+
198+
default:
199+
return null;
186200
}
187201

188202
if (task != null)
@@ -280,7 +294,7 @@ public Iterator<Long> browseLastOperations(final long[] iRemoteLastOperationId,
280294
while ((localOperationId[0] > iRemoteLastOperationId[0])
281295
|| (localOperationId[0] == iRemoteLastOperationId[0] && localOperationId[1] > iRemoteLastOperationId[1])) {
282296

283-
if (getOperationStatus(fileOffset) == iStatus && !rids.contains(fileOffset)) {
297+
if ((iStatus == null || iStatus == getOperationStatus(fileOffset)) && !rids.contains(fileOffset)) {
284298
// COLLECT CURRENT POSITION AS GOOD
285299
result.add(fileOffset);
286300
rids.add(fileOffset);
@@ -437,6 +451,97 @@ public long append(final OAbstractReplicatedTask<?> task) throws IOException {
437451
}
438452
}
439453

454+
public Iterable<ODocument> query(final OPERATION_STATUS iStatus, final int iMaxItems) throws IOException {
455+
LinkedList<ODocument> result = new LinkedList<ODocument>();
456+
457+
lock.acquireExclusiveLock();
458+
try {
459+
460+
final Iterator<Long> iter = browseLastOperations(new long[] { -1, -1 }, null, iMaxItems);
461+
while (iter.hasNext()) {
462+
final long pos = iter.next();
463+
464+
final long runId = file.readLong(pos - OFFSET_BACK_RUNID);
465+
final long operationId = file.readLong(pos - OFFSET_BACK_OPERATID);
466+
final int varSize = file.readInt(pos - OFFSET_BACK_SIZE);
467+
final long offset = pos - OFFSET_BACK_SIZE - varSize - OFFSET_VARDATA;
468+
469+
final OPERATION_STATUS status = OPERATION_STATUS.values()[file.readByte(offset)];
470+
471+
if (iStatus != null && status != iStatus)
472+
continue;
473+
474+
final OPERATION_TYPES operationType = OPERATION_TYPES.values()[file.readByte(offset + OFFSET_OPERATION_TYPE)];
475+
476+
final ODocument doc = new ODocument();
477+
doc.setIdentity(-2, new OClusterPositionLong(result.size()));
478+
doc.fields("serial", runId + "." + operationId, "status", status, "type", operationType.toString());
479+
480+
switch (operationType) {
481+
case RECORD_CREATE: {
482+
final ORecordId rid = new ORecordId(file.readShort(offset + OFFSET_VARDATA),
483+
OClusterPositionFactory.INSTANCE.valueOf(file.readLong(offset + OFFSET_VARDATA + OBinaryProtocol.SIZE_SHORT)));
484+
485+
if (rid.isNew())
486+
// GET LAST RID
487+
rid.clusterPosition = storage.getClusterDataRange(rid.clusterId)[1];
488+
489+
doc.fields("rid", "" + rid);
490+
491+
final ORawBuffer record = storage.readRecord(rid, null, false, null, false).getResult();
492+
if (record != null)
493+
doc.fields("size", record.buffer.length, "version", record.version, "recordType", record.recordType);
494+
495+
break;
496+
}
497+
498+
case RECORD_UPDATE: {
499+
final ORecordId rid = new ORecordId(file.readShort(offset + OFFSET_VARDATA),
500+
OClusterPositionFactory.INSTANCE.valueOf(file.readLong(offset + OFFSET_VARDATA + OBinaryProtocol.SIZE_SHORT)));
501+
502+
doc.fields("rid", rid);
503+
504+
final ORawBuffer record = storage.readRecord(rid, null, false, null, false).getResult();
505+
if (record != null)
506+
doc.fields("size", record.buffer.length, "version", record.version, "recordType", record.recordType);
507+
508+
break;
509+
}
510+
511+
case RECORD_DELETE: {
512+
final ORecordId rid = new ORecordId(file.readShort(offset + OFFSET_VARDATA),
513+
OClusterPositionFactory.INSTANCE.valueOf(file.readLong(offset + OFFSET_VARDATA + OBinaryProtocol.SIZE_SHORT)));
514+
515+
doc.fields("rid", rid);
516+
517+
final ORawBuffer record = storage.readRecord(rid, null, false, null, false).getResult();
518+
if (record != null)
519+
doc.fields("version", record.version);
520+
break;
521+
}
522+
523+
case SQL_COMMAND: {
524+
final byte[] buffer = new byte[varSize];
525+
file.read(offset + OFFSET_VARDATA, buffer, buffer.length);
526+
doc.fields("command", new String(buffer));
527+
break;
528+
}
529+
}
530+
531+
if (iMaxItems > -1 && result.size() >= iMaxItems)
532+
// LIMIT REACHED
533+
break;
534+
535+
if (doc != null)
536+
result.add(0, doc);
537+
}
538+
} finally {
539+
lock.releaseExclusiveLock();
540+
}
541+
542+
return result;
543+
}
544+
440545
/**
441546
* Appends a new log by writing the header without the payload (variable data)
442547
*

server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,9 +799,10 @@ protected void replicationDatabase() throws IOException {
799799
if (dManager == null)
800800
throw new OConfigurationException("No distributed manager configured");
801801

802+
final String operation = request.field("operation");
803+
802804
ODocument response = null;
803805

804-
final String operation = request.field("operation");
805806
if (operation.equals("start")) {
806807
checkServerAccess("server.replication.start");
807808

@@ -814,16 +815,30 @@ protected void replicationDatabase() throws IOException {
814815
} else if (operation.equals("getJournal")) {
815816
checkServerAccess("server.replication.getJournal");
816817

818+
final Integer limit = request.field("limit");
819+
820+
final OStorageSynchronizer dbSynch = dManager.getDatabaseSynchronizer((String) request.field("db"));
821+
822+
final Iterable<ODocument> result = dbSynch.getLog().query(null, limit != null ? limit : -1);
823+
response = new ODocument().field("result", result, OType.EMBEDDEDLIST);
824+
817825
} else if (operation.equals("resetJournal")) {
818826
checkServerAccess("server.replication.resetJournal");
819827

828+
final OStorageSynchronizer dbSynch = dManager.getDatabaseSynchronizer((String) request.field("db"));
829+
dbSynch.getLog().reset();
830+
820831
} else if (operation.equals("getAllConflicts")) {
821832
final OStorageSynchronizer dbSynch = dManager.getDatabaseSynchronizer((String) request.field("db"));
822833
response = dbSynch.getConflictResolver().getAllConflicts();
823834

824-
}
835+
} else if (operation.equals("resetConflicts")) {
836+
final OStorageSynchronizer dbSynch = dManager.getDatabaseSynchronizer((String) request.field("db"));
837+
response = dbSynch.getConflictResolver().reset();
825838

839+
}
826840
sendResponse(response);
841+
827842
}
828843

829844
protected void distributedCluster() throws IOException {

tools/src/main/java/com/orientechnologies/orient/console/OConsoleDatabaseApp.java

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@
2222
import java.io.InputStream;
2323
import java.io.InputStreamReader;
2424
import java.lang.reflect.Array;
25-
import java.text.SimpleDateFormat;
2625
import java.util.ArrayList;
2726
import java.util.Arrays;
2827
import java.util.Collection;
2928
import java.util.Collections;
3029
import java.util.Comparator;
31-
import java.util.Date;
3230
import java.util.Iterator;
3331
import java.util.List;
3432
import java.util.Map;
@@ -58,7 +56,6 @@
5856
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
5957
import com.orientechnologies.orient.core.db.record.ODatabaseRecordAbstract;
6058
import com.orientechnologies.orient.core.db.record.OIdentifiable;
61-
import com.orientechnologies.orient.core.db.record.ORecordOperation;
6259
import com.orientechnologies.orient.core.db.tool.ODatabaseCompare;
6360
import com.orientechnologies.orient.core.db.tool.ODatabaseExport;
6461
import com.orientechnologies.orient.core.db.tool.ODatabaseExportException;
@@ -74,7 +71,6 @@
7471
import com.orientechnologies.orient.core.iterator.ORecordIteratorCluster;
7572
import com.orientechnologies.orient.core.metadata.schema.OClass;
7673
import com.orientechnologies.orient.core.metadata.schema.OProperty;
77-
import com.orientechnologies.orient.core.metadata.schema.OType;
7874
import com.orientechnologies.orient.core.metadata.security.OUser;
7975
import com.orientechnologies.orient.core.record.ORecordInternal;
8076
import com.orientechnologies.orient.core.record.impl.ODocument;
@@ -1327,35 +1323,28 @@ public void copyDatabase(
13271323
@ConsoleCommand(description = "Gets the replication journal for a database against a remote server")
13281324
public void replicationGetJournal(
13291325
@ConsoleParameter(name = "db-name", description = "Name of the database") final String iDatabaseName,
1330-
@ConsoleParameter(name = "server-name", description = "Remote server's name as <address>:<port>") final String iRemoteName)
1326+
@ConsoleParameter(name = "server-name", description = "Remote server's name as <address>:<port>") final String iRemoteName,
1327+
@ConsoleParameter(name = "limit", description = "Limit as maximum number of records starting from the end", optional = true) final String iLimit)
13311328
throws IOException {
13321329

13331330
checkForRemoteServer();
13341331

1335-
try {
1336-
final ODocument response = serverAdmin.getReplicationJournal(iDatabaseName, iRemoteName);
1332+
final long start = System.currentTimeMillis();
1333+
1334+
int limit = iLimit == null ? -1 : Integer.parseInt(iLimit);
13371335

1338-
if (response.fieldNames().length == 0)
1336+
try {
1337+
final ODocument response = serverAdmin.getReplicationJournal(iDatabaseName, iRemoteName, limit);
1338+
currentResultSet = response.field("result");
1339+
if (currentResultSet.size() == 0)
13391340
out.println("Replication journal for database '" + iDatabaseName + "' is empty");
13401341
else {
1341-
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
1342-
1343-
out.println("Replication journal for database '" + iDatabaseName + "'\n");
1344-
out.printf("+-----------+-----------+---------------+-------------------------+\n");
1345-
out.printf("| SERIAL | OPERATION | RECORD ID | DATE |\n");
1346-
out.printf("+-----------+-----------+---------------+-------------------------+\n");
1347-
for (String k : response.fieldNames()) {
1348-
final long opSerial = Long.parseLong(k);
1349-
1350-
final String[] split = ((String) response.field(k)).split("-");
1351-
final byte opType = Byte.valueOf((byte) Integer.parseInt(split[0]));
1352-
final String opRID = split[1];
1353-
final String date = split[2];
1354-
1355-
out.printf("| %-10d| %9s | %-14s| %23s |\n", opSerial, ORecordOperation.getName(opType), opRID,
1356-
format.format(new Date(Long.parseLong(date))));
1357-
}
1358-
out.printf("+-----------+--------+---------------+-------------------------+\n");
1342+
float elapsedSeconds = getElapsedSecs(start);
1343+
1344+
new OTableFormatter(out).hideRID(true).setMaxWidthSize(Integer.parseInt(properties.get("width")))
1345+
.writeRecords(currentResultSet, -1);
1346+
1347+
out.println("\n" + currentResultSet.size() + " item(s) found. Query executed in " + elapsedSeconds + " sec(s).");
13591348
}
13601349
out.println();
13611350

@@ -1390,23 +1379,13 @@ public void replicationGetConflicts(@ConsoleParameter(name = "db-name", descript
13901379

13911380
try {
13921381
final ODocument response = serverAdmin.getReplicationConflicts(iDatabaseName);
1393-
final List<ODocument> entries = response.field("entries");
1382+
currentResultSet = response.field("result");
13941383

1395-
if (entries == null || entries.size() == 0)
1384+
if (currentResultSet == null || currentResultSet.size() == 0)
13961385
out.println("There are not replication conflicts for database '" + iDatabaseName + "'");
13971386
else {
1398-
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
1399-
1400-
out.println("Replication conflicts for database '" + iDatabaseName + "'. Found " + entries.size() + " entries.\n");
1401-
out.printf("+---------------+--------------------+----------+-------------------------+--------------+---------------+----------------+\n");
1402-
out.printf("| RECORD ID | SERVER NODE |OPERATION | DATE | CURR VERSION | OTHER VERSION | OTHER RID |\n");
1403-
out.printf("+---------------+--------------------+----------+-------------------------+--------------+---------------+----------------+\n");
1404-
for (ODocument doc : entries) {
1405-
out.printf("| %-14s| %18s | %-8s | %23s | %-12d | %-13d | %-14s |\n", doc.field("record", OType.LINK), doc.field("node"),
1406-
ORecordOperation.getName((Byte) doc.field("operation")), format.format(new Date((Long) doc.field("date"))),
1407-
doc.field("currentVersion"), doc.field("otherVersion"), doc.field("otherRID"));
1408-
}
1409-
out.printf("+---------------+--------------------+----------+-------------------------+--------------+---------------+----------------+\n");
1387+
new OTableFormatter(out).hideRID(true).setMaxWidthSize(Integer.parseInt(properties.get("width")))
1388+
.writeRecords(currentResultSet, -1);
14101389
}
14111390
out.println();
14121391

0 commit comments

Comments
 (0)