Skip to content

Commit

Permalink
improve buffer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Jul 13, 2023
1 parent c2f2c54 commit df5efa0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 54 deletions.
9 changes: 5 additions & 4 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync/atomic"
)

const compressionRate = 3

type compressors struct {
serial uint64
size uint64
Expand Down Expand Up @@ -94,15 +96,14 @@ type decompressor struct {

// Decompress 解压
// 解压过程中, 切片可能会发生扩容, 造成bufferpool get/put失衡, 故需要指定大小, 让它们命中同一个bufferpool.
func (c *decompressor) Decompress(src *bytes.Buffer, vCap int) (*bytes.Buffer, error) {
func (c *decompressor) Decompress(src *bytes.Buffer) (*bytes.Buffer, int, error) {
c.Lock()
defer c.Unlock()

_, _ = src.Write(internal.FlateTail)
resetter := c.fr.(flate.Resetter)
_ = resetter.Reset(src, nil) // must return a null pointer
var dst = myBufferPool.Get(vCap)
var dst, idx = myBufferPool.Get(src.Len() * compressionRate)
_, err := io.CopyBuffer(dst, c.fr, c.buffer[0:])
myBufferPool.Put(src, src.Cap())
return dst, err
return dst, idx, err
}
9 changes: 6 additions & 3 deletions internal/others.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ const (
const (
Lv1 = 128
Lv2 = 1024
Lv3 = 4 * 1024
Lv4 = 16 * 1024
Lv5 = 64 * 1024
Lv3 = 2 * 1024
Lv4 = 4 * 1024
Lv5 = 8 * 1024
Lv6 = 16 * 1024
Lv7 = 32 * 1024
Lv8 = 64 * 1024
)

type (
Expand Down
42 changes: 14 additions & 28 deletions internal/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"sync"
)

const poolSize = 9

type BufferPool struct {
pools [6]*sync.Pool
limits [6]int
pools [poolSize]*sync.Pool
limits [poolSize]int
}

func NewBufferPool() *BufferPool {
var p BufferPool
p.limits = [6]int{0, Lv1, Lv2, Lv3, Lv4, Lv5}
for i := 1; i < 6; i++ {
p.limits = [poolSize]int{0, Lv1, Lv2, Lv3, Lv4, Lv5, Lv6, Lv7, Lv8}
for i := 1; i < poolSize; i++ {
var capacity = p.limits[i]
p.pools[i] = &sync.Pool{New: func() any {
return bytes.NewBuffer(make([]byte, 0, capacity))
Expand All @@ -22,41 +24,25 @@ func NewBufferPool() *BufferPool {
return &p
}

// compressedSize: 压缩后的大小
func (p *BufferPool) GetvCap(compressedSize int) int {
if compressedSize <= Lv2 {
return Lv2
}
if compressedSize <= Lv3 {
return Lv3
}
return Lv4
}

func (p *BufferPool) Put(b *bytes.Buffer, n int) {
if b == nil || n == 0 {
func (p *BufferPool) Put(b *bytes.Buffer, index int) {
if b == nil || index == 0 {
return
}
for i := 1; i < 6; i++ {
if n <= p.limits[i] {
if b.Cap() <= 4*p.limits[i] {
p.pools[i].Put(b)
}
return
}
if b.Cap() <= 2*p.limits[index] {
p.pools[index].Put(b)
}
}

func (p *BufferPool) Get(n int) *bytes.Buffer {
for i := 1; i < 6; i++ {
func (p *BufferPool) Get(n int) (*bytes.Buffer, int) {
for i := 1; i < poolSize; i++ {
if n <= p.limits[i] {
b := p.pools[i].Get().(*bytes.Buffer)
if b.Cap() < n {
b.Grow(p.limits[i])
}
b.Reset()
return b
return b, i
}
}
return bytes.NewBuffer(make([]byte, 0, n))
return bytes.NewBuffer(make([]byte, 0, n)), 0
}
6 changes: 3 additions & 3 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (c *frameHeader) GetMaskKey() []byte {
}

type Message struct {
// 虚拟容量
vCap int
// 内存池下标索引
index int

// 操作码
Opcode Opcode
Expand All @@ -211,7 +211,7 @@ func (c *Message) Bytes() []byte {

// Close recycle buffer
func (c *Message) Close() error {
myBufferPool.Put(c.Data, c.vCap)
myBufferPool.Put(c.Data, c.index)
c.Data = nil
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (c *Conn) readMessage() error {
}

var fin = c.fh.GetFIN()
var p = myBufferPool.Get(contentLength).Bytes()
var buf, index = myBufferPool.Get(contentLength)
var p = buf.Bytes()
p = p[:contentLength]
if err := internal.ReadN(c.rbuf, p, contentLength); err != nil {
return err
Expand All @@ -111,7 +112,7 @@ func (c *Conn) readMessage() error {
c.continuationFrame.initialized = true
c.continuationFrame.compressed = compressed
c.continuationFrame.opcode = opcode
c.continuationFrame.buffer = myBufferPool.Get(contentLength)
c.continuationFrame.buffer = bytes.NewBuffer(make([]byte, 0, contentLength))
}

if !fin || (fin && opcode == OpcodeContinuation) {
Expand Down Expand Up @@ -140,17 +141,17 @@ func (c *Conn) readMessage() error {
c.continuationFrame.reset()
return myerr
case OpcodeText, OpcodeBinary:
return c.emitMessage(&Message{Opcode: opcode, Data: bytes.NewBuffer(p)}, compressed)
return c.emitMessage(&Message{index: index, Opcode: opcode, Data: bytes.NewBuffer(p)}, compressed)
default:
return internal.CloseNormalClosure
}
}

func (c *Conn) emitMessage(msg *Message, compressed bool) (err error) {
msg.vCap = msg.Data.Cap()
if compressed {
msg.vCap = myBufferPool.GetvCap(msg.Data.Len())
msg.Data, err = c.config.decompressors.Select().Decompress(msg.Data, msg.vCap)
data, index := msg.Data, msg.index
msg.Data, msg.index, err = c.config.decompressors.Select().Decompress(msg.Data)
myBufferPool.Put(data, index)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
}
Expand Down
20 changes: 10 additions & 10 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Conn) WriteString(s string) error {
// WriteAsync 异步非阻塞地写入消息
// Write messages asynchronously and non-blockingly
func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
msg, err := c.prepare(opcode, payload)
msg, err := c.genFrame(opcode, payload)
if err != nil {
c.emitError(err)
return err
Expand All @@ -49,7 +49,7 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
return
}
err = internal.WriteN(c.conn, msg.Bytes(), msg.Data.Len())
myBufferPool.Put(msg.Data, msg.vCap)
myBufferPool.Put(msg.Data, msg.index)
c.emitError(err)
})
return nil
Expand All @@ -68,17 +68,18 @@ func (c *Conn) WriteMessage(opcode Opcode, payload []byte) error {
// 执行写入逻辑, 关闭状态置为1后还能写, 以便发送关闭帧
// Execute the write logic, and write after the close state is set to 1, so that the close frame can be sent
func (c *Conn) doWrite(opcode Opcode, payload []byte) error {
msg, err := c.prepare(opcode, payload)
msg, err := c.genFrame(opcode, payload)
if err != nil {
return err
}

err = internal.WriteN(c.conn, msg.Bytes(), msg.Data.Len())
myBufferPool.Put(msg.Data, msg.vCap)
myBufferPool.Put(msg.Data, msg.index)
return err
}

func (c *Conn) prepare(opcode Opcode, payload []byte) (*Message, error) {
// 帧生成
func (c *Conn) genFrame(opcode Opcode, payload []byte) (*Message, error) {
// 不要删除 opcode == OpcodeText
if opcode == OpcodeText && !c.isTextValid(opcode, payload) {
return nil, internal.NewError(internal.CloseUnsupportedData, internal.ErrTextEncoding)
Expand All @@ -96,19 +97,18 @@ func (c *Conn) prepare(opcode Opcode, payload []byte) (*Message, error) {
var header = frameHeader{}
headerLength, maskBytes := header.GenerateHeader(c.isServer, true, false, opcode, n)
var totalSize = n + headerLength
var buf = myBufferPool.Get(totalSize)
var buf, index = myBufferPool.Get(totalSize)
buf.Write(header[:headerLength])
buf.Write(payload)
var contents = buf.Bytes()
if !c.isServer {
internal.MaskXOR(contents[headerLength:], maskBytes)
}
return &Message{Opcode: opcode, vCap: buf.Cap(), Data: buf}, nil
return &Message{Opcode: opcode, index: index, Data: buf}, nil
}

func (c *Conn) compressData(opcode Opcode, payload []byte) (*Message, error) {
var vCap = myBufferPool.GetvCap(len(payload) / 3)
var buf = myBufferPool.Get(vCap)
var buf, index = myBufferPool.Get(len(payload) / compressionRate)
buf.Write(myPadding[0:])
err := c.config.compressors.Select().Compress(payload, buf)
if err != nil {
Expand All @@ -126,5 +126,5 @@ func (c *Conn) compressData(opcode Opcode, payload []byte) (*Message, error) {
}
copy(contents[frameHeaderSize-headerLength:], header[:headerLength])
buf.Next(frameHeaderSize - headerLength)
return &Message{Opcode: opcode, Data: buf, vCap: vCap}, nil
return &Message{Opcode: opcode, Data: buf, index: index}, nil
}

0 comments on commit df5efa0

Please sign in to comment.