Skip to content

Commit

Permalink
add writev method
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Jan 22, 2024
1 parent 0f857d4 commit fe3d21b
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 13 deletions.
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ to be processed in a non-blocking way.

- <font size=3>Simplicity and Ease of Use</font>

- **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy.
- **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions.
- **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy.
- **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions.

- <font size=3>High-Performance</font>

- **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive
applications.
- **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership.
- **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive
applications.
- **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of
ownership.

- <font size=3>Reliability and Stability</font>
- **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 7692`. 99% unit test coverage, covering almost all conditional branches.
- **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 7692`. Unit test
coverage is almost 100%, covering all conditional branches.

### Benchmark

Expand Down Expand Up @@ -317,9 +319,12 @@ func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {

#### Pub / Sub

Use the event_emitter package to implement the publish-subscribe model. Wrap `gws.Conn` in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription.
Use the event_emitter package to implement the publish-subscribe model. Wrap `gws.Conn` in a structure and implement the
GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the
subscriber, who can only receive messages on the subject of his subscription.

This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.
This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one
or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.

```go
package main
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且

- <font size=3>稳定可靠</font>
- **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 符合 `RFC 7692` 标准. 单元测试覆盖率达到 99%, 几乎覆盖所有条件分支.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 符合 `RFC 7692` 标准. 单元测试覆盖率几乎达到 100%, 覆盖所有条件分支.

### 基准测试

Expand Down
7 changes: 7 additions & 0 deletions internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,10 @@ func CheckErrors(errs ...error) error {
}
return nil
}

func Reduce[T any, S any](arr []T, initialValue S, reducer func(s S, i int, v T) S) S {
for index, value := range arr {
initialValue = reducer(initialValue, index, value)
}
return initialValue
}
30 changes: 30 additions & 0 deletions internal/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,33 @@ func TestCheckErrors(t *testing.T) {
assert.Error(t, CheckErrors(err0, err1, err2))
assert.True(t, errors.Is(CheckErrors(err0, err1, err2), err2))
}

func TestReduce(t *testing.T) {
t.Run("", func(t *testing.T) {
var arr = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
var sum = Reduce(arr, 0, func(summarize int, i int, item int) int {
return summarize + item
})
assert.Equal(t, sum, 55)
})

t.Run("", func(t *testing.T) {
var arr []int
var sum = Reduce(arr, 0, func(summarize int, i int, item int) int {
return summarize + item
})
assert.Equal(t, sum, 0)
})

t.Run("", func(t *testing.T) {
var payloads = [][]byte{
AlphabetNumeric.Generate(10),
AlphabetNumeric.Generate(20),
AlphabetNumeric.Generate(30),
}
var n = Reduce(payloads, 0, func(s int, i int, v []byte) int {
return s + len(v)
})
assert.Equal(t, n, 60)
})
}
17 changes: 14 additions & 3 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,25 @@ func (c *Conn) WriteMessage(opcode Opcode, payload []byte) error {
return err
}

// WriteV 批量写入文本/二进制消息, 文本消息应该使用UTF8编码
// writes batch text/binary messages, text messages should be encoded in UTF8.
func (c *Conn) WriteV(opcode Opcode, payloads ...[]byte) error {
var n = internal.Reduce(payloads, 0, func(s int, i int, v []byte) int { return s + len(v) })
var buf = binaryPool.Get(n)
for _, item := range payloads {
buf.Write(item)
}
var err = c.WriteMessage(opcode, buf.Bytes())
binaryPool.Put(buf)
return err
}

// WriteAsync 异步写
// 异步非阻塞地将消息写入到任务队列, 收到回调后才允许回收payload内存
// Asynchronously and non-blockingly write the message to the task queue, allowing the payload memory to be reclaimed only after a callback is received.
func (c *Conn) WriteAsync(opcode Opcode, payload []byte, callback func(error)) {
c.writeQueue.Push(func() {
var err = c.doWrite(opcode, payload)
c.emitError(err)
if callback != nil {
if err := c.WriteMessage(opcode, payload); callback != nil {
callback(err)
}
})
Expand Down
70 changes: 70 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,73 @@ func TestRecovery(t *testing.T) {
as.NoError(client.WriteString("hi"))
time.Sleep(100 * time.Millisecond)
}

func TestConn_WriteV(t *testing.T) {
t.Run("", func(t *testing.T) {
var serverHandler = new(webSocketMocker)
var clientHandler = new(webSocketMocker)
var serverOption = &ServerOption{}
var clientOption = &ClientOption{}
var wg = &sync.WaitGroup{}
wg.Add(1)

serverHandler.onMessage = func(socket *Conn, message *Message) {
if bytes.Equal(message.Bytes(), []byte("hello, world!")) {
wg.Done()
}
}

server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go server.ReadLoop()
go client.ReadLoop()

var err = client.WriteV(OpcodeText, [][]byte{
[]byte("he"),
[]byte("llo"),
[]byte(", world!"),
}...)
assert.NoError(t, err)
wg.Wait()
})

t.Run("", func(t *testing.T) {
var serverHandler = new(webSocketMocker)
var clientHandler = new(webSocketMocker)
var serverOption = &ServerOption{
PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
Threshold: 1,
},
}
var clientOption = &ClientOption{
PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
Threshold: 1,
},
}
var wg = &sync.WaitGroup{}
wg.Add(1)

serverHandler.onMessage = func(socket *Conn, message *Message) {
if bytes.Equal(message.Bytes(), []byte("hello, world!")) {
wg.Done()
}
}

server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go server.ReadLoop()
go client.ReadLoop()

var err = client.WriteV(OpcodeText, [][]byte{
[]byte("he"),
[]byte("llo"),
[]byte(", world!"),
}...)
assert.NoError(t, err)
wg.Wait()
})
}

0 comments on commit fe3d21b

Please sign in to comment.