Skip to content

Commit 65a2f77

Browse files
Implement a exclusive lock claimer
The claimer is based on the leases subsystem. We can use the leases to provide an exclusive lock to prevent us from adding or removing a file if there is a worker cleaning up files in the background. The base objectstore with lock requires some tests, but the rest is in place.
1 parent 2d02e99 commit 65a2f77

20 files changed

Lines changed: 869 additions & 136 deletions

cmd/jujud/agent/machine/manifolds.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,7 @@ func commonManifolds(config ManifoldsConfig) dependency.Manifolds {
822822
StateName: stateName,
823823
TraceName: traceName,
824824
ServiceFactoryName: serviceFactoryName,
825+
LeaseManagerName: leaseManagerName,
825826
Clock: config.Clock,
826827
Logger: loggo.GetLogger("juju.worker.objectstore"),
827828
NewObjectStoreWorker: internalobjectstore.ObjectStoreFactory,

core/lease/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ const (
1515
// SingularControllerNamespace is the namespace used to manage
1616
// controller leases.
1717
SingularControllerNamespace = "singular-controller"
18+
19+
// ObjectStoreNamespace is the namespace used to manage
20+
// object store files.
21+
ObjectStoreNamespace = "object-store"
1822
)
1923

2024
// Claimer exposes lease acquisition and expiry notification capabilities.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ require (
108108
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0
109109
go.opentelemetry.io/otel/sdk v1.21.0
110110
go.opentelemetry.io/otel/trace v1.21.0
111+
go.uber.org/goleak v1.3.0
111112
go.uber.org/mock v0.4.0
112113
golang.org/x/crypto v0.17.0
113114
golang.org/x/net v0.19.0

internal/objectstore/base.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,25 @@ import (
77
"context"
88
"time"
99

10+
"github.com/juju/clock"
1011
"github.com/juju/errors"
11-
"github.com/juju/juju/core/objectstore"
1212
"gopkg.in/tomb.v2"
13+
14+
"github.com/juju/juju/core/objectstore"
1315
)
1416

15-
// Locker is the interface that is used to lock a file.
16-
type Locker interface {
17-
// Lock locks the file with the given hash.
18-
Lock(ctx context.Context, hash string) (LockExtender, error)
19-
// Unlock unlocks the file with the given hash.
20-
Unlock(ctx context.Context, hash string) error
17+
// Claimer is the interface that is used to claim an exclusive lock on a file.
18+
// The lock is used to prevent concurrent access to the same file for put
19+
// and remove operations.
20+
type Claimer interface {
21+
// Claim locks the file with the given hash.
22+
Claim(ctx context.Context, hash string) (ClaimExtender, error)
23+
// Release releases the file with the given hash.
24+
Release(ctx context.Context, hash string) error
2125
}
2226

23-
// LockExtender is the interface that is used to extend a lock.
24-
type LockExtender interface {
27+
// ClaimExtender is the interface that is used to extend a lock.
28+
type ClaimExtender interface {
2529
// Extend extends the lock for the given hash.
2630
Extend(ctx context.Context) error
2731

@@ -32,8 +36,19 @@ type LockExtender interface {
3236
type baseObjectStore struct {
3337
tomb tomb.Tomb
3438
metadataService objectstore.ObjectStoreMetadata
35-
locker Locker
39+
claimer Claimer
3640
logger Logger
41+
clock clock.Clock
42+
}
43+
44+
// Kill implements the worker.Worker interface.
45+
func (s *baseObjectStore) Kill() {
46+
s.tomb.Kill(nil)
47+
}
48+
49+
// Wait implements the worker.Worker interface.
50+
func (s *baseObjectStore) Wait() error {
51+
return s.tomb.Wait()
3752
}
3853

3954
// scopedContext returns a context that is in the scope of the worker lifetime.
@@ -45,53 +60,66 @@ func (w *baseObjectStore) scopedContext() (context.Context, context.CancelFunc)
4560
}
4661

4762
func (w *baseObjectStore) withLock(ctx context.Context, hash string, f func(context.Context) error) error {
63+
// If the context is already done, then don't waste any cycles trying
64+
// to claim the lock.
65+
if err := ctx.Err(); err != nil {
66+
return errors.Trace(err)
67+
}
68+
4869
// Lock the file with the given hash, so that we can't remove the file
4970
// while we're writing it.
50-
extender, err := w.locker.Lock(ctx, hash)
71+
extender, err := w.claimer.Claim(ctx, hash)
5172
if err != nil {
5273
return errors.Trace(err)
5374
}
5475

55-
defer w.locker.Unlock(ctx, hash)
76+
// Always release the lock when we're done. This is optimistic, because
77+
// when the duration of the lock has expired, the lock will be released
78+
// anyway.
79+
defer func() {
80+
_ = w.claimer.Release(ctx, hash)
81+
}()
5682

57-
newCtx, cancel := context.WithCancel(ctx)
83+
runnerCtx, cancel := context.WithCancel(ctx)
5884
defer cancel()
5985

6086
// Extend the lock for the duration of the operation.
6187
var runner tomb.Tomb
6288
runner.Go(func() error {
6389
defer cancel()
6490

65-
return f(newCtx)
91+
return f(runnerCtx)
6692
})
6793
runner.Go(func() error {
6894
defer cancel()
6995

70-
timer := time.NewTimer(extender.Duration())
71-
defer timer.Stop()
72-
7396
for {
7497
select {
75-
case <-newCtx.Done():
98+
case <-w.tomb.Dying():
7699
return nil
77-
case <-timer.C:
78-
if err := extender.Extend(newCtx); err != nil {
100+
case <-runnerCtx.Done():
101+
return nil
102+
103+
case <-w.clock.After(extender.Duration()):
104+
// Attempt to extend the lock if the function is still running.
105+
if err := extender.Extend(runnerCtx); err != nil {
79106
return errors.Trace(err)
80107
}
81-
case <-w.tomb.Dying():
82-
return nil
83108
}
84109
}
85110
})
86111

87112
select {
113+
case <-ctx.Done():
114+
return ctx.Err()
88115
case <-runner.Dying():
89116
return runner.Err()
90117
case <-w.tomb.Dying():
91118
// Ensure that we cancel the context if the runner is dying.
92119
runner.Kill(nil)
93-
<-runner.Dying()
94-
120+
if err := runner.Wait(); err != nil {
121+
return errors.Trace(err)
122+
}
95123
return tomb.ErrDying
96124
}
97125
}

0 commit comments

Comments
 (0)