Skip to content

Commit

Permalink
add demo echo_server and client
Browse files Browse the repository at this point in the history
  • Loading branch information
Dueplay committed Sep 26, 2023
1 parent f7c4c77 commit 69c66d7
Show file tree
Hide file tree
Showing 16 changed files with 688 additions and 538 deletions.
Binary file added demo/echo_client
Binary file not shown.
44 changes: 44 additions & 0 deletions demo/echo_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include <sys/socket.h>
#include <unistd.h>

#include <memory>

#include "core/connection.h"
#include "core/net_addr.h"
#include "core/socket.h"
#include "core/thread_pool.h"

#define BUF_SIZE 2048
namespace Next {
class EchoClient {
public:
explicit EchoClient(NetAddress server_address) {
auto client_sock = std::make_unique<Socket>();
client_sock->Connect(server_address);
client_connection = std::make_unique<Connection>(std::move(client_sock));
}

void Begin() {
char buf[BUF_SIZE + 1];
memset(buf, 0, sizeof(buf));
int fd = client_connection->GetFd();
while (true) {
auto actual_read = read(STDIN_FILENO, buf, BUF_SIZE);
send(fd, buf, actual_read, 0);
memset(buf, 0, sizeof(buf));
auto actual_recv = recv(fd, buf, BUF_SIZE, 0);
write(STDOUT_FILENO, buf, actual_recv);
memset(buf, 0, sizeof(buf));
}
}

private:
std::unique_ptr<Connection> client_connection;
};
} // namespace Next
int main() {
Next::NetAddress local_address("127.0.0.1", 20080);
Next::EchoClient echo_client(local_address);
echo_client.Begin();
return 0;
}
Binary file added demo/echo_server
Binary file not shown.
22 changes: 22 additions & 0 deletions demo/echo_server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include "core/next_server.h"

int main() {
Next::NetAddress local_address("127.0.0.1", 20080);
Next::NextServer echo_server(local_address);
echo_server
.OnHandle([&](Next::Connection *client_conn) {
int from_fd = client_conn->GetFd();
std::pair<size_t, bool> ret = client_conn->Recv();
if (ret.second) {
client_conn->GetLooper()->DeleteConnection(from_fd);
return;
}
if (ret.first) {
client_conn->WriteToWriteBuffer(client_conn->ReadAsString());
client_conn->Send();
client_conn->ClearReadBuffer();
}
})
.Begin();
return 0;
}
106 changes: 59 additions & 47 deletions src/core/acceptor.cpp
Original file line number Diff line number Diff line change
@@ -1,66 +1,78 @@
#include "core/acceptor.h"
#include "core/net_addr.h"
#include "core/connection.h"
#include "core/socket.h"
#include "core/looper.h"
#include "core/net_addr.h"
#include "core/poller.h"
#include "core/socket.h"
#include "log/logger.h"
#include "core/looper.h"

