Skip to content

Commit

Permalink
TxStats: provides Getter/Inc functions
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Jan 4, 2023
1 parent b7f2da4 commit dd4458c
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 55 deletions.
8 changes: 5 additions & 3 deletions allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ func TestTx_allocatePageStats(t *testing.T) {
pages: make(map[pgid]*page),
}

prePageCnt := tx.Stats().PageCount
txStats := tx.Stats()
prePageCnt := txStats.GetPageCount()
allocateCnt := f.free_count()

if _, err := tx.allocate(allocateCnt); err != nil {
t.Fatal(err)
}

if tx.Stats().PageCount != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount)
txStats = tx.Stats()
if txStats.GetPageCount() != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, txStats.GetPageCount())
}
}
5 changes: 2 additions & 3 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bbolt
import (
"bytes"
"fmt"
"sync/atomic"
"unsafe"
)

Expand Down Expand Up @@ -82,7 +81,7 @@ func (b *Bucket) Writable() bool {
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
atomic.AddInt64(&b.tx.stats.CursorCount, 1)
b.tx.stats.IncCursorCount(1)

// Allocate and return a cursor.
return &Cursor{
Expand Down Expand Up @@ -682,7 +681,7 @@ func (b *Bucket) node(pgid pgid, parent *node) *node {
b.nodes[pgid] = n

// Update statistics.
atomic.AddInt64(&b.tx.stats.NodeCount, 1)
b.tx.stats.IncNodeCount(1)

return n
}
Expand Down
8 changes: 4 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ func TestDB_Stats(t *testing.T) {
}

stats := db.Stats()
if stats.TxStats.PageCount != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.PageCount)
if stats.TxStats.GetPageCount() != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.GetPageCount())
} else if stats.FreePageN != 0 {
t.Fatalf("unexpected FreePageN != 0: %d", stats.FreePageN)
} else if stats.PendingPageN != 2 {
Expand Down Expand Up @@ -1122,8 +1122,8 @@ func TestDBStats_Sub(t *testing.T) {
b.TxStats.PageCount = 10
b.FreePageN = 14
diff := b.Sub(&a)
if diff.TxStats.PageCount != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.PageCount)
if diff.TxStats.GetPageCount() != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.GetPageCount())
}

// free page stats are copied from the receiver and not subtracted
Expand Down
12 changes: 6 additions & 6 deletions internal/btesting/btesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ func (db *DB) CopyTempFile() {
func (db *DB) PrintStats() {
var stats = db.Stats()
fmt.Printf("[db] %-20s %-20s %-20s\n",
fmt.Sprintf("pg(%d/%d)", stats.TxStats.PageCount, stats.TxStats.PageAlloc),
fmt.Sprintf("cur(%d)", stats.TxStats.CursorCount),
fmt.Sprintf("node(%d/%d)", stats.TxStats.NodeCount, stats.TxStats.NodeDeref),
fmt.Sprintf("pg(%d/%d)", stats.TxStats.GetPageCount(), stats.TxStats.GetPageAlloc()),
fmt.Sprintf("cur(%d)", stats.TxStats.GetCursorCount()),
fmt.Sprintf("node(%d/%d)", stats.TxStats.GetNodeCount(), stats.TxStats.GetNodeDeref()),
)
fmt.Printf(" %-20s %-20s %-20s\n",
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.Rebalance, truncDuration(stats.TxStats.RebalanceTime)),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.Spill, truncDuration(stats.TxStats.SpillTime)),
fmt.Sprintf("w(%d/%v)", stats.TxStats.Write, truncDuration(stats.TxStats.WriteTime)),
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.GetRebalance(), truncDuration(stats.TxStats.GetRebalanceTime())),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.GetSpill(), truncDuration(stats.TxStats.GetSpillTime())),
fmt.Sprintf("w(%d/%v)", stats.TxStats.GetWrite(), truncDuration(stats.TxStats.GetWriteTime())),
)
}

Expand Down
9 changes: 4 additions & 5 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"sort"
"sync/atomic"
"unsafe"
)

Expand Down Expand Up @@ -305,7 +304,7 @@ func (n *node) splitTwo(pageSize uintptr) (*node, *node) {
n.inodes = n.inodes[:splitIndex]

// Update the statistics.
atomic.AddInt64(&n.bucket.tx.stats.Split, 1)
n.bucket.tx.stats.IncSplit(1)

return n, next
}
Expand Down Expand Up @@ -392,7 +391,7 @@ func (n *node) spill() error {
}

// Update the statistics.
atomic.AddInt64(&tx.stats.Spill, 1)
tx.stats.IncSpill(1)
}

