Skip to content

Commit

Permalink
Broken test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Veebers committed Apr 23, 2018
1 parent d238cee commit c4f5bea
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/juju/errors"
"github.com/juju/utils/set"
"gopkg.in/juju/names.v2"
"gopkg.in/tomb.v1"
"gopkg.in/tomb.v2"

"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
Expand Down Expand Up @@ -54,13 +54,12 @@ func (fw Watchers) WatchMachineManagedFilesystems(m names.MachineTag) state.Stri
modelVolumesAttached: make(set.Tags),
modelVolumeFilesystems: make(map[names.VolumeTag]names.FilesystemTag),
}
go func() {
defer w.tomb.Done()
w.tomb.Go(func() error {
defer watcher.Stop(w.machineFilesystems, &w.tomb)
defer watcher.Stop(w.modelFilesystems, &w.tomb)
defer watcher.Stop(w.modelVolumeAttachments, &w.tomb)
w.tomb.Kill(w.loop())
}()
return w.loop()
})
return w
}

Expand Down Expand Up @@ -232,13 +231,13 @@ func (fw Watchers) WatchMachineManagedFilesystemAttachments(m names.MachineTag)
modelVolumesAttached: make(set.Tags),
modelVolumeFilesystemAttachments: make(map[names.VolumeTag]string),
}
go func() {
defer w.tomb.Done()

w.tomb.Go(func() error {
defer watcher.Stop(w.machineFilesystemAttachments, &w.tomb)
defer watcher.Stop(w.modelFilesystemAttachments, &w.tomb)
defer watcher.Stop(w.modelVolumeAttachments, &w.tomb)
w.tomb.Kill(w.loop())
}()
return w.loop()
})
return w
}

Expand Down Expand Up @@ -396,11 +395,10 @@ func newFilteredStringsWatcher(w state.StringsWatcher, filter func(string) (bool
w: w,
filter: filter,
}
go func() {
defer fw.tomb.Done()
fw.tomb.Go(func() error {
defer watcher.Stop(fw.w, &fw.tomb)
fw.tomb.Kill(fw.loop())
}()
return fw.loop()
})
return fw
}

Expand Down
9 changes: 4 additions & 5 deletions mongo/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/juju/errors"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/tomb.v1"
"gopkg.in/tomb.v2"
)

// OplogDoc represents a document in the oplog.rs collection.
Expand Down Expand Up @@ -166,14 +166,13 @@ func NewOplogTailer(
initialTs: NewMongoTimestamp(initialTs),
outCh: make(chan *OplogDoc),
}
go func() {
t.tomb.Go(func() error {
defer func() {
close(t.outCh)
t.tomb.Done()
session.Close()
}()
t.tomb.Kill(t.loop())
}()
return t.loop()
})
return t
}

