Skip to content

Commit cba6bef

Browse files
committed
separation of dynamic and static buffer pools
1 parent 804e329 commit cba6bef

File tree

8 files changed

+41
-69
lines changed

8 files changed

+41
-69
lines changed

compress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (c *decompressor) Decompress(src *bytes.Buffer) (*bytes.Buffer, int, error)
101101
_, _ = src.Write(internal.FlateTail)
102102
resetter := c.fr.(flate.Resetter)
103103
_ = resetter.Reset(src, nil) // must return a null pointer
104-
var dst, idx = myBufferPool.Get(src.Len() * compressionRate)
104+
var dst, idx = elasticPool.Get(src.Len() * compressionRate)
105105
_, err := c.fr.(io.WriterTo).WriteTo(dst)
106106
return dst, idx, err
107107
}

conn.go

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,21 @@ import (
1414
)
1515

1616
type Conn struct {
17-
// store session information
18-
SessionStorage SessionStorage
19-
20-
// is server
21-
isServer bool
22-
23-
// sub protocol
24-
subprotocol string
25-
26-
// whether to use compression
27-
compressEnabled bool
28-
29-
// tcp connection
30-
conn net.Conn
31-
32-
// configs
33-
config *Config
34-
35-
// read buffer
36-
br *bufio.Reader
37-
38-
// continuation frame
39-
continuationFrame continuationFrame
40-
41-
// frame header for read
42-
fh frameHeader
43-
44-
// WebSocket Event Handler
45-
handler Event
46-
47-
// whether server is closed
48-
closed uint32
49-
50-
// async read task queue
51-
readQueue channel
52-
53-
// async write task queue
54-
writeQueue workerQueue
55-
56-
// flate compressor
57-
compressor *compressor
58-
59-
// flate decompressor
60-
decompressor *decompressor
17+
SessionStorage SessionStorage // 会话
18+
isServer bool // 是否为服务器
19+
subprotocol string // 子协议
20+
conn net.Conn // 底层连接
21+
config *Config // 配置
22+
br *bufio.Reader // 读缓存
23+
continuationFrame continuationFrame // 连续帧
24+
fh frameHeader // 帧头
25+
handler Event // 事件处理器
26+
closed uint32 // 是否关闭
27+
readQueue channel // 消息处理队列
28+
writeQueue workerQueue // 发送队列
29+
compressEnabled bool // 是否压缩
30+
compressor *compressor // 压缩器
31+
decompressor *decompressor // 解压器
6132
}
6233

