Skip to content

Commit

Permalink
Merge pull request #109 from lxzan/writeReader
Browse files Browse the repository at this point in the history
WriteFile
  • Loading branch information
lxzan authored Aug 19, 2024
2 parents 3dc044f + bf42d3f commit a6641ec
Show file tree
Hide file tree
Showing 16 changed files with 656 additions and 62 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ linters:
# Disable specific linter
# https://golangci-lint.run/usage/linters/#disabled-by-default
disable:
- maintidx
- mnd
- testpackage
- nlreturn
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ ok github.com/lxzan/gws 17.231s
- [x] Broadcast
- [x] Dial via Proxy
- [x] Context-Takeover
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Concurrent & Asynchronous Non-Blocking Write
- [x] Segmented Writing of Large Files
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

### Attention

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ok github.com/lxzan/gws 17.231s
- [x] 广播
- [x] 代理拨号
- [x] 上下文接管
- [x] 大文件分段写入
- [x] 支持并发和异步非阻塞写入
- [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

Expand Down
14 changes: 12 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
conn: &benchConn{},
config: upgrader.option.getConfig(),
}
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down Expand Up @@ -98,7 +103,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
deflater: new(deflater),
}
conn1.deflater.initialize(false, conn1.pd, config.ReadMaxPayloadSize)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down
232 changes: 232 additions & 0 deletions bigfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package gws

import (
"bytes"
"encoding/binary"
"errors"
"io"
"math"

"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
)

const segmentSize = 128 * 1024

// 获取大文件压缩器
// Get bigDeflater
func (c *Conn) getBigDeflater() *bigDeflater {
if c.isServer {
return c.config.bdPool.Get()
}
return (*bigDeflater)(c.deflater.cpsWriter)
}

// 回收大文件压缩器
// Recycle bigDeflater
func (c *Conn) putBigDeflater(d *bigDeflater) {
if c.isServer {
c.config.bdPool.Put(d)
}
}

// 拆分io.Reader为小切片
// Split io.Reader into small slices
func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)

var p = buf.Bytes()[:segmentSize]
var n, index = 0, 0
var err error
for n, err = r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = r.Read(p) {
eof := errors.Is(err, io.EOF)
if err = f(index, eof, p[:n]); err != nil {
return err
}
index++
if eof {
break
}
}
return err
}

// WriteFile 大文件写入
// 采用分段写入技术, 减少写入过程中的内存占用
// Segmented write technology to reduce memory usage during write process
func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error {
err := c.doWriteFile(opcode, payload)
c.emitError(err)
return err
}

func (c *Conn) doWriteFile(opcode Opcode, payload io.Reader) error {
c.mu.Lock()
defer c.mu.Unlock()

var cb = func(index int, eof bool, p []byte) error {
if index > 0 {
opcode = OpcodeContinuation
}
frame, err := c.genFrame(opcode, internal.Bytes(p), frameConfig{
fin: eof,
compress: false,
broadcast: false,
checkEncoding: false,
})
if err != nil {
return err
}
if c.pd.Enabled && index == 0 {
frame.Bytes()[0] |= uint8(64)
}
if c.isClosed() {
return ErrConnClosed
}
err = internal.WriteN(c.conn, frame.Bytes())
binaryPool.Put(frame)
return err
}

if c.pd.Enabled {
var deflater = c.getBigDeflater()
var fw = &flateWriter{cb: cb}
err := deflater.Compress(payload, fw, c.getCpsDict(false), &c.cpsWindow)
c.putBigDeflater(deflater)
return err
} else {
return c.splitReader(payload, cb)
}
}

// 大文件压缩器
type bigDeflater flate.Writer

// 创建大文件压缩器
// Create a bigDeflater
func newBigDeflater(isServer bool, options PermessageDeflate) *bigDeflater {
windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits)
if windowBits == 15 {
cpsWriter, _ := flate.NewWriter(nil, options.Level)
return (*bigDeflater)(cpsWriter)
} else {
cpsWriter, _ := flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
return (*bigDeflater)(cpsWriter)
}
}

func (c *bigDeflater) FlateWriter() *flate.Writer { return (*flate.Writer)(c) }

// Compress 压缩
func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw *slideWindow) error {
if err := compressTo(c.FlateWriter(), &readerWrapper{r: src, sw: sw}, dst, dict); err != nil {
return err
}
return dst.Flush()
}

