forked from UbuntuEvangelist/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relationunit.go
457 lines (418 loc) · 14.9 KB
/
relationunit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
// Copyright 2013 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package state
import (
stderrors "errors"
"fmt"
"strings"
"github.com/juju/errors"
"github.com/juju/names"
jujutxn "github.com/juju/txn"
"gopkg.in/juju/charm.v6-unstable"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/txn"
"github.com/juju/juju/network"
)
// RelationUnit holds information about a single unit in a relation, and
// allows clients to conveniently access unit-specific functionality.
type RelationUnit struct {
st *State
relation *Relation
unit *Unit
endpoint Endpoint
scope string
}
// Relation returns the relation associated with the unit.
func (ru *RelationUnit) Relation() *Relation {
return ru.relation
}
// Endpoint returns the relation endpoint that defines the unit's
// participation in the relation.
func (ru *RelationUnit) Endpoint() Endpoint {
return ru.endpoint
}
// PrivateAddress returns the private address of the unit.
func (ru *RelationUnit) PrivateAddress() (network.Address, error) {
return ru.unit.PrivateAddress()
}
// ErrCannotEnterScope indicates that a relation unit failed to enter its scope
// due to either the unit or the relation not being Alive.
var ErrCannotEnterScope = stderrors.New("cannot enter scope: unit or relation is not alive")
// ErrCannotEnterScopeYet indicates that a relation unit failed to enter its
// scope due to a required and pre-existing subordinate unit that is not Alive.
// Once that subordinate has been removed, a new one can be created.
var ErrCannotEnterScopeYet = stderrors.New("cannot enter scope yet: non-alive subordinate unit has not been removed")
// EnterScope ensures that the unit has entered its scope in the relation.
// When the unit has already entered its relation scope, EnterScope will report
// success but make no changes to state.
//
// Otherwise, assuming both the relation and the unit are alive, it will enter
// scope and create or overwrite the unit's settings in the relation according
// to the supplied map.
//
// If the unit is a principal and the relation has container scope, EnterScope
// will also create the required subordinate unit, if it does not already exist;
// this is because there's no point having a principal in scope if there is no
// corresponding subordinate to join it.
//
// Once a unit has entered a scope, it stays in scope without further
// intervention; the relation will not be able to become Dead until all units
// have departed its scopes.
func (ru *RelationUnit) EnterScope(settings map[string]interface{}) error {
db, closer := ru.st.newDB()
defer closer()
relationScopes, closer := db.GetCollection(relationScopesC)
defer closer()
// Verify that the unit is not already in scope, and abort without error
// if it is.
ruKey, err := ru.key(ru.unit.Name())
if err != nil {
return err
}
if count, err := relationScopes.FindId(ruKey).Count(); err != nil {
return err
} else if count != 0 {
return nil
}
// Collect the operations necessary to enter scope, as follows:
// * Check unit and relation state, and incref the relation.
// * TODO(fwereade): check unit status == params.StatusActive (this
// breaks a bunch of tests in a boring but noisy-to-fix way, and is
// being saved for a followup).
unitDocID, relationDocID := ru.unit.doc.DocID, ru.relation.doc.DocID
ops := []txn.Op{{
C: unitsC,
Id: unitDocID,
Assert: isAliveDoc,
}, {
C: relationsC,
Id: relationDocID,
Assert: isAliveDoc,
Update: bson.D{{"$inc", bson.D{{"unitcount", 1}}}},
}}
// * Create the unit settings in this relation, if they do not already
// exist; or completely overwrite them if they do. This must happen
// before we create the scope doc, because the existence of a scope doc
// is considered to be a guarantee of the existence of a settings doc.
settingsChanged := func() (bool, error) { return false, nil }
settingsColl, closer := db.GetCollection(settingsC)
defer closer()
if count, err := settingsColl.FindId(ruKey).Count(); err != nil {
return err
} else if count == 0 {
ops = append(ops, createSettingsOp(ruKey, settings))
} else {
var rop txn.Op
rop, settingsChanged, err = replaceSettingsOp(ru.st, ruKey, settings)
if err != nil {
return err
}
ops = append(ops, rop)
}
// * Create the scope doc.
rsDocID := ru.st.docID(ruKey)
ops = append(ops, txn.Op{
C: relationScopesC,
Id: rsDocID,
Assert: txn.DocMissing,
Insert: relationScopeDoc{
DocID: rsDocID,
Key: ruKey,
EnvUUID: ru.st.EnvironUUID(),
},
})
// * If the unit should have a subordinate, and does not, create it.
var existingSubName string
if subOps, subName, err := ru.subordinateOps(); err != nil {
return err
} else {
existingSubName = subName
ops = append(ops, subOps...)
}
// Now run the complete transaction, or figure out why we can't.
if err := ru.st.runTransaction(ops); err != txn.ErrAborted {
return err
}
if count, err := relationScopes.FindId(rsDocID).Count(); err != nil {
return err
} else if count != 0 {
// The scope document exists, so we're actually already in scope.
return nil
}
units, closer := db.GetCollection(unitsC)
defer closer()
relations, closer := db.GetCollection(relationsC)
defer closer()
// The relation or unit might no longer be Alive. (Note that there is no
// need for additional checks if we're trying to create a subordinate
// unit: this could fail due to the subordinate service's not being Alive,
// but this case will always be caught by the check for the relation's
// life (because a relation cannot be Alive if its services are not).)
if alive, err := isAliveWithSession(units, unitDocID); err != nil {
return err
} else if !alive {
return ErrCannotEnterScope
}
if alive, err := isAliveWithSession(relations, relationDocID); err != nil {
return err
} else if !alive {
return ErrCannotEnterScope
}
// Maybe a subordinate used to exist, but is no longer alive. If that is
// case, we will be unable to enter scope until that unit is gone.
if existingSubName != "" {
if alive, err := isAliveWithSession(units, existingSubName); err != nil {
return err
} else if !alive {
return ErrCannotEnterScopeYet
}
}
// It's possible that there was a pre-existing settings doc whose version
// has changed under our feet, preventing us from clearing it properly; if
// that is the case, something is seriously wrong (nobody else should be
// touching that doc under our feet) and we should bail out.
prefix := fmt.Sprintf("cannot enter scope for unit %q in relation %q: ", ru.unit, ru.relation)
if changed, err := settingsChanged(); err != nil {
return err
} else if changed {
return fmt.Errorf(prefix + "concurrent settings change detected")
}
// Apparently, all our assertions should have passed, but the txn was
// aborted: something is really seriously wrong.
return fmt.Errorf(prefix + "inconsistent state in EnterScope")
}
// subordinateOps returns any txn operations necessary to ensure sane
// subordinate state when entering scope. If a required subordinate unit
// exists and is Alive, its name will be returned as well; if one exists
// but is not Alive, ErrCannotEnterScopeYet is returned.
func (ru *RelationUnit) subordinateOps() ([]txn.Op, string, error) {
units, closer := ru.st.getCollection(unitsC)
defer closer()
if !ru.unit.IsPrincipal() || ru.endpoint.Scope != charm.ScopeContainer {
return nil, "", nil
}
related, err := ru.relation.RelatedEndpoints(ru.endpoint.ServiceName)
if err != nil {
return nil, "", err
}
if len(related) != 1 {
return nil, "", fmt.Errorf("expected single related endpoint, got %v", related)
}
serviceName, unitName := related[0].ServiceName, ru.unit.doc.Name
selSubordinate := bson.D{{"service", serviceName}, {"principal", unitName}}
var lDoc lifeDoc
if err := units.Find(selSubordinate).One(&lDoc); err == mgo.ErrNotFound {
service, err := ru.st.Service(serviceName)
if err != nil {
return nil, "", err
}
_, ops, err := service.addUnitOps(unitName, nil)
return ops, "", err
} else if err != nil {
return nil, "", err
} else if lDoc.Life != Alive {
return nil, "", ErrCannotEnterScopeYet
}
return []txn.Op{{
C: unitsC,
Id: lDoc.Id,
Assert: isAliveDoc,
}}, lDoc.Id, nil
}
// PrepareLeaveScope causes the unit to be reported as departed by watchers,
// but does not *actually* leave the scope, to avoid triggering relation
// cleanup.
func (ru *RelationUnit) PrepareLeaveScope() error {
relationScopes, closer := ru.st.getCollection(relationScopesC)
defer closer()
key, err := ru.key(ru.unit.Name())
if err != nil {
return err
}
if count, err := relationScopes.FindId(key).Count(); err != nil {
return err
} else if count == 0 {
return nil
}
ops := []txn.Op{{
C: relationScopesC,
Id: ru.st.docID(key),
Update: bson.D{{"$set", bson.D{{"departing", true}}}},
}}
return ru.st.runTransaction(ops)
}
// LeaveScope signals that the unit has left its scope in the relation.
// After the unit has left its relation scope, it is no longer a member
// of the relation; if the relation is dying when its last member unit
// leaves, it is removed immediately. It is not an error to leave a scope
// that the unit is not, or never was, a member of.
func (ru *RelationUnit) LeaveScope() error {
relationScopes, closer := ru.st.getCollection(relationScopesC)
defer closer()
key, err := ru.key(ru.unit.Name())
if err != nil {
return err
}
// The logic below is involved because we remove a dying relation
// with the last unit that leaves a scope in it. It handles three
// possible cases:
//
// 1. Relation is alive: just leave the scope.
//
// 2. Relation is dying, and other units remain: just leave the scope.
//
// 3. Relation is dying, and this is the last unit: leave the scope
// and remove the relation.
//
// In each of those cases, proper assertions are done to guarantee
// that the condition observed is still valid when the transaction is
// applied. If an abort happens, it observes the new condition and
// retries. In theory, a worst case will try at most all of the
// conditions once, because units cannot join a scope once its relation
// is dying.
//
// Keep in mind that in the first iteration of the loop it's possible
// to have a Dying relation with a smaller-than-real unit count, because
// Destroy changes the Life attribute in memory (units could join before
// the database is actually changed).
desc := fmt.Sprintf("unit %q in relation %q", ru.unit, ru.relation)
buildTxn := func(attempt int) ([]txn.Op, error) {
if attempt > 0 {
if err := ru.relation.Refresh(); errors.IsNotFound(err) {
return nil, jujutxn.ErrNoOperations
} else if err != nil {
return nil, err
}
}
count, err := relationScopes.FindId(key).Count()
if err != nil {
return nil, fmt.Errorf("cannot examine scope for %s: %v", desc, err)
} else if count == 0 {
return nil, jujutxn.ErrNoOperations
}
ops := []txn.Op{{
C: relationScopesC,
Id: ru.st.docID(key),
Assert: txn.DocExists,
Remove: true,
}}
if ru.relation.doc.Life == Alive {
ops = append(ops, txn.Op{
C: relationsC,
Id: ru.relation.doc.DocID,
Assert: bson.D{{"life", Alive}},
Update: bson.D{{"$inc", bson.D{{"unitcount", -1}}}},
})
} else if ru.relation.doc.UnitCount > 1 {
ops = append(ops, txn.Op{
C: relationsC,
Id: ru.relation.doc.DocID,
Assert: bson.D{{"unitcount", bson.D{{"$gt", 1}}}},
Update: bson.D{{"$inc", bson.D{{"unitcount", -1}}}},
})
} else {
relOps, err := ru.relation.removeOps("", ru.unit)
if err != nil {
return nil, err
}
ops = append(ops, relOps...)
}
return ops, nil
}
if err = ru.st.run(buildTxn); err != nil {
return fmt.Errorf("cannot leave scope for %s: %v", desc, err)
}
return nil
}
// InScope returns whether the relation unit has entered scope and not left it.
func (ru *RelationUnit) InScope() (bool, error) {
return ru.inScope(nil)
}
// Joined returns whether the relation unit has entered scope and neither left
// it nor prepared to leave it.
func (ru *RelationUnit) Joined() (bool, error) {
return ru.inScope(bson.D{{"departing", bson.D{{"$ne", true}}}})
}
// inScope returns whether a scope document exists satisfying the supplied
// selector.
func (ru *RelationUnit) inScope(sel bson.D) (bool, error) {
relationScopes, closer := ru.st.getCollection(relationScopesC)
defer closer()
key, err := ru.key(ru.unit.Name())
if err != nil {
return false, err
}
sel = append(sel, bson.D{{"_id", key}}...)
count, err := relationScopes.Find(sel).Count()
if err != nil {
return false, err
}
return count > 0, nil
}
// WatchScope returns a watcher which notifies of counterpart units
// entering and leaving the unit's scope.
func (ru *RelationUnit) WatchScope() *RelationScopeWatcher {
role := counterpartRole(ru.endpoint.Role)
scope := ru.scope + "#" + string(role)
return newRelationScopeWatcher(ru.st, scope, ru.unit.Name())
}
// Settings returns a Settings which allows access to the unit's settings
// within the relation.
func (ru *RelationUnit) Settings() (*Settings, error) {
key, err := ru.key(ru.unit.Name())
if err != nil {
return nil, err
}
return readSettings(ru.st, key)
}
// ReadSettings returns a map holding the settings of the unit with the
// supplied name within this relation. An error will be returned if the
// relation no longer exists, or if the unit's service is not part of the
// relation, or the settings are invalid; but mere non-existence of the
// unit is not grounds for an error, because the unit settings are
// guaranteed to persist for the lifetime of the relation, regardless
// of the lifetime of the unit.
func (ru *RelationUnit) ReadSettings(uname string) (m map[string]interface{}, err error) {
defer errors.DeferredAnnotatef(&err, "cannot read settings for unit %q in relation %q", uname, ru.relation)
if !names.IsValidUnit(uname) {
return nil, fmt.Errorf("%q is not a valid unit name", uname)
}
key, err := ru.key(uname)
if err != nil {
return nil, err
}
node, err := readSettings(ru.st, key)
if err != nil {
return nil, err
}
return node.Map(), nil
}
// key returns a string, based on the relation and the supplied unit name,
// which is used as a key for that unit within this relation in the settings,
// presence, and relationScopes collections.
func (ru *RelationUnit) key(uname string) (string, error) {
uparts := strings.Split(uname, "/")
sname := uparts[0]
ep, err := ru.relation.Endpoint(sname)
if err != nil {
return "", err
}
parts := []string{ru.scope, string(ep.Role), uname}
return strings.Join(parts, "#"), nil
}
// relationScopeDoc represents a unit which is in a relation scope.
// The relation, container, role, and unit are all encoded in the key.
type relationScopeDoc struct {
DocID string `bson:"_id"`
Key string `bson:"key"`
EnvUUID string `bson:"env-uuid"`
Departing bool
}
func (d *relationScopeDoc) unitName() string {
return unitNameFromScopeKey(d.Key)
}
func unitNameFromScopeKey(key string) string {
parts := strings.Split(key, "#")
return parts[len(parts)-1]
}