6334
func (c *Conn) init() *Conn {

init.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gws
33
import "github.com/lxzan/gws/internal"
44

55
var (
6-
myBufferPool = internal.NewBufferPool()
7-
myPadding = frameHeader{}
6+
myPadding = frameHeader{} // 帧头填充物
7+
staticPool = internal.NewBufferPool() // 静态缓冲池
8+
elasticPool = internal.NewBufferPool() // 弹性缓冲池
89
)

internal/others.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ var (
2727
// Add final block to squelch unexpected EOF error from flate reader.
2828
var FlateTail = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff}
2929

30-
const (
31-
MagicNumber = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
32-
FrameHeaderSize = 14
33-
)
30+
const MagicNumber = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
3431

3532
const (
3633
ThresholdV1 = 125

internal/pool.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66
)
77

88
const (
9-
poolSize = 10
9+
poolSize = 10
10+
maxBufferSize = 256 * 1024
1011

1112
Lv1 = 128
1213
Lv2 = 1024
@@ -40,7 +41,7 @@ func (p *BufferPool) Put(b *bytes.Buffer, index int) {
4041
if index == 0 || b == nil {
4142
return
4243
}
43-
if b.Cap() <= 3*p.limits[index] {
44+
if b.Cap() <= maxBufferSize {
4445
p.pools[index].Put(b)
4546
}
4647
}

protocol.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ func (c *frameHeader) GetMaskKey() []byte {
223223
}
224224

225225
type Message struct {
226+
// 是否压缩
227+
compressed bool
228+
226229
// 内存池下标索引
227230
index int
228231

@@ -243,7 +246,8 @@ func (c *Message) Bytes() []byte {
243246

244247
// Close recycle buffer
245248
func (c *Message) Close() error {
246-
myBufferPool.Put(c.Data, c.index)
249+
pool := internal.SelectValue(c.compressed, elasticPool, staticPool)
250+
pool.Put(c.Data, c.index)
247251
c.Data = nil
248252
return nil
249253
}

reader.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (c *Conn) readMessage() error {
9393
}
9494

9595
var fin = c.fh.GetFIN()
96-
var buf, index = myBufferPool.Get(contentLength)
96+
var buf, index = staticPool.Get(contentLength)
9797
var p = buf.Bytes()
9898
p = p[:contentLength]
9999
if err := internal.ReadN(c.br, p); err != nil {
@@ -109,7 +109,7 @@ func (c *Conn) readMessage() error {
109109

110110
if fin && opcode != OpcodeContinuation {
111111
internal.ResetBuffer(buf, p)
112-
return c.emitMessage(&Message{index: index, Opcode: opcode, Data: buf}, compressed)
112+
return c.emitMessage(&Message{index: index, Opcode: opcode, Data: buf, compressed: compressed})
113113
}
114114

115115
if !fin && opcode != OpcodeContinuation {
@@ -125,7 +125,7 @@ func (c *Conn) readMessage() error {
125125
if err := internal.WriteN(c.continuationFrame.buffer, p); err != nil {
126126
return err
127127
} else {
128-
myBufferPool.Put(buf, index)
128+
staticPool.Put(buf, index)
129129
}
130130
if c.continuationFrame.buffer.Len() > c.config.ReadMaxPayloadSize {
131131
return internal.CloseMessageTooLarge
@@ -134,17 +134,16 @@ func (c *Conn) readMessage() error {
134134
return nil
135135
}
136136

137-
msg := &Message{Opcode: c.continuationFrame.opcode, Data: c.continuationFrame.buffer}
138-
compressed = c.continuationFrame.compressed
137+
msg := &Message{Opcode: c.continuationFrame.opcode, Data: c.continuationFrame.buffer, compressed: c.continuationFrame.compressed}
139138
c.continuationFrame.reset()
140-
return c.emitMessage(msg, compressed)
139+
return c.emitMessage(msg)
141140
}
142141

143-
func (c *Conn) emitMessage(msg *Message, compressed bool) (err error) {
144-
if compressed {
142+
func (c *Conn) emitMessage(msg *Message) (err error) {
143+
if msg.compressed {
145144
data, index := msg.Data, msg.index
146145
msg.Data, msg.index, err = c.decompressor.Decompress(msg.Data)
147-
myBufferPool.Put(data, index)
146+
staticPool.Put(data, index)
148147
if err != nil {
149148
return internal.NewError(internal.CloseInternalServerErr, err)
150149
}

writer.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
5454
return
5555
}
5656
err = internal.WriteN(c.conn, frame.Bytes())
57-
myBufferPool.Put(frame, index)
57+
staticPool.Put(frame, index)
5858
c.emitError(err)
5959
})
6060
return nil
@@ -80,7 +80,7 @@ func (c *Conn) doWrite(opcode Opcode, payload []byte) error {
8080
}
8181

8282
err = internal.WriteN(c.conn, frame.Bytes())
83-
myBufferPool.Put(frame, index)
83+
staticPool.Put(frame, index)
8484
return err
8585
}
8686

@@ -102,8 +102,7 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*bytes.Buffer, int, erro
102102

103103
var header = frameHeader{}
104104
headerLength, maskBytes := header.GenerateHeader(c.isServer, true, false, opcode, n)
105-
var totalSize = n + headerLength
106-
var buf, index = myBufferPool.Get(totalSize)
105+
var buf, index = staticPool.Get(n + headerLength)
107106
buf.Write(header[:headerLength])
108107
buf.Write(payload)
109108
var contents = buf.Bytes()
@@ -114,7 +113,7 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*bytes.Buffer, int, erro
114113
}
115114

116115
func (c *Conn) compressData(opcode Opcode, payload []byte) (*bytes.Buffer, int, error) {
117-
var buf, index = myBufferPool.Get(len(payload) / compressionRate)
116+
var buf, index = staticPool.Get(len(payload) + frameHeaderSize)
118117
buf.Write(myPadding[0:])
119118
err := c.compressor.Compress(payload, buf)
120119
if err != nil {
@@ -194,7 +193,7 @@ func (c *Broadcaster) Broadcast(socket *Conn) error {
194193
func (c *Broadcaster) doClose() {
195194
for _, item := range c.msgs {
196195
if item != nil {
197-
myBufferPool.Put(item.frame, item.index)
196+
staticPool.Put(item.frame, item.index)
198197
}
199198
}
200199
}

0 commit comments

Comments
 (0)