Skip to content

Commit

Permalink
database/sql: reduce lock contention in Stmt.connStmt
Browse files Browse the repository at this point in the history
Previouslly, Stmt.connStmt calls DB.connIfFree on each Stmt.css.
Since Stmt.connStmt locks Stmt.mu, a concurrent use of Stmt causes lock
contention on Stmt.mu.
Additionally, DB.connIfFree locks DB.mu which is shared by DB.addDep and
DB.removeDep.

This change removes DB.connIfFree and makes use of a first unused
connection in idle connection pool to reduce lock contention
without making it complicated.

Fixes #9484

On EC2 c3.8xlarge (E5-2680 v2 @ 2.80GHz * 32 vCPU):

benchmark                           old ns/op     new ns/op     delta
BenchmarkManyConcurrentQuery-8      40249         34721         -13.73%
BenchmarkManyConcurrentQuery-16     45610         40176         -11.91%
BenchmarkManyConcurrentQuery-32     109831        43179         -60.69%

benchmark                           old allocs     new allocs     delta
BenchmarkManyConcurrentQuery-8      25             25             +0.00%
BenchmarkManyConcurrentQuery-16     25             25             +0.00%
BenchmarkManyConcurrentQuery-32     25             25             +0.00%

benchmark                           old bytes     new bytes     delta
BenchmarkManyConcurrentQuery-8      3980          3969          -0.28%
BenchmarkManyConcurrentQuery-16     3980          3982          +0.05%
BenchmarkManyConcurrentQuery-32     3993          3990          -0.08%

Change-Id: Ic96296922c465bac38a260018c58324dae1531d9
Reviewed-on: https://go-review.googlesource.com/2207
Reviewed-by: Mikio Hara <[email protected]>
  • Loading branch information
methane authored and cixtor committed Jan 24, 2015
1 parent 3acb9fd commit 1b61a97
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 111 deletions.
103 changes: 42 additions & 61 deletions src/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
)

var drivers = make(map[string]driver.Driver)
Expand Down Expand Up @@ -211,6 +212,10 @@ var ErrNoRows = errors.New("sql: no rows in result set")
type DB struct {
driver driver.Driver
dsn string
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64

mu sync.Mutex // protects following fields
freeConn []*driverConn
Expand Down Expand Up @@ -246,7 +251,7 @@ type driverConn struct {
// guarded by db.mu
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

func (dc *driverConn) releaseConn(err error) {
Expand Down Expand Up @@ -329,6 +334,7 @@ func (dc *driverConn) finalClose() error {
dc.db.maybeOpenNewConnections()
dc.db.mu.Unlock()

atomic.AddUint64(&dc.db.numClosed, 1)
return err
}

Expand Down Expand Up @@ -683,42 +689,6 @@ var (
errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy")
)

// connIfFree returns (wanted, nil) if wanted is still a valid conn and
// isn't in use.
//
// The error is errConnClosed if the connection if the requested connection
// is invalid because it's been closed.
//
// The error is errConnBusy if the connection is in use.
func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) {
db.mu.Lock()
defer db.mu.Unlock()
if wanted.dbmuClosed {
return nil, errConnClosed
}
if wanted.inUse {
return nil, errConnBusy
}
idx := -1
for ii, v := range db.freeConn {
if v == wanted {
idx = ii
break
}
}
if idx >= 0 {
db.freeConn = append(db.freeConn[:idx], db.freeConn[idx+1:]...)
wanted.inUse = true
return wanted, nil
}
// TODO(bradfitz): shouldn't get here. After Go 1.1, change this to:
// panic("connIfFree call requested a non-closed, non-busy, non-free conn")
// Which passes all the tests, but I'm too paranoid to include this
// late in Go 1.1.
// Instead, treat it like a busy connection:
return nil, errConnBusy
}

// putConnHook is a hook for testing.
var putConnHook func(*DB, *driverConn)

Expand Down Expand Up @@ -856,9 +826,10 @@ func (db *DB) prepare(query string) (*Stmt, error) {
return nil, err
}
stmt := &Stmt{
db: db,
query: query,
css: []connStmt{{dc, si}},
db: db,
query: query,
css: []connStmt{{dc, si}},
lastNumClosed: atomic.LoadUint64(&db.numClosed),
}
db.addDep(stmt, stmt)
db.putConn(dc, nil)
Expand Down Expand Up @@ -1293,6 +1264,10 @@ type Stmt struct {
// used if tx == nil and one is found that has idle
// connections. If tx != nil, txsi is always used.
css []connStmt

// lastNumClosed is copied from db.numClosed when Stmt is created
// without tx and closed connections in css are removed.
lastNumClosed uint64
}

// Exec executes a prepared statement with the given arguments and
Expand Down Expand Up @@ -1346,6 +1321,32 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
return driverResult{ds.Locker, resi}, nil
}

// removeClosedStmtLocked removes closed conns in s.css.
//
// To avoid lock contention on DB.mu, we do it only when
// s.db.numClosed - s.lastNum is large enough.
func (s *Stmt) removeClosedStmtLocked() {
t := len(s.css)/2 + 1
if t > 10 {
t = 10
}
dbClosed := atomic.LoadUint64(&s.db.numClosed)
if dbClosed-s.lastNumClosed < uint64(t) {
return
}

s.db.mu.Lock()
for i := 0; i < len(s.css); i++ {
if s.css[i].dc.dbmuClosed {
s.css[i] = s.css[len(s.css)-1]
s.css = s.css[:len(s.css)-1]
i--
}
}
s.db.mu.Unlock()
s.lastNumClosed = dbClosed
}

// connStmt returns a free driver connection on which to execute the
// statement, a function to call to release the connection, and a
// statement bound to that connection.
Expand All @@ -1372,35 +1373,15 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St
return ci, releaseConn, s.txsi.si, nil
}

