Skip to content

Commit 20d24b4

Browse files
TCP RPC: Fix: Avoid excessive RPC connections
If two sides race in actively establishing connections to its peer side, excessive connections resulted, which are to be dropped. Signed-off-by: Bernard Metzler <[email protected]>
1 parent ad34f6c commit 20d24b4

File tree

2 files changed

+47
-27
lines changed

2 files changed

+47
-27
lines changed

src/libgeds/Server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ void Server::TcpListenThread() {
155155
}
156156
if (_geds->_tcpTransport->addEndpointPassive(newsock) == false) {
157157
::close(newsock);
158-
LOG_ERROR("Server: Adding new TCP client failed ");
158+
LOG_DEBUG("Server: Adding new TCP client failed ");
159159
}
160160
}
161161
close(sock);

src/libgeds/TcpTransport.cpp

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,6 @@ bool TcpTransport::activateEndpoint(std::shared_ptr<TcpEndpoint> tep,
805805
}
806806

807807
bool TcpTransport::addEndpointPassive(int sock) {
808-
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
809808
struct sockaddr peer_sockaddr = {};
810809
auto *in_peer = (sockaddr_in *)&peer_sockaddr;
811810

@@ -826,23 +825,36 @@ bool TcpTransport::addEndpointPassive(int sock) {
826825
perror("getpeername: ");
827826
return false;
828827
}
829-
tep->sock = sock;
830828

831829
std::string hostname = inet_ntoa(in_peer->sin_addr);
832830
std::shared_ptr<TcpPeer> tcpPeer;
833831
unsigned int epId = SStringHash(hostname);
832+
833+
getWriteLock();
834834
auto it = tcpPeers.get(epId);
835835
if (!it.has_value()) {
836836
tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this);
837837
tcpPeers.insertOrReplace(epId, tcpPeer);
838838
} else {
839839
tcpPeer = *it;
840840
}
841-
tcpPeer->addEndpoint(tep);
842-
activateEndpoint(tep, tcpPeer);
843-
LOG_DEBUG("Server connected to ", hostname, "::", in_peer->sin_port);
844-
845-
return true;
841+
if (tcpPeer->endpoints.size() < num_proc) {
842+
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
843+
844+
tep->sock = sock;
845+
tcpPeer->addEndpoint(tep);
846+
activateEndpoint(tep, tcpPeer);
847+
LOG_DEBUG("Server with ", tcpPeer->endpoints.size(),
848+
" connections to ", hostname, "::", in_peer->sin_port);
849+
return true;
850+
}
851+
/*
852+
* Will kill this extra connection probably created during cross-connect
853+
* from both sides
854+
*/
855+
LOG_DEBUG("Server with ", tcpPeer->endpoints.size() + 1,
856+
" connections to ", hostname, "::", in_peer->sin_port, ", dropping new");
857+
return false;
846858
}
847859

848860
std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
@@ -851,7 +863,13 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
851863
size_t addrlen = sizeof *peer;
852864
int sock = -1, rv = 0;
853865
unsigned int epId = SStringHash(hostname);
866+
867+
if (peer->sa_family != AF_INET) {
868+
LOG_ERROR("Address family not supported: ", peer->sa_family);
869+
return nullptr;
870+
}
854871
auto lock = getWriteLock();
872+
855873
/*
856874
* Check if we are already connected to that address. No new peer in
857875
* this case.
@@ -860,23 +878,23 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
860878
auto it = tcpPeers.get(epId);
861879
if (it.has_value()) {
862880
tcpPeer = *it;
863-
LOG_DEBUG("Already connected: ", hostname, "::", inaddr->sin_port);
864-
return tcpPeer;
865-
}
866-
if (peer->sa_family != AF_INET) {
867-
LOG_ERROR("Address family not supported: ", peer->sa_family);
868-
return nullptr;
881+
} else {
882+
tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this);
883+
tcpPeers.insertOrReplace(epId, tcpPeer);
869884
}
870-
for (unsigned int num_ep = 0; num_ep < num_proc; num_ep++) {
885+
unsigned int num_ep = tcpPeer->endpoints.size();
886+
887+
while (num_ep < num_proc) {
871888
sock = ::socket(AF_INET, SOCK_STREAM, 0);
872889
if (sock < 0) {
873-
return nullptr;
890+
LOG_ERROR("Cannot create socket: ", hostname, "::", inaddr->sin_port);
891+
break;
874892
}
875893
rv = ::connect(sock, peer, addrlen);
876894
if (rv) {
877895
LOG_ERROR("Cannot connect: ", hostname, "::", inaddr->sin_port);
878896
::close(sock);
879-
return nullptr;
897+
break;
880898
}
881899
/*
882900
* Mark socket non-blocking to allow efficient handling of
@@ -886,27 +904,29 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
886904
if (rv) {
887905
LOG_ERROR("Cannot set socket non-blocking ", hostname, "::", inaddr->sin_port);
888906
close(sock);
889-
return nullptr;
907+
break;
890908
}
891909
struct linger lg = {.l_onoff = 0, .l_linger = 0};
892910
if (::setsockopt(sock, SOL_SOCKET, SO_LINGER, &lg, sizeof lg)) {
893911
LOG_ERROR("Cannot set NO_LINGER ", hostname, "::", inaddr->sin_port);
894912
close(sock);
895-
return nullptr;
913+
break;
896914
}
897915
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
898-
899-
LOG_DEBUG("Connected, num Ep: ", num_ep, " hostname: ", hostname, "::", inaddr->sin_port);
900916
tep->sock = sock;
901-
if (num_ep == 0) {
902-
tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this);
903-
tcpPeers.insertOrReplace(epId, tcpPeer);
904-
}
905917
tcpPeer->addEndpoint(tep);
906918
activateEndpoint(tep, tcpPeer);
919+
920+
num_ep++;
921+
922+
LOG_DEBUG("Client with ", num_ep, " connections to ", hostname, "::", inaddr->sin_port);
907923
}
908-
LOG_DEBUG("Client connected to ", hostname, "::", inaddr->sin_port);
909-
return tcpPeer;
924+
if (num_ep)
925+
return tcpPeer;
926+
927+
tcpPeers.remove(tcpPeer->Id);
928+
LOG_ERROR("Cannot connect peer: ", hostname, "::", inaddr->sin_port);
929+
return nullptr;
910930
}
911931

912932
void TcpPeer::updateIoStats() {

0 commit comments

Comments
 (0)