Skip to content

Commit

Permalink
first add all src file
Browse files Browse the repository at this point in the history
  • Loading branch information
gxj committed Sep 23, 2023
0 parents commit 1c97041
Show file tree
Hide file tree
Showing 25 changed files with 1,595 additions and 0 deletions.
67 changes: 67 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"files.associations": {
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"array": "cpp",
"atomic": "cpp",
"*.tcc": "cpp",
"chrono": "cpp",
"codecvt": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"filesystem": "cpp",
"functional": "cpp",
"optional": "cpp",
"ratio": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"memory": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"numeric": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cinttypes": "cpp",
"utility": "cpp",
"typeinfo": "cpp",
"bit": "cpp",
"map": "cpp",
"set": "cpp",
"iterator": "cpp",
"memory_resource": "cpp",
"random": "cpp",
"string": "cpp",
"bitset": "cpp",
"regex": "cpp",
"variant": "cpp"
}
}
66 changes: 66 additions & 0 deletions src/core/acceptor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "core/acceptor.h"
#include "core/net_addr.h"
#include "core/connection.h"
#include "core/socket.h"
#include "core/poller.h"
#include "log/logger.h"
#include "core/looper.h"

namespace Next {

Acceptor::Acceptor(Looper *listener, std::vector<Looper *> reactors, NetAddress server_address)
: reactors_(std::move(reactors)) {
auto acceptor_sock = std::make_unique<Socket>();
acceptor_sock->Bind(server_address, SocketFlag::set_reusable);
acceptor_sock->Listen();
acceptor_conn_ = std::make_unique<Connection>(std::move(acceptor_sock));
acceptor_conn_->SetEvents(POLL_READ);
acceptor_conn_->SetLooper(listener);
listener->AddAcceptor(acceptor_conn_.get());
SetCustomAcceptCallback([](Connection *) {});
SetCustomHandleCallback([](Connection *) {});
}
/**
* accept a connect,and create Connection for client
*/
void Acceptor::BaseAcceptCallback(Connection *server_conn) {
NetAddress client_address;
int accept_fd = server_conn->GetSocket()->Accept(client_address);
if (accept_fd == -1) {
return;
}
auto client_sock = std::make_unique<Socket>(accept_fd);
client_sock->SetNonBlocking();
auto client_conn = std::make_unique<Connection>(std::move(client_sock));
client_conn->SetEvents(POLL_READ | POLL_ET);
client_conn->SetCallback(GetCustomHandleCallback());

int idx = rand() % reactors_.size();
LOG_INFO("new client fd=" + std::to_string(client_conn->GetFd()) + " maps to reactor " + std::to_string(idx));
client_conn->SetLooper(reactors_[idx]);
reactors_[idx]->AddConnection(std::move(client_conn));

}
void Acceptor::BaseHandleCallback(Connection *client_conn) {
int fd = client_conn->GetFd();
if (client_conn->GetLooper()) {
client_conn->GetLooper()->RefreshConnection(fd);
}
}
void Acceptor::SetCustomAcceptCallback(std::function<void(Connection *)> custom_accept_callback) {
custom_accept_callback = std::move(custom_accept_callback);
acceptor_conn_->SetCallback([this](auto &&PH1) {
BaseAcceptCallback(std::forward<decltype(PH1)>(PH1));
custom_accept_callback_(std::forward<decltype(PH1)>(PH1));
});
}
void Acceptor::SetCustomHandleCallback(std::function<void(Connection *)> custom_handle_callback) {
custom_handle_callback_ = [this, callback = std::move(custom_handle_callback)](auto &&PH1) {
BaseHandleCallback(std::forward<decltype(PH1)>(PH1));
callback(std::forward<decltype(PH1)>(PH1));
};
}
auto Acceptor::GetCustomHandleCallback() const noexcept -> std::function<void(Connection *)> { return custom_accept_callback_; }
auto Acceptor::GetCustomHandleCallback() const noexcept-> std::function<void(Connection *)> { return custom_handle_callback_; }
auto Acceptor::GetAcceptorConnection() noexcept -> Connection * { return acceptor_conn_.get(); }
}
55 changes: 55 additions & 0 deletions src/core/buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "../include/core/buffer.h"

