Skip to content

Commit

Permalink
improve buffer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Jul 14, 2023
1 parent df5efa0 commit e88b91a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 49 deletions.
4 changes: 2 additions & 2 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestFlate(t *testing.T) {

var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
plainText, err := dps.Decompress(buf, 0)
plainText, _, err := dps.Decompress(buf)
if err != nil {
as.NoError(err)
return
Expand All @@ -50,7 +50,7 @@ func TestFlate(t *testing.T) {
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
buf.WriteString("1234")
_, err := dps.Decompress(buf, 0)
_, _, err := dps.Decompress(buf)
as.Error(err)
})
}
Expand Down
12 changes: 0 additions & 12 deletions internal/others.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ const (
ThresholdV3 = math.MaxUint64
)

// buffer level
const (
Lv1 = 128
Lv2 = 1024
Lv3 = 2 * 1024
Lv4 = 4 * 1024
Lv5 = 8 * 1024
Lv6 = 16 * 1024
Lv7 = 32 * 1024
Lv8 = 64 * 1024
)

type (
ReadLener interface {
io.Reader
Expand Down
18 changes: 15 additions & 3 deletions internal/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@ import (
"sync"
)

const poolSize = 9
const (
poolSize = 10

Lv1 = 128
Lv2 = 1024
Lv3 = 2 * 1024
Lv4 = 4 * 1024
Lv5 = 8 * 1024
Lv6 = 16 * 1024
Lv7 = 32 * 1024
Lv8 = 64 * 1024
Lv9 = 128 * 1024
)

type BufferPool struct {
pools [poolSize]*sync.Pool
Expand All @@ -14,7 +26,7 @@ type BufferPool struct {

func NewBufferPool() *BufferPool {
var p BufferPool
p.limits = [poolSize]int{0, Lv1, Lv2, Lv3, Lv4, Lv5, Lv6, Lv7, Lv8}
p.limits = [poolSize]int{0, Lv1, Lv2, Lv3, Lv4, Lv5, Lv6, Lv7, Lv8, Lv9}
for i := 1; i < poolSize; i++ {
var capacity = p.limits[i]
p.pools[i] = &sync.Pool{New: func() any {
Expand All @@ -25,7 +37,7 @@ func NewBufferPool() *BufferPool {
}

func (p *BufferPool) Put(b *bytes.Buffer, index int) {
if b == nil || index == 0 {
if index == 0 || b == nil {
return
}
if b.Cap() <= 2*p.limits[index] {
Expand Down
34 changes: 16 additions & 18 deletions internal/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,45 @@ func TestBufferPool(t *testing.T) {

for i := 0; i < 10; i++ {
var n = AlphabetNumeric.Intn(126)
var buf = pool.Get(n)
var buf, index = pool.Get(n)
as.Equal(128, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 1)
}
for i := 0; i < 10; i++ {
var buf = pool.Get(500)
var buf, index = pool.Get(500)
as.Equal(Lv2, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 2)
}
for i := 0; i < 10; i++ {
var buf = pool.Get(2000)
var buf, index = pool.Get(2000)
as.Equal(Lv3, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 3)
}
for i := 0; i < 10; i++ {
var buf = pool.Get(5000)
as.Equal(Lv4, buf.Cap())
var buf, index = pool.Get(5000)
as.Equal(Lv5, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 5)
}

{
pool.Put(bytes.NewBuffer(make([]byte, 2)), 2)
b := pool.Get(120)
b, index := pool.Get(120)
as.GreaterOrEqual(b.Cap(), 120)
as.Equal(index, 1)
}
{
pool.Put(bytes.NewBuffer(make([]byte, 2000)), 2000)
b := pool.Get(3000)
pool.Put(bytes.NewBuffer(make([]byte, 2000)), 4)
b, index := pool.Get(3000)
as.GreaterOrEqual(b.Cap(), 3000)
as.Equal(index, 4)
}

pool.Put(nil, 0)
pool.Put(NewBufferWithCap(0), 0)
as.GreaterOrEqual(pool.Get(128*1024).Cap(), 128*1024)
}

func TestBufferPool_GetvCap(t *testing.T) {
var as = assert.New(t)
var p = NewBufferPool()
as.Equal(Lv2, p.GetvCap(512))
as.Equal(Lv3, p.GetvCap(3*1024))
as.Equal(Lv4, p.GetvCap(8*1024))
as.Equal(Lv4, p.GetvCap(256*1024))
buffer, _ := pool.Get(128 * 1024)
as.GreaterOrEqual(buffer.Cap(), 128*1024)
}
29 changes: 15 additions & 14 deletions writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gws

import (
"bytes"
"errors"
"github.com/lxzan/gws/internal"
)
Expand Down Expand Up @@ -38,7 +39,7 @@ func (c *Conn) WriteString(s string) error {
// WriteAsync 异步非阻塞地写入消息
// Write messages asynchronously and non-blockingly
func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
msg, err := c.genFrame(opcode, payload)
frame, index, err := c.genFrame(opcode, payload)
if err != nil {
c.emitError(err)
return err
Expand All @@ -48,8 +49,8 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error {
if c.isClosed() {
return
}
err = internal.WriteN(c.conn, msg.Bytes(), msg.Data.Len())
myBufferPool.Put(msg.Data, msg.index)
err = internal.WriteN(c.conn, frame.Bytes(), frame.Len())
myBufferPool.Put(frame, index)
c.emitError(err)
})
return nil
Expand All @@ -68,21 +69,21 @@ func (c *Conn) WriteMessage(opcode Opcode, payload []byte) error {
// 执行写入逻辑, 关闭状态置为1后还能写, 以便发送关闭帧
// Execute the write logic, and write after the close state is set to 1, so that the close frame can be sent
func (c *Conn) doWrite(opcode Opcode, payload []byte) error {
msg, err := c.genFrame(opcode, payload)
frame, index, err := c.genFrame(opcode, payload)
if err != nil {
return err
}

err = internal.WriteN(c.conn, msg.Bytes(), msg.Data.Len())
myBufferPool.Put(msg.Data, msg.index)
err = internal.WriteN(c.conn, frame.Bytes(), frame.Len())
myBufferPool.Put(frame, index)
return err
}

// 帧生成
func (c *Conn) genFrame(opcode Opcode, payload []byte) (*Message, error) {
func (c *Conn) genFrame(opcode Opcode, payload []byte) (*bytes.Buffer, int, error) {
// 不要删除 opcode == OpcodeText
if opcode == OpcodeText && !c.isTextValid(opcode, payload) {
return nil, internal.NewError(internal.CloseUnsupportedData, internal.ErrTextEncoding)
return nil, 0, internal.NewError(internal.CloseUnsupportedData, internal.ErrTextEncoding)
}

if c.compressEnabled && opcode.isDataFrame() && len(payload) >= c.config.CompressThreshold {
Expand All @@ -91,7 +92,7 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*Message, error) {

var n = len(payload)
if n > c.config.WriteMaxPayloadSize {
return nil, internal.CloseMessageTooLarge
return nil, 0, internal.CloseMessageTooLarge
}

var header = frameHeader{}
Expand All @@ -104,20 +105,20 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*Message, error) {
if !c.isServer {
internal.MaskXOR(contents[headerLength:], maskBytes)
}
return &Message{Opcode: opcode, index: index, Data: buf}, nil
return buf, index, nil
}

func (c *Conn) compressData(opcode Opcode, payload []byte) (*Message, error) {
func (c *Conn) compressData(opcode Opcode, payload []byte) (*bytes.Buffer, int, error) {
var buf, index = myBufferPool.Get(len(payload) / compressionRate)
buf.Write(myPadding[0:])
err := c.config.compressors.Select().Compress(payload, buf)
if err != nil {
return nil, err
return nil, 0, err
}
var contents = buf.Bytes()
var payloadSize = buf.Len() - frameHeaderSize
if payloadSize > c.config.WriteMaxPayloadSize {
return nil, internal.CloseMessageTooLarge
return nil, 0, internal.CloseMessageTooLarge
}
var header = frameHeader{}
headerLength, maskBytes := header.GenerateHeader(c.isServer, true, true, opcode, payloadSize)
Expand All @@ -126,5 +127,5 @@ func (c *Conn) compressData(opcode Opcode, payload []byte) (*Message, error) {
}
copy(contents[frameHeaderSize-headerLength:], header[:headerLength])
buf.Next(frameHeaderSize - headerLength)
return &Message{Opcode: opcode, Data: buf, index: index}, nil
return buf, index, nil
}

0 comments on commit e88b91a

Please sign in to comment.