-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathminimumunits.go
199 lines (187 loc) · 6.34 KB
/
minimumunits.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2012, 2013 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package state
import (
"github.com/juju/errors"
jujutxn "github.com/juju/txn"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/txn"
)
// minUnitsDoc keeps track of relevant changes on the service's MinUnits field
// and on the number of alive units for the service.
// A new document is created when MinUnits is set to a non zero value.
// A document is deleted when either the associated service is destroyed
// or MinUnits is restored to zero. The Revno is increased when either MinUnits
// for a service is increased or a unit is destroyed.
// TODO(frankban): the MinUnitsWatcher reacts to changes by sending events,
// each one describing one or more services. A worker reacts to those events
// ensuring the number of units for the service is never less than the actual
// alive units: new units are added if required.
type minUnitsDoc struct {
// ServiceName is safe to be used here in place of its globalKey, since
// the referred entity type is always the Service.
DocID string `bson:"_id"`
ServiceName string
EnvUUID string `bson:"env-uuid"`
Revno int
}
// SetMinUnits changes the number of minimum units required by the service.
func (s *Service) SetMinUnits(minUnits int) (err error) {
defer errors.DeferredAnnotatef(&err, "cannot set minimum units for service %q", s)
defer func() {
if err == nil {
s.doc.MinUnits = minUnits
}
}()
if minUnits < 0 {
return errors.New("cannot set a negative minimum number of units")
}
service := &Service{st: s.st, doc: s.doc}
// Removing the document never fails. Racing clients trying to create the
// document generate one failure, but the second attempt should succeed.
// If one client tries to update the document, and a racing client removes
// it, the former should be able to re-create the document in the second
// attempt. If the referred-to service advanced its life cycle to a not
// alive state, an error is returned after the first failing attempt.
buildTxn := func(attempt int) ([]txn.Op, error) {
if attempt > 0 {
if err := service.Refresh(); err != nil {
return nil, err
}
}
if service.doc.Life != Alive {
return nil, errors.New("service is no longer alive")
}
if minUnits == service.doc.MinUnits {
return nil, jujutxn.ErrNoOperations
}
return setMinUnitsOps(service, minUnits), nil
}
return s.st.run(buildTxn)
}
// setMinUnitsOps returns the operations required to set MinUnits on the
// service and to create/update/remove the minUnits document in MongoDB.
func setMinUnitsOps(service *Service, minUnits int) []txn.Op {
state := service.st
serviceName := service.Name()
ops := []txn.Op{{
C: servicesC,
Id: state.docID(serviceName),
Assert: isAliveDoc,
Update: bson.D{{"$set", bson.D{{"minunits", minUnits}}}},
}}
if service.doc.MinUnits == 0 {
return append(ops, txn.Op{
C: minUnitsC,
Id: state.docID(serviceName),
Assert: txn.DocMissing,
Insert: &minUnitsDoc{
ServiceName: serviceName,
EnvUUID: service.st.EnvironUUID(),
},
})
}
if minUnits == 0 {
return append(ops, minUnitsRemoveOp(state, serviceName))
}
if minUnits > service.doc.MinUnits {
op := minUnitsTriggerOp(state, serviceName)
op.Assert = txn.DocExists
return append(ops, op)
}
return ops
}
// minUnitsTriggerOp returns the operation required to increase the minimum
// units revno for the service in MongoDB, ignoring the case of document not
// existing. This is included in the operations performed when a unit is
// destroyed: if the document exists, then we need to update the Revno.
// If the service does not require a minimum number of units, then the
// operation is a noop.
func minUnitsTriggerOp(st *State, serviceName string) txn.Op {
return txn.Op{
C: minUnitsC,
Id: st.docID(serviceName),
Update: bson.D{{"$inc", bson.D{{"revno", 1}}}},
}
}
// minUnitsRemoveOp returns the operation required to remove the minimum
// units document from MongoDB.
func minUnitsRemoveOp(st *State, serviceName string) txn.Op {
return txn.Op{
C: minUnitsC,
Id: st.docID(serviceName),
Remove: true,
}
}
// MinUnits returns the minimum units count for the service.
func (s *Service) MinUnits() int {
return s.doc.MinUnits
}
// EnsureMinUnits adds new units if the service's MinUnits value is greater
// than the number of alive units.
func (s *Service) EnsureMinUnits() (err error) {
defer errors.DeferredAnnotatef(&err, "cannot ensure minimum units for service %q", s)
service := &Service{st: s.st, doc: s.doc}
for {
// Ensure the service is alive.
if service.doc.Life != Alive {
return errors.New("service is not alive")
}
// Exit without errors if the MinUnits for the service is not set.
if service.doc.MinUnits == 0 {
return nil
}
// Retrieve the number of alive units for the service.
aliveUnits, err := aliveUnitsCount(service)
if err != nil {
return err
}
// Calculate the number of required units to be added.
missing := service.doc.MinUnits - aliveUnits
if missing <= 0 {
return nil
}
name, ops, err := ensureMinUnitsOps(service)
if err != nil {
return err
}
// Add missing unit.
switch err := s.st.runTransaction(ops); err {
case nil:
// Assign the new unit.
unit, err := s.st.Unit(name)
if err != nil {
return err
}
if err := service.st.AssignUnit(unit, AssignNew); err != nil {
return err
}
// No need to proceed and refresh the service if this was the
// last/only missing unit.
if missing == 1 {
return nil
}
case txn.ErrAborted:
// Refresh the service and restart the loop.
default:
return err
}
if err := service.Refresh(); err != nil {
return err
}
}
}
// aliveUnitsCount returns the number a alive units for the service.
func aliveUnitsCount(service *Service) (int, error) {
units, closer := service.st.getCollection(unitsC)
defer closer()
query := bson.D{{"service", service.doc.Name}, {"life", Alive}}
return units.Find(query).Count()
}
// ensureMinUnitsOps returns the operations required to add a unit for the
// service in MongoDB and the name for the new unit. The resulting transaction
// will be aborted if the service document changes when running the operations.
func ensureMinUnitsOps(service *Service) (string, []txn.Op, error) {
asserts := bson.D{{"txn-revno", service.doc.TxnRevno}}
return service.addUnitOps("", asserts)
}