GWS
ç®å, å¿«é, å¯é ç WebSocket æå¡å¨å客æ·ç«¯
[![awesome](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#networking) [![codecov](https://codecov.io/gh/lxzan/gws/graph/badge.svg?token=DJU7YXWN05)](https://codecov.io/gh/lxzan/gws) [![Go Test](https://github.com/lxzan/gws/actions/workflows/go.yml/badge.svg?branch=main)](https://github.com/lxzan/gws/actions/workflows/go.yml) [![go-reportcard](https://goreportcard.com/badge/github.com/lxzan/gws)](https://goreportcard.com/report/github.com/lxzan/gws) [![license](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE) [![go-version](https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go)](https://github.com/lxzan/gws)
### ä»ç»
GWSï¼Go WebSocketï¼æ¯ä¸ä¸ªç¨ Go ç¼åçé常ç®åãå¿«éãå¯é ä¸åè½ä¸°å¯ç WebSocket å®ç°ãå®è®¾è®¡ç¨äºé«å¹¶åç¯å¢ï¼æ建"æ¥å£", "代ç", "游æ", "æµåªä½", "æ¶æ¯åå¸è®¢é
"çæå¡ãå®æä¾é常ç®åç API,
æ¨å¯ä»¥è½»æ¾ç¼åèªå·±çæå¡å¨æ客æ·ç«¯ã
### 为ä»ä¹éæ© GWS
- ç®åæç¨
- **ç¨æ·å好**: ç®æ´æäºç `WebSocket` äºä»¶æ¥å£è®¾è®¡ï¼è®©æå¡å¨å客æ·ç«¯ç交äºåå¾è½»æ¾ç®å.
- **ç¼ç æç**: æ大é度å°åå°å®æ½å¤æç解å³æ¹æ¡æéç代ç é.
- æ§è½åºä¼
- **é«ååä½å»¶è¿**: ä¸ä¸ºå¿«éä¼ è¾åæ¥æ¶æ°æ®è设计ï¼æ¯æ¶é´ææååºç¨ççæ³ä¹é.
- **ä½å
åå ç¨**: é«åº¦ä¼åçå
åå¤ç¨ç³»ç», æ大é度éä½å
å使ç¨éï¼éä½æ¨çææ¬.
- 稳å®å¯é
- **å¥å£®çé误å¤ç**: 管çååå°é误çå
è¿æºå¶ï¼ç¡®ä¿æç»è¿è¡.
- **å®åçæµè¯ç¨ä¾**: éè¿äºææ `Autobahn` æµè¯ç¨ä¾, 符å `RFC 7692` æ å. åå
æµè¯è¦ççå ä¹è¾¾å° 100%, è¦çæææ¡ä»¶åæ¯.
### åºåæµè¯
#### IOPS (Echo Server)
GOMAXPROCS=4, Connection=1000, CompressEnabled=false
![performance](assets/performance-compress-disabled.png)
#### GoBench
```go
go test -benchmem -run=^$ -bench . github.com/lxzan/gws
goos: linux
goarch: amd64
pkg: github.com/lxzan/gws
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkConn_WriteMessage/compress_disabled-12 5263632 232.3 ns/op 24 B/op 1 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-12 99663 11265 ns/op 386 B/op 1 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-12 7809654 152.4 ns/op 8 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-12 326257 3133 ns/op 81 B/op 1 allocs/op
PASS
ok github.com/lxzan/gws 17.231s
```
### Index
- [ä»ç»](#ä»ç»)
- [为ä»ä¹éæ© GWS](#为ä»ä¹éæ©-gws)
- [åºåæµè¯](#åºåæµè¯)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Index](#index)
- [ç¹æ§](#ç¹æ§)
- [注æ](#注æ)
- [å®è£
](#å®è£
)
- [äºä»¶](#äºä»¶)
- [å¿«éä¸æ](#å¿«éä¸æ)
- [æä½³å®è·µ](#æä½³å®è·µ)
- [æ´å¤ç¨ä¾](#æ´å¤ç¨ä¾)
- [KCP](#kcp)
- [代ç](#代ç)
- [广æ](#广æ)
- [åå
¥è¶
æ¶](#åå
¥è¶
æ¶)
- [åå¸/订é
](#åå¸è®¢é
)
- [Autobahn æµè¯](#autobahn-æµè¯)
- [交æµ](#交æµ)
- [è´è°¢](#è´è°¢)
### ç¹æ§
- [x] äºä»¶é©±å¨å¼ API
- [x] 广æ
- [x] 代çæ¨å·
- [x] ä¸ä¸ææ¥ç®¡
- [x] æ¯æ并ååå¼æ¥éé»å¡åå
¥
- [x] éè¿ææ Autobahn æµè¯ç¨ä¾ [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
### 注æ
- ææ gws.Conn 导åºæ¹æ³è¿åçé误é½æ¯å¯å¿½ç¥ç, å®ä»¬å¨å
é¨å·²ç»è¢«å¦¥åå¤çäº
- ä¼ è¾å¤§æ件æé»å¡è¿æ¥çé£é©
- å¦æå¤ç¨ HTTP æå¡å¨, 建议å¼å¯æ°ç Goroutine æ¥è°ç¨ ReadLoop, 以é¿å
请æ±ä¸ä¸æå
åä¸è½åæ¶åæ¶.
### å®è£
```bash
go get -v github.com/lxzan/gws@latest
```
### äºä»¶
```go
type Event interface {
OnOpen(socket *Conn) // connection is established
OnClose(socket *Conn, err error) // received a close frame or input/output error occurs
OnPing(socket *Conn, payload []byte) // received a ping frame
OnPong(socket *Conn, payload []byte) // received a pong frame
OnMessage(socket *Conn, message *Message) // received a text/binary frame
}
```
### å¿«éä¸æ
```go
package main
import "github.com/lxzan/gws"
func main() {
gws.NewServer(&gws.BuiltinEventHandler{}, nil).Run(":6666")
}
```
### æä½³å®è·µ
```go
package main
import (
"net/http"
"time"
"github.com/lxzan/gws"
)
const (
PingInterval = 5 * time.Second
PingWait = 10 * time.Second
)
func main() {
upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
ParallelEnabled: true, // å¼å¯å¹¶è¡æ¶æ¯å¤ç
Recovery: gws.Recovery, // å¼å¯å¼å¸¸æ¢å¤
PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // å¼å¯å缩
})
http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
return
}
go func() {
socket.ReadLoop() // æ¤å¤é»å¡ä¼ä½¿è¯·æ±ä¸ä¸æä¸è½é¡ºå©è¢«GC
}()
})
http.ListenAndServe(":6666", nil)
}
type Handler struct{}
func (c *Handler) OnOpen(socket *gws.Conn) {
_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}
func (c *Handler) OnClose(socket *gws.Conn, err error) {}
func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
_ = socket.WritePong(nil)
}
func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}
func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
socket.WriteMessage(message.Opcode, message.Bytes())
}
```
### æ´å¤ç¨ä¾
#### KCP
- server
```go
package main
import (
"log"
"github.com/lxzan/gws"
kcp "github.com/xtaci/kcp-go"
)
func main() {
listener, err := kcp.Listen(":6666")
if err != nil {
log.Println(err.Error())
return
}
app := gws.NewServer(&gws.BuiltinEventHandler{}, nil)
app.RunListener(listener)
}
```
- client
```go
package main
import (
"github.com/lxzan/gws"
kcp "github.com/xtaci/kcp-go"
"log"
)
func main() {
conn, err := kcp.Dial("127.0.0.1:6666")
if err != nil {
log.Println(err.Error())
return
}
app, _, err := gws.NewClientFromConn(&gws.BuiltinEventHandler{}, nil, conn)
if err != nil {
log.Println(err.Error())
return
}
app.ReadLoop()
}
```
#### 代ç
éè¿ä»£çæ¨å·, ä½¿ç¨ socks5 åè®®.
```go
package main
import (
"crypto/tls"
"github.com/lxzan/gws"
"golang.org/x/net/proxy"
"log"
)
func main() {
socket, _, err := gws.NewClient(new(gws.BuiltinEventHandler), &gws.ClientOption{
Addr: "wss://example.com/connect",
TlsConfig: &tls.Config{InsecureSkipVerify: true},
NewDialer: func() (gws.Dialer, error) {
return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
},
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
},
})
if err != nil {
log.Println(err.Error())
return
}
socket.ReadLoop()
}
```
#### 广æ
å
å建ä¸ä¸ª Broadcaster å®ä¾ï¼ç¶åå¨å¾ªç¯ä¸è°ç¨ Broadcast æ¹æ³åæ¯ä¸ªå®¢æ·ç«¯åéæ¶æ¯ï¼æåå
³é
广æç¨åºä»¥åæ¶å
åãæ´ä¸ªè¿ç¨ä¸æ¶æ¯åªä¼è¢«å缩ä¸æ¬¡ã
```go
func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
var b = gws.NewBroadcaster(opcode, payload)
defer b.Close()
for _, item := range conns {
_ = b.Broadcast(item)
}
}
```
#### åå
¥è¶
æ¶
`SetDeadline` å¯ä»¥è¦ç大é¨å使ç¨åºæ¯, æ³è¦ç²¾ç»å°æ§å¶æ¯ä¸æ¬¡åå
¥çè¶
æ¶æ¶é´, åéè¦èªè¡å°è£
ä¸ `WriteWithTimeout`
å½æ°, `timer` çå建åéæ¯ä¼æä¸å®é¢å¤å¼é.
```go
func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {
var sig = atomic.Uint32{}
var timer = time.AfterFunc(timeout, func() {
if sig.CompareAndSwap(0, 1) {
socket.WriteClose(1000, []byte("write timeout"))
}
})
var err = socket.WriteMessage(gws.OpcodeText, p)
if sig.CompareAndSwap(0, 1) {
timer.Stop()
}
return err
}
```
#### åå¸/订é
ä½¿ç¨ event_emitter å
å®ç°åå¸è®¢é
模å¼ãç¨ç»æä½å
è£
`gws.Conn`ï¼å¹¶å®ç° GetSubscriberID æ¹æ³ä»¥è·å订é
IDï¼è¯¥ ID å¿
é¡»æ¯å¯ä¸çã订é
ID ç¨äºè¯å«è®¢é
è
ï¼è®¢é
è
åªè½æ¥æ¶å
¶è®¢é
主é¢çæ¶æ¯ã
æ¤ç¤ºä¾å¯¹äºä½¿ç¨ gws æ建è天室ææ¶æ¯æ¨éé常æç¨ãè¿æå³çç¨æ·å¯ä»¥éè¿ websocket 订é
ä¸ä¸ªæå¤ä¸ªä¸»é¢ï¼å½å该主é¢åå¸æ¶æ¯æ¶ï¼ææ订é
ç¨æ·é½ä¼æ¶å°æ¶æ¯ã
```go
package main
import (
"github.com/lxzan/event_emitter"
"github.com/lxzan/gws"
)
type Subscriber gws.Conn
func NewSubscriber(conn *gws.Conn) *Subscriber { return (*Subscriber)(conn) }
func (c *Subscriber) GetSubscriberID() int64 {
userId, _ := c.GetMetadata().Load("userId")
return userId.(int64)
}
func (c *Subscriber) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }
func (c *Subscriber) Conn() *gws.Conn { return (*gws.Conn)(c) }
func Subscribe(em *event_emitter.EventEmitter[int64, *Subscriber], s *Subscriber, topic string) {
em.Subscribe(s, topic, func(msg any) {
_ = msg.(*gws.Broadcaster).Broadcast(s.Conn())
})
}
func Publish(em *event_emitter.EventEmitter[int64, *Subscriber], topic string, msg []byte) {
var broadcaster = gws.NewBroadcaster(gws.OpcodeText, msg)
defer broadcaster.Close()
em.Publish(topic, broadcaster)
}
```
### Autobahn æµè¯
```bash
cd examples/autobahn
mkdir reports
docker run -it --rm \
-v ${PWD}/config:/config \
-v ${PWD}/reports:/reports \
crossbario/autobahn-testsuite \
wstest -m fuzzingclient -s /config/fuzzingclient.json
```
### 交æµ
> 微信éè¦å
æ·»å 好ååæ群, 请注ææ¥èª GitHub
### è´è°¢
- [crossbario/autobahn-testsuite](https://github.com/crossbario/autobahn-testsuite)
- [klauspost/compress](https://github.com/klauspost/compress)
- [lesismal/nbio](https://github.com/lesismal/nbio)