namespace Next {

Acceptor::Acceptor(Looper *listener, std::vector<Looper *> reactors, NetAddress server_address)
: reactors_(std::move(reactors)) {
auto acceptor_sock = std::make_unique<Socket>();
acceptor_sock->Bind(server_address, SocketFlag::set_reusable);
acceptor_sock->Listen();
acceptor_conn_ = std::make_unique<Connection>(std::move(acceptor_sock));
acceptor_conn_->SetEvents(POLL_READ);
acceptor_conn_->SetLooper(listener);
listener->AddAcceptor(acceptor_conn_.get());
SetCustomAcceptCallback([](Connection *) {});
SetCustomHandleCallback([](Connection *) {});
Acceptor::Acceptor(Looper *listener, std::vector<Looper *> reactors,
NetAddress server_address)
: reactors_(std::move(reactors)) {
auto acceptor_sock = std::make_unique<Socket>();
acceptor_sock->Bind(server_address, SocketFlag::set_reusable);
acceptor_sock->Listen();
acceptor_conn_ = std::make_unique<Connection>(std::move(acceptor_sock));
acceptor_conn_->SetEvents(POLL_READ);
acceptor_conn_->SetLooper(listener);
listener->AddAcceptor(acceptor_conn_.get());
SetCustomAcceptCallback([](Connection *) {});
SetCustomHandleCallback([](Connection *) {});
}
/**
* accept a connect,and create Connection for client
*/
*/
void Acceptor::BaseAcceptCallback(Connection *server_conn) {
NetAddress client_address;
int accept_fd = server_conn->GetSocket()->Accept(client_address);
if (accept_fd == -1) {
return;
}
auto client_sock = std::make_unique<Socket>(accept_fd);
client_sock->SetNonBlocking();
auto client_conn = std::make_unique<Connection>(std::move(client_sock));
client_conn->SetEvents(POLL_READ | POLL_ET);
client_conn->SetCallback(GetCustomHandleCallback());

int idx = rand() % reactors_.size();
LOG_INFO("new client fd=" + std::to_string(client_conn->GetFd()) + " maps to reactor " + std::to_string(idx));
client_conn->SetLooper(reactors_[idx]);
reactors_[idx]->AddConnection(std::move(client_conn));
NetAddress client_address;
int accept_fd = server_conn->GetSocket()->Accept(client_address);
if (accept_fd == -1) {
return;
}
auto client_sock = std::make_unique<Socket>(accept_fd);
client_sock->SetNonBlocking();
auto client_conn = std::make_unique<Connection>(std::move(client_sock));
client_conn->SetEvents(POLL_READ | POLL_ET);
client_conn->SetCallback(GetCustomHandleCallback());

int idx = rand() % reactors_.size();
LOG_INFO("new client fd=" + std::to_string(client_conn->GetFd()) +
" maps to reactor " + std::to_string(idx));
client_conn->SetLooper(reactors_[idx]);
reactors_[idx]->AddConnection(std::move(client_conn));
}
void Acceptor::BaseHandleCallback(Connection *client_conn) {
int fd = client_conn->GetFd();
if (client_conn->GetLooper()) {
client_conn->GetLooper()->RefreshConnection(fd);
}
int fd = client_conn->GetFd();
if (client_conn->GetLooper()) {
client_conn->GetLooper()->RefreshConnection(fd);
}
}
void Acceptor::SetCustomAcceptCallback(std::function<void(Connection *)> custom_accept_callback) {
custom_accept_callback = std::move(custom_accept_callback);
acceptor_conn_->SetCallback([this](auto &&PH1) {
BaseAcceptCallback(std::forward<decltype(PH1)>(PH1));
custom_accept_callback_(std::forward<decltype(PH1)>(PH1));
});
void Acceptor::SetCustomAcceptCallback(
std::function<void(Connection *)> custom_accept_callback) {
custom_accept_callback = std::move(custom_accept_callback);
acceptor_conn_->SetCallback([this](auto &&PH1) {
BaseAcceptCallback(std::forward<decltype(PH1)>(PH1));
custom_accept_callback_(std::forward<decltype(PH1)>(PH1));
});
}
void Acceptor::SetCustomHandleCallback(std::function<void(Connection *)> custom_handle_callback) {
custom_handle_callback_ = [this, callback = std::move(custom_handle_callback)](auto &&PH1) {
void Acceptor::SetCustomHandleCallback(
std::function<void(Connection *)> custom_handle_callback) {
custom_handle_callback_ =
[this, callback = std::move(custom_handle_callback)](auto &&PH1) {
BaseHandleCallback(std::forward<decltype(PH1)>(PH1));
callback(std::forward<decltype(PH1)>(PH1));
};
};
}
auto Acceptor::GetCustomAcceptCallback() const noexcept
-> std::function<void(Connection *)> {
return custom_accept_callback_;
}
auto Acceptor::GetCustomHandleCallback() const noexcept
-> std::function<void(Connection *)> {
return custom_handle_callback_;
}
auto Acceptor::GetAcceptorConnection() noexcept -> Connection * {
return acceptor_conn_.get();
}
auto Acceptor::GetCustomHandleCallback() const noexcept -> std::function<void(Connection *)> { return custom_accept_callback_; }
auto Acceptor::GetCustomHandleCallback() const noexcept-> std::function<void(Connection *)> { return custom_handle_callback_; }
auto Acceptor::GetAcceptorConnection() noexcept -> Connection * { return acceptor_conn_.get(); }
}
} // namespace Next
2 changes: 1 addition & 1 deletion src/core/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ auto Cache::CacheNode::GetTimeStamp() const noexcept -> uint64_t {
return last_access_;
}

Cache::Cache(size_t capacity = DEFAULT_CACHE_CAPACITY) noexcept
Cache::Cache(size_t capacity) noexcept
: capacity_(capacity), header_(std::make_unique<CacheNode>()),
tailer_(std::make_unique<CacheNode>()) {
header_->next_ = tailer_.get();
Expand Down
147 changes: 83 additions & 64 deletions src/core/connection.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#include "core/connection.h"
#include <socket.h>
#include "log/logger.h"
#include <sys/socket.h>
namespace Next {
Connection::Connection(std::unique_ptr<Socket> socket)
: socket_(std::move(socket)), read_buffer_(std::make_unique<Buffer>()), write_buffer_(std::make_unique<Buffer>()) {}
Connection::Connection(std::unique_ptr<Socket> socket)
: socket_(std::move(socket)), read_buffer_(std::make_unique<Buffer>()),
write_buffer_(std::make_unique<Buffer>()) {}

auto Connection::GetFd() const noexcept -> int { return socket_->GetFd(); }
auto Connection::GetSocket() noexcept -> Socket * { return socket_.get(); }
Expand All @@ -14,84 +15,102 @@ void Connection::SetRevents(uint32_t revents) { revents_ = revents; }
auto Connection::GetRevents() const noexcept -> uint32_t { return revents_; }

void Connection::SetCallback(std::function<void(Connection *)> callback) {
// 值的方式捕获callback,捕获this是lambda像成员函数一样。
// callback(this) 为可调用对象。将成员callback_设置lambda,在lam中调用callback(参数为this)
callback_ = [callback, this]() { return callback(this); };
// 值的方式捕获callback,捕获this是lambda像成员函数一样。
// callback(this)
// 为可调用对象。将成员callback_设置lambda,在lam中调用callback(参数为this)
callback_ = [callback, this]() { return callback(this); };
}
auto Connection::GetCallback() noexcept -> std::function<void()> {
return callback_;
}
auto Connection::GetCallback() noexcept -> std::function<void()> { return callback_; }

/* for Buffer */
auto Connection::FindAndPopTill(const std::string &target) -> std::optional<std::string> {
return read_buffer_->FindAndPopTill(target);
auto Connection::FindAndPopTill(const std::string &target)
-> std::optional<std::string> {
return read_buffer_->FindAndPopTill(target);
}

auto Connection::GetReadBufferSize() const noexcept -> size_t { return read_buffer_->Size(); }
auto Connection::GetWriteBufferSize() const noexcept -> size_t { return write_buffer_->Size(); }
void Connection::WriteToReadBUffer(const unsigned char *buf, size_t size) { read_buffer_->Append(buf, size); }
void Connection::WriteToWrietBUffer(const unsigned char *buf, size_t size) { write_buffer_->Append(buf, size); }
void Connection::WriteToReadBUffer(const std::string &str) { read_buffer_->Append(str); }
void Connection::WriteToWrietBUffer(const std::string &str) { write_buffer_->Append(str); }
void Connection::WriteToWrietBUffer(const std::vector<unsigned char> &&other_buf) {
write_buffer_->Append(std::move(other_buf));
auto Connection::GetReadBufferSize() const noexcept -> size_t {
return read_buffer_->Size();
}
auto Connection::GetWriteBufferSize() const noexcept -> size_t {
return write_buffer_->Size();
}
void Connection::WriteToReadBuffer(const unsigned char *buf, size_t size) {
read_buffer_->Append(buf, size);
}
void Connection::WriteToWriteBuffer(const unsigned char *buf, size_t size) {
write_buffer_->Append(buf, size);
}
void Connection::WriteToReadBuffer(const std::string &str) {
read_buffer_->Append(str);
}
void Connection::WriteToWriteBuffer(const std::string &str) {
write_buffer_->Append(str);
}
void Connection::WriteToWriteBuffer(std::vector<unsigned char> &&other_buf) {
write_buffer_->Append(std::move(other_buf));
}

auto Connection::Read() const noexcept -> const unsigned char * { return read_buffer_->Data(); }
auto Connection::Read() const noexcept -> const unsigned char * {
return read_buffer_->Data();
}
auto Connection::ReadAsString() const noexcept -> std::string {
auto string_view = read_buffer_->ToStringView();
return {string_view.begin(), string_view.end()};
auto string_view = read_buffer_->ToStringView();
return {string_view.begin(), string_view.end()};
}

/* return std::pair<How many bytes read, whether the client exits> */
auto Connection::Recv() -> std::pair<ssize_t, bool> {
int from_fd = GetFd();
ssize_t read = 0, curr_read = 0;
unsigned char buf[TEMP_BUF_SIZE];
memset(buf, 0, sizeof(buf));
while(true) {
curr_read = recv(from_fd, buf, TEMP_BUF_SIZE, 0);
if (curr_read > 0) {
read += curr_read;
WriteToReadBuffer(buf, curr_read);
memset(buf, 0, sizeof(buf));
} else if (curr_read == 0) {
// read 返回0 客户端退出
return {read, true};
} else if (curr_read == -1 && errno == EINTR) {
// 被打断,这不算错误
continue;
} else if(curr_read == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// 所有数据read
break;
} else {
LOG_ERROR("HandleConnection: recv() error");
return {read, true};
}
int from_fd = GetFd();
ssize_t read = 0, curr_read = 0;
unsigned char buf[TEMP_BUF_SIZE];
memset(buf, 0, sizeof(buf));
while (true) {
curr_read = recv(from_fd, buf, TEMP_BUF_SIZE, 0);
if (curr_read > 0) {
read += curr_read;
WriteToReadBuffer(buf, curr_read);
memset(buf, 0, sizeof(buf));
} else if (curr_read == 0) {
// read 返回0 客户端退出
return {read, true};
} else if (curr_read == -1 && errno == EINTR) {
// 被打断,这不算错误
continue;
} else if (curr_read == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// 所有数据read
break;
} else {
LOG_ERROR("HandleConnection: recv() error");
return {read, true};
}
return {read, false};
}
return {read, false};
}
auto Connection::Send() {
ssize_t curr_write = 0;
ssize_t write;
const size_t to_write = GetWriteBufferSize();
const unsigned char *buf = write_buffer_->Data();
while (curr_write < to_write) {
write = send(GetFd(), buf + curr_write, to_write - curr_write, 0);
if (write < 0) {
// 这三种不是错误
if (errno != EINTR || errno != EAGAIN || errno != EWOULDBLOCK) {
LOG_ERROR("Error in Connection::Send()");
ClearWriteBuffer();
return;
}
write = 0;
}
curr_write += write;
void Connection::Send() {
ssize_t curr_write = 0;
ssize_t write;
const size_t to_write = GetWriteBufferSize();
const unsigned char *buf = write_buffer_->Data();
while (curr_write < to_write) {
write = send(GetFd(), buf + curr_write, to_write - curr_write, 0);
if (write < 0) {
// 这三种不是错误
if (errno != EINTR || errno != EAGAIN || errno != EWOULDBLOCK) {
LOG_ERROR("Error in Connection::Send()");
ClearWriteBuffer();
return;
}
write = 0;
}
ClearWriteBuffer();
curr_write += write;
}
ClearWriteBuffer();
}
void Connection::ClearReadBuffer() noexcept { read_buffer_->Clear(); };
void Connection::ClearWriteBuffer() noexcept { write_buffer_->Clear(); }

void Connection::SetLooper(Looper * looper) noexcept { owner_looper_ = looper; }
void Connection::SetLooper(Looper *looper) noexcept { owner_looper_ = looper; }
auto Connection::GetLooper() noexcept -> Looper * { return owner_looper_; }
}
} // namespace Next
Loading

0 comments on commit 69c66d7

Please sign in to comment.