// 写入代理
// 将切片透传给回调函数, 以实现分段写入功能
// Write proxy
// Passthrough slices to the callback function for segmented writes.
type flateWriter struct {
index int
buffers []*bytes.Buffer
cb func(index int, eof bool, p []byte) error
}

// 是否可以执行回调函数
// Whether the callback function can be executed
func (c *flateWriter) shouldCall() bool {
var n = len(c.buffers)
if n < 2 {
return false
}
var sum = 0
for i := 1; i < n; i++ {
sum += c.buffers[i].Len()
}
return sum >= 4
}

// 聚合写入, 减少syscall.write调用次数
// Aggregate writes, reducing the number of syscall.write calls
func (c *flateWriter) write(p []byte) {
if len(c.buffers) == 0 {
c.buffers = append(c.buffers, binaryPool.Get(segmentSize))
}
var n = len(c.buffers)
var tail = c.buffers[n-1]
if tail.Len()+len(p)+frameHeaderSize > tail.Cap() {
tail = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, tail)
}
tail.Write(p)
}

func (c *flateWriter) Write(p []byte) (n int, err error) {
c.write(p)
if c.shouldCall() {
err = c.cb(c.index, false, c.buffers[0].Bytes())
binaryPool.Put(c.buffers[0])
c.buffers = c.buffers[1:]
c.index++
}
return n, err
}

func (c *flateWriter) Flush() error {
var buf = c.buffers[0]
for i := 1; i < len(c.buffers); i++ {
buf.Write(c.buffers[i].Bytes())
binaryPool.Put(c.buffers[i])
}
if n := buf.Len(); n >= 4 {
if tail := buf.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
buf.Truncate(n - 4)
}
}
var err = c.cb(c.index, true, buf.Bytes())
c.index++
binaryPool.Put(buf)
return err
}

// 将io.Reader包装为io.WriterTo
// Wrapping io.Reader as io.WriterTo
type readerWrapper struct {
r io.Reader
sw *slideWindow
}

// WriteTo 写入内容, 并更新字典
// Write the contents, and update the dictionary
func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)

var p = buf.Bytes()[:segmentSize]
var sum, n = 0, 0
var err error
for n, err = c.r.Read(p); err == nil || errors.Is(err, io.EOF); n, err = c.r.Read(p) {
eof := errors.Is(err, io.EOF)
if _, err = w.Write(p[:n]); err != nil {
return int64(sum), err
}
sum += n
_, _ = c.sw.Write(p[:n])
if eof {
break
}
}
return int64(sum), err
}

func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error {
cpsWriter.ResetDict(w, dict)
if _, err := r.WriteTo(cpsWriter); err != nil {
return err
}
return cpsWriter.Flush()
}
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
writeQueue: workerQueue{maxConcurrency: 1},
readQueue: make(channel, c.option.ParallelGolimit),
}

// 压缩字典和解压字典内存开销比较大, 故使用懒加载
// Compressing and decompressing dictionaries has a large memory overhead, so use lazy loading.
if pd.Enabled {
socket.deflater.initialize(false, pd, c.option.ReadMaxPayloadSize)
if pd.ServerContextTakeover {
Expand Down
10 changes: 2 additions & 8 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,11 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er
func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error {
c.cpsLocker.Lock()
defer c.cpsLocker.Unlock()

c.cpsWriter.ResetDict(dst, dict)
if _, err := src.WriteTo(c.cpsWriter); err != nil {
return err
}
if err := c.cpsWriter.Flush(); err != nil {
if err := compressTo(c.cpsWriter, src, dst, dict); err != nil {
return err
}
if n := dst.Len(); n >= 4 {
compressedContent := dst.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
if tail := dst.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
dst.Truncate(n - 4)
}
}
Expand Down
4 changes: 4 additions & 0 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,7 @@ func (c *writerTo) Len() int {
func (c *writerTo) WriteTo(w io.Writer) (n int64, err error) {
return 0, errors.New("1")
}

func (c *writerTo) Read(p []byte) (n int, err error) {
return 0, errors.New("1")
}
2 changes: 1 addition & 1 deletion examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {

func MustLoad[T any](session gws.SessionStorage, key string) (v T) {
if value, exist := session.Load(key); exist {
v = value.(T)
v, _ = value.(T)
}
return
}
Expand Down
Loading

0 comments on commit a6641ec

Please sign in to comment.