namespace Next {

Buffer::Buffer(size_t initial_capacity) {
buf_.reserve(initial_capacity);
}

void Buffer::Append(const unsigned char *new_char_data, size_t data_size) {
buf_.insert(buf_.end(), new_char_data, new_char_data + data_size);
}

void Buffer::Append(const std::string &new_string_data) {
Append(reinterpret_cast<const unsigned char *>(new_string_data.c_str()), new_string_data.size());
}

void Buffer::Append(std::vector<unsigned char> &&new_buf_data) {

buf_.insert(buf_.end(), std::make_move_iterator(new_buf_data.begin()), std::make_move_iterator(new_buf_data.end()));
}

void Buffer::AppendToHead(const unsigned char *new_char_data, size_t data_size) {
buf_.insert(buf_.begin(), new_char_data, new_char_data + data_size);
}

void Buffer::AppendToHead(const std::string &new_string_data) {
AppendToHead(reinterpret_cast<const unsigned char *>(new_string_data.c_str()), new_string_data.size());
}

auto Buffer::FindAndPopTill(const std::string &target) -> std::optional<std::string> {
std::optional<std::string> ret = std::nullopt;
auto curr_content = ToStringView();
auto pos = curr_content.find(target);
if (pos != std::string::npos) {
ret = curr_content.substr(0, pos + target.size());
buf_.erase(buf_.begin(), buf_.begin() + pos + target.size());
}
return ret;
}

auto Buffer::Size() const noexcept -> size_t { return buf_.size(); }

auto Buffer::Capacity() const noexcept -> size_t { return buf_.capacity(); };

auto Buffer::Data() noexcept -> const unsigned char * { return buf_.data(); };

auto Buffer::ToStringView() const noexcept -> std::string_view {
// string_view只是一个字符串的视图,构造函数可以避免拷贝,做到O(1)复杂度
//std::string_view类的成员变量只包含两个:字符串指针和字符串长度。
return {reinterpret_cast<const char *>(buf_.data()), buf_.size() };
}

void Buffer::Clear() noexcept { buf_.clear(); }

}
97 changes: 97 additions & 0 deletions src/core/connection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "core/connection.h"
#include <socket.h>
#include "log/logger.h"
namespace Next {
Connection::Connection(std::unique_ptr<Socket> socket)
: socket_(std::move(socket)), read_buffer_(std::make_unique<Buffer>()), write_buffer_(std::make_unique<Buffer>()) {}

auto Connection::GetFd() const noexcept -> int { return socket_->GetFd(); }
auto Connection::GetSocket() noexcept -> Socket * { return socket_.get(); }

void Connection::SetEvents(uint32_t events) { events_ = events; }
auto Connection::GetEvents() const noexcept -> uint32_t { return events_; }
void Connection::SetRevents(uint32_t revents) { revents_ = revents; }
auto Connection::GetRevents() const noexcept -> uint32_t { return revents_; }

void Connection::SetCallback(std::function<void(Connection *)> callback) {
// 值的方式捕获callback,捕获this是lambda像成员函数一样。
// callback(this) 为可调用对象。将成员callback_设置lambda,在lam中调用callback(参数为this)
callback_ = [callback, this]() { return callback(this); };
}
auto Connection::GetCallback() noexcept -> std::function<void()> { return callback_; }

/* for Buffer */
auto Connection::FindAndPopTill(const std::string &target) -> std::optional<std::string> {
return read_buffer_->FindAndPopTill(target);
}

auto Connection::GetReadBufferSize() const noexcept -> size_t { return read_buffer_->Size(); }
auto Connection::GetWriteBufferSize() const noexcept -> size_t { return write_buffer_->Size(); }
void Connection::WriteToReadBUffer(const unsigned char *buf, size_t size) { read_buffer_->Append(buf, size); }
void Connection::WriteToWrietBUffer(const unsigned char *buf, size_t size) { write_buffer_->Append(buf, size); }
void Connection::WriteToReadBUffer(const std::string &str) { read_buffer_->Append(str); }
void Connection::WriteToWrietBUffer(const std::string &str) { write_buffer_->Append(str); }
void Connection::WriteToWrietBUffer(const std::vector<unsigned char> &&other_buf) {
write_buffer_->Append(std::move(other_buf));
}

auto Connection::Read() const noexcept -> const unsigned char * { return read_buffer_->Data(); }
auto Connection::ReadAsString() const noexcept -> std::string {
auto string_view = read_buffer_->ToStringView();
return {string_view.begin(), string_view.end()};
}

/* return std::pair<How many bytes read, whether the client exits> */
auto Connection::Recv() -> std::pair<ssize_t, bool> {
int from_fd = GetFd();
ssize_t read = 0, curr_read = 0;
unsigned char buf[TEMP_BUF_SIZE];
memset(buf, 0, sizeof(buf));
while(true) {
curr_read = recv(from_fd, buf, TEMP_BUF_SIZE, 0);
if (curr_read > 0) {
read += curr_read;
WriteToReadBuffer(buf, curr_read);
memset(buf, 0, sizeof(buf));
} else if (curr_read == 0) {
// read 返回0 客户端退出
return {read, true};
} else if (curr_read == -1 && errno == EINTR) {
// 被打断,这不算错误
continue;
} else if(curr_read == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// 所有数据read
break;
} else {
LOG_ERROR("HandleConnection: recv() error");
return {read, true};
}
}
return {read, false};
}
auto Connection::Send() {
ssize_t curr_write = 0;
ssize_t write;
const size_t to_write = GetWriteBufferSize();
const unsigned char *buf = write_buffer_->Data();
while (curr_write < to_write) {
write = send(GetFd(), buf + curr_write, to_write - curr_write, 0);
if (write < 0) {
// 这三种不是错误
if (errno != EINTR || errno != EAGAIN || errno != EWOULDBLOCK) {
LOG_ERROR("Error in Connection::Send()");
ClearWriteBuffer();
return;
}
write = 0;
}
curr_write += write;
}
ClearWriteBuffer();
}
void Connection::ClearReadBuffer() noexcept { read_buffer_->Clear(); };
void Connection::ClearWriteBuffer() noexcept { write_buffer_->Clear(); }

void Connection::SetLooper(Looper * looper) noexcept { owner_looper_ = looper; }
auto Connection::GetLooper() noexcept -> Looper * { return owner_looper_; }
}
94 changes: 94 additions & 0 deletions src/core/looper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include "core/looper.h"

