Skip to content

Commit 2fa2661

Browse files
committed
Inserted output callback and improved backup and restore by skipping .wal files
1 parent 2efa8d2 commit 2fa2661

16 files changed

Lines changed: 248 additions & 133 deletions

File tree

client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,20 @@
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
2121
import java.net.UnknownHostException;
22-
import java.util.*;
23-
import java.util.concurrent.*;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.Hashtable;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.FutureTask;
2436

2537
import javax.naming.NamingException;
2638
import javax.naming.directory.Attribute;
@@ -39,6 +51,7 @@
3951
import com.orientechnologies.orient.core.OConstants;
4052
import com.orientechnologies.orient.core.Orient;
4153
import com.orientechnologies.orient.core.cache.OCacheLevelTwoLocatorRemote;
54+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
4255
import com.orientechnologies.orient.core.command.OCommandRequestAsynch;
4356
import com.orientechnologies.orient.core.command.OCommandRequestText;
4457
import com.orientechnologies.orient.core.config.OContextConfiguration;
@@ -61,13 +74,25 @@
6174
import com.orientechnologies.orient.core.serialization.OSerializableStream;
6275
import com.orientechnologies.orient.core.serialization.serializer.record.string.ORecordSerializerStringAbstract;
6376
import com.orientechnologies.orient.core.serialization.serializer.stream.OStreamSerializerAnyStreamable;
64-
import com.orientechnologies.orient.core.storage.*;
77+
import com.orientechnologies.orient.core.storage.OCluster;
78+
import com.orientechnologies.orient.core.storage.ODataSegment;
79+
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
80+
import com.orientechnologies.orient.core.storage.ORawBuffer;
81+
import com.orientechnologies.orient.core.storage.ORecordCallback;
82+
import com.orientechnologies.orient.core.storage.ORecordMetadata;
83+
import com.orientechnologies.orient.core.storage.OStorageAbstract;
84+
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
85+
import com.orientechnologies.orient.core.storage.OStorageProxy;
6586
import com.orientechnologies.orient.core.tx.OTransaction;
6687
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
6788
import com.orientechnologies.orient.core.version.ORecordVersion;
6889
import com.orientechnologies.orient.core.version.OVersionFactory;
6990
import com.orientechnologies.orient.enterprise.channel.OChannel;
70-
import com.orientechnologies.orient.enterprise.channel.binary.*;
91+
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
92+
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
93+
import com.orientechnologies.orient.enterprise.channel.binary.OChannelListener;
94+
import com.orientechnologies.orient.enterprise.channel.binary.ONetworkProtocolException;
95+
import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener;
7196

