Skip to content

Commit ad34f6c

Browse files
BernardMetzlerpspoerri
authored andcommitted
TCP RPC: Fix socket close and introduce ctlfd for epoll.
This commit fixes two things: 1. Fix handling socket peer close if not attached to peer anymore. The peer must not be affected by that event, and not be closed. Unexpected peer socket close happens if both ends try to connect at the same time. 2. epoll_wait() may wait forever if no socket is in watchlist. This prevented RX and TX threads from termination during transport service stop. We introduce an evenfd which is in the watchlist of all TX and RX threads. Writing to that eventfd wakes up epoll_wait(). It also clarifies some LOG_DEBUG messages. Signed-off-by: Bernard Metzler <[email protected]>
1 parent d9a8f75 commit ad34f6c

File tree

3 files changed

+86
-26
lines changed

3 files changed

+86
-26
lines changed

src/libgeds/FileTransferService.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ absl::Status FileTransferService::connect() {
8282

8383
if (peer) {
8484
_tcpPeer = peer;
85-
// for now, make just one connection
8685
break;
8786
}
8887
}
@@ -144,18 +143,18 @@ absl::StatusOr<size_t> FileTransferService::readBytes(const std::string &bucket,
144143
{
145144
auto lock = getReadLock();
146145
if (_tcpPeer.expired()) {
147-
return absl::UnavailableError("No TCP for " + nodeAddress);
146+
return absl::UnavailableError("TCP readBytes: no peer: " + nodeAddress);
148147
}
149148

150-
LOG_DEBUG("Found TCP peer for ", nodeAddress);
149+
LOG_DEBUG("TCP readBytes: ", nodeAddress, ", REQ: ", length);
151150
auto peer = _tcpPeer.lock();
152151
lock.unlock();
153152
auto prom = peer->sendRpcRequest((uint64_t)buffer, bucket + "/" + key, position, length);
154153
fut = prom->get_future();
155154
}
156155
auto status = fut.get();
157156
if (status.ok()) {
158-
LOG_DEBUG("TCP readBytes OK, bytes: ", length);
157+
LOG_DEBUG("TCP readBytes: ", nodeAddress, ", DONE: ", length);
159158
return *status;
160159
}
161160
// Close the FileTransferService on error.

src/libgeds/TcpTransport.cpp

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <shared_mutex>
2222
#include <string>
2323
#include <sys/epoll.h>
24+
#include <sys/eventfd.h>
2425
#include <sys/sendfile.h>
2526
#include <sys/socket.h>
2627
#include <sys/types.h>
@@ -65,20 +66,32 @@ void TcpTransport::start() {
6566
num_proc = std::min(num_proc, MAX_IO_THREADS);
6667
isServing = true;
6768

69+
/*
70+
* Create eventfd to be integrated into epoll interest list for RX and TX
71+
* threads. Writing to it will wakeup epoll_wait() if no other fd available
72+
* or active
73+
*/
74+
eventFd = eventfd(0, 0);
75+
6876
for (unsigned int id = 0; id < MAX_IO_THREADS; id++) {
6977
txThreads.push_back(std::make_unique<std::thread>([this, id] { this->tcpTxThread(id); }));
7078
rxThreads.push_back(std::make_unique<std::thread>([this, id] { this->tcpRxThread(id); }));
7179
_buffers.push(new (std::align_val_t(BUFFER_ALIGNMENT)) uint8_t[MIN_SENDFILE_SIZE]);
7280
}
7381
ioStatsThread = std::make_unique<std::thread>([this] { this->updateIoStats(); });
7482

75-
LOG_DEBUG("TCP service start");
83+
LOG_DEBUG("TCP service started");
7684
}
7785

7886
void TcpTransport::stop() {
87+
LOG_DEBUG("Stopping TCP Service");
7988
isServing = false;
8089

81-
LOG_DEBUG("Stopping TCP Service");
90+
if (eventFd > 0) {
91+
u_int64_t buf = 1;
92+
LOG_DEBUG("TCP Transport: write CTL fd");
93+
write(eventFd, &buf, 8);
94+
}
8295

8396
std::vector<std::shared_ptr<TcpPeer>> tcpPeerV;
8497
tcpPeers.forall([&tcpPeerV](std::shared_ptr<TcpPeer> &tp) { tcpPeerV.push_back(tp); });
@@ -103,6 +116,10 @@ void TcpTransport::stop() {
103116
while (_buffers.pop(buffer)) {
104117
delete[] buffer;
105118
}
119+
if (eventFd > 0)
120+
close(eventFd);
121+
122+
eventFd = -1;
106123
LOG_DEBUG("TCP Transport stopped");
107124
}
108125

@@ -301,20 +318,32 @@ bool TcpPeer::processEndpointSend(std::shared_ptr<TcpEndpoint> tep) {
301318

302319
void TcpTransport::tcpTxThread(unsigned int id) {
303320
struct epoll_event events[EPOLL_MAXEVENTS]; // NOLINT
304-
305-
LOG_DEBUG("TX thread ", id, " starting");
306-
307321
int poll_fd = ::epoll_create1(0);
308322
if (poll_fd < 0) {
309323
perror("epoll_create: ");
310324
return;
311325
}
326+
LOG_DEBUG("TCP TX thread ", id, " starting");
312327
epoll_wfd[id] = poll_fd;
328+
if (eventFd > 0) {
329+
struct epoll_event ev{};
330+
ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
331+
ev.data.fd = eventFd;
332+
if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, eventFd, &ev)) {
333+
LOG_ERROR("WARNING: Cannot register ctl socket for TX epoll");
334+
perror("epoll_ctl: ");
335+
}
336+
}
313337
do {
314-
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500);
338+
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1);
315339

316340
for (int i = 0; i < cnt; i++) {
317341
struct epoll_event *ev = &events[i];
342+
343+
if (ev->data.fd == eventFd) {
344+
LOG_DEBUG("TCP TX: epoll CTL");
345+
continue;
346+
}
318347
epoll_epid_t ep_id = {};
319348
ep_id.data = ev->data.u64;
320349
int sock = ep_id.id.sock;
@@ -327,10 +356,10 @@ void TcpTransport::tcpTxThread(unsigned int id) {
327356
}
328357
auto it = tcpPeers.get(epId);
329358
if (it.has_value()) {
330-
LOG_DEBUG("Found TX peer for: ", sock);
359+
LOG_DEBUG("TX: Found peer for: ", sock);
331360
tcpPeer = *it;
332361
} else {
333-
LOG_DEBUG("No TX peer for: ", sock);
362+
LOG_ERROR("TX: No peer for: ", sock);
334363
deactivateEndpoint(poll_fd, sock, ALL_CLOSED);
335364
continue;
336365
}
@@ -355,14 +384,21 @@ void TcpTransport::tcpTxThread(unsigned int id) {
355384
}
356385
}
357386
} while (isServing);
358-
LOG_DEBUG("TX thread ", id, " exiting");
387+
if (eventFd > 0)
388+
epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL);
389+
390+
close(poll_fd);
391+
LOG_DEBUG("TCP TX thread ", id, " exiting");
359392
}
360393

