Skip to content

Commit

Permalink
Simplified Connection Pool Management
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Nov 9, 2023
1 parent 0dda584 commit 37aaa6c
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ jobs:
go-version: 1.18
- name: Test
run: go test -v ./...
- name: Bench
run: make bench
4 changes: 2 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
conn: &benchConn{},
config: upgrader.option.getConfig(),
}
var buf, _, _ = conn1.genFrame(OpcodeText, githubData)
var buf, _ = conn1.genFrame(OpcodeText, githubData)

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down Expand Up @@ -94,7 +94,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
compressor: config.compressors.Select(),
decompressor: config.decompressors.Select(),
}
var buf, _, _ = conn1.genFrame(OpcodeText, githubData)
var buf, _ = conn1.genFrame(OpcodeText, githubData)

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down
8 changes: 4 additions & 4 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ func (c *decompressor) reset(r io.Reader) {
}

// Decompress 解压
func (c *decompressor) Decompress(src *bytes.Buffer) (*bytes.Buffer, int, error) {
func (c *decompressor) Decompress(src *bytes.Buffer) (*bytes.Buffer, error) {
c.Lock()
defer c.Unlock()

_, _ = src.Write(flateTail)
c.reset(src)
if _, err := c.fr.(io.WriterTo).WriteTo(c.b); err != nil {
return nil, 0, err
return nil, err
}
var dst, idx = binaryPool.Get(c.b.Len())
var dst = binaryPool.Get(c.b.Len())
_, _ = c.b.WriteTo(dst)
return dst, idx, nil
return dst, nil
}
4 changes: 2 additions & 2 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestFlate(t *testing.T) {

var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
plainText, _, err := dps.Decompress(buf)
plainText, err := dps.Decompress(buf)
if err != nil {
as.NoError(err)
return
Expand All @@ -49,7 +49,7 @@ func TestFlate(t *testing.T) {
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
buf.WriteString("1234")
_, _, err := dps.Decompress(buf)
_, err := dps.Decompress(buf)
as.Error(err)
})
}
Expand Down
78 changes: 58 additions & 20 deletions internal/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,81 @@ const (
)

type BufferPool struct {
pools [poolSize]*sync.Pool
limits [poolSize]int
pools []*sync.Pool
limits []int
}

func NewBufferPool() *BufferPool {
var p BufferPool
p.limits = [poolSize]int{0, Lv1, Lv2, Lv3, Lv4, Lv5, Lv6, Lv7, Lv8, Lv9}
var p = &BufferPool{
pools: make([]*sync.Pool, poolSize),
limits: []int{0, Lv1, Lv2, Lv3, Lv4, Lv5, Lv6, Lv7, Lv8, Lv9},
}
for i := 1; i < poolSize; i++ {
var capacity = p.limits[i]
p.pools[i] = &sync.Pool{New: func() any {
return bytes.NewBuffer(make([]byte, 0, capacity))
}}
}
return &p
return p
}

func (p *BufferPool) Put(b *bytes.Buffer, index int) {
if index == 0 || b == nil {
func (p *BufferPool) Put(b *bytes.Buffer) {
if b == nil || b.Cap() == 0 {
return
}
if b.Cap() <= p.limits[index] {
p.pools[index].Put(b)
if i := p.getIndex(uint32(b.Cap())); i > 0 {
p.pools[i].Put(b)
}
}

func (p *BufferPool) Get(n int) (*bytes.Buffer, int) {
for i := 1; i < poolSize; i++ {
if n <= p.limits[i] {
b := p.pools[i].Get().(*bytes.Buffer)
if b.Cap() < n {
b.Grow(p.limits[i])
}
b.Reset()
return b, i
}
func (p *BufferPool) Get(n int) *bytes.Buffer {
var index = p.getIndex(uint32(n))
if index == 0 {
return bytes.NewBuffer(make([]byte, 0, n))
}

buf := p.pools[index].Get().(*bytes.Buffer)
if buf.Cap() < n {
buf.Grow(p.limits[index])
}
buf.Reset()
return buf
}

func (p *BufferPool) getIndex(v uint32) int {
if v > Lv9 {
return 0
}
if v <= 128 {
return 1
}

v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++

switch v {
case Lv3:
return 3
case Lv4:
return 4
case Lv5:
return 5
case Lv6:
return 6
case Lv7:
return 7
case Lv8:
return 8
case Lv9:
return 9
default:
return 2
}
return bytes.NewBuffer(make([]byte, 0, n)), 0
}

func NewPool[T any](f func() T) *Pool[T] {
Expand Down
81 changes: 65 additions & 16 deletions internal/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,39 @@ func TestBufferPool(t *testing.T) {

for i := 0; i < 10; i++ {
var n = AlphabetNumeric.Intn(126)
var buf, index = pool.Get(n)
var buf = pool.Get(n)
as.Equal(128, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 1)
}
for i := 0; i < 10; i++ {
var buf, index = pool.Get(500)
var buf = pool.Get(500)
as.Equal(Lv2, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 2)
}
for i := 0; i < 10; i++ {
var buf, index = pool.Get(2000)
var buf = pool.Get(2000)
as.Equal(Lv3, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 3)
}
for i := 0; i < 10; i++ {
var buf, index = pool.Get(5000)
var buf = pool.Get(5000)
as.Equal(Lv5, buf.Cap())
as.Equal(0, buf.Len())
as.Equal(index, 5)
}

{
pool.Put(bytes.NewBuffer(make([]byte, 2)), 2)
b, index := pool.Get(120)
pool.Put(bytes.NewBuffer(make([]byte, 2)))
b := pool.Get(120)
as.GreaterOrEqual(b.Cap(), 120)
as.Equal(index, 1)
}
{
pool.Put(bytes.NewBuffer(make([]byte, 2000)), 4)
b, index := pool.Get(3000)
pool.Put(bytes.NewBuffer(make([]byte, 2000)))
b := pool.Get(3000)
as.GreaterOrEqual(b.Cap(), 3000)
as.Equal(index, 4)
}

pool.Put(nil, 0)
buffer, _ := pool.Get(256 * 1024)
pool.Put(nil)
buffer := pool.Get(256 * 1024)
as.GreaterOrEqual(buffer.Cap(), 256*1024)
}

Expand All @@ -61,3 +55,58 @@ func TestPool(t *testing.T) {
assert.Equal(t, 0, p.Get())
p.Put(1)
}

func TestBufferPool_GetIndex(t *testing.T) {
var p = NewBufferPool()
assert.Equal(t, p.getIndex(200*1024), 0)

assert.Equal(t, p.getIndex(0), 1)
assert.Equal(t, p.getIndex(1), 1)
assert.Equal(t, p.getIndex(10), 1)
assert.Equal(t, p.getIndex(100), 1)
assert.Equal(t, p.getIndex(128), 1)

assert.Equal(t, p.getIndex(200), 2)
assert.Equal(t, p.getIndex(1000), 2)
assert.Equal(t, p.getIndex(500), 2)
assert.Equal(t, p.getIndex(1024), 2)

assert.Equal(t, p.getIndex(2*1024), 3)
assert.Equal(t, p.getIndex(2000), 3)
assert.Equal(t, p.getIndex(1025), 3)

assert.Equal(t, p.getIndex(4*1024), 4)
assert.Equal(t, p.getIndex(3000), 4)
assert.Equal(t, p.getIndex(2*1024+1), 4)

assert.Equal(t, p.getIndex(8*1024), 5)
assert.Equal(t, p.getIndex(5000), 5)
assert.Equal(t, p.getIndex(4*1024+1), 5)

assert.Equal(t, p.getIndex(16*1024), 6)
assert.Equal(t, p.getIndex(10000), 6)
assert.Equal(t, p.getIndex(8*1024+1), 6)

assert.Equal(t, p.getIndex(32*1024), 7)
assert.Equal(t, p.getIndex(20000), 7)
assert.Equal(t, p.getIndex(16*1024+1), 7)

assert.Equal(t, p.getIndex(64*1024), 8)
assert.Equal(t, p.getIndex(40000), 8)
assert.Equal(t, p.getIndex(32*1024+1), 8)

assert.Equal(t, p.getIndex(128*1024), 9)
assert.Equal(t, p.getIndex(100000), 9)
assert.Equal(t, p.getIndex(64*1024+1), 9)
}

func BenchmarkPool_GetIndex(b *testing.B) {
var p = NewBufferPool()

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 1000000; j++ {
p.getIndex(uint32(j))
}
}
}
10 changes: 5 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (c *Conn) readMessage() error {
}

var fin = c.fh.GetFIN()
var buf, index = binaryPool.Get(contentLength + len(flateTail))
var buf = binaryPool.Get(contentLength + len(flateTail))
var p = buf.Bytes()[:contentLength]
var closer = Message{Data: buf, index: index}
var closer = Message{Data: buf}
defer closer.Close()

if err := internal.ReadN(c.br, p); err != nil {
Expand All @@ -112,9 +112,9 @@ func (c *Conn) readMessage() error {
if fin && opcode != OpcodeContinuation {
*(*[]byte)(unsafe.Pointer(buf)) = p
if !compressed {
closer.Data, closer.index = nil, 0
closer.Data = nil
}
return c.emitMessage(&Message{index: index, Opcode: opcode, Data: buf, compressed: compressed})
return c.emitMessage(&Message{Opcode: opcode, Data: buf, compressed: compressed})
}

if !fin && opcode != OpcodeContinuation {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *Conn) dispatch(msg *Message) error {

func (c *Conn) emitMessage(msg *Message) (err error) {
if msg.compressed {
msg.Data, msg.index, err = c.decompressor.Decompress(msg.Data)
msg.Data, err = c.decompressor.Decompress(msg.Data)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
}
Expand Down
2 changes: 1 addition & 1 deletion reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestSegments(t *testing.T) {
go client.ReadLoop()

go func() {
frame, _, _ := client.genFrame(OpcodeText, testdata)
frame, _ := client.genFrame(OpcodeText, testdata)
data := frame.Bytes()
data[20] = 'x'
client.conn.Write(data)
Expand Down
27 changes: 0 additions & 27 deletions recovery.go

This file was deleted.

Loading

0 comments on commit 37aaa6c

Please sign in to comment.