Skip to content

Commit

Permalink
Merge pull request #106 from lxzan/note
Browse files Browse the repository at this point in the history
Add Comments & SetBufferThreshold
  • Loading branch information
lxzan authored Aug 16, 2024
2 parents 38b4bdf + f55a412 commit 3dc044f
Show file tree
Hide file tree
Showing 19 changed files with 837 additions and 208 deletions.
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ linters:
# Disable specific linter
# https://golangci-lint.run/usage/linters/#disabled-by-default
disable:
- mnd
- testpackage
- nosnakecase
- nlreturn
- gomnd
- forcetypeassert
Expand All @@ -14,15 +14,13 @@ linters:
- ineffassign
- lll
- funlen
- scopelint
- dupl
- gofumpt
- gofmt
- godot
- gci
- goimports
- gocognit
- ifshort
- gochecknoinits
- goconst
- depguard
Expand Down
28 changes: 23 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import (
"github.com/lxzan/gws/internal"
)

// Dialer 拨号器接口
// Dialer interface
type Dialer interface {
// Dial 连接到指定网络上的地址
// Connects to the address on the named network
Dial(network, addr string) (c net.Conn, err error)
}

Expand All @@ -26,8 +30,8 @@ type connector struct {
secWebsocketKey string
}

// NewClient 创建客户端
// Create New client
// NewClient 创建一个新的 WebSocket 客户端连接
// Creates a new WebSocket client connection
func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, eventHandler: handler}
Expand Down Expand Up @@ -69,7 +73,7 @@ func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, erro
}

// NewClientFromConn 通过外部连接创建客户端, 支持 TCP/KCP/Unix Domain Socket
// Create New client via external connection, supports TCP/KCP/Unix Domain Socket.
// Create new client via external connection, supports TCP/KCP/Unix Domain Socket.
func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, conn: conn, eventHandler: handler}
Expand All @@ -80,12 +84,15 @@ func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Con
return client, resp, err
}

