Skip to content

Commit

Permalink
avoid buffer expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Jul 24, 2024
1 parent 38b4bdf commit b85d0de
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 41 deletions.
86 changes: 49 additions & 37 deletions examples/push/main.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,79 @@
package main

import (
"bufio"
"fmt"
"github.com/lxzan/gws"
"log"
"net"
"net/http"

"github.com/lxzan/gws"
)

func main() {
var app = gws.NewServer(new(Handler), nil)
var h = &Handler{conns: gws.NewConcurrentMap[string, *gws.Conn]()}

var upgrader = gws.NewUpgrader(h, &gws.ServerOption{
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
},
})

app.OnRequest = func(conn net.Conn, br *bufio.Reader, r *http.Request) {
socket, err := app.GetUpgrader().UpgradeFromConn(conn, br, r)
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
log.Print(err.Error())
log.Println(err.Error())
return
}
var channel = make(chan []byte, 8)
var closer = make(chan struct{})
socket.Session().Store("channel", channel)
socket.Session().Store("closer", closer)
go socket.ReadLoop()
websocketKey := request.Header.Get("Sec-WebSocket-Key")
socket.Session().Store("websocketKey", websocketKey)
h.conns.Store(websocketKey, socket)
go func() {
for {
select {
case p := <-channel:
_ = socket.WriteMessage(gws.OpcodeText, p)
case <-closer:
return
}
}
socket.ReadLoop()
}()
})

go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
return
}
}()

for {
var msg = ""
if _, err := fmt.Scanf("%s\n", &msg); err != nil {
log.Println(err.Error())
return
}
h.Broadcast(msg)
}
}

log.Fatalf("%v", app.Run(":8000"))
func getSession[T any](s gws.SessionStorage, key string) (val T) {
if v, ok := s.Load(key); ok {
val, _ = v.(T)
}
return
}

type Handler struct {
gws.BuiltinEventHandler
conns *gws.ConcurrentMap[string, *gws.Conn]
}

func (c *Handler) getSession(socket *gws.Conn, key string) any {
v, _ := socket.Session().Load(key)
return v
}

func (c *Handler) Send(socket *gws.Conn, payload []byte) {
var channel = c.getSession(socket, "channel").(chan []byte)
select {
case channel <- payload:
default:
return
}
func (c *Handler) Broadcast(msg string) {
var b = gws.NewBroadcaster(gws.OpcodeText, []byte(msg))
c.conns.Range(func(key string, conn *gws.Conn) bool {
_ = b.Broadcast(conn)
return true
})
_ = b.Close()
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {
var closer = c.getSession(socket, "closer").(chan struct{})
closer <- struct{}{}
websocketKey := getSession[string](socket.Session(), "websocketKey")
c.conns.Delete(websocketKey)
}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
7 changes: 3 additions & 4 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast boo
return nil, internal.CloseMessageTooLarge
}

var buf = binaryPool.Get(n + frameHeaderSize)
var buf = binaryPool.Get(n*11/10 + frameHeaderSize)
buf.Write(framePadding[0:])

if c.pd.Enabled && opcode.isDataFrame() && n >= c.pd.Threshold {
Expand Down Expand Up @@ -203,9 +203,7 @@ func (c *Broadcaster) Broadcast(socket *Conn) error {
var idx = internal.SelectValue(socket.pd.Enabled, 1, 0)
var msg = c.msgs[idx]

msg.once.Do(func() {
msg.frame, msg.err = socket.genFrame(c.opcode, internal.Bytes(c.payload), true)
})
msg.once.Do(func() { msg.frame, msg.err = socket.genFrame(c.opcode, internal.Bytes(c.payload), true) })
if msg.err != nil {
return msg.err
}
Expand All @@ -225,6 +223,7 @@ func (c *Broadcaster) doClose() {
for _, item := range c.msgs {
if item != nil {
binaryPool.Put(item.frame)
item.frame = nil
}
}
}
Expand Down

0 comments on commit b85d0de

Please sign in to comment.