Skip to content

Commit

Permalink
Merge pull request juju#8385 from jameinel/2.3-presence-1747367
Browse files Browse the repository at this point in the history
  • Loading branch information
jujubot authored Feb 15, 2018
2 parents 5f65e5e + ba26f5c commit d7a0009
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
5 changes: 5 additions & 0 deletions state/presence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (w *Watcher) flush() {
return
case req := <-w.request:
w.handle(req)
// handle may append to the w.pending array, and/or it may unwatch something that was previously pending
// thus changing e.ch to nil while we are waiting to send the request. We need to make sure we are using
// the correct 'e' object
// See TestRobustness which fails if this line doesn't exist
e = &w.pending[i]
continue
case e.ch <- Change{e.key, e.alive}:
}
Expand Down
56 changes: 56 additions & 0 deletions state/presence/presence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package presence_test

import (
"fmt"
"strconv"
"sync"
"sync/atomic"
stdtesting "testing"
"time"

Expand Down Expand Up @@ -637,3 +640,56 @@ func (s *PresenceSuite) TestMultiplePingersForEntity(c *gc.C) {
w.Sync()
c.Check(w.BeingLoads(), gc.Equals, loads)
}

func (s *PresenceSuite) TestRobustness(c *gc.C) {
// There used to be a potential condition, where during a flush() we wait for a channel send, and while we're
// waiting for it, we would handle events, which might cause us to grow our pending array, which would realloc
// the slice. If while that happened the original watch was unwatched, then we nil the channel, but the object
// we were hung pending on part of the reallocated slice.
w := presence.NewWatcher(s.presence, s.modelTag)
defer assertStopped(c, w)
// Start a watch for changes to 'key'. Never listen for actual events on that channel, though, so we know flush()
// will always be blocked, but allowing other events while waiting to send that event.
rootKey := "key"
keyChan := make(chan presence.Change, 0)
w.Watch(rootKey, keyChan)
// Whenever we successfully watch in the main loop(), it starts a flush. We should now be able to build up more
// watches while waiting. Create enough of these that we know the slice gets reallocated
var wg sync.WaitGroup
defer wg.Wait()
var observed uint32
const numKeys = 10
for i := 0; i < numKeys; i++ {
k := fmt.Sprintf("k%d", i)
kChan := make(chan presence.Change, 0)
w.Watch("key", kChan)
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-kChan:
atomic.AddUint32(&observed, 1)
return
case <-time.After(testing.LongWait):
c.Fatalf("timed out waiting %s for %q to see its event", testing.LongWait, k)
}
}()
}
// None of them should actually have triggered, since the very first pending object has not been listened to
// And now we unwatch that object
time.Sleep(testing.ShortWait)
c.Check(atomic.LoadUint32(&observed), gc.Equals, uint32(0))
w.Unwatch(rootKey, keyChan)
// This should unblock all of them, and everything should go to observed
failTime := time.After(testing.LongWait)
o := atomic.LoadUint32(&observed)
for o != numKeys {
select {
case <-time.After(time.Millisecond):
o = atomic.LoadUint32(&observed)
case <-failTime:
c.Fatalf("only observed %d changes (expected %d) after %s time", atomic.LoadUint32(&observed), numKeys, testing.LongWait)
}
}
c.Check(atomic.LoadUint32(&observed), gc.Equals, uint32(numKeys))
}

0 comments on commit d7a0009

Please sign in to comment.