Skip to content

Commit

Permalink
Move all to tomb.v2, remove some un-needed uses.
Browse files Browse the repository at this point in the history
  • Loading branch information
Veebers committed May 31, 2018
1 parent 795b040 commit 06507ff
Show file tree
Hide file tree
Showing 46 changed files with 297 additions and 200 deletions.
20 changes: 16 additions & 4 deletions apiserver/common/firewall/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ type mockStringsWatcher struct {

func newMockStringsWatcher() *mockStringsWatcher {
w := &mockStringsWatcher{changes: make(chan []string, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand All @@ -205,7 +208,10 @@ func newMockNotifyWatcher() *mockNotifyWatcher {
w := &mockNotifyWatcher{changes: make(chan struct{}, 1)}
// Initial event
w.changes <- struct{}{}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down Expand Up @@ -299,7 +305,10 @@ func (r *mockRelation) WatchRelationEgressNetworks() state.StringsWatcher {

func newMockRelationUnitsWatcher() *mockRelationUnitsWatcher {
w := &mockRelationUnitsWatcher{changes: make(chan params.RelationUnitsChange, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down Expand Up @@ -423,7 +432,10 @@ type mockAddressWatcher struct {

func newMockAddressWatcher() *mockAddressWatcher {
w := &mockAddressWatcher{changes: make(chan struct{}, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down
11 changes: 7 additions & 4 deletions apiserver/common/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,18 @@ func NewMultiNotifyWatcher(w ...state.NotifyWatcher) *MultiNotifyWatcher {
for _, w := range w {
// Consume the first event of each watcher.
<-w.Changes()
m.tomb.Go(w.Wait)
m.tomb.Go(func() error {
defer wg.Done()
return w.Wait()
})
// Copy events from the watcher to the staging channel.
go copyEvents(staging, w.Changes(), &m.tomb)
}
go func() {
// defer m.tomb.Done()
m.tomb.Go(func() error {
m.loop(staging)
wg.Wait()
}()
return nil
})
return m
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type mockWatcher struct {

func (w *mockWatcher) doneWhenDying() {
<-w.Tomb.Dying()
w.Tomb.Done()
}

func (w *mockWatcher) Kill() {
Expand All @@ -95,7 +94,10 @@ type mockStringsWatcher struct {

func newMockStringsWatcher() *mockStringsWatcher {
w := &mockStringsWatcher{changes: make(chan []string, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down
6 changes: 3 additions & 3 deletions apiserver/facades/controller/crosscontroller/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ type mockNotifyWatcher struct {

func newMockNotifyWatcher() *mockNotifyWatcher {
w := &mockNotifyWatcher{changes: make(chan struct{}, 1)}
go func() {
defer w.tomb.Done()
w.tomb.Go(func() error {
<-w.tomb.Dying()
}()
return nil
})
return w
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ type mockStringsWatcher struct {

func newMockStringsWatcher() *mockStringsWatcher {
w := &mockStringsWatcher{changes: make(chan []string, 1)}
go w.loop()
w.tomb.Go(func() error {
w.loop()
return nil
})
return w
}

func (w *mockStringsWatcher) loop() {
defer w.tomb.Done()
<-w.tomb.Dying()
}

Expand Down
16 changes: 12 additions & 4 deletions apiserver/facades/controller/firewaller/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ type mockWatcher struct {

func (w *mockWatcher) doneWhenDying() {
<-w.Tomb.Dying()
w.Tomb.Done()
}

func (w *mockWatcher) Kill() {
Expand Down Expand Up @@ -156,7 +155,10 @@ type mockStringsWatcher struct {

func newMockStringsWatcher() *mockStringsWatcher {
w := &mockStringsWatcher{changes: make(chan []string, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand All @@ -169,7 +171,10 @@ func newMockNotifyWatcher() *mockNotifyWatcher {
w := &mockNotifyWatcher{changes: make(chan struct{}, 1)}
// Initial event
w.changes <- struct{}{}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down Expand Up @@ -246,7 +251,10 @@ func (r *mockRelation) SetStatus(info status.StatusInfo) error {

func newMockRelationUnitsWatcher() *mockRelationUnitsWatcher {
w := &mockRelationUnitsWatcher{changes: make(chan params.RelationUnitsChange, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down
11 changes: 8 additions & 3 deletions apiserver/facades/controller/remoterelations/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ type mockWatcher struct {

func (w *mockWatcher) doneWhenDying() {
<-w.Tomb.Dying()
w.Tomb.Done()
}

func (w *mockWatcher) Kill() {
Expand All @@ -430,7 +429,10 @@ type mockStringsWatcher struct {

func newMockStringsWatcher() *mockStringsWatcher {
w := &mockStringsWatcher{changes: make(chan []string, 1)}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand All @@ -448,7 +450,10 @@ func newMockRelationUnitsWatcher() *mockRelationUnitsWatcher {
w := &mockRelationUnitsWatcher{
changes: make(chan params.RelationUnitsChange, 1),
}
go w.doneWhenDying()
w.Tomb.Go(func() error {
w.doneWhenDying()
return nil
})
return w
}

Expand Down
3 changes: 3 additions & 0 deletions apiserver/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (pt *pingTimeout) Ping() {

// Stop terminates the ping timeout.
func (pt *pingTimeout) Stop() error {
if err := pt.tomb.Err(); err != tomb.ErrStillAlive {
return err
}
pt.tomb.Kill(nil)
return pt.tomb.Wait()
}
Expand Down
30 changes: 21 additions & 9 deletions cmd/jujud/agent/caasoperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/juju/names.v2"
"gopkg.in/juju/worker.v1"
"gopkg.in/tomb.v2"

"github.com/juju/juju/agent"
"github.com/juju/juju/cmd/jujud/agent/caasoperator"
Expand All @@ -36,13 +35,14 @@ var (
// CaasOperatorAgent is a cmd.Command responsible for running a CAAS operator agent.
type CaasOperatorAgent struct {
cmd.CommandBase
tomb tomb.Tomb
AgentConf
ApplicationName string
runner *worker.Runner
bufferedLogger *logsender.BufferedLogWriter
setupLogging func(agent.Config) error
ctx *cmd.Context
dead chan struct{}
errReason error

// Used to signal that the upgrade worker will not
// reboot the agent on startup because there are no
Expand All @@ -63,8 +63,9 @@ func NewCaasOperatorAgent(ctx *cmd.Context, bufferedLogger *logsender.BufferedLo
AgentConf: NewAgentConf(""),
ctx: ctx,
initialUpgradeCheckComplete: make(chan struct{}),
bufferedLogger: bufferedLogger,
prometheusRegistry: prometheusRegistry,
dead: make(chan struct{}),
bufferedLogger: bufferedLogger,
prometheusRegistry: prometheusRegistry,
}, nil
}

Expand Down Expand Up @@ -101,14 +102,27 @@ func (op *CaasOperatorAgent) Init(args []string) error {
return nil
}

// Wait waits for the CaasOperator agent to finish.
func (op *CaasOperatorAgent) Wait() error {
<-op.dead
return op.errReason
}

// Stop implements Worker.
func (op *CaasOperatorAgent) Stop() error {
op.runner.Kill()
return op.tomb.Wait()
return op.Wait()
}

// Finished signals the machine agent is finished
func (op *CaasOperatorAgent) Finished(err error) {
op.errReason = err
close(op.dead)
}

// Run implements Command.
func (op *CaasOperatorAgent) Run(ctx *cmd.Context) error {
func (op *CaasOperatorAgent) Run(ctx *cmd.Context) (err error) {
defer op.Finished(err)
if err := op.ReadConfig(op.Tag().String()); err != nil {
return err
}
Expand All @@ -118,9 +132,7 @@ func (op *CaasOperatorAgent) Run(ctx *cmd.Context) error {
}

op.runner.StartWorker("api", op.Workers)
err := cmdutil.AgentDone(logger, op.runner.Wait())
op.tomb.Kill(err)
return err
return cmdutil.AgentDone(logger, op.runner.Wait())
}

// Workers returns a dependency.Engine running the operator's responsibilities.
Expand Down
6 changes: 3 additions & 3 deletions cmd/jujud/agent/engine/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ type staticFlagWorker struct {
// whose Check method always returns the specified value.
func NewStaticFlagWorker(value bool) worker.Worker {
w := &staticFlagWorker{value: value}
go func() {
defer w.tomb.Kill(tomb.ErrDying)
w.tomb.Go(func() error {
<-w.tomb.Dying()
}()
return tomb.ErrDying
})
return w
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/jujud/agent/engine/valueworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ func NewValueWorker(value interface{}) (worker.Worker, error) {
w := &valueWorker{
value: value,
}
go func() {
w.tomb.Go(func() error {
<-w.tomb.Dying()
}()
return nil
})
return w, nil
}

Expand Down
25 changes: 16 additions & 9 deletions cmd/jujud/agent/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"gopkg.in/juju/worker.v1"
"gopkg.in/mgo.v2"
"gopkg.in/natefinch/lumberjack.v2"
"gopkg.in/tomb.v2"

"github.com/juju/juju/agent"
"github.com/juju/juju/agent/tools"
Expand Down Expand Up @@ -285,6 +284,7 @@ func NewMachineAgent(
configChangedVal: voyeur.NewValue(true),
bufferedLogger: bufferedLogger,
workersStarted: make(chan struct{}),
dead: make(chan struct{}),
runner: runner,
rootDir: rootDir,
initialUpgradeCheckComplete: gate.NewLock(),
Expand Down Expand Up @@ -331,7 +331,8 @@ func (a *MachineAgent) registerPrometheusCollectors() error {
type MachineAgent struct {
AgentConfigWriter

tomb tomb.Tomb
dead chan struct{}
errReason error
machineId string
runner *worker.Runner
rootDir string
Expand Down Expand Up @@ -362,13 +363,20 @@ type MachineAgent struct {

// Wait waits for the machine agent to finish.
func (a *MachineAgent) Wait() error {
return a.tomb.Wait()
<-a.dead
return a.errReason
}

// Stop stops the machine agent.
func (a *MachineAgent) Stop() error {
a.runner.Kill()
return a.tomb.Wait()
return a.Wait()
}

// Finished signals the machine agent is finished
func (a *MachineAgent) Finished(err error) {
a.errReason = err
close(a.dead)
}

// upgradeCertificateDNSNames ensure that the controller certificate
Expand Down Expand Up @@ -421,7 +429,8 @@ func upgradeCertificateDNSNames(config agent.ConfigSetter) error {
}

// Run runs a machine agent.
func (a *MachineAgent) Run(*cmd.Context) error {
func (a *MachineAgent) Run(*cmd.Context) (err error) {
defer a.Finished(err)
useMultipleCPUs()
if err := a.ReadConfig(a.Tag().String()); err != nil {
return errors.Errorf("cannot read agent configuration: %v", err)
Expand Down Expand Up @@ -458,7 +467,7 @@ func (a *MachineAgent) Run(*cmd.Context) error {

// At this point, all workers will have been configured to start
close(a.workersStarted)
err := a.runner.Wait()
err = a.runner.Wait()
switch errors.Cause(err) {
case jworker.ErrTerminateAgent:
err = a.uninstallAgent()
Expand All @@ -469,9 +478,7 @@ func (a *MachineAgent) Run(*cmd.Context) error {
logger.Infof("Caught shutdown error")
err = a.executeRebootOrShutdown(params.ShouldShutdown)
}
err = cmdutil.AgentDone(logger, err)
a.tomb.Kill(err)
return err
return cmdutil.AgentDone(logger, err)
}

func (a *MachineAgent) makeEngineCreator(previousAgentVersion version.Number) func() (worker.Worker, error) {
Expand Down
1 change: 0 additions & 1 deletion cmd/jujud/agent/machine/apiworkers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type mockWorker struct {

func (w *mockWorker) Kill() {
w.tomb.Kill(nil)
w.tomb.Done()
}

func (w *mockWorker) Wait() error {
Expand Down
Loading

0 comments on commit 06507ff

Please sign in to comment.