Skip to content

Latest commit

 

History

History
167 lines (129 loc) · 2.83 KB

README.md

File metadata and controls

167 lines (129 loc) · 2.83 KB

go-bigqueue

golang bigqueue is the same as redis quicklist. bigqueue is made up of ringbuffer and linkedlist.

feature:

  • thread safe, simple api, safe memory.
  • compared with chan []byte, bigqueue reduce gc latency.
  • compared with a big single ringbuffer []byte, bigqueue is easier to expand and shrink.

go-bigqueue

usage

func NewQueue(capacity int, maxCapacity int) *BigQueue
func NewQueueChains(bucketBytes int, maxBuckets int) *BigQueueChains
func Reset()
func Push(data []byte)
func Pop() []byte
func Len() int
func BucketLength() int
func Full() bool

example

simple

package main

import (
	"log"
	"math/rand"
	"strings"
	"time"

	"github.com/rfyiamcool/go-bigqueue"
)

func init() {
	rand.Seed(time.Now().Unix())
}

func randString(n int) string {
	return strings.Repeat(".", n)
}

func main() {
	var (
		count       = 100000            // 10w
		bucketBytes = 100 * 1024 * 1024 // 100mb
		maxBuckets  = 10                // 100mb * 10
	)

	queue := bigqueue.NewQueueChains(bucketBytes, maxBuckets)

	for i := 0; i < count; i++ {
		length := rand.Intn(1024)
		bs := "{{" + randString(length) + "}}"
		queue.Push([]byte(bs))
	}

	incr := 0
	for i := 0; i < count; i++ {
		val, err := queue.Pop()
		if err != nil {
			break
		}

		str := string(val)
		if strings.HasSuffix(str, "}}") && strings.HasPrefix(str, "{{") {
			incr++
			continue
		}

		panic("error")
	}

	if incr != count {
		log.Panicf("counter error")
	}

	log.Println("ok")
}

concurrent

package main

import (
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	"github.com/rfyiamcool/go-bigqueue"
)

func randString(n int) string {
	const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
	b := make([]byte, n)
	for i := range b {
		b[i] = letterBytes[rand.Intn(len(letterBytes))]
	}
	return string(b)
}

func main() {
	var (
		count       = 100000           // 10w
		bucketBytes = 10 * 1024 * 1024 // 100mb
		maxBuckets  = 10               // 100mb * 10
		wg          = sync.WaitGroup{}
	)

	queue := bigqueue.NewQueueChains(bucketBytes, maxBuckets)

	incr := 0
	wg.Add(1)
	go func() {
		defer wg.Done()

		for i := 0; i < count; i++ {
			time.Sleep(time.Duration(rand.Intn(5)) * time.Microsecond) // < 10us

			val, err := queue.Pop()
			for err == bigqueue.ErrEmptyQueue {
				time.Sleep(1 * time.Microsecond)
				val, err = queue.Pop()
			}

			str := string(val)
			if strings.HasSuffix(str, "}}") && strings.HasPrefix(str, "{{") {
				incr++
				continue
			}

			return
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		for i := 0; i < count; i++ {
			time.Sleep(time.Duration(rand.Intn(5)) * time.Microsecond) // < 10us

			length := rand.Intn(1024)
			bs := "{{" + randString(length) + "}}"
			queue.Push([]byte(bs))
		}
	}()
	wg.Wait()

	if incr != count {
		log.Panicf("counter error")
	}

	log.Println("ok")
}