-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
gxj
committed
Sep 23, 2023
0 parents
commit 1c97041
Showing
25 changed files
with
1,595 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
{ | ||
"files.associations": { | ||
"cctype": "cpp", | ||
"clocale": "cpp", | ||
"cmath": "cpp", | ||
"cstdarg": "cpp", | ||
"cstddef": "cpp", | ||
"cstdio": "cpp", | ||
"cstdlib": "cpp", | ||
"cstring": "cpp", | ||
"ctime": "cpp", | ||
"cwchar": "cpp", | ||
"cwctype": "cpp", | ||
"array": "cpp", | ||
"atomic": "cpp", | ||
"*.tcc": "cpp", | ||
"chrono": "cpp", | ||
"codecvt": "cpp", | ||
"condition_variable": "cpp", | ||
"cstdint": "cpp", | ||
"deque": "cpp", | ||
"list": "cpp", | ||
"unordered_map": "cpp", | ||
"vector": "cpp", | ||
"exception": "cpp", | ||
"algorithm": "cpp", | ||
"filesystem": "cpp", | ||
"functional": "cpp", | ||
"optional": "cpp", | ||
"ratio": "cpp", | ||
"string_view": "cpp", | ||
"system_error": "cpp", | ||
"tuple": "cpp", | ||
"type_traits": "cpp", | ||
"fstream": "cpp", | ||
"future": "cpp", | ||
"initializer_list": "cpp", | ||
"iomanip": "cpp", | ||
"iosfwd": "cpp", | ||
"iostream": "cpp", | ||
"istream": "cpp", | ||
"limits": "cpp", | ||
"memory": "cpp", | ||
"mutex": "cpp", | ||
"new": "cpp", | ||
"ostream": "cpp", | ||
"numeric": "cpp", | ||
"shared_mutex": "cpp", | ||
"sstream": "cpp", | ||
"stdexcept": "cpp", | ||
"streambuf": "cpp", | ||
"thread": "cpp", | ||
"cinttypes": "cpp", | ||
"utility": "cpp", | ||
"typeinfo": "cpp", | ||
"bit": "cpp", | ||
"map": "cpp", | ||
"set": "cpp", | ||
"iterator": "cpp", | ||
"memory_resource": "cpp", | ||
"random": "cpp", | ||
"string": "cpp", | ||
"bitset": "cpp", | ||
"regex": "cpp", | ||
"variant": "cpp" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
#include "core/acceptor.h" | ||
#include "core/net_addr.h" | ||
#include "core/connection.h" | ||
#include "core/socket.h" | ||
#include "core/poller.h" | ||
#include "log/logger.h" | ||
#include "core/looper.h" | ||
|
||
namespace Next { | ||
|
||
Acceptor::Acceptor(Looper *listener, std::vector<Looper *> reactors, NetAddress server_address) | ||
: reactors_(std::move(reactors)) { | ||
auto acceptor_sock = std::make_unique<Socket>(); | ||
acceptor_sock->Bind(server_address, SocketFlag::set_reusable); | ||
acceptor_sock->Listen(); | ||
acceptor_conn_ = std::make_unique<Connection>(std::move(acceptor_sock)); | ||
acceptor_conn_->SetEvents(POLL_READ); | ||
acceptor_conn_->SetLooper(listener); | ||
listener->AddAcceptor(acceptor_conn_.get()); | ||
SetCustomAcceptCallback([](Connection *) {}); | ||
SetCustomHandleCallback([](Connection *) {}); | ||
} | ||
/** | ||
* accept a connect,and create Connection for client | ||
*/ | ||
void Acceptor::BaseAcceptCallback(Connection *server_conn) { | ||
NetAddress client_address; | ||
int accept_fd = server_conn->GetSocket()->Accept(client_address); | ||
if (accept_fd == -1) { | ||
return; | ||
} | ||
auto client_sock = std::make_unique<Socket>(accept_fd); | ||
client_sock->SetNonBlocking(); | ||
auto client_conn = std::make_unique<Connection>(std::move(client_sock)); | ||
client_conn->SetEvents(POLL_READ | POLL_ET); | ||
client_conn->SetCallback(GetCustomHandleCallback()); | ||
|
||
int idx = rand() % reactors_.size(); | ||
LOG_INFO("new client fd=" + std::to_string(client_conn->GetFd()) + " maps to reactor " + std::to_string(idx)); | ||
client_conn->SetLooper(reactors_[idx]); | ||
reactors_[idx]->AddConnection(std::move(client_conn)); | ||
|
||
} | ||
void Acceptor::BaseHandleCallback(Connection *client_conn) { | ||
int fd = client_conn->GetFd(); | ||
if (client_conn->GetLooper()) { | ||
client_conn->GetLooper()->RefreshConnection(fd); | ||
} | ||
} | ||
void Acceptor::SetCustomAcceptCallback(std::function<void(Connection *)> custom_accept_callback) { | ||
custom_accept_callback = std::move(custom_accept_callback); | ||
acceptor_conn_->SetCallback([this](auto &&PH1) { | ||
BaseAcceptCallback(std::forward<decltype(PH1)>(PH1)); | ||
custom_accept_callback_(std::forward<decltype(PH1)>(PH1)); | ||
}); | ||
} | ||
void Acceptor::SetCustomHandleCallback(std::function<void(Connection *)> custom_handle_callback) { | ||
custom_handle_callback_ = [this, callback = std::move(custom_handle_callback)](auto &&PH1) { | ||
BaseHandleCallback(std::forward<decltype(PH1)>(PH1)); | ||
callback(std::forward<decltype(PH1)>(PH1)); | ||
}; | ||
} | ||
auto Acceptor::GetCustomHandleCallback() const noexcept -> std::function<void(Connection *)> { return custom_accept_callback_; } | ||
auto Acceptor::GetCustomHandleCallback() const noexcept-> std::function<void(Connection *)> { return custom_handle_callback_; } | ||
auto Acceptor::GetAcceptorConnection() noexcept -> Connection * { return acceptor_conn_.get(); } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
#include "../include/core/buffer.h" | ||
|
||
namespace Next { | ||
|
||
Buffer::Buffer(size_t initial_capacity) { | ||
buf_.reserve(initial_capacity); | ||
} | ||
|
||
void Buffer::Append(const unsigned char *new_char_data, size_t data_size) { | ||
buf_.insert(buf_.end(), new_char_data, new_char_data + data_size); | ||
} | ||
|
||
void Buffer::Append(const std::string &new_string_data) { | ||
Append(reinterpret_cast<const unsigned char *>(new_string_data.c_str()), new_string_data.size()); | ||
} | ||
|
||
void Buffer::Append(std::vector<unsigned char> &&new_buf_data) { | ||
|
||
buf_.insert(buf_.end(), std::make_move_iterator(new_buf_data.begin()), std::make_move_iterator(new_buf_data.end())); | ||
} | ||
|
||
void Buffer::AppendToHead(const unsigned char *new_char_data, size_t data_size) { | ||
buf_.insert(buf_.begin(), new_char_data, new_char_data + data_size); | ||
} | ||
|
||
void Buffer::AppendToHead(const std::string &new_string_data) { | ||
AppendToHead(reinterpret_cast<const unsigned char *>(new_string_data.c_str()), new_string_data.size()); | ||
} | ||
|
||
auto Buffer::FindAndPopTill(const std::string &target) -> std::optional<std::string> { | ||
std::optional<std::string> ret = std::nullopt; | ||
auto curr_content = ToStringView(); | ||
auto pos = curr_content.find(target); | ||
if (pos != std::string::npos) { | ||
ret = curr_content.substr(0, pos + target.size()); | ||
buf_.erase(buf_.begin(), buf_.begin() + pos + target.size()); | ||
} | ||
return ret; | ||
} | ||
|
||
auto Buffer::Size() const noexcept -> size_t { return buf_.size(); } | ||
|
||
auto Buffer::Capacity() const noexcept -> size_t { return buf_.capacity(); }; | ||
|
||
auto Buffer::Data() noexcept -> const unsigned char * { return buf_.data(); }; | ||
|
||
auto Buffer::ToStringView() const noexcept -> std::string_view { | ||
// string_view只是一个字符串的视图,构造函数可以避免拷贝,做到O(1)复杂度 | ||
//std::string_view类的成员变量只包含两个:字符串指针和字符串长度。 | ||
return {reinterpret_cast<const char *>(buf_.data()), buf_.size() }; | ||
} | ||
|
||
void Buffer::Clear() noexcept { buf_.clear(); } | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
#include "core/connection.h" | ||
#include <socket.h> | ||
#include "log/logger.h" | ||
namespace Next { | ||
Connection::Connection(std::unique_ptr<Socket> socket) | ||
: socket_(std::move(socket)), read_buffer_(std::make_unique<Buffer>()), write_buffer_(std::make_unique<Buffer>()) {} | ||
|
||
auto Connection::GetFd() const noexcept -> int { return socket_->GetFd(); } | ||
auto Connection::GetSocket() noexcept -> Socket * { return socket_.get(); } | ||
|
||
void Connection::SetEvents(uint32_t events) { events_ = events; } | ||
auto Connection::GetEvents() const noexcept -> uint32_t { return events_; } | ||
void Connection::SetRevents(uint32_t revents) { revents_ = revents; } | ||
auto Connection::GetRevents() const noexcept -> uint32_t { return revents_; } | ||
|
||
void Connection::SetCallback(std::function<void(Connection *)> callback) { | ||
// 值的方式捕获callback,捕获this是lambda像成员函数一样。 | ||
// callback(this) 为可调用对象。将成员callback_设置lambda,在lam中调用callback(参数为this) | ||
callback_ = [callback, this]() { return callback(this); }; | ||
} | ||
auto Connection::GetCallback() noexcept -> std::function<void()> { return callback_; } | ||
|
||
/* for Buffer */ | ||
auto Connection::FindAndPopTill(const std::string &target) -> std::optional<std::string> { | ||
return read_buffer_->FindAndPopTill(target); | ||
} | ||
|
||
auto Connection::GetReadBufferSize() const noexcept -> size_t { return read_buffer_->Size(); } | ||
auto Connection::GetWriteBufferSize() const noexcept -> size_t { return write_buffer_->Size(); } | ||
void Connection::WriteToReadBUffer(const unsigned char *buf, size_t size) { read_buffer_->Append(buf, size); } | ||
void Connection::WriteToWrietBUffer(const unsigned char *buf, size_t size) { write_buffer_->Append(buf, size); } | ||
void Connection::WriteToReadBUffer(const std::string &str) { read_buffer_->Append(str); } | ||
void Connection::WriteToWrietBUffer(const std::string &str) { write_buffer_->Append(str); } | ||
void Connection::WriteToWrietBUffer(const std::vector<unsigned char> &&other_buf) { | ||
write_buffer_->Append(std::move(other_buf)); | ||
} | ||
|
||
auto Connection::Read() const noexcept -> const unsigned char * { return read_buffer_->Data(); } | ||
auto Connection::ReadAsString() const noexcept -> std::string { | ||
auto string_view = read_buffer_->ToStringView(); | ||
return {string_view.begin(), string_view.end()}; | ||
} | ||
|
||
/* return std::pair<How many bytes read, whether the client exits> */ | ||
auto Connection::Recv() -> std::pair<ssize_t, bool> { | ||
int from_fd = GetFd(); | ||
ssize_t read = 0, curr_read = 0; | ||
unsigned char buf[TEMP_BUF_SIZE]; | ||
memset(buf, 0, sizeof(buf)); | ||
while(true) { | ||
curr_read = recv(from_fd, buf, TEMP_BUF_SIZE, 0); | ||
if (curr_read > 0) { | ||
read += curr_read; | ||
WriteToReadBuffer(buf, curr_read); | ||
memset(buf, 0, sizeof(buf)); | ||
} else if (curr_read == 0) { | ||
// read 返回0 客户端退出 | ||
return {read, true}; | ||
} else if (curr_read == -1 && errno == EINTR) { | ||
// 被打断,这不算错误 | ||
continue; | ||
} else if(curr_read == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { | ||
// 所有数据read | ||
break; | ||
} else { | ||
LOG_ERROR("HandleConnection: recv() error"); | ||
return {read, true}; | ||
} | ||
} | ||
return {read, false}; | ||
} | ||
auto Connection::Send() { | ||
ssize_t curr_write = 0; | ||
ssize_t write; | ||
const size_t to_write = GetWriteBufferSize(); | ||
const unsigned char *buf = write_buffer_->Data(); | ||
while (curr_write < to_write) { | ||
write = send(GetFd(), buf + curr_write, to_write - curr_write, 0); | ||
if (write < 0) { | ||
// 这三种不是错误 | ||
if (errno != EINTR || errno != EAGAIN || errno != EWOULDBLOCK) { | ||
LOG_ERROR("Error in Connection::Send()"); | ||
ClearWriteBuffer(); | ||
return; | ||
} | ||
write = 0; | ||
} | ||
curr_write += write; | ||
} | ||
ClearWriteBuffer(); | ||
} | ||
void Connection::ClearReadBuffer() noexcept { read_buffer_->Clear(); }; | ||
void Connection::ClearWriteBuffer() noexcept { write_buffer_->Clear(); } | ||
|
||
void Connection::SetLooper(Looper * looper) noexcept { owner_looper_ = looper; } | ||
auto Connection::GetLooper() noexcept -> Looper * { return owner_looper_; } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
#include "core/looper.h" | ||
|
||
#include "core/acceptor.h" | ||
#include "core/connection.h" | ||
#include "core/poller.h" | ||
#include "core/thread_pool.h" | ||
#include "log/logger.h" | ||
namespace Next { | ||
|
||
Looper::Looper(uint64_t timer_expiration = 0) | ||
: poller_(std::make_unique<Poller>()), use_timer_(timer_expiration != 0), timer_expiration_(timer_expiration) { | ||
if (use_timer_) { | ||
poller_->AddConnection(timer_.GetTimerConnection()); | ||
} | ||
} | ||
|
||
// 通过poller_->Poll获取epoll中就绪的事件对应的connection,然后执行他们的回调conn->GetCallback()(); | ||
void Looper::Loop() { | ||
while(!exit_) { | ||
auto ready_connections = poller_->Poll(); | ||
Connection *timer_conn = nullptr; | ||
|
||
for (auto &conn : ready_connections) { | ||
if (conn = timer_.GetTimerConnection()) { | ||
timer_conn = conn; | ||
continue; | ||
} | ||
conn->GetCallback()(); | ||
} | ||
|
||
if (timer_conn != nullptr) { | ||
timer_conn->GetCallback()(); | ||
} | ||
} | ||
|
||
} | ||
|
||
void Looper::AddAcceptor(Connection *acceptor_conn) { | ||
std::unique_lock<std::mutex> lock(mtx_); | ||
poller_->AddConnection(acceptor_conn); | ||
} | ||
|
||
void Looper::AddConnection(std::unique_ptr<Connection> new_conn) { | ||
std::unique_lock<std::mutex> lock(mtx_); | ||
poller_->AddConnection(new_conn.get()); | ||
inf fd = new_conn->GetFd(); | ||
connections_.insert({fd,std::move(new_conn)}); | ||
if (use_timer_) { | ||
auto singletimer = timer_.AddSingleTimer(timer_expiration_, [this, fd = fd]() { | ||
LOG_INFO("client fd =" + std::to_string(fd) + "has expired and will be kicked out"); | ||
DeleteConnection(fd); | ||
}) | ||
} | ||
timers_mapping_.insert({fd, singletimer}); | ||
} | ||
|
||
auto Looper::RefreshConnection(int fd) noexcept -> bool { | ||
if (!use_timer_) { | ||
return false; | ||
} | ||
std::unique_lock<std::mutex> lock(mtx_); | ||
auto it = timers_mapping_.find(fd); | ||
if (use_timer_ && it != timers_mapping_.end()) { | ||
auto new_timer = timer_.RefreshSingleTimer(it->second, timer_expiration_); | ||
if (new_timer != nullptr) { | ||
timers_mapping_.insert({fd,new_timer}); | ||
} | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
auto Looper::DeleteConnection(int fd) noexcept -> bool { | ||
std::unique_lock<std::mutex> lock(mtx_); | ||
auto it = connections_.find(fd); | ||
if (it == connections_.end()) { | ||
return false; | ||
} | ||
connections_.erase(it); | ||
// 再删除这个连接对应的timer | ||
if (use_timer_) { | ||
auto timer_it = timers_mapping_.find(fd); | ||
if (timer_it != timers_mapping_.end()) { | ||
timer_.RemoveSingleTimer(it->second); | ||
timers_mapping_.erase(timer_it); | ||
} else { | ||
LOG_ERROR("Looper: DeleteConnection() the fd " + std::to_string(fd) + " not in timers_mapping_"); | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
void Looper::Exit() noexcept { exit_ = true; } | ||
} |
Oops, something went wrong.