361394
bool TcpPeer::SocketStateChange(int sock, uint32_t change) {
362395
auto lock = getWriteLock();
363396
auto it = endpoints.find(sock);
364397
if (it == endpoints.end()) {
365-
return true;
398+
LOG_ERROR("Unassigned socket: ", sock);
399+
close(sock);
400+
// Return false to keep unassociated peer alive
401+
return false;
366402
}
367403
auto tep = it->second;
368404
bool dead = false;
@@ -377,6 +413,8 @@ bool TcpPeer::SocketStateChange(int sock, uint32_t change) {
377413
dead = true;
378414
close(sock);
379415
endpoints.erase(it);
416+
LOG_DEBUG("TCP Peer: ", this->hostname, ", erased socket: ",
417+
sock, ", change: ", change, ", num sockets now: ", endpoints.size());
380418
}
381419
}
382420
if (endpoints.size() != 0)
@@ -646,21 +684,34 @@ void TcpTransport::updateIoStats() {
646684

647685
void TcpTransport::tcpRxThread(unsigned int id) {
648686
struct epoll_event events[EPOLL_MAXEVENTS]; // NOLINT
649-
650-
LOG_DEBUG("RX thread ", id, " starting");
651-
652687
int poll_fd = ::epoll_create1(0);
653688
if (poll_fd < 0) {
654689
perror("epoll_create: ");
655690
return;
656691
}
692+
LOG_DEBUG("TCP RX thread ", id, " starting");
657693
epoll_rfd[id] = poll_fd;
658694

695+
if (eventFd > 0) {
696+
struct epoll_event ev{};
697+
ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
698+
ev.data.fd = eventFd;
699+
if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, eventFd, &ev)) {
700+
LOG_ERROR("WARNING: Cannot register ctl socket for RX epoll");
701+
perror("epoll_ctl: ");
702+
}
703+
}
704+
659705
do {
660-
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500);
706+
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1);
661707

