@@ -805,7 +805,6 @@ bool TcpTransport::activateEndpoint(std::shared_ptr<TcpEndpoint> tep,
805
805
}
806
806
807
807
bool TcpTransport::addEndpointPassive (int sock) {
808
- std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
809
808
struct sockaddr peer_sockaddr = {};
810
809
auto *in_peer = (sockaddr_in *)&peer_sockaddr;
811
810
@@ -826,23 +825,36 @@ bool TcpTransport::addEndpointPassive(int sock) {
826
825
perror (" getpeername: " );
827
826
return false ;
828
827
}
829
- tep->sock = sock;
830
828
831
829
std::string hostname = inet_ntoa (in_peer->sin_addr );
832
830
std::shared_ptr<TcpPeer> tcpPeer;
833
831
unsigned int epId = SStringHash (hostname);
832
+
833
+ getWriteLock ();
834
834
auto it = tcpPeers.get (epId);
835
835
if (!it.has_value ()) {
836
836
tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this );
837
837
tcpPeers.insertOrReplace (epId, tcpPeer);
838
838
} else {
839
839
tcpPeer = *it;
840
840
}
841
- tcpPeer->addEndpoint (tep);
842
- activateEndpoint (tep, tcpPeer);
843
- LOG_DEBUG (" Server connected to " , hostname, " ::" , in_peer->sin_port );
844
-
845
- return true ;
841
+ if (tcpPeer->endpoints .size () < num_proc) {
842
+ std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
843
+
844
+ tep->sock = sock;
845
+ tcpPeer->addEndpoint (tep);
846
+ activateEndpoint (tep, tcpPeer);
847
+ LOG_DEBUG (" Server with " , tcpPeer->endpoints .size (),
848
+ " connections to " , hostname, " ::" , in_peer->sin_port );
849
+ return true ;
850
+ }
851
+ /*
852
+ * Will kill this extra connection probably created during cross-connect
853
+ * from both sides
854
+ */
855
+ LOG_DEBUG (" Server with " , tcpPeer->endpoints .size () + 1 ,
856
+ " connections to " , hostname, " ::" , in_peer->sin_port , " , dropping new" );
857
+ return false ;
846
858
}
847
859
848
860
std::shared_ptr<TcpPeer> TcpTransport::getPeer (sockaddr *peer) {
@@ -851,7 +863,13 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
851
863
size_t addrlen = sizeof *peer;
852
864
int sock = -1 , rv = 0 ;
853
865
unsigned int epId = SStringHash (hostname);
866
+
867
+ if (peer->sa_family != AF_INET) {
868
+ LOG_ERROR (" Address family not supported: " , peer->sa_family );
869
+ return nullptr ;
870
+ }
854
871
auto lock = getWriteLock ();
872
+
855
873
/*
856
874
* Check if we are already connected to that address. No new peer in
857
875
* this case.
@@ -860,23 +878,23 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
860
878
auto it = tcpPeers.get (epId);
861
879
if (it.has_value ()) {
862
880
tcpPeer = *it;
863
- LOG_DEBUG (" Already connected: " , hostname, " ::" , inaddr->sin_port );
864
- return tcpPeer;
865
- }
866
- if (peer->sa_family != AF_INET) {
867
- LOG_ERROR (" Address family not supported: " , peer->sa_family );
868
- return nullptr ;
881
+ } else {
882
+ tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this );
883
+ tcpPeers.insertOrReplace (epId, tcpPeer);
869
884
}
870
- for (unsigned int num_ep = 0 ; num_ep < num_proc; num_ep++) {
885
+ unsigned int num_ep = tcpPeer->endpoints .size ();
886
+
887
+ while (num_ep < num_proc) {
871
888
sock = ::socket (AF_INET, SOCK_STREAM, 0 );
872
889
if (sock < 0 ) {
873
- return nullptr ;
890
+ LOG_ERROR (" Cannot create socket: " , hostname, " ::" , inaddr->sin_port );
891
+ break ;
874
892
}
875
893
rv = ::connect (sock, peer, addrlen);
876
894
if (rv) {
877
895
LOG_ERROR (" Cannot connect: " , hostname, " ::" , inaddr->sin_port );
878
896
::close (sock);
879
- return nullptr ;
897
+ break ;
880
898
}
881
899
/*
882
900
* Mark socket non-blocking to allow efficient handling of
@@ -886,27 +904,29 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
886
904
if (rv) {
887
905
LOG_ERROR (" Cannot set socket non-blocking " , hostname, " ::" , inaddr->sin_port );
888
906
close (sock);
889
- return nullptr ;
907
+ break ;
890
908
}
891
909
struct linger lg = {.l_onoff = 0 , .l_linger = 0 };
892
910
if (::setsockopt (sock, SOL_SOCKET, SO_LINGER, &lg, sizeof lg)) {
893
911
LOG_ERROR (" Cannot set NO_LINGER " , hostname, " ::" , inaddr->sin_port );
894
912
close (sock);
895
- return nullptr ;
913
+ break ;
896
914
}
897
915
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
898
-
899
- LOG_DEBUG (" Connected, num Ep: " , num_ep, " hostname: " , hostname, " ::" , inaddr->sin_port );
900
916
tep->sock = sock;
901
- if (num_ep == 0 ) {
902
- tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this );
903
- tcpPeers.insertOrReplace (epId, tcpPeer);
904
- }
905
917
tcpPeer->addEndpoint (tep);
906
918
activateEndpoint (tep, tcpPeer);
919
+
920
+ num_ep++;
921
+
922
+ LOG_DEBUG (" Client with " , num_ep, " connections to " , hostname, " ::" , inaddr->sin_port );
907
923
}
908
- LOG_DEBUG (" Client connected to " , hostname, " ::" , inaddr->sin_port );
909
- return tcpPeer;
924
+ if (num_ep)
925
+ return tcpPeer;
926
+
927
+ tcpPeers.remove (tcpPeer->Id );
928
+ LOG_ERROR (" Cannot connect peer: " , hostname, " ::" , inaddr->sin_port );
929
+ return nullptr ;
910
930
}
911
931
912
932
void TcpPeer::updateIoStats () {
0 commit comments