Skip to content

Commit 024ce65

Browse files
committed
proxy protocol v2
1 parent 63f81b1 commit 024ce65

10 files changed

Lines changed: 341 additions & 13 deletions

File tree

phxrpc_package_config/tools/etc_template/phxsqlproxy.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SlaveForkProcCnt=1
66
SlaveWorkerThread=30
77
SlaveIORoutineCnt=100
88
MasterEnableReadPort=1
9+
ProxyProtocol=2
910

1011
IP = $InnerIP
1112
Port = 54321

phxsqlproxy/io_routine.cpp

Lines changed: 208 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ void IORoutine::ClearVariablesAndStatus() {
129129
connect_dest_ = "";
130130
connect_port_ = 0;
131131
client_ip_ = "";
132+
client_port_ = 0;
133+
server_ip_ = "";
134+
server_port_ = 0;
132135
req_uniq_id_ = 0;
133136
last_sent_request_timestamp_ = 0;
134137
last_received_request_timestamp_ = 0;
@@ -233,7 +236,7 @@ void IORoutine::SetClientFD(int fd) {
233236
ClearAll();
234237
req_uniq_id_ = ((uint64_t)(random_engine()) << 32) | random_engine();
235238
client_fd_ = fd;
236-
int ret1 = GetSockName(fd, listen_ip_, listen_port_);
239+
int ret1 = GetSockName(fd, server_ip_, server_port_);
237240
int ret2 = GetPeerName(fd, client_ip_, client_port_);
238241
if (ret1 == 0 && ret2 == 0) {
239242
LogVerbose("requniqid %llu receive connect from [%s:%d]",
@@ -349,20 +352,37 @@ int IORoutine::run() {
349352
while (true) {
350353
//first connect to master mysql to finish auth
351354
if (sqlsvr_fd_ == -1) {
355+
SetNoDelay(client_fd_);
356+
if (server_port_ == GetWorkerConfig()->proxy_port_) {
357+
if (!GetGroupStatusCache()->IsMember(client_ip_)) {
358+
break;
359+
}
360+
361+
int ret = ProcessProxyHeader(client_fd_);
362+
if (ret < 0) {
363+
LogError("process proxy header err: %d", ret);
364+
break;
365+
}
366+
LogVerbose("process proxy header succ: %s:%d %s:%d",
367+
client_ip_.c_str(), client_port_, server_ip_.c_str(), server_port_);
368+
}
369+
352370
int fd = ConnectDest();
353371
if (fd <= 0) {
354372
break;
355373
}
356374
sqlsvr_fd_ = fd;
357-
SetNoDelay(client_fd_);
358375
SetNoDelay(sqlsvr_fd_);
359376

360-
if (listen_port_ == GetWorkerConfig()->port_) {
361-
string proxy_line = "PROXY TCP4 " + client_ip_ + " " + listen_ip_ + " " +
362-
UIntToStr(client_port_) + " " + UIntToStr(listen_port_) + "\r\n";
363-
int ret = WriteToDest(sqlsvr_fd_, proxy_line.c_str(), proxy_line.size());
377+
if (config_->ProxyProtocol()) {
378+
string proxy_header;
379+
int ret = MakeProxyHeader(proxy_header);
380+
if (ret < 0) {
381+
LogError("make proxy header err: %d", ret);
382+
break;
383+
}
384+
ret = WriteToDest(sqlsvr_fd_, proxy_header.c_str(), proxy_header.size());
364385
if (ret < 0) {
365-
LogError("send proxy line error: %s", proxy_line.c_str());
366386
break;
367387
}
368388
}
@@ -416,7 +436,7 @@ int IORoutine::run() {
416436

417437
if (byte_size_tot == 0) {
418438
// MasterEnableReadPort=0
419-
if (connect_port_ == GetWorkerConfig()->proxy_port_ && !GetWorkerConfig()->is_master_port_) {
439+
if (connect_port_ != config_->GetMysqlPort() && !GetWorkerConfig()->is_master_port_) {
420440
LogVerbose("%s:%d requniqid %llu mark %s failure", __func__, __LINE__, req_uniq_id_,
421441
connect_dest_.c_str());
422442
GetGroupStatusCache()->MarkFailure(connect_dest_);
@@ -503,7 +523,7 @@ int MasterIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
503523
dest_port = config_->GetMysqlPort();
504524
} else {
505525
dest_ip = master_ip;
506-
dest_port = GetWorkerConfig()->proxy_port_;
526+
dest_port = config_->ProxyProtocol() ? GetWorkerConfig()->proxy_port_ : GetWorkerConfig()->port_;
507527
}
508528

509529
LogVerbose("%s:%d requniqid %llu ret ip [%s] port [%d]", __func__, __LINE__, req_uniq_id_, dest_ip.c_str(),
@@ -575,7 +595,7 @@ int SlaveIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
575595
req_uniq_id_, master_ip.c_str());
576596
return -__LINE__;
577597
}
578-
dest_port = GetWorkerConfig()->proxy_port_;
598+
dest_port = config_->ProxyProtocol() ? GetWorkerConfig()->proxy_port_ : GetWorkerConfig()->port_;
579599
} else {
580600
dest_ip = "127.0.0.1";
581601
dest_port = config_->GetMysqlPort();
@@ -622,4 +642,182 @@ WorkerConfig_t * SlaveIORoutine::GetWorkerConfig() {
622642
return config_->GetSlaveWorkerConfig();
623643
}
624644

645+
int IORoutine::ProcessProxyHeader(int fd) {
646+
union {
647+
ProxyHdrV1_t v1;
648+
ProxyHdrV2_t v2;
649+
} hdr;
650+
651+
int ret = RoutinePeekWithTimeout(fd, (char *)&hdr, sizeof(hdr), config_->ProxyProtocolTimeoutMs());
652+
if (ret < 0) {
653+
return ret;
654+
}
655+
656+
if (ret >= 16 && memcmp(&hdr.v2, PP2_SIGNATURE, 12) == 0 && (hdr.v2.ver_cmd & 0xF0) == PP2_VERSION) {
657+
ret = ProcessProxyHeaderV2(hdr.v2, ret);
658+
} else if (ret >= 8 && memcmp(hdr.v1.line, "PROXY ", 6) == 0) {
659+
ret = ProcessProxyHeaderV1(hdr.v1.line, ret);
660+
} else {
661+
return -__LINE__;
662+
}
663+
664+
if (ret > 0) {
665+
ret = RoutineReadWithTimeout(fd, (char *)&hdr, ret, config_->ProxyProtocolTimeoutMs());
666+
}
667+
return ret;
668+
}
669+
670+
int IORoutine::ProcessProxyHeaderV1(char * hdr, int read_count) {
671+
char * end = (char *)memchr(hdr, '\r', read_count - 1);
672+
int size = end - hdr + 2;
673+
if (!end || *(end + 1) != '\n') {
674+
return -__LINE__;
675+
}
676+
677+
vector<string> tokens = SplitStr(string(hdr, end), " ");
678+
if (tokens[1] == "UNKNOWN") {
679+
return size;
680+
}
681+
if (tokens.size() != 6) {
682+
return -__LINE__;
683+
}
684+
685+
const string & src_ip = tokens[2];
686+
const string & dst_ip = tokens[3];
687+
if (tokens[1] == "TCP4") {
688+
struct in_addr addr;
689+
if (inet_pton(AF_INET, src_ip.c_str(), &addr) <= 0) {
690+
return -__LINE__;
691+
}
692+
if (inet_pton(AF_INET, dst_ip.c_str(), &addr) <= 0) {
693+
return -__LINE__;
694+
}
695+
} else if (tokens[1] == "TCP6") {
696+
struct in6_addr addr;
697+
if (inet_pton(AF_INET6, src_ip.c_str(), &addr) <= 0) {
698+
return -__LINE__;
699+
}
700+
if (inet_pton(AF_INET6, dst_ip.c_str(), &addr) <= 0) {
701+
return -__LINE__;
702+
}
703+
}
704+
705+
int src_port = atoi(tokens[4].c_str());
706+
int dst_port = atoi(tokens[5].c_str());
707+
if (src_port < 0 || src_port > 65535) {
708+
return -__LINE__;
709+
}
710+
if (dst_port < 0 || dst_port > 65535) {
711+
return -__LINE__;
712+
}
713+
714+
client_ip_ = src_ip;
715+
server_ip_ = dst_ip;
716+
client_port_ = src_port;
717+
server_port_ = dst_port;
718+
719+
return size;
720+
}
721+
722+
int IORoutine::ProcessProxyHeaderV2(const ProxyHdrV2_t & hdr, int read_count) {
723+
int size = 16 + ntohs(hdr.len);
724+
if (read_count < size) { // truncated or too large header
725+
return -__LINE__;
726+
}
727+
char src_ip[INET6_ADDRSTRLEN] = { 0 };
728+
char dst_ip[INET6_ADDRSTRLEN] = { 0 };
729+
switch (hdr.ver_cmd & 0xF) {
730+
case PP2_CMD_PROXY:
731+
switch (hdr.fam) {
732+
case PP2_FAM_TCP4:
733+
if (inet_ntop(AF_INET, &hdr.addr.ip4.src_addr, src_ip, sizeof(src_ip)) == NULL) {
734+
return -__LINE__;
735+
}
736+
if (inet_ntop(AF_INET, &hdr.addr.ip4.dst_addr, dst_ip, sizeof(dst_ip)) == NULL) {
737+
return -__LINE__;
738+
}
739+
client_ip_ = string(src_ip);
740+
server_ip_ = string(dst_ip);
741+
client_port_ = ntohs(hdr.addr.ip4.src_port);
742+
server_port_ = ntohs(hdr.addr.ip4.dst_port);
743+
return size;
744+
case PP2_FAM_TCP6:
745+
if (inet_ntop(AF_INET6, &hdr.addr.ip6.src_addr, src_ip, sizeof(src_ip)) == NULL) {
746+
return -__LINE__;
747+
}
748+
if (inet_ntop(AF_INET6, &hdr.addr.ip6.dst_addr, dst_ip, sizeof(dst_ip)) == NULL) {
749+
return -__LINE__;
750+
}
751+
client_ip_ = string(src_ip);
752+
server_ip_ = string(dst_ip);
753+
client_port_ = ntohs(hdr.addr.ip6.src_port);
754+
server_port_ = ntohs(hdr.addr.ip6.dst_port);
755+
return size;
756+
case PP2_FAM_UNSPEC:
757+
return size; // unknown protocol, keep local connection address
758+
default:
759+
return -__LINE__; // unsupport protocol
760+
}
761+
case PP2_CMD_LOCAL:
762+
return size; // keep local connection address for LOCAL
763+
default:
764+
return -__LINE__; // unsupport command
765+
}
766+
return -__LINE__;
767+
}
768+
769+
int IORoutine::MakeProxyHeader(string & header) {
770+
switch (config_->ProxyProtocol()) {
771+
case 1:
772+
return MakeProxyHeaderV1(header);
773+
case 2:
774+
return MakeProxyHeaderV2(header);
775+
}
776+
return -__LINE__;
777+
}
778+
779+
int IORoutine::MakeProxyHeaderV1(string & header) {
780+
header = "PROXY ";
781+
struct in_addr addr;
782+
struct in6_addr addr6;
783+
if (inet_pton(AF_INET, client_ip_.c_str(), &addr) == 1 &&
784+
inet_pton(AF_INET, server_ip_.c_str(), &addr) == 1) {
785+
header += "TCP4 ";
786+
} else if (inet_pton(AF_INET6, client_ip_.c_str(), &addr6) == 1 &&
787+
inet_pton(AF_INET6, server_ip_.c_str(), &addr6) == 1) {
788+
header += "TCP6 ";
789+
} else {
790+
return -__LINE__;
791+
}
792+
header += client_ip_ + " " + server_ip_ + " " + UIntToStr(client_port_) + " " + UIntToStr(server_port_) + "\r\n";
793+
return 0;
794+
}
795+
796+
int IORoutine::MakeProxyHeaderV2(string & header) {
797+
ProxyHdrV2_t hdr;
798+
memcpy(hdr.sig, PP2_SIGNATURE, sizeof(hdr.sig));
799+
hdr.ver_cmd = PP2_VERSION | PP2_CMD_PROXY;
800+
if (inet_pton(AF_INET, client_ip_.c_str(), &hdr.addr.ip4.src_addr) == 1 &&
801+
inet_pton(AF_INET, server_ip_.c_str(), &hdr.addr.ip4.dst_addr) == 1)
802+
{
803+
hdr.addr.ip4.src_port = htons(client_port_);
804+
hdr.addr.ip4.dst_port = htons(server_port_);
805+
hdr.fam = PP2_FAM_TCP4;
806+
hdr.len = htons(sizeof(hdr.addr.ip4));
807+
header = string((char *)&hdr, 16 + sizeof(hdr.addr.ip4));
808+
return 0;
809+
}
810+
if (inet_pton(AF_INET6, client_ip_.c_str(), &hdr.addr.ip6.src_addr) == 1 &&
811+
inet_pton(AF_INET6, server_ip_.c_str(), &hdr.addr.ip6.dst_addr) == 1)
812+
{
813+
hdr.addr.ip6.src_port = htons(client_port_);
814+
hdr.addr.ip6.dst_port = htons(server_port_);
815+
hdr.fam = PP2_FAM_TCP6;
816+
hdr.len = htons(sizeof(hdr.addr.ip6));
817+
header = string((char *)&hdr, 16 + sizeof(hdr.addr.ip6));
818+
return 0;
819+
}
820+
return -__LINE__;
821+
}
822+
625823
}

phxsqlproxy/io_routine.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "co_routine.h"
1414
#include "phxcoroutine.h"
15+
#include "proxy_protocol.h"
1516
#include <string>
1617
#include <stack>
1718

@@ -72,6 +73,15 @@ class IORoutine : public Coroutine {
7273
void GetDBNameFromAuthBuf(const char * buf, int buf_size);
7374

7475
void GetDBNameFromReqBuf(const char * buf, int buf_size);
76+
77+
private:
78+
int ProcessProxyHeader(int fd);
79+
int ProcessProxyHeaderV1(char * hdr, int read_count);
80+
int ProcessProxyHeaderV2(const ProxyHdrV2_t & hdr, int read_count);
81+
int MakeProxyHeader(std::string & header);
82+
int MakeProxyHeaderV1(std::string & header);
83+
int MakeProxyHeaderV2(std::string & header);
84+
7585
protected:
7686
uint64_t req_uniq_id_;
7787

@@ -88,10 +98,10 @@ class IORoutine : public Coroutine {
8898

8999
int client_fd_;
90100
int sqlsvr_fd_;
91-
std::string listen_ip_;
92-
int listen_port_;
93101
std::string client_ip_;
94102
int client_port_;
103+
std::string server_ip_;
104+
int server_port_;
95105
uint64_t last_received_request_timestamp_;
96106
uint64_t last_sent_request_timestamp_;
97107
int last_read_fd_;

phxsqlproxy/phxcoroutine.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,5 +177,29 @@ int Coroutine::RoutineWriteWithTimeout(int dest_fd, const char * buf, int write_
177177
return written_once;
178178
}
179179

180+
int Coroutine::RoutinePeekWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms) {
181+
assert(IsNonBlock(source_fd));
182+
struct pollfd pf[1];
183+
int nfds = 0;
184+
memset(pf, 0, sizeof(pf));
185+
pf[0].fd = source_fd;
186+
pf[0].events = (POLLIN | POLLERR | POLLHUP);
187+
nfds++;
188+
189+
int return_fd_count = co_poll(co_get_epoll_ct(), pf, nfds, timeout_ms);
190+
if (return_fd_count < 0) {
191+
return return_fd_count;
192+
}
193+
194+
if (pf[0].revents & POLLIN) {
195+
return recv(source_fd, buf, buf_size, MSG_PEEK);
196+
} else if (pf[0].revents & POLLHUP) {
197+
return return_fd_count;
198+
} else if (pf[0].revents & POLLERR) {
199+
return return_fd_count;
200+
}
201+
return return_fd_count;
202+
}
203+
180204
}
181205

phxsqlproxy/phxcoroutine.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class Coroutine {
3838

3939
static int RoutineWriteWithTimeout(int dest_fd, const char * buf, int write_size, int timeout_ms);
4040

41-
static int RoutineReadWithTimeout(int source_fd, char * buf, int write_size, int timeout_ms);
41+
static int RoutineReadWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms);
42+
43+
static int RoutinePeekWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms);
4244

4345
private:
4446
stCoRoutine_t * routine_;

phxsqlproxy/phxsqlproxyconfig.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ void PHXSqlProxyConfig::ReadConfig() {
4848
sleep_ = GetInteger("Server", "Sleep", 0);
4949
connect_timeout_ms_ = GetInteger("Server", "ConnectTimeoutMs", 200);
5050
write_timeout_ms_ = GetInteger("Server", "WriteTimeoutMs", 1000);
51+
proxy_protocol_ = GetInteger("Server", "ProxyProtocol", 0);
52+
proxy_protocol_timeout_ms_ = GetInteger("Server", "ProxyProtocolTimeoutMs", 1000);
5153

5254
/*
5355
phxsql_config_ = PhxMySqlConfig :: GetDefault();
@@ -154,4 +156,12 @@ uint32_t PHXSqlProxyConfig::WriteTimeoutMs() {
154156
return write_timeout_ms_;
155157
}
156158

159+
int PHXSqlProxyConfig::ProxyProtocol() {
160+
return proxy_protocol_;
161+
}
162+
163+
uint32_t PHXSqlProxyConfig::ProxyProtocolTimeoutMs() {
164+
return proxy_protocol_timeout_ms_;
165+
}
166+
157167
}

0 commit comments

Comments
 (0)