Skip to content

Commit

Permalink
Simplify IO logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 5, 2023
1 parent d53a3f6 commit 8f423e9
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 105 deletions.
2 changes: 1 addition & 1 deletion compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *compressor) Compress(src []byte, dst *bytes.Buffer) error {
defer c.Unlock()

c.fw.Reset(dst)
if err := internal.WriteN(c.fw, src, len(src)); err != nil {
if err := internal.WriteN(c.fw, src); err != nil {
return err
}
if err := c.fw.Flush(); err != nil {
Expand Down
32 changes: 14 additions & 18 deletions internal/error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package internal

import "errors"

var ErrIOBytesLen = errors.New("unexpected bytes length")

var closeErrorMap = map[StatusCode]string{
0: "empty code",
CloseNormalClosure: "close normal",
Expand All @@ -25,46 +21,46 @@ var closeErrorMap = map[StatusCode]string{
type StatusCode uint16

const (
// 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
// CloseNormalClosure 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
CloseNormalClosure StatusCode = 1000

// 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
// CloseGoingAway 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
CloseGoingAway StatusCode = 1001

// 由于协议错误而中断连接.
// CloseProtocolError 由于协议错误而中断连接.
CloseProtocolError StatusCode = 1002

// 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
// CloseUnsupported 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
CloseUnsupported StatusCode = 1003

// 保留. 表示没有收到预期的状态码.
// CloseNoStatusReceived 保留. 表示没有收到预期的状态码.
CloseNoStatusReceived StatusCode = 1005

// 保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
// CloseAbnormalClosure 保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
CloseAbnormalClosure StatusCode = 1006

// 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
// CloseUnsupportedData 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
CloseUnsupportedData StatusCode = 1007

// 由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
// ClosePolicyViolation 由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
ClosePolicyViolation StatusCode = 1008

// 由于收到过大的数据帧而断开连接.
// CloseMessageTooLarge 由于收到过大的数据帧而断开连接.
CloseMessageTooLarge StatusCode = 1009

// 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
// CloseMissingExtension 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
CloseMissingExtension StatusCode = 1010

// 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
// CloseInternalServerErr 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
CloseInternalServerErr StatusCode = 1011

// 服务器由于重启而断开连接. [Ref]
// CloseServiceRestart 服务器由于重启而断开连接. [Ref]
CloseServiceRestart StatusCode = 1012

// 服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接. [Ref]
// CloseTryAgainLater 服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接. [Ref]
CloseTryAgainLater StatusCode = 1013

// 保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
// CloseTLSHandshake 保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
CloseTLSHandshake StatusCode = 1015
)

Expand Down
1 change: 0 additions & 1 deletion internal/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func TestBufferPool(t *testing.T) {
}

pool.Put(nil, 0)
pool.Put(NewBufferWithCap(0), 0)
buffer, _ := pool.Get(256 * 1024)
as.GreaterOrEqual(buffer.Cap(), 256*1024)
}
46 changes: 8 additions & 38 deletions internal/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package internal

import (
"bytes"
"crypto/sha1"
"encoding/base64"
"encoding/binary"
Expand Down Expand Up @@ -88,45 +87,16 @@ func FnvNumber[T Integer](x T) uint64 {
return h
}

func NewBufferWithCap(n uint8) *bytes.Buffer {
if n == 0 {
return bytes.NewBuffer(nil)
}
return bytes.NewBuffer(make([]byte, 0, n))
}

func CheckIOError(expectN, realN int, err error) error {
if err != nil {
return NewError(CloseInternalServerErr, err)
}
if realN != expectN {
return NewError(CloseInternalServerErr, ErrIOBytesLen)
}
return nil
}

func ReadN(reader io.Reader, data []byte, n int) error {
if n == 0 {
return nil
}
num, err := io.ReadFull(reader, data)
return CheckIOError(n, num, err)
}

func WriteN(writer io.Writer, content []byte, n int) error {
if n == 0 {
return nil
}
num, err := writer.Write(content)
return CheckIOError(n, num, err)
// ReadN 精准地读取len(data)个字节, 否则返回错误
func ReadN(reader io.Reader, data []byte) error {
_, err := io.ReadFull(reader, data)
return err
}

func CopyN(dst io.Writer, src io.Reader, n int64) error {
if n == 0 {
return nil
}
num, err := io.CopyN(dst, src, n)
return CheckIOError(int(n), int(num), err)
// WriteN 精准地写入len(data)个字节, 否则返回错误
func WriteN(writer io.Writer, content []byte) error {
_, err := writer.Write(content)
return err
}

func MaskXOR(b []byte, key []byte) {
Expand Down
36 changes: 4 additions & 32 deletions internal/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,53 +81,25 @@ func TestFNV64(t *testing.T) {
func TestIOUtil(t *testing.T) {
var as = assert.New(t)

t.Run("", func(t *testing.T) {
var dst = bytes.NewBuffer(nil)
var src = bytes.NewBuffer(make([]byte, 0))
var err = CopyN(dst, src, 0)
as.NoError(err)
})

t.Run("", func(t *testing.T) {
var dst = bytes.NewBuffer(nil)
var src = bytes.NewBuffer(make([]byte, 6))
var err = CopyN(dst, src, 12)
as.Error(err)
})

t.Run("", func(t *testing.T) {
var reader = strings.NewReader("hello")
var p = make([]byte, 0)
var err = ReadN(reader, p, 0)
as.NoError(err)
})

t.Run("", func(t *testing.T) {
var reader = strings.NewReader("hello")
var p = make([]byte, 5)
var err = ReadN(reader, p, 10)
as.Error(err)
var err = ReadN(reader, p)
as.Nil(err)
})

t.Run("", func(t *testing.T) {
var writer = bytes.NewBufferString("")
var err = WriteN(writer, nil, 0)
var err = WriteN(writer, nil)
as.NoError(err)
})

t.Run("", func(t *testing.T) {
var writer = bytes.NewBufferString("")
var p = []byte("hello")
var err = WriteN(writer, p, 5)
var err = WriteN(writer, p)
as.NoError(err)
})

t.Run("", func(t *testing.T) {
var buf1 = NewBufferWithCap(0)
as.Equal(0, buf1.Cap())
var buf2 = NewBufferWithCap(12)
as.Equal(12, buf2.Cap())
})
}

func TestNewMaskKey(t *testing.T) {
Expand Down
12 changes: 4 additions & 8 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ var (
// Connection closed
ErrConnClosed = net.ErrClosed

// ErrIOBytesLen IO错误(读写的字节长度不符合预期)
// IO error (length of bytes read or written is not as expected)
ErrIOBytesLen = internal.ErrIOBytesLen

// ErrUnsupportedProtocol 不支持的网络协议
// Unsupported network protocols
ErrUnsupportedProtocol = errors.New("unsupported protocol")
Expand Down Expand Up @@ -181,21 +177,21 @@ func (c *frameHeader) GenerateHeader(isServer bool, fin bool, compress bool, opc

// Parse 解析完整协议头, 最多14byte, 返回payload长度
func (c *frameHeader) Parse(reader io.Reader) (int, error) {
if err := internal.ReadN(reader, (*c)[0:2], 2); err != nil {
if err := internal.ReadN(reader, (*c)[0:2]); err != nil {
return 0, err
}

var payloadLength = 0
var lengthCode = c.GetLengthCode()
switch lengthCode {
case 126:
if err := internal.ReadN(reader, (*c)[2:4], 2); err != nil {
if err := internal.ReadN(reader, (*c)[2:4]); err != nil {
return 0, err
}
payloadLength = int(binary.BigEndian.Uint16((*c)[2:4]))

case 127:
if err := internal.ReadN(reader, (*c)[2:10], 8); err != nil {
if err := internal.ReadN(reader, (*c)[2:10]); err != nil {
return 0, err
}
payloadLength = int(binary.BigEndian.Uint64((*c)[2:10]))
Expand All @@ -205,7 +201,7 @@ func (c *frameHeader) Parse(reader io.Reader) (int, error) {

var maskOn = c.GetMask()
if maskOn {
if err := internal.ReadN(reader, (*c)[10:14], 4); err != nil {
if err := internal.ReadN(reader, (*c)[10:14]); err != nil {
return 0, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Conn) readControl() error {
var payload []byte
if n > 0 {
payload = make([]byte, n)
if err := internal.ReadN(c.rbuf, payload, int(n)); err != nil {
if err := internal.ReadN(c.rbuf, payload); err != nil {
return err
}
if maskEnabled := c.fh.GetMask(); maskEnabled {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (c *Conn) readMessage() error {
var buf, index = myBufferPool.Get(contentLength)
var p = buf.Bytes()
p = p[:contentLength]
if err := internal.ReadN(c.rbuf, p, contentLength); err != nil {
if err := internal.ReadN(c.rbuf, p); err != nil {
return err
}
if maskEnabled {
Expand All @@ -122,7 +122,7 @@ func (c *Conn) readMessage() error {
if !c.continuationFrame.initialized {
return internal.CloseProtocolError
}
if err := internal.WriteN(c.continuationFrame.buffer, p, len(p)); err != nil {
if err := internal.WriteN(c.continuationFrame.buffer, p); err != nil {
return err
} else {
myBufferPool.Put(buf, index)
Expand Down
6 changes: 3 additions & 3 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
if c.isClosed() {
return
}
err = internal.WriteN(c.conn, frame.Bytes(), frame.Len())
err = internal.WriteN(c.conn, frame.Bytes())
myBufferPool.Put(frame, index)
c.emitError(err)
})
Expand All @@ -79,7 +79,7 @@ func (c *Conn) doWrite(opcode Opcode, payload []byte) error {
return err
}

err = internal.WriteN(c.conn, frame.Bytes(), frame.Len())
err = internal.WriteN(c.conn, frame.Bytes())
myBufferPool.Put(frame, index)
return err
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (c *Broadcaster) Broadcast(socket *Conn) error {
atomic.AddInt64(&c.state, 1)
socket.writeQueue.Push(func() {
if !socket.isClosed() {
socket.emitError(internal.WriteN(socket.conn, msg.frame.Bytes(), msg.frame.Len()))
socket.emitError(internal.WriteN(socket.conn, msg.frame.Bytes()))
}
if atomic.AddInt64(&c.state, -1) == 0 {
c.doClose()
Expand Down
9 changes: 8 additions & 1 deletion writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gws

import (
"bytes"
"io"
"net"
"net/http"
"sync"
Expand Down Expand Up @@ -39,7 +40,13 @@ func testWrite(c *Conn, fin bool, opcode Opcode, payload []byte) error {
buf = append(buf, payload)
}
num, err := buf.WriteTo(c.conn)
return internal.CheckIOError(headerLength+n, int(num), err)
if err != nil {
return err
}
if int(num) < headerLength+n {
return io.ErrShortWrite
}
return nil
}

func TestWriteBigMessage(t *testing.T) {
Expand Down

0 comments on commit 8f423e9

Please sign in to comment.