Skip to content

Commit

Permalink
optimize async io
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Feb 26, 2023
1 parent b1a6beb commit 577e777
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ bench:
go test -benchmem -bench ^Benchmark github.com/lxzan/gws

build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/gws-server-linux-amd64 github.com/lxzan/gws/examples/testsuite
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/gws-linux-amd64 github.com/lxzan/gws/examples/testsuite

run-testsuite-server:
go run github.com/lxzan/gws/examples/testsuite
Expand Down
25 changes: 13 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type Conn struct {
closed uint32
// write lock
wmu *sync.Mutex
// async io task queue
aiomq *workerQueue
// write message queue
wmq *writeQueue
// async read task queue
readTaskQ *workerQueue
// async write task queue
writeTaskQ *workerQueue
// messages waiting for writing
wMessages *writeQueue
}

func serveWebSocket(config *Upgrader, r *Request, netConn net.Conn, brw *bufio.ReadWriter, handler Event, compressEnabled bool) *Conn {
Expand All @@ -63,8 +65,9 @@ func serveWebSocket(config *Upgrader, r *Request, netConn net.Conn, brw *bufio.R
rbuf: brw.Reader,
fh: frameHeader{},
handler: handler,
aiomq: newWorkerQueue(int64(config.AsyncIOGoLimit)),
wmq: &writeQueue{},
readTaskQ: newWorkerQueue(int64(config.AsyncReadGoLimit)),
writeTaskQ: newWorkerQueue(1),
wMessages: &writeQueue{},
}
if c.compressEnabled {
c.compressor = newCompressor(config.CompressLevel)
Expand Down Expand Up @@ -128,9 +131,8 @@ func (c *Conn) emitError(err error) {
content = content[:internal.ThresholdV1]
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.wmq.Push(messageWrapper{opcode: OpcodeCloseConnection, payload: content})
c.aiomq.AddJob(asyncJob{Do: c.doWriteAsync})
c.aiomq.Wait(defaultCloseTimeout)
c.addWriteTask(OpcodeCloseConnection, content)
c.writeTaskQ.Wait(defaultCloseTimeout)
c.handler.OnError(c, responseErr)
}
}
Expand Down Expand Up @@ -167,9 +169,8 @@ func (c *Conn) emitClose(buf *bytes.Buffer) error {
}
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.wmq.Push(messageWrapper{opcode: OpcodeCloseConnection, payload: responseCode.Bytes()})
c.aiomq.AddJob(asyncJob{Do: c.doWriteAsync})
c.aiomq.Wait(defaultCloseTimeout)
c.addWriteTask(OpcodeCloseConnection, responseCode.Bytes())
c.writeTaskQ.Wait(defaultCloseTimeout)
c.handler.OnClose(c, realCode, buf.Bytes())
}
return internal.CloseNormalClosure
Expand Down
14 changes: 13 additions & 1 deletion option.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gws

import "net/http"
import (
"net/http"
"time"
)

type Option func(c *Upgrader)

Expand Down Expand Up @@ -70,3 +73,12 @@ func WithCheckOrigin(f func(r *Request) bool) Option {
c.CheckOrigin = f
}
}

// WithCloseTimeout 关闭连接等待超时时间
// 当IO出现异常时, 连接内可能还有一些未写入数据, 所以需要等待
// 为了快速关闭异常连接, 这个值不要设置太大
func WithCloseTimeout(timeout time.Duration) Option {
return func(c *Upgrader) {
c.CloseTimeout = timeout
}
}
4 changes: 4 additions & 0 deletions option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"compress/flate"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestNewUpgrader(t *testing.T) {
Expand All @@ -14,6 +15,7 @@ func TestNewUpgrader(t *testing.T) {
as.Equal(false, config.CheckTextEncoding)
as.Equal(defaultAsyncReadGoLimit, config.AsyncReadGoLimit)
as.Equal(defaultMaxContentLength, config.MaxContentLength)
as.Equal(defaultCloseTimeout, config.CloseTimeout)
as.NotNil(config.EventHandler)
as.NotNil(config.ResponseHeader)
as.NotNil(config.CheckOrigin)
Expand All @@ -26,6 +28,7 @@ func TestOptions(t *testing.T) {
WithAsyncReadEnabled(),
WithAsyncReadGoLimit(16),
WithMaxContentLength(256),
WithCloseTimeout(100*time.Millisecond),
WithCheckTextEncoding(),
)
as.Equal(true, config.CompressEnabled)
Expand All @@ -34,6 +37,7 @@ func TestOptions(t *testing.T) {

as.Equal(true, config.AsyncReadEnabled)
as.Equal(16, config.AsyncReadGoLimit)
as.Equal(100*time.Millisecond, config.CloseTimeout)
as.Equal(256, config.MaxContentLength)
as.Equal(true, config.CheckTextEncoding)
}
2 changes: 1 addition & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Conn) emitMessage(msg *Message, compressed bool) error {
}

if c.config.AsyncReadEnabled {
c.aiomq.AddJob(asyncJob{
c.readTaskQ.AddJob(asyncJob{
Args: msg,
Do: func(args interface{}) error {
c.handler.OnMessage(c, args.(*Message))
Expand Down
9 changes: 8 additions & 1 deletion updrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
defaultCompressLevel = flate.BestSpeed
defaultMaxContentLength = 16 * 1024 * 1024 // 16MiB
defaultCompressionThreshold = 512 // 512 Byte
defaultCloseTimeout = 5 * time.Second // 5s
defaultCloseTimeout = time.Second // 1s
)

type (
Expand All @@ -35,6 +35,10 @@ type (
// goroutine limits for concurrent onmessage
AsyncReadGoLimit int

// maximum wait time to close a connection. for fast closing of abnormal connections, do not set this value too high.
// when an IO exception occurs, there may be some unwritten data in the connection, so you need to wait
CloseTimeout time.Duration

// whether to compress data
CompressEnabled bool

Expand Down Expand Up @@ -85,6 +89,9 @@ func (c *Upgrader) Initialize() {
if c.AsyncReadGoLimit <= 0 {
c.AsyncReadGoLimit = defaultAsyncReadGoLimit
}
if c.CloseTimeout <= 0 {
c.CloseTimeout = defaultCloseTimeout
}
}

func NewUpgrader(options ...Option) *Upgrader {
Expand Down
18 changes: 11 additions & 7 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,24 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) {
if atomic.LoadUint32(&c.closed) == 1 {
return
}
c.addWriteTask(opcode, payload)
}

c.wmq.Push(messageWrapper{opcode: opcode, payload: payload})
c.aiomq.AddJob(asyncJob{Do: c.doWriteAsync})
// 添加异步写的任务
func (c *Conn) addWriteTask(opcode Opcode, payload []byte) {
c.wMessages.Push(messageWrapper{opcode: opcode, payload: payload})
c.writeTaskQ.AddJob(asyncJob{Do: c.doWriteAsync})
}

func (c *Conn) doWriteAsync(args interface{}) error {
if c.wmq.Len() == 0 {
if c.wMessages.Len() == 0 {
return nil
}

c.wmq.Lock()
msgs := c.wmq.data
c.wmq.data = []messageWrapper{}
c.wmq.Unlock()
c.wMessages.Lock()
msgs := c.wMessages.data
c.wMessages.data = []messageWrapper{}
c.wMessages.Unlock()

myerr := func() error {
c.wmu.Lock()
Expand Down

0 comments on commit 577e777

Please sign in to comment.