Expand Down
91 changes: 0 additions & 91 deletions state/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,6 @@ func newLifecycleWatcher(
life: make(map[string]Life),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -637,11 +632,6 @@ func newMinUnitsWatcher(backend modelBackend) StringsWatcher {
known: make(map[string]int),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -797,11 +787,6 @@ func newRelationScopeWatcher(backend modelBackend, scope, ignore string) *Relati
ignore: ignore,
out: make(chan *RelationScopeChange),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -971,10 +956,6 @@ func newRelationUnitsWatcher(backend modelBackend, sw *RelationScopeWatcher) Rel
updates: make(chan watcher.Change),
out: make(chan params.RelationUnitsChange),
}
// go func() {
// defer w.finish()
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer w.finish()
return w.loop()
Expand Down Expand Up @@ -1155,11 +1136,6 @@ func newRelationLifeSuspendedWatcher(
transform: transform,
lifeSuspended: make(map[string]relationLifeSuspended),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -1345,11 +1321,6 @@ func newUnitsWatcher(backend modelBackend, tag names.Tag, getUnits func() ([]str
in: make(chan watcher.Change),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop(coll, id))
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop(coll, id)
Expand Down Expand Up @@ -1693,12 +1664,6 @@ func newDocWatcher(backend modelBackend, docKeys []docKey) NotifyWatcher {
commonWatcher: newCommonWatcher(backend),
out: make(chan struct{}),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop(docKeys))
// }(
// )
w.tomb.Go(func() error {
defer close(w.out)
return w.loop(docKeys)
Expand Down Expand Up @@ -1791,11 +1756,6 @@ func newMachineUnitsWatcher(m *Machine) StringsWatcher {
known: make(map[string]Life),
machine: &Machine{st: m.st, doc: m.doc}, // Copy so it may be freely refreshed
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -1945,11 +1905,6 @@ func newMachineAddressesWatcher(m *Machine) NotifyWatcher {
out: make(chan struct{}),
machine: &Machine{st: m.st, doc: m.doc}, // Copy so it may be freely refreshed
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -2026,11 +1981,6 @@ func newActionStatusWatcher(backend modelBackend, receivers []ActionReceiver, st
statusFilter: statusInCollectionOp(statusSet...),
}

// go func() {
// defer w.tomb.Done()
// defer close(w.sink)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.sink)
return w.loop()
Expand Down Expand Up @@ -2252,12 +2202,6 @@ func newCollectionWatcher(backend modelBackend, cfg colWCfg) StringsWatcher {
sink: make(chan []string),
}

// go func() {
// defer w.tomb.Done()
// defer close(w.sink)
// defer close(w.source)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.sink)
defer close(w.source)
Expand Down Expand Up @@ -2538,11 +2482,6 @@ func newOpenedPortsWatcher(backend modelBackend) StringsWatcher {
known: make(map[string]int64),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -2697,11 +2636,6 @@ func newBlockDevicesWatcher(backend modelBackend, machineId string) NotifyWatche
machineId: machineId,
out: make(chan struct{}),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -2772,11 +2706,6 @@ func newMigrationActiveWatcher(st *State) NotifyWatcher {
id: st.ModelUUID(),
sink: make(chan struct{}),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.sink)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.sink)
return w.loop()
Expand Down Expand Up @@ -2864,11 +2793,6 @@ func newNotifyCollWatcher(backend modelBackend, collName string, filter func(int
filter: filter,
sink: make(chan struct{}),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.sink)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.sink)
return w.loop()
Expand Down Expand Up @@ -3037,11 +2961,6 @@ func newrelationNetworksWatcher(st modelBackend, relationKey, direction string)
knownCidrs: set.NewStrings(),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -3152,11 +3071,6 @@ func newExternalControllersWatcher(st *State) StringsWatcher {
coll: collFactory(st.db(), externalControllersC),
out: make(chan []string),
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down Expand Up @@ -3263,11 +3177,6 @@ func newContainerAddressesWatcher(u *Unit) NotifyWatcher {
out: make(chan struct{}),
unit: u,
}
// go func() {
// defer w.tomb.Done()
// defer close(w.out)
// w.tomb.Kill(w.loop())
// }()
w.tomb.Go(func() error {
defer close(w.out)
return w.loop()
Expand Down
3 changes: 3 additions & 0 deletions state/watcher/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package watcher

import (
"fmt"

"github.com/juju/errors"
"gopkg.in/tomb.v2"
)
Expand All @@ -28,6 +30,7 @@ func Stop(w Stopper, t *tomb.Tomb) {
// wrap any other error.
err = errors.Trace(err)
}
fmt.Printf(">> Error is: %s\n", err)
t.Kill(err)
}
}
Expand Down
11 changes: 6 additions & 5 deletions watcher/legacy/notifyworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package legacy

import (
worker "gopkg.in/juju/worker.v1"
"gopkg.in/tomb.v1"
"gopkg.in/tomb.v2"

"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
Expand Down Expand Up @@ -50,10 +50,11 @@ func NewNotifyWorker(handler NotifyWatchHandler) worker.Worker {
handler: handler,
}

go func() {
defer nw.tomb.Done()
nw.tomb.Kill(nw.loop())
}()
// go func() {
// defer nw.tomb.Done()
// nw.tomb.Kill(nw.loop())
// }()
nw.tomb.Go(nw.loop)
return nw
}

Expand Down
8 changes: 7 additions & 1 deletion watcher/legacy/notifyworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ import (
"sync"
"time"

"github.com/juju/loggo"
jc "github.com/juju/testing/checkers"
gc "gopkg.in/check.v1"
worker "gopkg.in/juju/worker.v1"
"gopkg.in/tomb.v1"
"gopkg.in/tomb.v2"

"github.com/juju/juju/state"
"github.com/juju/juju/state/watcher"
coretesting "github.com/juju/juju/testing"
"github.com/juju/juju/watcher/legacy"
)

var logger = loggo.GetLogger("juju.watcher.legacy")

type NotifyWorkerSuite struct {
coretesting.BaseSuite
worker worker.Worker
Expand Down Expand Up @@ -143,9 +146,11 @@ func (tnw *testNotifyWatcher) Err() error {
}

func (tnw *testNotifyWatcher) Stop() error {
logger.Criticalf(">> Stop!")
tnw.mu.Lock()
defer tnw.mu.Unlock()
if !tnw.stopped {
logger.Criticalf(">> Stopping changes.")
close(tnw.changes)
}
tnw.stopped = true
Expand All @@ -154,6 +159,7 @@ func (tnw *testNotifyWatcher) Stop() error {

func (tnw *testNotifyWatcher) SetStopError(err error) {
tnw.mu.Lock()
logger.Criticalf(">> setting error: %s", err)
tnw.stopError = err
tnw.mu.Unlock()
}
Expand Down
Loading

0 comments on commit c4f5bea

Please sign in to comment.