Skip to content

Commit

Permalink
Refactor the peergrouper to use clock.Clock, and publish an event ove…
Browse files Browse the repository at this point in the history
…r pubsub when the collection of apiservers change.
  • Loading branch information
howbazaar committed Dec 7, 2016
1 parent bd97666 commit 4966a89
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmd/jujud/agent/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (a *MachineAgent) startStateWorkers(st *state.State) (worker.Worker, error)
return nil, errors.Annotate(err, "getting environ from state")
}
supportsSpaces := environs.SupportsSpaces(env)
w, err := peergrouperNew(st, supportsSpaces)
w, err := peergrouperNew(st, clock.WallClock, supportsSpaces, a.centralHub)
if err != nil {
return nil, errors.Annotate(err, "cannot start peergrouper worker")
}
Expand Down
21 changes: 21 additions & 0 deletions pubsub/apiserver/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package apiserver

// DetailsTopic is the topic name for the published message when the details of
// the api servers change. This message is normally published by the peergrouper
// when the set of API servers changes.
const DetailsTopic = "apiserver.details"

// APIServer defines a single api server machine.
type APIServer struct {
Id string `yaml:"id"`
Addresses []string `yaml:"addresses"`
}

// Details defines the message structure for the apiserver.details topic.
type Details struct {
Servers []APIServer `yaml:"servers"`
LocalOnly bool `yaml:"local-only"`
}
65 changes: 46 additions & 19 deletions worker/peergrouper/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (

"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
"github.com/juju/replicaset"
"github.com/juju/utils/clock"

"github.com/juju/juju/instance"
"github.com/juju/juju/network"
"github.com/juju/juju/pubsub/apiserver"
"github.com/juju/juju/state"
"github.com/juju/juju/status"
"github.com/juju/juju/worker"
Expand Down Expand Up @@ -76,6 +79,12 @@ var (
pollInterval = 1 * time.Minute
)

// Hub defines the only method of the apiserver centralhub that
// the peer grouper uses.
type Hub interface {
Publish(topic pubsub.Topic, data interface{}) (<-chan struct{}, error)
}

// pgWorker is a worker which watches the controller machines in state
// as well as the MongoDB replicaset configuration, adding and
// removing controller machines as they change or are added and
Expand All @@ -85,7 +94,8 @@ type pgWorker struct {

// st represents the State. It is an interface so we can swap
// out the implementation during testing.
st stateInterface
st stateInterface
clock clock.Clock

// machineChanges receives events from the machineTrackers when
// controller machines change in ways that are relevant to the
Expand All @@ -101,11 +111,15 @@ type pgWorker struct {
publisher publisherInterface

providerSupportsSpaces bool

// hub is the central hub of the apiserver, and is used to pusblish the
// details of the api servers.
hub Hub
}

// New returns a new worker that maintains the mongo replica set
// with respect to the given state.
func New(st *state.State, supportsSpaces bool) (worker.Worker, error) {
func New(st *state.State, clock clock.Clock, supportsSpaces bool, hub Hub) (worker.Worker, error) {
cfg, err := st.ControllerConfig()
if err != nil {
return nil, err
Expand All @@ -115,16 +129,18 @@ func New(st *state.State, supportsSpaces bool) (worker.Worker, error) {
mongoPort: cfg.StatePort(),
apiPort: cfg.APIPort(),
}
return newWorker(shim, newPublisher(st), supportsSpaces)
return newWorker(shim, clock, newPublisher(st), supportsSpaces, hub)
}

func newWorker(st stateInterface, pub publisherInterface, supportsSpaces bool) (worker.Worker, error) {
func newWorker(st stateInterface, clock clock.Clock, pub publisherInterface, supportsSpaces bool, hub Hub) (worker.Worker, error) {
w := &pgWorker{
st: st,
clock: clock,
machineChanges: make(chan struct{}),
machineTrackers: make(map[string]*machineTracker),
publisher: pub,
providerSupportsSpaces: supportsSpaces,
hub: hub,
}
err := catacomb.Invoke(catacomb.Plan{
Site: &w.catacomb,
Expand Down Expand Up @@ -158,24 +174,25 @@ func (w *pgWorker) loop() error {
case <-w.catacomb.Dying():
return w.catacomb.ErrDying()
case <-controllerChanges:
logger.Tracef("<-controllerChanges")
changed, err := w.updateControllerMachines()
if err != nil {
return errors.Trace(err)
}
if changed {
// A controller machine was added or removed, update
// the replica set immediately.
// TODO(fwereade): 2016-03-17 lp:1558657
updateChan = time.After(0)
updateChan = w.clock.After(0)
}

case <-w.machineChanges:
logger.Tracef("<-w.machineChanges")
// One of the controller machines changed, update the
// replica set immediately.
// TODO(fwereade): 2016-03-17 lp:1558657
updateChan = time.After(0)
updateChan = w.clock.After(0)

case <-updateChan:
logger.Tracef("<-updateChan")
ok := true
servers, instanceIds, err := w.apiPublishInfo()
if err != nil {
Expand All @@ -196,22 +213,24 @@ func (w *pgWorker) loop() error {
// Update the replica set members occasionally
// to keep them up to date with the current
// replica set member statuses.
// TODO(fwereade): 2016-03-17 lp:1558657
updateChan = time.After(pollInterval)
updateChan = w.clock.After(pollInterval)
retryInterval = initialRetryInterval
} else {
// TODO(fwereade): 2016-03-17 lp:1558657
updateChan = time.After(retryInterval)
retryInterval *= 2
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
updateChan = w.clock.After(retryInterval)
retryInterval = scaleRetry(retryInterval)
}

}
}
}

func scaleRetry(value time.Duration) time.Duration {
value *= 2
if value > maxRetryInterval {
value = maxRetryInterval
}
return value
}

// watchForControllerChanges starts two watchers pertaining to changes
// to the controllers, returning a channel which will receive events
// if either watcher fires.
Expand Down Expand Up @@ -317,20 +336,28 @@ func inStrings(t string, ss []string) bool {
}

func (w *pgWorker) apiPublishInfo() ([][]network.HostPort, []instance.Id, error) {
details := apiserver.Details{LocalOnly: true}
servers := make([][]network.HostPort, 0, len(w.machineTrackers))
instanceIds := make([]instance.Id, 0, len(w.machineTrackers))
for _, m := range w.machineTrackers {
if len(m.APIHostPorts()) == 0 {
hostPorts := m.APIHostPorts()
server := apiserver.APIServer{Id: m.Id()}
if len(hostPorts) == 0 {
continue
}
for _, hp := range network.FilterUnusableHostPorts(hostPorts) {
server.Addresses = append(server.Addresses, hp.String())
}

instanceId, err := m.stm.InstanceId()
if err != nil {
return nil, nil, err
}
instanceIds = append(instanceIds, instanceId)
servers = append(servers, m.APIHostPorts())

details.Servers = append(details.Servers, server)
}
w.hub.Publish(apiserver.DetailsTopic, details)
return servers, instanceIds, nil
}

Expand Down
Loading

0 comments on commit 4966a89

Please sign in to comment.