Skip to content

Commit e90a703

Browse files
committed
network connections now check and in case wait is the node is online before to execute any command
1 parent 85a39fb commit e90a703

3 files changed

Lines changed: 88 additions & 26 deletions

File tree

server/src/main/java/com/orientechnologies/orient/server/network/protocol/ONetworkProtocol.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import java.net.Socket;
2020
import java.util.List;
2121

22+
import com.orientechnologies.common.concur.OTimeoutException;
23+
import com.orientechnologies.common.log.OLogManager;
2224
import com.orientechnologies.common.thread.OSoftThread;
2325
import com.orientechnologies.orient.core.config.OContextConfiguration;
2426
import com.orientechnologies.orient.enterprise.channel.OChannel;
2527
import com.orientechnologies.orient.server.OServer;
28+
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
2629

2730
public abstract class ONetworkProtocol extends OSoftThread {
2831
protected OServer server;
@@ -50,4 +53,24 @@ public String getListeningAddress() {
5053
public OServer getServer() {
5154
return server;
5255
}
56+
57+
public void waitNodeIsOnline() {
58+
// WAIT THE NODE IS ONLINE AGAIN
59+
final ODistributedServerManager mgr = server.getDistributedManager();
60+
if (mgr != null && mgr.isOfflineNode(mgr.getLocalNodeId())) {
61+
for (int retry = 0; retry < 10; ++retry) {
62+
if (mgr != null && mgr.isOfflineNode(mgr.getLocalNodeId())) {
63+
// NODE NOT ONLINE YET, REFUSE THE CONNECTION
64+
OLogManager.instance().info(this, "Node is not online yet (status=%s), blocking the command until it's online %d/%d",
65+
mgr.getStatus(), retry + 1, 10);
66+
pauseCurrentThread(300);
67+
} else
68+
// OK, RETURN
69+
return;
70+
}
71+
72+
// TIMEOUT
73+
throw new OTimeoutException("Cannot execute operation while the node is not online (status=" + mgr.getStatus() + ")");
74+
}
75+
}
5376
}

server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public int getVersion() {
117117

118118
@Override
119119
protected void onBeforeRequest() throws IOException {
120+
waitNodeIsOnline();
121+
120122
connection = OClientConnectionManager.instance().getConnection(clientTxId);
121123

122124
if (clientTxId < 0) {
@@ -513,7 +515,8 @@ protected ODatabaseComplex<?> openDatabase(final ODatabaseComplex<?> database, f
513515
protected void addDataSegment() throws IOException {
514516
setDataCommandInfo("Add data segment");
515517

516-
checkDatabase();
518+
if (!isConnectionAlive())
519+
return;
517520

518521
final String name = channel.readString();
519522
final String location = channel.readString();
@@ -532,7 +535,8 @@ protected void addDataSegment() throws IOException {
532535
protected void dropDataSegment() throws IOException {
533536
setDataCommandInfo("Drop data segment");
534537

535-
checkDatabase();
538+
if (!isConnectionAlive())
539+
return;
536540

537541
final String name = channel.readString();
538542

@@ -550,7 +554,8 @@ protected void dropDataSegment() throws IOException {
550554
protected void removeCluster() throws IOException {
551555
setDataCommandInfo("Remove cluster");
552556

553-
checkDatabase();
557+
if (!isConnectionAlive())
558+
return;
554559

555560
final int id = channel.readShort();
556561

@@ -573,7 +578,8 @@ protected void removeCluster() throws IOException {
573578
protected void addCluster() throws IOException {
574579
setDataCommandInfo("Add cluster");
575580

576-
checkDatabase();
581+
if (!isConnectionAlive())
582+
return;
577583

578584
final String type = channel.readString();
579585
final String name = channel.readString();
@@ -608,7 +614,8 @@ protected void addCluster() throws IOException {
608614
protected void rangeCluster() throws IOException {
609615
setDataCommandInfo("Get the begin/end range of data in cluster");
610616

611-
checkDatabase();
617+
if (!isConnectionAlive())
618+
return;
612619

613620
OClusterPosition[] pos = connection.database.getStorage().getClusterDataRange(channel.readShort());
614621

@@ -625,7 +632,8 @@ protected void rangeCluster() throws IOException {
625632
protected void isLHClustersAreUsed() throws IOException {
626633
setDataCommandInfo("Determinate whether clusters are presented as persistent list or hash map ");
627634

628-
checkDatabase();
635+
if (!isConnectionAlive())
636+
return;
629637

630638
final boolean isLHClustersAreUsed = connection.database.getStorage().isHashClustersAreUsed();
631639

@@ -641,7 +649,8 @@ protected void isLHClustersAreUsed() throws IOException {
641649
protected void countClusters() throws IOException {
642650
setDataCommandInfo("Count cluster elements");
643651

644-
checkDatabase();
652+
if (!isConnectionAlive())
653+
return;
645654

646655
int[] clusterIds = new int[channel.readShort()];
647656
for (int i = 0; i < clusterIds.length; ++i)
@@ -665,7 +674,8 @@ protected void countClusters() throws IOException {
665674
protected void reloadDatabase() throws IOException {
666675
setDataCommandInfo("Reload database information");
667676

668-
checkDatabase();
677+
if (!isConnectionAlive())
678+
return;
669679

670680
beginResponse();
671681
try {
@@ -866,7 +876,8 @@ protected void distributedCluster() throws IOException {
866876
protected void countDatabaseRecords() throws IOException {
867877
setDataCommandInfo("Database count records");
868878

869-
checkDatabase();
879+
if (!isConnectionAlive())
880+
return;
870881

871882
beginResponse();
872883
try {
@@ -880,7 +891,8 @@ protected void countDatabaseRecords() throws IOException {
880891
protected void sizeDatabase() throws IOException {
881892
setDataCommandInfo("Database size");
882893

883-
checkDatabase();
894+
if (!isConnectionAlive())
895+
return;
884896

885897
beginResponse();
886898
try {
@@ -1059,7 +1071,8 @@ protected void configGet() throws IOException {
10591071
protected void commit() throws IOException {
10601072
setDataCommandInfo("Transaction commit");
10611073

1062-
checkDatabase();
1074+
if (!isConnectionAlive())
1075+
return;
10631076

10641077
final OTransactionOptimisticProxy tx = new OTransactionOptimisticProxy((ODatabaseRecordTx) connection.database.getUnderlying(),
10651078
channel);
@@ -1135,6 +1148,9 @@ protected void command() throws IOException {
11351148
// FORCE THE SERVER'S TIMEOUT
11361149
command.setTimeout(serverTimeout, command.getTimeoutStrategy());
11371150

1151+
if (!isConnectionAlive())
1152+
return;
1153+
11381154
// ASSIGNED THE PARSED FETCHPLAN
11391155
listener.setFetchPlan(((OCommandRequestInternal) connection.database.command(command)).getFetchPlan());
11401156

@@ -1194,6 +1210,15 @@ protected void command() throws IOException {
11941210
}
11951211
}
11961212

1213+
private boolean isConnectionAlive() {
1214+
if (connection == null || connection.database == null) {
1215+
// CONNECTION/DATABASE CLOSED
1216+
OClientConnectionManager.instance().disconnect(connection);
1217+
return false;
1218+
}
1219+
return true;
1220+
}
1221+
11971222
/**
11981223
* Use DATACLUSTER_COUNT
11991224
*
@@ -1203,7 +1228,8 @@ protected void command() throws IOException {
12031228
protected void countCluster() throws IOException {
12041229
setDataCommandInfo("Count cluster records");
12051230

1206-
checkDatabase();
1231+
if (!isConnectionAlive())
1232+
return;
12071233

12081234
final String clusterName = channel.readString();
12091235
final long size = connection.database.countClusterElements(clusterName);
@@ -1220,7 +1246,8 @@ protected void countCluster() throws IOException {
12201246
protected void deleteRecord() throws IOException {
12211247
setDataCommandInfo("Delete record");
12221248

1223-
checkDatabase();
1249+
if (!isConnectionAlive())
1250+
return;
12241251

12251252
final ORID rid = channel.readRID();
12261253
final ORecordVersion version = channel.readVersion();
@@ -1242,7 +1269,8 @@ protected void deleteRecord() throws IOException {
12421269
protected void cleanOutRecord() throws IOException {
12431270
setDataCommandInfo("Clean out record");
12441271

1245-
checkDatabase();
1272+
if (!isConnectionAlive())
1273+
return;
12461274

12471275
final ORID rid = channel.readRID();
12481276
final ORecordVersion version = channel.readVersion();
@@ -1274,7 +1302,11 @@ protected void cleanOutRecord() throws IOException {
12741302
protected void updateRecord() throws IOException {
12751303
setDataCommandInfo("Update record");
12761304

1277-
checkDatabase();
1305+
if (!isConnectionAlive())
1306+
return;
1307+
1308+
if (!isConnectionAlive())
1309+
return;
12781310

12791311
final ORecordId rid = channel.readRID();
12801312
final byte[] buffer = channel.readBytes();
@@ -1298,7 +1330,8 @@ protected void updateRecord() throws IOException {
12981330
protected void createRecord() throws IOException {
12991331
setDataCommandInfo("Create record");
13001332

1301-
checkDatabase();
1333+
if (!isConnectionAlive())
1334+
return;
13021335

13031336
final int dataSegmentId = connection.data.protocolVersion >= 10 ? channel.readInt() : 0;
13041337
final ORecordId rid = new ORecordId(channel.readShort(), ORID.CLUSTER_POS_INVALID);
@@ -1340,6 +1373,9 @@ protected void readRecordMetadata() throws IOException {
13401373
protected void readRecord() throws IOException {
13411374
setDataCommandInfo("Load record");
13421375

1376+
if (!isConnectionAlive())
1377+
return;
1378+
13431379
final ORecordId rid = channel.readRID();
13441380
final String fetchPlanString = channel.readString();
13451381
boolean ignoreCache = false;
@@ -1501,14 +1537,6 @@ private boolean loadUserFromSchema(final String iUserName, final String iUserPas
15011537
return true;
15021538
}
15031539

1504-
protected void checkDatabase() {
1505-
if (connection == null)
1506-
throw new OStorageException("Connection with remote server has been lost");
1507-
1508-
if (connection.database == null)
1509-
throw new OSecurityAccessException("You need to authenticate before to execute the requested operation");
1510-
}
1511-
15121540
@Override
15131541
protected void handleConnectionError(final OChannelBinaryServer iChannel, final Throwable e) {
15141542
super.handleConnectionError(channel, e);

server/src/main/java/com/orientechnologies/orient/server/network/protocol/http/ONetworkProtocolHttpAbstract.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,24 @@
2222
import java.net.SocketException;
2323
import java.net.SocketTimeoutException;
2424
import java.net.URLDecoder;
25-
import java.util.*;
25+
import java.util.Date;
26+
import java.util.HashMap;
27+
import java.util.IllegalFormatException;
28+
import java.util.InputMismatchException;
29+
import java.util.List;
30+
import java.util.Map;
2631
import java.util.zip.GZIPInputStream;
2732

2833
import com.orientechnologies.common.concur.lock.OLockException;
2934
import com.orientechnologies.common.log.OLogManager;
3035
import com.orientechnologies.orient.core.Orient;
3136
import com.orientechnologies.orient.core.config.OContextConfiguration;
3237
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
33-
import com.orientechnologies.orient.core.exception.*;
38+
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
39+
import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
40+
import com.orientechnologies.orient.core.exception.ODatabaseException;
41+
import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
42+
import com.orientechnologies.orient.core.exception.OSecurityAccessException;
3443
import com.orientechnologies.orient.core.metadata.security.OUser;
3544
import com.orientechnologies.orient.core.serialization.OBase64Utils;
3645
import com.orientechnologies.orient.core.serialization.OBinaryProtocol;
@@ -112,6 +121,8 @@ public void service() throws ONetworkProtocolException, IOException {
112121
response.setContentEncoding(OHttpUtils.CONTENT_ACCEPT_GZIP_ENCODED);
113122
}
114123

124+
waitNodeIsOnline();
125+
115126
final long begin = System.currentTimeMillis();
116127

117128
boolean isChain;

0 commit comments

Comments
 (0)