for i := 0; i < len(s.css); i++ {
v := s.css[i]
_, err := s.db.connIfFree(v.dc)
if err == nil {
s.mu.Unlock()
return v.dc, v.dc.releaseConn, v.si, nil
}
if err == errConnClosed {
// Lazily remove dead conn from our freelist.
s.css[i] = s.css[len(s.css)-1]
s.css = s.css[:len(s.css)-1]
i--
}

}
s.removeClosedStmtLocked()
s.mu.Unlock()

// If all connections are busy, either wait for one to become available (if
// we've already hit the maximum number of open connections) or create a
// new one.
//
// TODO(bradfitz): or always wait for one? make configurable later?
dc, err := s.db.conn()
if err != nil {
return nil, nil, nil, err
}

// Do another pass over the list to see whether this statement has
// already been prepared on the connection assigned to us.
s.mu.Lock()
for _, v := range s.css {
if v.dc == dc {
Expand Down
78 changes: 28 additions & 50 deletions src/database/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,56 +1764,6 @@ func doConcurrentTest(t testing.TB, ct concurrentTest) {
wg.Wait()
}

func manyConcurrentQueries(t testing.TB) {
maxProcs, numReqs := 16, 500
if testing.Short() {
maxProcs, numReqs = 4, 50
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs))

db := newTestDB(t, "people")
defer closeDB(t, db)

stmt, err := db.Prepare("SELECT|people|name|")
if err != nil {
t.Fatal(err)
}
defer stmt.Close()

var wg sync.WaitGroup
wg.Add(numReqs)

reqs := make(chan bool)
defer close(reqs)

for i := 0; i < maxProcs*2; i++ {
go func() {
for range reqs {
rows, err := stmt.Query()
if err != nil {
t.Errorf("error on query: %v", err)
wg.Done()
continue
}

var name string
for rows.Next() {
rows.Scan(&name)
}
rows.Close()

wg.Done()
}
}()
}

for i := 0; i < numReqs; i++ {
reqs <- true
}

wg.Wait()
}

func TestIssue6081(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)
Expand Down Expand Up @@ -1985,3 +1935,31 @@ func BenchmarkConcurrentRandom(b *testing.B) {
doConcurrentTest(b, ct)
}
}

func BenchmarkManyConcurrentQueries(b *testing.B) {
b.ReportAllocs()
// To see lock contention in Go 1.4, 16~ cores and 128~ goroutines are required.
const parallelism = 16

db := newTestDB(b, "magicquery")
defer closeDB(b, db)
db.SetMaxIdleConns(runtime.GOMAXPROCS(0) * parallelism)

stmt, err := db.Prepare("SELECT|magicquery|op|op=?,millis=?")
if err != nil {
b.Fatal(err)
}
defer stmt.Close()

b.SetParallelism(parallelism)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
rows, err := stmt.Query("sleep", 1)
if err != nil {
b.Error(err)
return
}
rows.Close()
}
})
}

0 comments on commit 1b61a97

Please sign in to comment.