@@ -129,6 +129,9 @@ void IORoutine::ClearVariablesAndStatus() {
129129 connect_dest_ = " " ;
130130 connect_port_ = 0 ;
131131 client_ip_ = " " ;
132+ client_port_ = 0 ;
133+ server_ip_ = " " ;
134+ server_port_ = 0 ;
132135 req_uniq_id_ = 0 ;
133136 last_sent_request_timestamp_ = 0 ;
134137 last_received_request_timestamp_ = 0 ;
@@ -233,7 +236,7 @@ void IORoutine::SetClientFD(int fd) {
233236 ClearAll ();
234237 req_uniq_id_ = ((uint64_t )(random_engine ()) << 32 ) | random_engine ();
235238 client_fd_ = fd;
236- int ret1 = GetSockName (fd, listen_ip_, listen_port_ );
239+ int ret1 = GetSockName (fd, server_ip_, server_port_ );
237240 int ret2 = GetPeerName (fd, client_ip_, client_port_);
238241 if (ret1 == 0 && ret2 == 0 ) {
239242 LogVerbose (" requniqid %llu receive connect from [%s:%d]" ,
@@ -349,20 +352,37 @@ int IORoutine::run() {
349352 while (true ) {
350353 // first connect to master mysql to finish auth
351354 if (sqlsvr_fd_ == -1 ) {
355+ SetNoDelay (client_fd_);
356+ if (server_port_ == GetWorkerConfig ()->proxy_port_ ) {
357+ if (!GetGroupStatusCache ()->IsMember (client_ip_)) {
358+ break ;
359+ }
360+
361+ int ret = ProcessProxyHeader (client_fd_);
362+ if (ret < 0 ) {
363+ LogError (" process proxy header err: %d" , ret);
364+ break ;
365+ }
366+ LogVerbose (" process proxy header succ: %s:%d %s:%d" ,
367+ client_ip_.c_str (), client_port_, server_ip_.c_str (), server_port_);
368+ }
369+
352370 int fd = ConnectDest ();
353371 if (fd <= 0 ) {
354372 break ;
355373 }
356374 sqlsvr_fd_ = fd;
357- SetNoDelay (client_fd_);
358375 SetNoDelay (sqlsvr_fd_);
359376
360- if (listen_port_ == GetWorkerConfig ()->port_ ) {
361- string proxy_line = " PROXY TCP4 " + client_ip_ + " " + listen_ip_ + " " +
362- UIntToStr (client_port_) + " " + UIntToStr (listen_port_) + " \r\n " ;
363- int ret = WriteToDest (sqlsvr_fd_, proxy_line.c_str (), proxy_line.size ());
377+ if (config_->ProxyProtocol ()) {
378+ string proxy_header;
379+ int ret = MakeProxyHeader (proxy_header);
380+ if (ret < 0 ) {
381+ LogError (" make proxy header err: %d" , ret);
382+ break ;
383+ }
384+ ret = WriteToDest (sqlsvr_fd_, proxy_header.c_str (), proxy_header.size ());
364385 if (ret < 0 ) {
365- LogError (" send proxy line error: %s" , proxy_line.c_str ());
366386 break ;
367387 }
368388 }
@@ -416,7 +436,7 @@ int IORoutine::run() {
416436
417437 if (byte_size_tot == 0 ) {
418438 // MasterEnableReadPort=0
419- if (connect_port_ == GetWorkerConfig ()-> proxy_port_ && !GetWorkerConfig ()->is_master_port_ ) {
439+ if (connect_port_ != config_-> GetMysqlPort () && !GetWorkerConfig ()->is_master_port_ ) {
420440 LogVerbose (" %s:%d requniqid %llu mark %s failure" , __func__, __LINE__, req_uniq_id_,
421441 connect_dest_.c_str ());
422442 GetGroupStatusCache ()->MarkFailure (connect_dest_);
@@ -503,7 +523,7 @@ int MasterIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
503523 dest_port = config_->GetMysqlPort ();
504524 } else {
505525 dest_ip = master_ip;
506- dest_port = GetWorkerConfig ()->proxy_port_ ;
526+ dest_port = config_-> ProxyProtocol () ? GetWorkerConfig ()->proxy_port_ : GetWorkerConfig ()-> port_ ;
507527 }
508528
509529 LogVerbose (" %s:%d requniqid %llu ret ip [%s] port [%d]" , __func__, __LINE__, req_uniq_id_, dest_ip.c_str (),
@@ -575,7 +595,7 @@ int SlaveIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
575595 req_uniq_id_, master_ip.c_str ());
576596 return -__LINE__;
577597 }
578- dest_port = GetWorkerConfig ()->proxy_port_ ;
598+ dest_port = config_-> ProxyProtocol () ? GetWorkerConfig ()->proxy_port_ : GetWorkerConfig ()-> port_ ;
579599 } else {
580600 dest_ip = " 127.0.0.1" ;
581601 dest_port = config_->GetMysqlPort ();
@@ -622,4 +642,182 @@ WorkerConfig_t * SlaveIORoutine::GetWorkerConfig() {
622642 return config_->GetSlaveWorkerConfig ();
623643}
624644
645+ int IORoutine::ProcessProxyHeader (int fd) {
646+ union {
647+ ProxyHdrV1_t v1;
648+ ProxyHdrV2_t v2;
649+ } hdr;
650+
651+ int ret = RoutinePeekWithTimeout (fd, (char *)&hdr, sizeof (hdr), config_->ProxyProtocolTimeoutMs ());
652+ if (ret < 0 ) {
653+ return ret;
654+ }
655+
656+ if (ret >= 16 && memcmp (&hdr.v2 , PP2_SIGNATURE, 12 ) == 0 && (hdr.v2 .ver_cmd & 0xF0 ) == PP2_VERSION) {
657+ ret = ProcessProxyHeaderV2 (hdr.v2 , ret);
658+ } else if (ret >= 8 && memcmp (hdr.v1 .line , " PROXY " , 6 ) == 0 ) {
659+ ret = ProcessProxyHeaderV1 (hdr.v1 .line , ret);
660+ } else {
661+ return -__LINE__;
662+ }
663+
664+ if (ret > 0 ) {
665+ ret = RoutineReadWithTimeout (fd, (char *)&hdr, ret, config_->ProxyProtocolTimeoutMs ());
666+ }
667+ return ret;
668+ }
669+
670+ int IORoutine::ProcessProxyHeaderV1 (char * hdr, int read_count) {
671+ char * end = (char *)memchr (hdr, ' \r ' , read_count - 1 );
672+ int size = end - hdr + 2 ;
673+ if (!end || *(end + 1 ) != ' \n ' ) {
674+ return -__LINE__;
675+ }
676+
677+ vector<string> tokens = SplitStr (string (hdr, end), " " );
678+ if (tokens[1 ] == " UNKNOWN" ) {
679+ return size;
680+ }
681+ if (tokens.size () != 6 ) {
682+ return -__LINE__;
683+ }
684+
685+ const string & src_ip = tokens[2 ];
686+ const string & dst_ip = tokens[3 ];
687+ if (tokens[1 ] == " TCP4" ) {
688+ struct in_addr addr;
689+ if (inet_pton (AF_INET, src_ip.c_str (), &addr) <= 0 ) {
690+ return -__LINE__;
691+ }
692+ if (inet_pton (AF_INET, dst_ip.c_str (), &addr) <= 0 ) {
693+ return -__LINE__;
694+ }
695+ } else if (tokens[1 ] == " TCP6" ) {
696+ struct in6_addr addr;
697+ if (inet_pton (AF_INET6, src_ip.c_str (), &addr) <= 0 ) {
698+ return -__LINE__;
699+ }
700+ if (inet_pton (AF_INET6, dst_ip.c_str (), &addr) <= 0 ) {
701+ return -__LINE__;
702+ }
703+ }
704+
705+ int src_port = atoi (tokens[4 ].c_str ());
706+ int dst_port = atoi (tokens[5 ].c_str ());
707+ if (src_port < 0 || src_port > 65535 ) {
708+ return -__LINE__;
709+ }
710+ if (dst_port < 0 || dst_port > 65535 ) {
711+ return -__LINE__;
712+ }
713+
714+ client_ip_ = src_ip;
715+ server_ip_ = dst_ip;
716+ client_port_ = src_port;
717+ server_port_ = dst_port;
718+
719+ return size;
720+ }
721+
722+ int IORoutine::ProcessProxyHeaderV2 (const ProxyHdrV2_t & hdr, int read_count) {
723+ int size = 16 + ntohs (hdr.len );
724+ if (read_count < size) { // truncated or too large header
725+ return -__LINE__;
726+ }
727+ char src_ip[INET6_ADDRSTRLEN] = { 0 };
728+ char dst_ip[INET6_ADDRSTRLEN] = { 0 };
729+ switch (hdr.ver_cmd & 0xF ) {
730+ case PP2_CMD_PROXY:
731+ switch (hdr.fam ) {
732+ case PP2_FAM_TCP4:
733+ if (inet_ntop (AF_INET, &hdr.addr .ip4 .src_addr , src_ip, sizeof (src_ip)) == NULL ) {
734+ return -__LINE__;
735+ }
736+ if (inet_ntop (AF_INET, &hdr.addr .ip4 .dst_addr , dst_ip, sizeof (dst_ip)) == NULL ) {
737+ return -__LINE__;
738+ }
739+ client_ip_ = string (src_ip);
740+ server_ip_ = string (dst_ip);
741+ client_port_ = ntohs (hdr.addr .ip4 .src_port );
742+ server_port_ = ntohs (hdr.addr .ip4 .dst_port );
743+ return size;
744+ case PP2_FAM_TCP6:
745+ if (inet_ntop (AF_INET6, &hdr.addr .ip6 .src_addr , src_ip, sizeof (src_ip)) == NULL ) {
746+ return -__LINE__;
747+ }
748+ if (inet_ntop (AF_INET6, &hdr.addr .ip6 .dst_addr , dst_ip, sizeof (dst_ip)) == NULL ) {
749+ return -__LINE__;
750+ }
751+ client_ip_ = string (src_ip);
752+ server_ip_ = string (dst_ip);
753+ client_port_ = ntohs (hdr.addr .ip6 .src_port );
754+ server_port_ = ntohs (hdr.addr .ip6 .dst_port );
755+ return size;
756+ case PP2_FAM_UNSPEC:
757+ return size; // unknown protocol, keep local connection address
758+ default :
759+ return -__LINE__; // unsupport protocol
760+ }
761+ case PP2_CMD_LOCAL:
762+ return size; // keep local connection address for LOCAL
763+ default :
764+ return -__LINE__; // unsupport command
765+ }
766+ return -__LINE__;
767+ }
768+
769+ int IORoutine::MakeProxyHeader (string & header) {
770+ switch (config_->ProxyProtocol ()) {
771+ case 1 :
772+ return MakeProxyHeaderV1 (header);
773+ case 2 :
774+ return MakeProxyHeaderV2 (header);
775+ }
776+ return -__LINE__;
777+ }
778+
779+ int IORoutine::MakeProxyHeaderV1 (string & header) {
780+ header = " PROXY " ;
781+ struct in_addr addr;
782+ struct in6_addr addr6;
783+ if (inet_pton (AF_INET, client_ip_.c_str (), &addr) == 1 &&
784+ inet_pton (AF_INET, server_ip_.c_str (), &addr) == 1 ) {
785+ header += " TCP4 " ;
786+ } else if (inet_pton (AF_INET6, client_ip_.c_str (), &addr6) == 1 &&
787+ inet_pton (AF_INET6, server_ip_.c_str (), &addr6) == 1 ) {
788+ header += " TCP6 " ;
789+ } else {
790+ return -__LINE__;
791+ }
792+ header += client_ip_ + " " + server_ip_ + " " + UIntToStr (client_port_) + " " + UIntToStr (server_port_) + " \r\n " ;
793+ return 0 ;
794+ }
795+
796+ int IORoutine::MakeProxyHeaderV2 (string & header) {
797+ ProxyHdrV2_t hdr;
798+ memcpy (hdr.sig , PP2_SIGNATURE, sizeof (hdr.sig ));
799+ hdr.ver_cmd = PP2_VERSION | PP2_CMD_PROXY;
800+ if (inet_pton (AF_INET, client_ip_.c_str (), &hdr.addr .ip4 .src_addr ) == 1 &&
801+ inet_pton (AF_INET, server_ip_.c_str (), &hdr.addr .ip4 .dst_addr ) == 1 )
802+ {
803+ hdr.addr .ip4 .src_port = htons (client_port_);
804+ hdr.addr .ip4 .dst_port = htons (server_port_);
805+ hdr.fam = PP2_FAM_TCP4;
806+ hdr.len = htons (sizeof (hdr.addr .ip4 ));
807+ header = string ((char *)&hdr, 16 + sizeof (hdr.addr .ip4 ));
808+ return 0 ;
809+ }
810+ if (inet_pton (AF_INET6, client_ip_.c_str (), &hdr.addr .ip6 .src_addr ) == 1 &&
811+ inet_pton (AF_INET6, server_ip_.c_str (), &hdr.addr .ip6 .dst_addr ) == 1 )
812+ {
813+ hdr.addr .ip6 .src_port = htons (client_port_);
814+ hdr.addr .ip6 .dst_port = htons (server_port_);
815+ hdr.fam = PP2_FAM_TCP6;
816+ hdr.len = htons (sizeof (hdr.addr .ip6 ));
817+ header = string ((char *)&hdr, 16 + sizeof (hdr.addr .ip6 ));
818+ return 0 ;
819+ }
820+ return -__LINE__;
821+ }
822+
625823}
0 commit comments