7297
/**
7398
* This object is bound to each remote ODatabase instances.
@@ -413,7 +438,8 @@ public ORecordMetadata getRecordMetadata(final ORID rid) {
413438
} while (true);
414439
}
415440

416-
public OStorageOperationResult<ORawBuffer> readRecord(final ORecordId iRid, final String iFetchPlan, final boolean iIgnoreCache, final ORecordCallback<ORawBuffer> iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) {
441+
public OStorageOperationResult<ORawBuffer> readRecord(final ORecordId iRid, final String iFetchPlan, final boolean iIgnoreCache,
442+
final ORecordCallback<ORawBuffer> iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) {
417443
checkConnection();
418444

419445
if (OStorageRemoteThreadLocal.INSTANCE.get().commandExecuting)
@@ -583,12 +609,14 @@ public boolean cleanOutRecord(ORecordId recordId, ORecordVersion recordVersion,
583609
}
584610

585611
@Override
586-
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable) throws IOException {
612+
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
613+
final OCommandOutputListener iListener) throws IOException {
587614
throw new UnsupportedOperationException("backup");
588615
}
589616

590617
@Override
591-
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable) throws IOException {
618+
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable, final OCommandOutputListener iListener)
619+
throws IOException {
592620
throw new UnsupportedOperationException("restore");
593621
}
594622

client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemoteThread.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,22 @@
2727
import com.orientechnologies.common.concur.resource.OSharedResourceAdaptiveExternal;
2828
import com.orientechnologies.orient.core.Orient;
2929
import com.orientechnologies.orient.core.cache.OLevel2RecordCache;
30+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
3031
import com.orientechnologies.orient.core.command.OCommandRequestText;
3132
import com.orientechnologies.orient.core.config.OStorageConfiguration;
3233
import com.orientechnologies.orient.core.id.OClusterPosition;
3334
import com.orientechnologies.orient.core.id.ORID;
3435
import com.orientechnologies.orient.core.id.ORecordId;
3536
import com.orientechnologies.orient.core.record.impl.ODocument;
36-
import com.orientechnologies.orient.core.storage.*;
37+
import com.orientechnologies.orient.core.storage.OCluster;
38+
import com.orientechnologies.orient.core.storage.ODataSegment;
39+
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
40+
import com.orientechnologies.orient.core.storage.ORawBuffer;
41+
import com.orientechnologies.orient.core.storage.ORecordCallback;
42+
import com.orientechnologies.orient.core.storage.ORecordMetadata;
43+
import com.orientechnologies.orient.core.storage.OStorage;
44+
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
45+
import com.orientechnologies.orient.core.storage.OStorageProxy;
3746
import com.orientechnologies.orient.core.tx.OTransaction;
3847
import com.orientechnologies.orient.core.version.ORecordVersion;
3948
import com.orientechnologies.orient.core.version.OVersionFactory;
@@ -199,12 +208,14 @@ public Set<String> getClusterNames() {
199208
}
200209

201210
@Override
202-
public void backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable) throws IOException {
211+
public void backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable,
212+
final OCommandOutputListener iListener) throws IOException {
203213
throw new UnsupportedOperationException("backup");
204214
}
205215

206216
@Override
207-
public void restore(InputStream in, Map<String, Object> options, final Callable<Object> callable) throws IOException {
217+
public void restore(InputStream in, Map<String, Object> options, final Callable<Object> callable,
218+
final OCommandOutputListener iListener) throws IOException {
208219
throw new UnsupportedOperationException("restore");
209220
}
210221

@@ -220,7 +231,8 @@ public OStorageOperationResult<OPhysicalPosition> createRecord(final int iDataSe
220231
}
221232
}
222233

223-
public OStorageOperationResult<ORawBuffer> readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache, ORecordCallback<ORawBuffer> iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) {
234+
public OStorageOperationResult<ORawBuffer> readRecord(final ORecordId iRid, final String iFetchPlan, boolean iIgnoreCache,
235+
ORecordCallback<ORawBuffer> iCallback, boolean loadTombstones, LOCKING_STRATEGY iLockingStrategy) {
224236
pushSession();
225237
try {
226238
return delegate.readRecord(iRid, iFetchPlan, iIgnoreCache, null, loadTombstones, LOCKING_STRATEGY.DEFAULT);

core/src/main/java/com/orientechnologies/orient/core/compression/impl/OZIPCompressionUtil.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,29 @@
2828
import java.util.zip.ZipOutputStream;
2929

3030
import com.orientechnologies.common.io.OIOUtils;
31+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
3132

3233
/**
3334
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
3435
*/
3536
public class OZIPCompressionUtil {
36-
public static int compressDirectory(final String sourceFolderName, final OutputStream output) throws IOException {
37+
public static int compressDirectory(final String sourceFolderName, final OutputStream output, final String[] iSkipFileExtensions,
38+
final OCommandOutputListener iOutput) throws IOException {
3739

3840
final ZipOutputStream zos = new ZipOutputStream(output);
3941
try {
4042
zos.setLevel(9);
41-
return addFolder(zos, sourceFolderName, sourceFolderName);
43+
return addFolder(zos, sourceFolderName, sourceFolderName, iSkipFileExtensions, iOutput);
4244
} finally {
4345
zos.close();
4446
}
4547
}
4648

4749
/***
4850
* Extract zipfile to outdir with complete directory structure
49-
*
50-
* @param zipfile
51-
* Input .zip file
52-
* @param outdir
53-
* Output directory
54-
* @throws IOException
5551
*/
56-
public static void uncompressDirectory(final InputStream in, final String out) throws IOException {
52+
public static void uncompressDirectory(final InputStream in, final String out, final OCommandOutputListener iListener)
53+
throws IOException {
5754
final File outdir = new File(out);
5855
final ZipInputStream zin = new ZipInputStream(in);
5956
try {
@@ -72,14 +69,18 @@ public static void uncompressDirectory(final InputStream in, final String out) t
7269
if (dir != null)
7370
mkdirs(outdir, dir);
7471

75-
extractFile(zin, outdir, name);
72+
extractFile(zin, outdir, name, iListener);
7673
}
7774
} finally {
7875
zin.close();
7976
}
8077
}
8178

82-
private static void extractFile(final ZipInputStream in, final File outdir, final String name) throws IOException {
79+
private static void extractFile(final ZipInputStream in, final File outdir, final String name,
80+
final OCommandOutputListener iListener) throws IOException {
81+
if (iListener != null)
82+
iListener.onMessage("- Uncompressing file " + name + "...");
83+
8384
final BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(new File(outdir, name)));
8485
try {
8586
OIOUtils.copyStream(in, out, -1);
@@ -99,20 +100,30 @@ private static String getDirectoryPart(final String name) {
99100
return s == -1 ? null : name.substring(0, s);
100101
}
101102

102-
private static int addFolder(ZipOutputStream zos, String folderName, String baseFolderName) throws IOException {
103+
private static int addFolder(ZipOutputStream zos, String folderName, String baseFolderName, final String[] iSkipFileExtensions,
104+
final OCommandOutputListener iOutput) throws IOException {
103105
int total = 0;
104106

105107
File f = new File(folderName);
106108
if (f.exists()) {
107109
if (f.isDirectory()) {
108110
File f2[] = f.listFiles();
109111
for (int i = 0; i < f2.length; i++) {
110-
total += addFolder(zos, f2[i].getAbsolutePath(), baseFolderName);
112+
total += addFolder(zos, f2[i].getAbsolutePath(), baseFolderName, iSkipFileExtensions, iOutput);
111113
}
112114
} else {
113115
// add file
114116
// extract the relative name for entry purpose
115117
String entryName = folderName.substring(baseFolderName.length() + 1, folderName.length());
118+
119+
if (iSkipFileExtensions != null)
120+
for (String skip : iSkipFileExtensions)
121+
if (entryName.endsWith(skip))
122+
return 0;
123+
124+
if (iOutput != null)
125+
iOutput.onMessage("- Compressing file " + entryName + "...");
126+
116127
ZipEntry ze = new ZipEntry(entryName);
117128
zos.putNextEntry(ze);
118129
try {

core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseRecordWrapperAbstract.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
*/
1616
package com.orientechnologies.orient.core.db;
1717

18+
import java.io.IOException;
19+
import java.io.OutputStream;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.Callable;
24+
25+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
1826
import com.orientechnologies.orient.core.command.OCommandRequest;
1927
import com.orientechnologies.orient.core.db.record.ODatabaseRecord;
2028
import com.orientechnologies.orient.core.db.record.OIdentifiable;
@@ -41,14 +49,6 @@
4149
import com.orientechnologies.orient.core.tx.OTransaction.TXTYPE;
4250
import com.orientechnologies.orient.core.version.ORecordVersion;
4351

44-
import java.io.IOException;
45-
import java.io.InputStream;
46-
import java.io.OutputStream;
47-
import java.util.List;
48-
import java.util.Map;
49-
import java.util.Set;
50-
import java.util.concurrent.Callable;
51-
5252
@SuppressWarnings("unchecked")
5353
public abstract class ODatabaseRecordWrapperAbstract<DB extends ODatabaseRecord> extends ODatabaseWrapperAbstract<DB> implements
5454
ODatabaseComplex<ORecordInternal<?>> {
@@ -226,12 +226,14 @@ public <RET extends ORecordInternal<?>> RET load(final ORID iRecordId, final Str
226226
}
227227

228228
@Override
229-
public <RET extends ORecordInternal<?>> RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) {
229+
public <RET extends ORecordInternal<?>> RET load(ORID iRecordId, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone,
230+
OStorage.LOCKING_STRATEGY iLockingStrategy) {
230231
return (RET) underlying.load(iRecordId, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT);
231232
}
232233

233234
@Override
234-
public <RET extends ORecordInternal<?>> RET load(ORecordInternal<?> iObject, String iFetchPlan, boolean iIgnoreCache, boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) {
235+
public <RET extends ORecordInternal<?>> RET load(ORecordInternal<?> iObject, String iFetchPlan, boolean iIgnoreCache,
236+
boolean loadTombstone, OStorage.LOCKING_STRATEGY iLockingStrategy) {
235237
return (RET) underlying.load(iObject, iFetchPlan, iIgnoreCache, loadTombstone, OStorage.LOCKING_STRATEGY.DEFAULT);
236238
}
237239

@@ -375,7 +377,8 @@ public void setDataSegmentStrategy(final ODataSegmentStrategy dataSegmentStrateg
375377
}
376378

377379
@Override
378-
public void backup(final OutputStream out, final Map<String, Object> options, final Callable<Object> callable) throws IOException {
380+
public void backup(final OutputStream out, final Map<String, Object> options, final Callable<Object> callable,
381+
final OCommandOutputListener iListener) throws IOException {
379382
underlying.backup(out, options, new Callable<Object>() {
380383

381384
@Override
@@ -388,12 +391,7 @@ public Object call() throws Exception {
388391
return callable.call();
389392
return null;
390393
}
391-
});
392-
}
393-
394-
@Override
395-
public void restore(final InputStream in, final Map<String, Object> options, final Callable<Object> callable) throws IOException {
396-
underlying.restore(in, options, callable);
394+
}, iListener);
397395
}
398396

399397
protected void checkClusterBoundedToClass(final int iClusterId) {

core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseWrapperAbstract.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.orientechnologies.orient.core.Orient;
2828
import com.orientechnologies.orient.core.cache.OLevel1RecordCache;
2929
import com.orientechnologies.orient.core.cache.OLevel2RecordCache;
30+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
3031
import com.orientechnologies.orient.core.exception.ODatabaseException;
3132
import com.orientechnologies.orient.core.id.ORID;
3233
import com.orientechnologies.orient.core.intent.OIntent;
@@ -70,13 +71,15 @@ public void reload() {
7071
}
7172

7273
@Override
73-
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable) throws IOException {
74-
underlying.backup(out, options, callable);
74+
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
75+
final OCommandOutputListener iListener) throws IOException {
76+
underlying.backup(out, options, callable, iListener);
7577
}
7678

7779
@Override
78-
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable) throws IOException {
79-
underlying.restore(in, options, callable);
80+
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable, final OCommandOutputListener iListener)
81+
throws IOException {
82+
underlying.restore(in, options, callable, iListener);
8083
}
8184

8285
public void close() {

core/src/main/java/com/orientechnologies/orient/core/db/raw/ODatabaseRaw.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,21 @@
1616

1717
package com.orientechnologies.orient.core.db.raw;
1818

19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
import java.util.*;
23+
import java.util.Map.Entry;
24+
import java.util.concurrent.Callable;
25+
1926
import com.orientechnologies.common.concur.lock.ONoLock;
2027
import com.orientechnologies.common.exception.OException;
2128
import com.orientechnologies.common.listener.OListenerManger;
2229
import com.orientechnologies.common.log.OLogManager;
2330
import com.orientechnologies.orient.core.Orient;
2431
import com.orientechnologies.orient.core.cache.OLevel1RecordCache;
2532
import com.orientechnologies.orient.core.cache.OLevel2RecordCache;
33+
import com.orientechnologies.orient.core.command.OCommandOutputListener;
2634
import com.orientechnologies.orient.core.config.OStorageEntryConfiguration;
2735
import com.orientechnologies.orient.core.db.ODatabase;
2836
import com.orientechnologies.orient.core.db.ODatabaseLifecycleListener;
@@ -50,13 +58,6 @@
5058
import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage;
5159
import com.orientechnologies.orient.core.version.ORecordVersion;
5260

53-
import java.io.IOException;
54-
import java.io.InputStream;
55-
import java.io.OutputStream;
56-
import java.util.*;
57-
import java.util.Map.Entry;
58-
import java.util.concurrent.Callable;
59-
6061
/**
6162
* Lower level ODatabase implementation. It's extended or wrapped by all the others.
6263
*
@@ -159,16 +160,19 @@ public void drop() {
159160
}
160161

161162
@Override
162-
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable) throws IOException {
163-
getStorage().backup(out, options, callable);
163+
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
164+
final OCommandOutputListener iListener) throws IOException {
165+
getStorage().backup(out, options, callable, iListener);
166+
164167
}
165168

166169
@Override
167-
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable) throws IOException {
170+
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable, final OCommandOutputListener iListener)
171+
throws IOException {
168172
if (storage == null)
169173
storage = Orient.instance().loadStorage(url);
170174

171-
getStorage().restore(in, options, callable);
175+
getStorage().restore(in, options, callable, iListener);
172176
}
173177

174178
public void reload() {

0 commit comments

Comments
 (0)