2121#include < shared_mutex>
2222#include < string>
2323#include < sys/epoll.h>
24+ #include < sys/eventfd.h>
2425#include < sys/sendfile.h>
2526#include < sys/socket.h>
2627#include < sys/types.h>
@@ -65,20 +66,32 @@ void TcpTransport::start() {
6566 num_proc = std::min (num_proc, MAX_IO_THREADS);
6667 isServing = true ;
6768
69+ /*
70+ * Create eventfd to be integrated into epoll interest list for RX and TX
71+ * threads. Writing to it will wakeup epoll_wait() if no other fd available
72+ * or active
73+ */
74+ eventFd = eventfd (0 , 0 );
75+
6876 for (unsigned int id = 0 ; id < MAX_IO_THREADS; id++) {
6977 txThreads.push_back (std::make_unique<std::thread>([this , id] { this ->tcpTxThread (id); }));
7078 rxThreads.push_back (std::make_unique<std::thread>([this , id] { this ->tcpRxThread (id); }));
7179 _buffers.push (new (std::align_val_t (BUFFER_ALIGNMENT)) uint8_t [MIN_SENDFILE_SIZE]);
7280 }
7381 ioStatsThread = std::make_unique<std::thread>([this ] { this ->updateIoStats (); });
7482
75- LOG_DEBUG (" TCP service start " );
83+ LOG_DEBUG (" TCP service started " );
7684}
7785
7886void TcpTransport::stop () {
87+ LOG_DEBUG (" Stopping TCP Service" );
7988 isServing = false ;
8089
81- LOG_DEBUG (" Stopping TCP Service" );
90+ if (eventFd > 0 ) {
91+ u_int64_t buf = 1 ;
92+ LOG_DEBUG (" TCP Transport: write CTL fd" );
93+ write (eventFd, &buf, 8 );
94+ }
8295
8396 std::vector<std::shared_ptr<TcpPeer>> tcpPeerV;
8497 tcpPeers.forall ([&tcpPeerV](std::shared_ptr<TcpPeer> &tp) { tcpPeerV.push_back (tp); });
@@ -103,6 +116,10 @@ void TcpTransport::stop() {
103116 while (_buffers.pop (buffer)) {
104117 delete[] buffer;
105118 }
119+ if (eventFd > 0 )
120+ close (eventFd);
121+
122+ eventFd = -1 ;
106123 LOG_DEBUG (" TCP Transport stopped" );
107124}
108125
@@ -301,20 +318,32 @@ bool TcpPeer::processEndpointSend(std::shared_ptr<TcpEndpoint> tep) {
301318
302319void TcpTransport::tcpTxThread (unsigned int id) {
303320 struct epoll_event events[EPOLL_MAXEVENTS]; // NOLINT
304-
305- LOG_DEBUG (" TX thread " , id, " starting" );
306-
307321 int poll_fd = ::epoll_create1 (0 );
308322 if (poll_fd < 0 ) {
309323 perror (" epoll_create: " );
310324 return ;
311325 }
326+ LOG_DEBUG (" TCP TX thread " , id, " starting" );
312327 epoll_wfd[id] = poll_fd;
328+ if (eventFd > 0 ) {
329+ struct epoll_event ev{};
330+ ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
331+ ev.data .fd = eventFd;
332+ if (epoll_ctl (poll_fd, EPOLL_CTL_ADD, eventFd, &ev)) {
333+ LOG_ERROR (" WARNING: Cannot register ctl socket for TX epoll" );
334+ perror (" epoll_ctl: " );
335+ }
336+ }
313337 do {
314- int cnt = ::epoll_wait (poll_fd, events, EPOLL_MAXEVENTS, 500 );
338+ int cnt = ::epoll_wait (poll_fd, events, EPOLL_MAXEVENTS, - 1 );
315339
316340 for (int i = 0 ; i < cnt; i++) {
317341 struct epoll_event *ev = &events[i];
342+
343+ if (ev->data .fd == eventFd) {
344+ LOG_DEBUG (" TCP TX: epoll CTL" );
345+ continue ;
346+ }
318347 epoll_epid_t ep_id = {};
319348 ep_id.data = ev->data .u64 ;
320349 int sock = ep_id.id .sock ;
@@ -327,10 +356,10 @@ void TcpTransport::tcpTxThread(unsigned int id) {
327356 }
328357 auto it = tcpPeers.get (epId);
329358 if (it.has_value ()) {
330- LOG_DEBUG (" Found TX peer for: " , sock);
359+ LOG_DEBUG (" TX: Found peer for: " , sock);
331360 tcpPeer = *it;
332361 } else {
333- LOG_DEBUG ( " No TX peer for: " , sock);
362+ LOG_ERROR ( " TX: No peer for: " , sock);
334363 deactivateEndpoint (poll_fd, sock, ALL_CLOSED);
335364 continue ;
336365 }
@@ -355,14 +384,21 @@ void TcpTransport::tcpTxThread(unsigned int id) {
355384 }
356385 }
357386 } while (isServing);
358- LOG_DEBUG (" TX thread " , id, " exiting" );
387+ if (eventFd > 0 )
388+ epoll_ctl (poll_fd, EPOLL_CTL_DEL, eventFd, NULL );
389+
390+ close (poll_fd);
391+ LOG_DEBUG (" TCP TX thread " , id, " exiting" );
359392}
360393
361394bool TcpPeer::SocketStateChange (int sock, uint32_t change) {
362395 auto lock = getWriteLock ();
363396 auto it = endpoints.find (sock);
364397 if (it == endpoints.end ()) {
365- return true ;
398+ LOG_ERROR (" Unassigned socket: " , sock);
399+ close (sock);
400+ // Return false to keep unassociated peer alive
401+ return false ;
366402 }
367403 auto tep = it->second ;
368404 bool dead = false ;
@@ -377,6 +413,8 @@ bool TcpPeer::SocketStateChange(int sock, uint32_t change) {
377413 dead = true ;
378414 close (sock);
379415 endpoints.erase (it);
416+ LOG_DEBUG (" TCP Peer: " , this ->hostname , " , erased socket: " ,
417+ sock, " , change: " , change, " , num sockets now: " , endpoints.size ());
380418 }
381419 }
382420 if (endpoints.size () != 0 )
@@ -646,21 +684,34 @@ void TcpTransport::updateIoStats() {
646684
647685void TcpTransport::tcpRxThread (unsigned int id) {
648686 struct epoll_event events[EPOLL_MAXEVENTS]; // NOLINT
649-
650- LOG_DEBUG (" RX thread " , id, " starting" );
651-
652687 int poll_fd = ::epoll_create1 (0 );
653688 if (poll_fd < 0 ) {
654689 perror (" epoll_create: " );
655690 return ;
656691 }
692+ LOG_DEBUG (" TCP RX thread " , id, " starting" );
657693 epoll_rfd[id] = poll_fd;
658694
695+ if (eventFd > 0 ) {
696+ struct epoll_event ev{};
697+ ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
698+ ev.data .fd = eventFd;
699+ if (epoll_ctl (poll_fd, EPOLL_CTL_ADD, eventFd, &ev)) {
700+ LOG_ERROR (" WARNING: Cannot register ctl socket for RX epoll" );
701+ perror (" epoll_ctl: " );
702+ }
703+ }
704+
659705 do {
660- int cnt = ::epoll_wait (poll_fd, events, EPOLL_MAXEVENTS, 500 );
706+ int cnt = ::epoll_wait (poll_fd, events, EPOLL_MAXEVENTS, - 1 );
661707
662708 for (int i = 0 ; i < cnt; i++) {
663709 struct epoll_event *ev = &events[i];
710+
711+ if (ev->data .fd == eventFd) {
712+ LOG_DEBUG (" TCP RX: epoll CTL" );
713+ continue ;
714+ }
664715 epoll_epid_t ep_id = {};
665716 ep_id.data = ev->data .u64 ;
666717 int sock = ep_id.id .sock ;
@@ -675,11 +726,12 @@ void TcpTransport::tcpRxThread(unsigned int id) {
675726 if (it.has_value ()) {
676727 tcpPeer = *it;
677728 } else {
678- LOG_ERROR (" No peer for: " , sock);
729+ LOG_ERROR (" RX: No peer for: " , sock);
679730 deactivateEndpoint (poll_fd, sock, ALL_CLOSED);
680731 continue ;
681732 }
682733 if (ev->events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) {
734+ LOG_DEBUG (" Peer closes: " , sock);
683735 deactivateEndpoint (poll_fd, sock, RX_CLOSED);
684736 if (tcpPeer->SocketStateChange (sock, RX_CLOSED)) {
685737 tcpPeers.remove (tcpPeer->Id );
@@ -700,14 +752,19 @@ void TcpTransport::tcpRxThread(unsigned int id) {
700752 }
701753 }
702754 } while (isServing);
703- LOG_DEBUG (" RX thread " , id, " exiting" );
755+ if (eventFd > 0 )
756+ epoll_ctl (poll_fd, EPOLL_CTL_DEL, eventFd, NULL );
757+
758+ close (poll_fd);
759+ LOG_DEBUG (" TCP RX thread " , id, " exiting" );
704760}
705761
706762std::shared_ptr<TcpTransport> TcpTransport::factory (std::shared_ptr<GEDS> geds) {
707763 return std::shared_ptr<TcpTransport>(new TcpTransport (geds));
708764}
709765
710766void TcpTransport::deactivateEndpoint (int poll_fd, int sock, uint32_t state) {
767+ LOG_DEBUG (" TCP deactivate EP: " , sock, " , state: " , state);
711768 if (state & TX_CLOSED)
712769 epoll_ctl (poll_fd, EPOLL_CTL_DEL, sock, nullptr );
713770 if (state & RX_CLOSED)
@@ -743,7 +800,7 @@ bool TcpTransport::activateEndpoint(std::shared_ptr<TcpEndpoint> tep,
743800 perror (" epoll_ctl send: " );
744801 return false ;
745802 }
746- LOG_DEBUG (" Registered socket " , tep->sock , " for epoll. " );
803+ LOG_DEBUG (" TCP activated EP, socket: " , tep->sock , " , host: " , peer-> hostname );
747804 return true ;
748805}
749806
@@ -803,6 +860,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
803860 auto it = tcpPeers.get (epId);
804861 if (it.has_value ()) {
805862 tcpPeer = *it;
863+ LOG_DEBUG (" Already connected: " , hostname, " ::" , inaddr->sin_port );
806864 return tcpPeer;
807865 }
808866 if (peer->sa_family != AF_INET) {
@@ -816,7 +874,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
816874 }
817875 rv = ::connect (sock, peer, addrlen);
818876 if (rv) {
819- LOG_DEBUG ( " cannot connect " , hostname);
877+ LOG_ERROR ( " Cannot connect: " , hostname, " :: " , inaddr-> sin_port );
820878 ::close (sock);
821879 return nullptr ;
822880 }
@@ -826,19 +884,19 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
826884 */
827885 rv = ::fcntl (sock, F_SETFL, fcntl (sock, F_GETFL, 0 ) | O_NONBLOCK);
828886 if (rv) {
829- LOG_DEBUG ( " cannot set socket non-blocking " , hostname);
887+ LOG_ERROR ( " Cannot set socket non-blocking " , hostname, " :: " , inaddr-> sin_port );
830888 close (sock);
831889 return nullptr ;
832890 }
833891 struct linger lg = {.l_onoff = 0 , .l_linger = 0 };
834892 if (::setsockopt (sock, SOL_SOCKET, SO_LINGER, &lg, sizeof lg)) {
835- perror ( " SO_LINGER: " );
893+ LOG_ERROR ( " Cannot set NO_LINGER " , hostname, " :: " , inaddr-> sin_port );
836894 close (sock);
837895 return nullptr ;
838896 }
839897 std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
840898
841- LOG_DEBUG (" connected " , hostname, " ::" , inaddr->sin_port );
899+ LOG_DEBUG (" Connected, num Ep: " , num_ep, " hostname: " , hostname, " ::" , inaddr->sin_port );
842900 tep->sock = sock;
843901 if (num_ep == 0 ) {
844902 tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this );
@@ -847,7 +905,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
847905 tcpPeer->addEndpoint (tep);
848906 activateEndpoint (tep, tcpPeer);
849907 }
850- LOG_DEBUG (" Client connected to " , hostname);
908+ LOG_DEBUG (" Client connected to " , hostname, " :: " , inaddr-> sin_port );
851909 return tcpPeer;
852910}
853911
0 commit comments