-
Notifications
You must be signed in to change notification settings - Fork 0
/
shared.go
154 lines (136 loc) · 4.76 KB
/
shared.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2018 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package apiserver
import (
"sync"
"time"
"github.com/juju/collections/set"
"github.com/juju/errors"
"github.com/juju/loggo"
jujucontroller "github.com/juju/juju/controller"
"github.com/juju/juju/core/cache"
"github.com/juju/juju/core/lease"
"github.com/juju/juju/core/multiwatcher"
"github.com/juju/juju/core/presence"
"github.com/juju/juju/pubsub/controller"
"github.com/juju/juju/state"
)
// SharedHub represents the methods of the pubsub.StructuredHub
// that are used. The context uses an interface to allow mocking
// of the hub.
type SharedHub interface {
Publish(topic string, data interface{}) (<-chan struct{}, error)
Subscribe(topic string, handler interface{}) (func(), error)
}
// sharedServerContext contains a number of components that are unchangeable in the API server.
// These components need to be exposed through the facade.Context. Instead of having the methods
// of newAPIHandler and newAPIRoot take ever increasing numbers of parameters, they will instead
// have a pointer to the sharedServerContext.
//
// All attributes in the context should be goroutine aware themselves, like the state pool, hub, and
// presence, or protected and only accessed through methods on this context object.
type sharedServerContext struct {
statePool *state.StatePool
controller *cache.Controller
multiwatcherFactory multiwatcher.Factory
centralHub SharedHub
presence presence.Recorder
leaseManager lease.Manager
logger loggo.Logger
cancel <-chan struct{}
configMutex sync.RWMutex
controllerConfig jujucontroller.Config
features set.Strings
unsubscribe func()
}
type sharedServerConfig struct {
statePool *state.StatePool
controller *cache.Controller
multiwatcherFactory multiwatcher.Factory
centralHub SharedHub
presence presence.Recorder
leaseManager lease.Manager
controllerConfig jujucontroller.Config
logger loggo.Logger
}
func (c *sharedServerConfig) validate() error {
if c.statePool == nil {
return errors.NotValidf("nil statePool")
}
if c.controller == nil {
return errors.NotValidf("nil controller")
}
if c.multiwatcherFactory == nil {
return errors.NotValidf("nil multiwatcherFactory")
}
if c.centralHub == nil {
return errors.NotValidf("nil centralHub")
}
if c.presence == nil {
return errors.NotValidf("nil presence")
}
if c.leaseManager == nil {
return errors.NotValidf("nil leaseManager")
}
if c.controllerConfig == nil {
return errors.NotValidf("nil controllerConfig")
}
return nil
}
func newSharedServerContext(config sharedServerConfig) (*sharedServerContext, error) {
if err := config.validate(); err != nil {
return nil, errors.Trace(err)
}
ctx := &sharedServerContext{
statePool: config.statePool,
controller: config.controller,
multiwatcherFactory: config.multiwatcherFactory,
centralHub: config.centralHub,
presence: config.presence,
leaseManager: config.leaseManager,
logger: config.logger,
controllerConfig: config.controllerConfig,
}
ctx.features = config.controllerConfig.Features()
// We are able to get the current controller config before subscribing to changes
// because the changes are only ever published in response to an API call, and
// this function is called in the newServer call to create the API server,
// and we know that we can't make any API calls until the server has started.
unsubscribe, err := ctx.centralHub.Subscribe(controller.ConfigChanged, ctx.onConfigChanged)
if err != nil {
ctx.logger.Criticalf("programming error in subscribe function: %v", err)
return nil, errors.Trace(err)
}
ctx.unsubscribe = unsubscribe
return ctx, nil
}
func (c *sharedServerContext) Close() {
c.unsubscribe()
}
func (c *sharedServerContext) onConfigChanged(topic string, data controller.ConfigChangedMessage, err error) {
if err != nil {
c.logger.Criticalf("programming error in %s message data: %v", topic, err)
return
}
features := data.Config.Features()
c.configMutex.Lock()
c.controllerConfig = data.Config
removed := c.features.Difference(features)
added := features.Difference(c.features)
c.features = features
values := features.SortedValues()
c.configMutex.Unlock()
if removed.Size() != 0 || added.Size() != 0 {
c.logger.Infof("updating features to %v", values)
}
}
func (c *sharedServerContext) featureEnabled(flag string) bool {
c.configMutex.RLock()
defer c.configMutex.RUnlock()
return c.features.Contains(flag)
}
func (c *sharedServerContext) maxDebugLogDuration() time.Duration {
c.configMutex.RLock()
defer c.configMutex.RUnlock()
return c.controllerConfig.MaxDebugLogDuration()
}