#include "core/acceptor.h"
#include "core/connection.h"
#include "core/poller.h"
#include "core/thread_pool.h"
#include "log/logger.h"
namespace Next {

Looper::Looper(uint64_t timer_expiration = 0)
: poller_(std::make_unique<Poller>()), use_timer_(timer_expiration != 0), timer_expiration_(timer_expiration) {
if (use_timer_) {
poller_->AddConnection(timer_.GetTimerConnection());
}
}

// 通过poller_->Poll获取epoll中就绪的事件对应的connection,然后执行他们的回调conn->GetCallback()();
void Looper::Loop() {
while(!exit_) {
auto ready_connections = poller_->Poll();
Connection *timer_conn = nullptr;

for (auto &conn : ready_connections) {
if (conn = timer_.GetTimerConnection()) {
timer_conn = conn;
continue;
}
conn->GetCallback()();
}

if (timer_conn != nullptr) {
timer_conn->GetCallback()();
}
}

}

void Looper::AddAcceptor(Connection *acceptor_conn) {
std::unique_lock<std::mutex> lock(mtx_);
poller_->AddConnection(acceptor_conn);
}

void Looper::AddConnection(std::unique_ptr<Connection> new_conn) {
std::unique_lock<std::mutex> lock(mtx_);
poller_->AddConnection(new_conn.get());
inf fd = new_conn->GetFd();
connections_.insert({fd,std::move(new_conn)});
if (use_timer_) {
auto singletimer = timer_.AddSingleTimer(timer_expiration_, [this, fd = fd]() {
LOG_INFO("client fd =" + std::to_string(fd) + "has expired and will be kicked out");
DeleteConnection(fd);
})
}
timers_mapping_.insert({fd, singletimer});
}

auto Looper::RefreshConnection(int fd) noexcept -> bool {
if (!use_timer_) {
return false;
}
std::unique_lock<std::mutex> lock(mtx_);
auto it = timers_mapping_.find(fd);
if (use_timer_ && it != timers_mapping_.end()) {
auto new_timer = timer_.RefreshSingleTimer(it->second, timer_expiration_);
if (new_timer != nullptr) {
timers_mapping_.insert({fd,new_timer});
}
return true;
}
return false;
}

auto Looper::DeleteConnection(int fd) noexcept -> bool {
std::unique_lock<std::mutex> lock(mtx_);
auto it = connections_.find(fd);
if (it == connections_.end()) {
return false;
}
connections_.erase(it);
// 再删除这个连接对应的timer
if (use_timer_) {
auto timer_it = timers_mapping_.find(fd);
if (timer_it != timers_mapping_.end()) {
timer_.RemoveSingleTimer(it->second);
timers_mapping_.erase(timer_it);
} else {
LOG_ERROR("Looper: DeleteConnection() the fd " + std::to_string(fd) + " not in timers_mapping_");
}
}
return true;
}

void Looper::Exit() noexcept { exit_ = true; }
}
Loading

0 comments on commit 1c97041

Please sign in to comment.