// If the root node split and created a new root then we need to spill that
Expand All @@ -414,7 +413,7 @@ func (n *node) rebalance() {
n.unbalanced = false

// Update statistics.
n.bucket.tx.stats.Rebalance++
n.bucket.tx.stats.IncRebalance(1)

// Ignore if node is above threshold (25%) and has enough keys.
var threshold = n.bucket.tx.db.pageSize / 4
Expand Down Expand Up @@ -548,7 +547,7 @@ func (n *node) dereference() {
}

// Update statistics.
atomic.AddInt64(&n.bucket.tx.stats.NodeDeref, 1)
n.bucket.tx.stats.IncNodeDeref(1)
}

// free adds the node's underlying page to the freelist.
Expand Down
192 changes: 158 additions & 34 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (tx *Tx) Commit() error {
// Rebalance nodes which have had deletions.
var startTime = time.Now()
tx.root.rebalance()
if atomic.LoadInt64(&tx.stats.Rebalance) > 0 {
atomicAddDuration(&tx.stats.RebalanceTime, time.Since(startTime))
if tx.stats.GetRebalance() > 0 {
tx.stats.IncRebalanceTime(time.Since(startTime))
}

// spill data onto dirty pages.
Expand All @@ -162,7 +162,7 @@ func (tx *Tx) Commit() error {
tx.rollback()
return err
}
atomicAddDuration(&tx.stats.SpillTime, time.Since(startTime))
tx.stats.IncSpillTime(time.Since(startTime))

// Free the old root bucket.
tx.meta.root.root = tx.root.root
Expand Down Expand Up @@ -209,7 +209,7 @@ func (tx *Tx) Commit() error {
tx.rollback()
return err
}
atomicAddDuration(&tx.stats.WriteTime, time.Since(startTime))
tx.stats.IncWriteTime(time.Since(startTime))

// Finalize the transaction.
tx.close()
Expand Down Expand Up @@ -504,8 +504,8 @@ func (tx *Tx) allocate(count int) (*page, error) {
tx.pages[p.id] = p

// Update statistics.
atomic.AddInt64(&tx.stats.PageCount, int64(count))
atomic.AddInt64(&tx.stats.PageAlloc, int64(count*tx.db.pageSize))
tx.stats.IncPageCount(int64(count))
tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))

return p, nil
}
Expand Down Expand Up @@ -540,7 +540,7 @@ func (tx *Tx) write() error {
}

// Update statistics.
atomic.AddInt64(&tx.stats.Write, 1)
tx.stats.IncWrite(1)

// Exit inner for loop if we've written all the chunks.
rem -= sz
Expand Down Expand Up @@ -599,7 +599,7 @@ func (tx *Tx) writeMeta() error {
}

// Update statistics.
atomic.AddInt64(&tx.stats.Write, 1)
tx.stats.IncWrite(1)

return nil
}
Expand Down Expand Up @@ -698,40 +698,164 @@ type TxStats struct {
}

func (s *TxStats) add(other *TxStats) {
s.PageCount += other.PageCount
s.PageAlloc += other.PageAlloc
s.CursorCount += other.CursorCount
s.NodeCount += other.NodeCount
s.NodeDeref += other.NodeDeref
s.Rebalance += other.Rebalance
s.RebalanceTime += other.RebalanceTime
s.Split += other.Split
s.Spill += other.Spill
s.SpillTime += other.SpillTime
s.Write += other.Write
s.WriteTime += other.WriteTime
s.IncPageCount(other.GetPageCount())
s.IncPageAlloc(other.GetPageAlloc())
s.IncCursorCount(other.GetCursorCount())
s.IncNodeCount(other.GetNodeCount())
s.IncNodeDeref(other.GetNodeDeref())
s.IncRebalance(other.GetRebalance())
s.IncRebalanceTime(other.GetRebalanceTime())
s.IncSplit(other.GetSplit())
s.IncSpill(other.GetSpill())
s.IncSpillTime(other.GetSpillTime())
s.IncWrite(other.GetWrite())
s.IncWriteTime(other.GetWriteTime())
}

// Sub calculates and returns the difference between two sets of transaction stats.
// This is useful when obtaining stats at two different points and time and
// you need the performance counters that occurred within that time span.
func (s *TxStats) Sub(other *TxStats) TxStats {
var diff TxStats
diff.PageCount = s.PageCount - other.PageCount
diff.PageAlloc = s.PageAlloc - other.PageAlloc
diff.CursorCount = s.CursorCount - other.CursorCount
diff.NodeCount = s.NodeCount - other.NodeCount
diff.NodeDeref = s.NodeDeref - other.NodeDeref
diff.Rebalance = s.Rebalance - other.Rebalance
diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
diff.Split = s.Split - other.Split
diff.Spill = s.Spill - other.Spill
diff.SpillTime = s.SpillTime - other.SpillTime
diff.Write = s.Write - other.Write
diff.WriteTime = s.WriteTime - other.WriteTime
diff.PageCount = s.GetPageCount() - other.GetPageCount()
diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
diff.Rebalance = s.GetRebalance() - other.GetRebalance()
diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
diff.Split = s.GetSplit() - other.GetSplit()
diff.Spill = s.GetSpill() - other.GetSpill()
diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
diff.Write = s.GetWrite() - other.GetWrite()
diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
return diff
}

