-
Notifications
You must be signed in to change notification settings - Fork 95
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
async write async write async write async write
- Loading branch information
lixizan
committed
Feb 23, 2023
1 parent
8c5adca
commit a2bb4a9
Showing
10 changed files
with
249 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package gws | ||
|
||
import ( | ||
"sync" | ||
) | ||
|
||
type ( | ||
workerQueue struct { | ||
mu *sync.Mutex // 锁 | ||
q []writeJob // 任务队列 | ||
maxConcurrency int64 // 最大并发 | ||
curConcurrency int64 // 当前并发 | ||
} | ||
|
||
writeJob struct { | ||
Args *Conn | ||
Do func(args *Conn) error | ||
} | ||
|
||
messageWrapper struct { | ||
opcode Opcode | ||
payload []byte | ||
} | ||
) | ||
|
||
// newWorkerQueue 创建一个工作队列 | ||
func newWorkerQueue(maxConcurrency int64) *workerQueue { | ||
c := &workerQueue{ | ||
mu: &sync.Mutex{}, | ||
q: make([]writeJob, 0), | ||
maxConcurrency: maxConcurrency, | ||
curConcurrency: 0, | ||
} | ||
return c | ||
} | ||
|
||
func (c *workerQueue) getJob() interface{} { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
if c.curConcurrency >= c.maxConcurrency { | ||
return nil | ||
} | ||
if n := len(c.q); n == 0 { | ||
return nil | ||
} | ||
var result = c.q[0] | ||
c.q = c.q[1:] | ||
c.curConcurrency++ | ||
return result | ||
} | ||
|
||
func (c *workerQueue) decrease() { | ||
c.mu.Lock() | ||
c.curConcurrency-- | ||
c.mu.Unlock() | ||
} | ||
|
||
func (c *workerQueue) do(job writeJob) { | ||
job.Args.emitError(job.Do(job.Args)) | ||
c.decrease() | ||
if nextJob := c.getJob(); nextJob != nil { | ||
c.do(nextJob.(writeJob)) | ||
} | ||
} | ||
|
||
// AddJob 追加任务, 有资源空闲的话会立即执行 | ||
func (c *workerQueue) AddJob(job writeJob) { | ||
c.mu.Lock() | ||
c.q = append(c.q, job) | ||
c.mu.Unlock() | ||
if item := c.getJob(); item != nil { | ||
go c.do(item.(writeJob)) | ||
} | ||
} | ||
|
||
func newMessageQueue() messageQueue { | ||
return messageQueue{ | ||
mu: &sync.RWMutex{}, | ||
data: []messageWrapper{}, | ||
} | ||
} | ||
|
||
type messageQueue struct { | ||
mu *sync.RWMutex | ||
data []messageWrapper | ||
} | ||
|
||
func (c *messageQueue) Len() int { | ||
c.mu.RLock() | ||
n := len(c.data) | ||
c.mu.RUnlock() | ||
return n | ||
} | ||
|
||
func (c *messageQueue) Push(conn *Conn, m messageWrapper) { | ||
c.mu.Lock() | ||
c.data = append(c.data, m) | ||
if n := len(c.data); n == 1 { | ||
_writeQueue.AddJob(writeJob{Args: conn, Do: doWriteAsync}) | ||
} | ||
c.mu.Unlock() | ||
} | ||
|
||
func (c *messageQueue) Range(f func(msg messageWrapper) error) error { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
for i, _ := range c.data { | ||
if err := f(c.data[i]); err != nil { | ||
return err | ||
} | ||
} | ||
c.data = c.data[:0] | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package gws | ||
|
||
import ( | ||
"bufio" | ||
"github.com/stretchr/testify/assert" | ||
"net" | ||
"sync" | ||
"testing" | ||
) | ||
|
||
func newPeer(config *Upgrader) (server, client *Conn) { | ||
size := 4096 | ||
s, c := net.Pipe() | ||
{ | ||
brw := bufio.NewReadWriter(bufio.NewReaderSize(s, size), bufio.NewWriterSize(s, size)) | ||
server = serveWebSocket(config, &Request{}, s, brw, config.EventHandler, config.CompressEnabled) | ||
} | ||
{ | ||
brw := bufio.NewReadWriter(bufio.NewReaderSize(c, size), bufio.NewWriterSize(c, size)) | ||
client = serveWebSocket(config, &Request{}, c, brw, config.EventHandler, config.CompressEnabled) | ||
} | ||
return | ||
} | ||
|
||
func TestConn_WriteAsync(t *testing.T) { | ||
var as = assert.New(t) | ||
SetMaxConcurrencyForWriteQueue(8) | ||
var handler = new(webSocketMocker) | ||
var upgrader = NewUpgrader(func(c *Upgrader) { | ||
c.EventHandler = handler | ||
}) | ||
server, client := newPeer(upgrader) | ||
|
||
var message = []byte("hello") | ||
var count = 1000 | ||
|
||
go func() { | ||
for i := 0; i < count; i++ { | ||
server.WriteMessageAsync(OpcodeText, message) | ||
} | ||
}() | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(count) | ||
go func() { | ||
for { | ||
var header = frameHeader{} | ||
_, err := client.conn.Read(header[:2]) | ||
if err != nil { | ||
return | ||
} | ||
var payload = make([]byte, header.GetLengthCode()) | ||
if _, err := client.conn.Read(payload); err != nil { | ||
return | ||
} | ||
as.Equal(string(message), string(payload)) | ||
wg.Done() | ||
} | ||
}() | ||
wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package gws | ||
|
||
import "github.com/lxzan/gws/internal" | ||
|
||
const defaultAsyncWriteConcurrency = 128 | ||
|
||
var ( | ||
// task queue for async write | ||
_writeQueue = newWorkerQueue(defaultAsyncWriteConcurrency) | ||
|
||
// buffer pool | ||
_bpool = internal.NewBufferPool() | ||
) | ||
|
||
func SetMaxConcurrencyForWriteQueue(num int64) { | ||
if num > 0 { | ||
_writeQueue.maxConcurrency = num | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters