Skip to content

Commit

Permalink
fix(spanner): wait for things to complete (#10095)
Browse files Browse the repository at this point in the history
As a good rule of thumb every single goroutine needs to be waited
to be completed, otherwise it's easy to introduce issues where the
created goroutine outlives the parent and accesses variables or
services that have been shutdown.
  • Loading branch information
egonelbre authored Aug 21, 2024
1 parent 1c80ee7 commit 7785cad
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
20 changes: 16 additions & 4 deletions spanner/sessionclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type sessionConsumer interface {
// will ensure that the sessions that are created are evenly distributed over
// all available channels.
type sessionClient struct {
waitWorkers sync.WaitGroup
mu sync.Mutex
closed bool
disableRouteToLeader bool
Expand Down Expand Up @@ -120,10 +121,17 @@ func newSessionClient(connPool gtransport.ConnPool, database, userAgent string,
}

func (sc *sessionClient) close() error {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.closed = true
return sc.connPool.Close()
defer sc.waitWorkers.Wait()

var err error
func() {
sc.mu.Lock()
defer sc.mu.Unlock()

sc.closed = true
err = sc.connPool.Close()
}()
return err
}

// createSession creates one session for the database of the sessionClient. The
Expand Down Expand Up @@ -231,6 +239,7 @@ func (sc *sessionClient) batchCreateSessions(createSessionCount int32, distribut
createCountForChannel += remainder
}
if createCountForChannel > 0 {
sc.waitWorkers.Add(1)
go sc.executeBatchCreateSessions(client, createCountForChannel, sc.sessionLabels, sc.md, consumer)
numBeingCreated += createCountForChannel
}
Expand All @@ -241,11 +250,14 @@ func (sc *sessionClient) batchCreateSessions(createSessionCount int32, distribut
// executeBatchCreateSessions executes the gRPC call for creating a batch of
// sessions.
func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createCount int32, labels map[string]string, md metadata.MD, consumer sessionConsumer) {
defer sc.waitWorkers.Done()

ctx, cancel := context.WithTimeout(context.Background(), sc.batchTimeout)
defer cancel()
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchCreateSessions")
defer func() { trace.EndSpan(ctx, nil) }()
trace.TracePrintf(ctx, nil, "Creating a batch of %d sessions", createCount)

remainingCreateCount := createCount
for {
sc.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion spanner/spannertest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewServer(laddr string) (*Server, error) {
s := &Server{
Addr: l.Addr().String(),
l: l,
srv: grpc.NewServer(),
srv: grpc.NewServer(grpc.WaitForHandlers(true)),
s: &server{
logf: func(format string, args ...interface{}) {
log.Printf("spannertest.inmem: "+format, args...)
Expand Down

0 comments on commit 7785cad

Please sign in to comment.