func atomicAddDuration(ptr *time.Duration, du time.Duration) {
atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))
// GetPageCount returns PageCount atomically.
func (s *TxStats) GetPageCount() int64 {
return atomic.LoadInt64(&s.PageCount)
}

// IncPageCount increases PageCount atomically and returns the new value.
func (s *TxStats) IncPageCount(delta int64) int64 {
return atomic.AddInt64(&s.PageCount, delta)
}

// GetPageAlloc returns PageAlloc atomically.
func (s *TxStats) GetPageAlloc() int64 {
return atomic.LoadInt64(&s.PageAlloc)
}

// IncPageAlloc increases PageAlloc atomically and returns the new value.
func (s *TxStats) IncPageAlloc(delta int64) int64 {
return atomic.AddInt64(&s.PageAlloc, delta)
}

// GetCursorCount returns CursorCount atomically.
func (s *TxStats) GetCursorCount() int64 {
return atomic.LoadInt64(&s.CursorCount)
}

// IncCursorCount increases CursorCount atomically and return the new value.
func (s *TxStats) IncCursorCount(delta int64) int64 {
return atomic.AddInt64(&s.CursorCount, delta)
}

// GetNodeCount returns NodeCount atomically.
func (s *TxStats) GetNodeCount() int64 {
return atomic.LoadInt64(&s.NodeCount)
}

// IncNodeCount increases NodeCount atomically and returns the new value.
func (s *TxStats) IncNodeCount(delta int64) int64 {
return atomic.AddInt64(&s.NodeCount, delta)
}

// GetNodeDeref returns NodeDeref atomically.
func (s *TxStats) GetNodeDeref() int64 {
return atomic.LoadInt64(&s.NodeDeref)
}

// IncNodeDeref increases NodeDeref atomically and returns the new value.
func (s *TxStats) IncNodeDeref(delta int64) int64 {
return atomic.AddInt64(&s.NodeDeref, delta)
}

// GetRebalance returns Rebalance atomically.
func (s *TxStats) GetRebalance() int64 {
return atomic.LoadInt64(&s.Rebalance)
}

// IncRebalance increases Rebalance atomically and returns the new value.
func (s *TxStats) IncRebalance(delta int64) int64 {
return atomic.AddInt64(&s.Rebalance, delta)
}

// GetRebalanceTime returns RebalanceTime atomically.
func (s *TxStats) GetRebalanceTime() time.Duration {
return atomicLoadDuration(&s.RebalanceTime)
}

// IncRebalanceTime increases RebalanceTime atomically and returns the new value.
func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.RebalanceTime, delta)
}

// GetSplit returns Split atomically.
func (s *TxStats) GetSplit() int64 {
return atomic.LoadInt64(&s.Split)
}

// IncSplit increases Split atomically and returns the new value.
func (s *TxStats) IncSplit(delta int64) int64 {
return atomic.AddInt64(&s.Split, delta)
}

// GetSpill returns Spill atomically.
func (s *TxStats) GetSpill() int64 {
return atomic.LoadInt64(&s.Spill)
}

// IncSpill increases Spill atomically and returns the new value.
func (s *TxStats) IncSpill(delta int64) int64 {
return atomic.AddInt64(&s.Spill, delta)
}

// GetSpillTime returns SpillTime atomically.
func (s *TxStats) GetSpillTime() time.Duration {
return atomicLoadDuration(&s.SpillTime)
}

// IncSpillTime increases SpillTime atomically and returns the new value.
func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.SpillTime, delta)
}

// GetWrite returns Write atomically.
func (s *TxStats) GetWrite() int64 {
return atomic.LoadInt64(&s.Write)
}

// IncWrite increases Write atomically and returns the new value.
func (s *TxStats) IncWrite(delta int64) int64 {
return atomic.AddInt64(&s.Write, delta)
}

// GetWriteTime returns WriteTime atomically.
func (s *TxStats) GetWriteTime() time.Duration {
return atomicLoadDuration(&s.WriteTime)
}

// IncWriteTime increases WriteTime atomically and returns the new value.
func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.WriteTime, delta)
}

func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
}

func atomicLoadDuration(ptr *time.Duration) time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
}
Loading

0 comments on commit dd4458c

Please sign in to comment.