// 发送HTTP请求, 即WebSocket握手
// Sends an http request, i.e., websocket handshake
func (c *connector) request() (*http.Response, *bufio.Reader, error) {
_ = c.conn.SetDeadline(time.Now().Add(c.option.HandshakeTimeout))
ctx, cancel := context.WithTimeout(context.Background(), c.option.HandshakeTimeout)
defer cancel()

// 构建请求
// 构建HTTP请求
// building a http request
r, err := http.NewRequestWithContext(ctx, http.MethodGet, c.option.Addr, nil)
if err != nil {
return nil, nil, err
Expand All @@ -109,10 +116,12 @@ func (c *connector) request() (*http.Response, *bufio.Reader, error) {

var ch = make(chan error)

// 发送请求
// 发送http请求
// send http request
go func() { ch <- r.Write(c.conn) }()

// 同步等待请求是否发送成功
// Synchronized waiting for the request to be sent successfully
select {
case err = <-ch:
case <-ctx.Done():
Expand All @@ -123,11 +132,14 @@ func (c *connector) request() (*http.Response, *bufio.Reader, error) {
}

// 读取响应结果
// Read the response result
br := bufio.NewReaderSize(c.conn, c.option.ReadBufferSize)
resp, err := http.ReadResponse(br, r)
return resp, br, err
}

// 获取压缩拓展结果
// Get compression expansion results
func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
serverPD := permessageNegotiation(extensions)
clientPD := c.option.PermessageDeflate
Expand All @@ -145,6 +157,8 @@ func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
return pd
}

// 执行 WebSocket 握手操作
// Performs the WebSocket handshake operation
func (c *connector) handshake() (*Conn, *http.Response, error) {
resp, br, err := c.request()
if err != nil {
Expand Down Expand Up @@ -188,6 +202,8 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
return socket, resp, c.conn.SetDeadline(time.Time{})
}

// 从响应中获取子协议
// Retrieves the subprotocol from the response
func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
a := internal.Split(c.option.RequestHeader.Get(internal.SecWebSocketProtocol.Key), ",")
b := internal.Split(resp.Header.Get(internal.SecWebSocketProtocol.Key), ",")
Expand All @@ -198,6 +214,8 @@ func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
return subprotocol, nil
}

// 检查响应头以验证握手是否成功
// Checks the response headers to verify if the handshake was successful
func (c *connector) checkHeaders(resp *http.Response) error {
if resp.StatusCode != http.StatusSwitchingProtocols {
return ErrHandshake
Expand Down
33 changes: 28 additions & 5 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/lxzan/gws/internal"
)

// FlateTail Add four bytes as specified in RFC
// Add final block to squelch unexpected EOF error from flate reader.
// deflate压缩算法的尾部标记
// The tail marker of the deflate compression algorithm
var flateTail = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff}

type deflaterPool struct {
Expand All @@ -24,6 +24,8 @@ type deflaterPool struct {
pool []*deflater
}

// 初始化deflaterPool
// Initialize the deflaterPool
func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflaterPool {
c.num = uint64(options.PoolSize)
for i := uint64(0); i < c.num; i++ {
Expand All @@ -32,6 +34,8 @@ func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflate
return c
}

// Select 从deflaterPool中选择一个deflater对象
// Select a deflater object from the deflaterPool
func (c *deflaterPool) Select() *deflater {
var j = atomic.AddUint64(&c.serial, 1) & (c.num - 1)
return c.pool[j]
Expand All @@ -47,6 +51,8 @@ type deflater struct {
cpsWriter *flate.Writer
}

// 初始化deflater
// Initialize the deflater
func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit int) *deflater {
c.dpsReader = flate.NewReader(nil)
c.dpsBuffer = bytes.NewBuffer(nil)
Expand All @@ -61,16 +67,19 @@ func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit in
return c
}

// 重置deflate reader
// Reset the deflate reader
func (c *deflater) resetFR(r io.Reader, dict []byte) {
resetter := c.dpsReader.(flate.Resetter)
_ = resetter.Reset(r, dict) // must return a null pointer
if c.dpsBuffer.Cap() > 256*1024 {
if c.dpsBuffer.Cap() > int(bufferThreshold) {
c.dpsBuffer = bytes.NewBuffer(nil)
}
c.dpsBuffer.Reset()
}

// Decompress 解压
// Decompress data
func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, error) {
c.dpsLocker.Lock()
defer c.dpsLocker.Unlock()
Expand All @@ -87,6 +96,7 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er
}

// Compress 压缩
// Compress data
func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error {
c.cpsLocker.Lock()
defer c.cpsLocker.Unlock()
Expand All @@ -107,12 +117,16 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

// 滑动窗口
// Sliding window
type slideWindow struct {
enabled bool
dict []byte
size int
}

// 初始化滑动窗口
// Initialize the sliding window
func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *slideWindow {
c.enabled = true
c.size = internal.BinaryPow(windowBits)
Expand All @@ -124,6 +138,8 @@ func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *s
return c
}

// Write 将数据写入滑动窗口
// Write data to the sliding window
func (c *slideWindow) Write(p []byte) (int, error) {
if !c.enabled {
return 0, nil
Expand Down Expand Up @@ -153,6 +169,8 @@ func (c *slideWindow) Write(p []byte) (int, error) {
return total, nil
}

// 生成请求头
// Generate request headers
func (c *PermessageDeflate) genRequestHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -173,6 +191,8 @@ func (c *PermessageDeflate) genRequestHeader() string {
return strings.Join(options, "; ")
}

// 生成响应头
// Generate response headers
func (c *PermessageDeflate) genResponseHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -191,7 +211,8 @@ func (c *PermessageDeflate) genResponseHeader() string {
return strings.Join(options, "; ")
}

// 压缩拓展握手协商
// 压缩拓展协商
// Negotiation of compression parameters
func permessageNegotiation(str string) PermessageDeflate {
var options = PermessageDeflate{
ServerContextTakeover: true,
Expand Down Expand Up @@ -229,7 +250,9 @@ func permessageNegotiation(str string) PermessageDeflate {
return options
}

func limitReader(r io.Reader, limit int) io.Reader { return &limitedReader{R: r, M: limit} }
// 限制从io.Reader中最多读取m个字节
// Limit reading up to m bytes from io.Reader
func limitReader(r io.Reader, m int) io.Reader { return &limitedReader{R: r, M: m} }

type limitedReader struct {
R io.Reader
Expand Down
Loading

0 comments on commit 3dc044f

Please sign in to comment.