-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlease.go
305 lines (255 loc) · 8.28 KB
/
lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
// Copyright 2014 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package lease
import (
"fmt"
"time"
"github.com/juju/errors"
"github.com/juju/loggo"
)
const (
// There are no blocking calls, so this can be long. We just don't
// want goroutines to hang around indefinitely, so notifications
// will time out after this value.
notificationTimeout = 1 * time.Minute
// This is a useful thing to know in several contexts.
maxDuration = time.Duration(1<<63 - 1)
)
var (
singleton *leaseManager
LeaseClaimDeniedErr = errors.New("lease claim denied")
NotLeaseOwnerErr = errors.Unauthorizedf("caller did not own lease for namespace")
logger = loggo.GetLogger("juju.lease")
)
func init() {
singleton = &leaseManager{
claimLease: make(chan Token),
releaseLease: make(chan releaseLeaseMsg),
leaseReleasedSub: make(chan leaseReleasedMsg),
copyOfTokens: make(chan []Token),
}
}
type leasePersistor interface {
WriteToken(string, Token) error
RemoveToken(id string) error
PersistedTokens() ([]Token, error)
}
// WorkerLoop returns a function which can be utilized within a
// worker.
func WorkerLoop(persistor leasePersistor) func(<-chan struct{}) error {
singleton.leasePersistor = persistor
return singleton.workerLoop
}
// Token represents a lease claim.
type Token struct {
Namespace, Id string
Expiration time.Time
}
// Manager returns a manager.
func Manager() *leaseManager {
// Guaranteed to be initialized because the init function runs
// first.
return singleton
}
//
// Messages for channels.
//
type releaseLeaseMsg struct {
Token *Token
Err error
}
type leaseReleasedMsg struct {
Watcher chan<- struct{}
ForNamespace string
}
type leaseManager struct {
leasePersistor leasePersistor
retrieveLease chan Token
claimLease chan Token
releaseLease chan releaseLeaseMsg
leaseReleasedSub chan leaseReleasedMsg
copyOfTokens chan []Token
}
// CopyOfLeaseTokens returns a copy of the lease tokens current held
// by the manager.
func (m *leaseManager) CopyOfLeaseTokens() []Token {
m.copyOfTokens <- nil
return <-m.copyOfTokens
}
// RetrieveLease returns the lease token currently stored for the
// given namespace.
func (m *leaseManager) RetrieveLease(namespace string) Token {
for _, tok := range m.CopyOfLeaseTokens() {
if tok.Namespace != namespace {
continue
}
return tok
}
return Token{}
}
// Claimlease claims a lease for the given duration for the given
// namespace and id. If the lease is already owned, a
// LeaseClaimDeniedErr will be returned. Either way the current lease
// owner's ID will be returned.
func (m *leaseManager) ClaimLease(namespace, id string, forDur time.Duration) (leaseOwnerId string, err error) {
token := Token{namespace, id, time.Now().Add(forDur)}
m.claimLease <- token
activeClaim := <-m.claimLease
leaseOwnerId = activeClaim.Id
if id != leaseOwnerId {
err = LeaseClaimDeniedErr
}
return leaseOwnerId, err
}
// ReleaseLease releases the lease held for namespace by id.
func (m *leaseManager) ReleaseLease(namespace, id string) (err error) {
token := Token{Namespace: namespace, Id: id}
m.releaseLease <- releaseLeaseMsg{Token: &token}
response := <-m.releaseLease
if err := response.Err; err != nil {
err = errors.Annotatef(response.Err, `could not release lease for namespace "%s", id "%s"`, namespace, id)
// Log errors so that we're aware they're happening, but don't
// burden the caller with dealing with an error if it's
// essential a no-op.
if errors.IsUnauthorized(err) {
logger.Warningf(err.Error())
return nil
}
return err
}
return nil
}
// LeaseReleasedNotifier returns a channel a caller can block on to be
// notified of when a lease is released for namespace. This channel is
// reusable, but will be closed if it does not respond within
// "notificationTimeout".
func (m *leaseManager) LeaseReleasedNotifier(namespace string) (notifier <-chan struct{}) {
watcher := make(chan struct{})
m.leaseReleasedSub <- leaseReleasedMsg{watcher, namespace}
return watcher
}
// workerLoop serializes all requests into a single thread.
func (m *leaseManager) workerLoop(stop <-chan struct{}) error {
// These data-structures are local to ensure they're only utilized
// within this thread-safe context.
releaseSubs := make(map[string][]chan<- struct{}, 0)
// Pull everything off our data-store & check for expirations.
leaseCache, err := populateTokenCache(m.leasePersistor)
if err != nil {
return err
}
nextExpiration := m.expireLeases(leaseCache, releaseSubs)
for {
select {
case <-stop:
return nil
case claim := <-m.claimLease:
lease := claimLease(leaseCache, claim)
if lease.Id != claim.Id {
m.claimLease <- lease
}
m.leasePersistor.WriteToken(lease.Namespace, lease)
if lease.Expiration.Before(nextExpiration) {
nextExpiration = lease.Expiration
}
m.claimLease <- lease
case claim := <-m.releaseLease:
var response releaseLeaseMsg
response.Err = releaseLease(leaseCache, claim.Token)
if response.Err != nil {
m.releaseLease <- response
}
// Unwind our layers from most volatile to least.
response.Err = m.leasePersistor.RemoveToken(claim.Token.Namespace)
m.releaseLease <- response
notifyOfRelease(releaseSubs[claim.Token.Namespace], claim.Token.Namespace)
case subscription := <-m.leaseReleasedSub:
subscribe(releaseSubs, subscription)
case <-m.copyOfTokens:
// create a copy of the lease cache for use by code
// external to our thread-safe context.
m.copyOfTokens <- copyTokens(leaseCache)
case <-time.After(nextExpiration.Sub(time.Now())):
nextExpiration = m.expireLeases(leaseCache, releaseSubs)
break
}
}
}
func (m *leaseManager) expireLeases(
cache map[string]Token,
subscribers map[string][]chan<- struct{},
) time.Time {
// Having just looped through all the leases we're holding, we can
// inform the caller of when the next expiration will occur.
nextExpiration := time.Now().Add(maxDuration)
for _, token := range cache {
if token.Expiration.After(time.Now()) {
// For the tokens that aren't expiring yet, find the
// minimum time we should wait before cleaning up again.
if nextExpiration.After(token.Expiration) {
nextExpiration = token.Expiration
fmt.Printf("Setting next expiration to %s\n", nextExpiration)
}
continue
}
logger.Infof(`Lease for namespace "%s" has expired.`, token.Namespace)
if err := releaseLease(cache, &token); err == nil {
notifyOfRelease(subscribers[token.Namespace], token.Namespace)
}
}
return nextExpiration
}
func copyTokens(cache map[string]Token) (copy []Token) {
for _, t := range cache {
copy = append(copy, t)
}
return copy
}
func claimLease(cache map[string]Token, claim Token) Token {
if active, ok := cache[claim.Namespace]; ok && active.Id != claim.Id {
return active
}
cache[claim.Namespace] = claim
logger.Infof(`"%s" obtained lease for "%s"`, claim.Id, claim.Namespace)
return claim
}
func releaseLease(cache map[string]Token, claim *Token) error {
if active, ok := cache[claim.Namespace]; !ok || active.Id != claim.Id {
return NotLeaseOwnerErr
}
delete(cache, claim.Namespace)
logger.Infof(`"%s" released lease for namespace "%s"`, claim.Id, claim.Namespace)
return nil
}
func subscribe(subMap map[string][]chan<- struct{}, subscription leaseReleasedMsg) {
subList := subMap[subscription.ForNamespace]
subList = append(subList, subscription.Watcher)
subMap[subscription.ForNamespace] = subList
}
func notifyOfRelease(subscribers []chan<- struct{}, namespace string) {
logger.Infof(`Notifying namespace "%s" subscribers that its lease has been released.`, namespace)
for _, subscriber := range subscribers {
// Spin off into go-routine so we don't rely on listeners to
// not block.
go func(subscriber chan<- struct{}) {
select {
case subscriber <- struct{}{}:
case <-time.After(notificationTimeout):
// TODO(kate): Remove this bad-citizen from the
// notifier's list.
logger.Warningf("A notification timed out after %s.", notificationTimeout)
}
}(subscriber)
}
}
func populateTokenCache(persistor leasePersistor) (map[string]Token, error) {
tokens, err := persistor.PersistedTokens()
if err != nil {
return nil, err
}
cache := make(map[string]Token)
for _, tok := range tokens {
cache[tok.Namespace] = tok
}
return cache, nil
}