Skip to content

Commit

Permalink
feat: parallet
Browse files Browse the repository at this point in the history
  • Loading branch information
rfyiamcool committed Sep 30, 2022
1 parent 77e913e commit c7ab938
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 34 deletions.
13 changes: 4 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,14 @@ func init() {
}

func randString(n int) string {
const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
return strings.Repeat(".", n)
}

func main() {
var (
count = 100000 // 10w
bucketBytes = 10 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
count = 100000 // 10w
bucketBytes = 100 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
)

queue := bigqueue.NewQueueChains(bucketBytes, maxBuckets)
Expand Down
13 changes: 4 additions & 9 deletions example/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@ import (
)

func randString(n int) string {
const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
return strings.Repeat(".", n)
}

func main() {
var (
count = 100000 // 10w
bucketBytes = 10 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
count = 100000 // 10w
bucketBytes = 100 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
wg = sync.WaitGroup{}
)

Expand Down
108 changes: 108 additions & 0 deletions example/parallel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"context"
"log"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/rfyiamcool/go-bigqueue"
)

var (
empty = struct{}{}
)

func randString(n int) string {
return strings.Repeat(".", n)
}

func main() {
var (
ctx, cancel = context.WithCancel(context.TODO())

bucketBytes = 100 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10 = 1gb
noticeChan = make(chan struct{}, 20000)
count int64 = 2000000 // 200w
workerNum = 20

start = time.Now()
wg = sync.WaitGroup{}
)

defer cancel()

queue := bigqueue.NewQueueChains(bucketBytes, maxBuckets)

// start consumers
var readCounter int64
for i := 0; i < workerNum; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case <-noticeChan:
}

val, err := queue.Pop()
for err == bigqueue.ErrEmptyQueue {
continue
}

str := string(val)
if strings.HasSuffix(str, "}}") && strings.HasPrefix(str, "{{") {
atomic.AddInt64(&readCounter, 1)
continue
}
}
}()
}

// start producers
var incr int64
for i := 0; i < workerNum; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for {
val := atomic.AddInt64(&incr, 1)
if val > count {
break
}

length := rand.Intn(1024)
bs := "{{" + randString(length) + "}}"
queue.Push([]byte(bs))

select {
case noticeChan <- empty:
}
}
}()
}

for {
if atomic.LoadInt64(&readCounter) == count {
cancel()
break
}
time.Sleep(1 * time.Millisecond)
}

wg.Wait()
if readCounter != count {
log.Panicf("counter error")
}

log.Printf("start %v producers, start consumers %v \n", workerNum, workerNum)
log.Printf("push %v msgs, consume %v msgs, cost: %v \n", count, readCounter, time.Since(start))
}
13 changes: 4 additions & 9 deletions example/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ func init() {
}

func randString(n int) string {
const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
return strings.Repeat(".", n)
}

func main() {
var (
count = 100000 // 10w
bucketBytes = 10 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
count = 100000 // 10w
bucketBytes = 100 * 1024 * 1024 // 100mb
maxBuckets = 10 // 100mb * 10
)

queue := bigqueue.NewQueueChains(bucketBytes, maxBuckets)
Expand Down
8 changes: 1 addition & 7 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,6 @@ func TestBigQueueChain3(t *testing.T) {
t.Log(incr)
}

const letterBytes = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
return strings.Repeat(".", n)
}

0 comments on commit c7ab938

Please sign in to comment.