662708
for (int i = 0; i < cnt; i++) {
663709
struct epoll_event *ev = &events[i];
710+
711+
if (ev->data.fd == eventFd) {
712+
LOG_DEBUG("TCP RX: epoll CTL");
713+
continue;
714+
}
664715
epoll_epid_t ep_id = {};
665716
ep_id.data = ev->data.u64;
666717
int sock = ep_id.id.sock;
@@ -675,11 +726,12 @@ void TcpTransport::tcpRxThread(unsigned int id) {
675726
if (it.has_value()) {
676727
tcpPeer = *it;
677728
} else {
678-
LOG_ERROR("No peer for: ", sock);
729+
LOG_ERROR("RX: No peer for: ", sock);
679730
deactivateEndpoint(poll_fd, sock, ALL_CLOSED);
680731
continue;
681732
}
682733
if (ev->events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) {
734+
LOG_DEBUG("Peer closes: ", sock);
683735
deactivateEndpoint(poll_fd, sock, RX_CLOSED);
684736
if (tcpPeer->SocketStateChange(sock, RX_CLOSED)) {
685737
tcpPeers.remove(tcpPeer->Id);
@@ -700,14 +752,19 @@ void TcpTransport::tcpRxThread(unsigned int id) {
700752
}
701753
}
702754
} while (isServing);
703-
LOG_DEBUG("RX thread ", id, " exiting");
755+
if (eventFd > 0)
756+
epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL);
757+
758+
close(poll_fd);
759+
LOG_DEBUG("TCP RX thread ", id, " exiting");
704760
}
705761

706762
std::shared_ptr<TcpTransport> TcpTransport::factory(std::shared_ptr<GEDS> geds) {
707763
return std::shared_ptr<TcpTransport>(new TcpTransport(geds));
708764
}
709765

710766
void TcpTransport::deactivateEndpoint(int poll_fd, int sock, uint32_t state) {
767+
LOG_DEBUG("TCP deactivate EP: ", sock, ", state: ", state);
711768
if (state & TX_CLOSED)
712769
epoll_ctl(poll_fd, EPOLL_CTL_DEL, sock, nullptr);
713770
if (state & RX_CLOSED)
@@ -743,7 +800,7 @@ bool TcpTransport::activateEndpoint(std::shared_ptr<TcpEndpoint> tep,
743800
perror("epoll_ctl send: ");
744801
return false;
745802
}
746-
LOG_DEBUG("Registered socket ", tep->sock, " for epoll.");
803+
LOG_DEBUG("TCP activated EP, socket: ", tep->sock, ", host: ", peer->hostname);
747804
return true;
748805
}
749806

@@ -803,6 +860,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
803860
auto it = tcpPeers.get(epId);
804861
if (it.has_value()) {
805862
tcpPeer = *it;
863+
LOG_DEBUG("Already connected: ", hostname, "::", inaddr->sin_port);
806864
return tcpPeer;
807865
}
808866
if (peer->sa_family != AF_INET) {
@@ -816,7 +874,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
816874
}
817875
rv = ::connect(sock, peer, addrlen);
818876
if (rv) {
819-
LOG_DEBUG("cannot connect ", hostname);
877+
LOG_ERROR("Cannot connect: ", hostname, "::", inaddr->sin_port);
820878
::close(sock);
821879
return nullptr;
822880
}
@@ -826,19 +884,19 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
826884
*/
827885
rv = ::fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
828886
if (rv) {
829-
LOG_DEBUG("cannot set socket non-blocking ", hostname);
887+
LOG_ERROR("Cannot set socket non-blocking ", hostname, "::", inaddr->sin_port);
830888
close(sock);
831889
return nullptr;
832890
}
833891
struct linger lg = {.l_onoff = 0, .l_linger = 0};
834892
if (::setsockopt(sock, SOL_SOCKET, SO_LINGER, &lg, sizeof lg)) {
835-
perror("SO_LINGER: ");
893+
LOG_ERROR("Cannot set NO_LINGER ", hostname, "::", inaddr->sin_port);
836894
close(sock);
837895
return nullptr;
838896
}
839897
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
840898

841-
LOG_DEBUG("connected ", hostname, "::", inaddr->sin_port);
899+
LOG_DEBUG("Connected, num Ep: ", num_ep, " hostname: ", hostname, "::", inaddr->sin_port);
842900
tep->sock = sock;
843901
if (num_ep == 0) {
844902
tcpPeer = std::make_shared<TcpPeer>(hostname, _geds, *this);
@@ -847,7 +905,7 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
847905
tcpPeer->addEndpoint(tep);
848906
activateEndpoint(tep, tcpPeer);
849907
}
850-
LOG_DEBUG("Client connected to ", hostname);
908+
LOG_DEBUG("Client connected to ", hostname, "::", inaddr->sin_port);
851909
return tcpPeer;
852910
}
853911

src/libgeds/TcpTransport.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ class TcpTransport : public std::enable_shared_from_this<TcpTransport> {
218218
int epoll_rfd[MAX_IO_THREADS] = {}; // for epoll() receive
219219
int epoll_wfd[MAX_IO_THREADS] = {}; // for epoll() send
220220

221+
/* fd to signal threads in epoll to interrupt wait */
222+
int eventFd = -1;
223+
221224
void deactivateEndpoint(int poll_fd, int sock, uint32_t state);
222225
bool activateEndpoint(std::shared_ptr<TcpEndpoint>, std::shared_ptr<TcpPeer>);
223226
utility::ConcurrentMap<unsigned int, std::shared_ptr<TcpPeer>> tcpPeers;

0 commit comments

Comments
 (0)