Skip to content

Commit

Permalink
Merge pull request juju#8391 from jameinel/2.3-into-develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jujubot authored Feb 17, 2018
2 parents 22584d1 + f672ec3 commit 75d0f22
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 73 deletions.
28 changes: 19 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,19 +795,15 @@ func (c *configInternal) APIInfo() (*api.Info, bool) {
}
servingInfo, isController := c.StateServingInfo()
addrs := c.apiDetails.addresses
// For controller we return only localhost - we should not connect
// to other controllers if we can talk locally.
if isController {
port := servingInfo.APIPort
// TODO(macgreagoir) IPv6. Ubuntu still always provides IPv4
// loopback, and when/if this changes localhost should resolve
// to IPv6 loopback in any case (lp:1644009). Review.
localAPIAddr := net.JoinHostPort("localhost", strconv.Itoa(port))
newAddrs := []string{localAPIAddr}
for _, addr := range addrs {
if addr != localAPIAddr {
newAddrs = append(newAddrs, addr)
}
}
addrs = newAddrs
addrs = []string{localAPIAddr}
}
return &api.Info{
Addrs: addrs,
Expand All @@ -825,13 +821,27 @@ func (c *configInternal) MongoInfo() (info *mongo.MongoInfo, ok bool) {
if !ok {
return nil, false
}
// We return localhost first and then all addresses of known API
// endpoints - this lets us connect to other Mongo instances and start
// state even if our own Mongo has not started yet (see lp:1749383 #1).
// TODO(macgreagoir) IPv6. Ubuntu still always provides IPv4 loopback,
// and when/if this changes localhost should resolve to IPv6 loopback
// in any case (lp:1644009). Review.
addr := net.JoinHostPort("localhost", strconv.Itoa(ssi.StatePort))
local := net.JoinHostPort("localhost", strconv.Itoa(ssi.StatePort))
addrs := []string{local}

for _, addr := range c.apiDetails.addresses {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, false
}
if host := net.JoinHostPort(host, strconv.Itoa(ssi.StatePort)); host != local {
addrs = append(addrs, host)
}
}
return &mongo.MongoInfo{
Info: mongo.Info{
Addrs: []string{addr},
Addrs: addrs,
CACert: c.caCert,
},
Password: c.statePassword,
Expand Down
24 changes: 7 additions & 17 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,42 +433,32 @@ func (*suite) TestAPIInfoMissingAddress(c *gc.C) {
c.Assert(ok, jc.IsFalse)
}

func (*suite) TestAPIInfoPutsLocalhostFirstWhenServingInfoPresent(c *gc.C) {
func (*suite) TestAPIInfoServesLocalhostOnlyWhenServingInfoPresent(c *gc.C) {
attrParams := attributeParams
attrParams.APIAddresses = []string{"localhost:1235", "localhost:1236"}
servingInfo := stateServingInfo()
conf, err := agent.NewStateMachineConfig(attrParams, servingInfo)
c.Assert(err, jc.ErrorIsNil)
apiinfo, ok := conf.APIInfo()
c.Assert(ok, jc.IsTrue)
c.Check(apiinfo.Addrs, gc.HasLen, len(attrParams.APIAddresses)+1)
c.Check(apiinfo.Addrs[0], gc.Equals, "localhost:47")
}

func (*suite) TestAPIInfoMovesLocalhostFirstWhenServingInfoPresent(c *gc.C) {
attrParams := attributeParams
attrParams.APIAddresses = []string{"localhost:1235", "localhost:47"}
servingInfo := stateServingInfo()
conf, err := agent.NewStateMachineConfig(attrParams, servingInfo)
c.Assert(err, jc.ErrorIsNil)
apiinfo, ok := conf.APIInfo()
c.Assert(ok, jc.IsTrue)
c.Check(apiinfo.Addrs, gc.HasLen, len(attrParams.APIAddresses))
c.Check(apiinfo.Addrs[0], gc.Equals, "localhost:47")
c.Check(apiinfo.Addrs, gc.DeepEquals, []string{"localhost:47"})
}

func (*suite) TestMongoInfo(c *gc.C) {
attrParams := attributeParams
attrParams.APIAddresses = []string{"foo.example:1235", "bar.example:1236", "localhost:88"}
servingInfo := stateServingInfo()
conf, err := agent.NewStateMachineConfig(attrParams, servingInfo)
c.Assert(err, jc.ErrorIsNil)
mongoInfo, ok := conf.MongoInfo()
c.Assert(ok, jc.IsTrue)
c.Check(mongoInfo.Info.Addrs, jc.DeepEquals, []string{"localhost:69"})
c.Check(mongoInfo.Info.Addrs, jc.DeepEquals, []string{"localhost:69", "foo.example:69", "bar.example:69"})
c.Check(mongoInfo.Info.DisableTLS, jc.IsFalse)
}

func (*suite) TestPromotedMongoInfo(c *gc.C) {
attrParams := attributeParams
attrParams.APIAddresses = []string{"foo.example:1235", "bar.example:1236", "localhost:88"}
conf, err := agent.NewAgentConfig(attrParams)
c.Assert(err, jc.ErrorIsNil)

Expand All @@ -483,7 +473,7 @@ func (*suite) TestPromotedMongoInfo(c *gc.C) {

mongoInfo, ok = conf.MongoInfo()
c.Assert(ok, jc.IsTrue)
c.Check(mongoInfo.Info.Addrs, jc.DeepEquals, []string{"localhost:69"})
c.Check(mongoInfo.Info.Addrs, jc.DeepEquals, []string{"localhost:69", "foo.example:69", "bar.example:69"})
c.Check(mongoInfo.Info.DisableTLS, jc.IsFalse)
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/juju/application/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,17 +1172,17 @@ func processSingleBundleOverlay(data *charm.BundleData, bundleOverlayFile string
// actually exist in the bundle data.
for appName, bc := range config.Applications {
app, found := data.Applications[appName]
if !found {
// Add it in.
data.Applications[appName] = bc
continue
}
// If bc is nil, that means to remove it from data.
if bc == nil {
delete(data.Applications, appName)
data.Relations = removeRelations(data.Relations, appName)
continue
}
if !found {
// Add it in.
data.Applications[appName] = bc
continue
}

fieldCheck := configCheck.Applications[appName]

Expand Down
14 changes: 14 additions & 0 deletions cmd/juju/application/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,20 @@ func (s *ProcessBundleOverlaySuite) TestRemoveApplication(c *gc.C) {
c.Assert(s.bundleData.Relations, gc.HasLen, 0)
}

func (s *ProcessBundleOverlaySuite) TestRemoveUnknownApplication(c *gc.C) {
config := `
applications:
unknown:
`
filename := s.writeFile(c, config)
err := processBundleOverlay(s.bundleData, filename)
c.Assert(err, jc.ErrorIsNil)
s.assertApplications(c, "django", "memcached")
c.Assert(s.bundleData.Relations, jc.DeepEquals, [][]string{
{"django", "memcached"},
})
}

func (s *ProcessBundleOverlaySuite) TestIncludes(c *gc.C) {
config := `
applications:
Expand Down
42 changes: 0 additions & 42 deletions featuretests/agent_caasoperator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
package featuretests

import (
"os"
"path/filepath"
"time"

"github.com/juju/cmd/cmdtesting"
jc "github.com/juju/testing/checkers"
"github.com/juju/utils"
Expand All @@ -24,7 +20,6 @@ import (
"github.com/juju/juju/testing"
coretesting "github.com/juju/juju/testing"
"github.com/juju/juju/tools"
"github.com/juju/juju/worker/caasoperator/commands"
"github.com/juju/juju/worker/dependency"
"github.com/juju/juju/worker/logsender"
)
Expand Down Expand Up @@ -97,43 +92,6 @@ func (s *CAASOperatorSuite) newBufferedLogWriter() *logsender.BufferedLogWriter
return logger
}

func waitForApplicationActive(c *gc.C, dataDir string) {
timeout := time.After(coretesting.LongWait)
for {
select {
case <-timeout:
c.Fatalf("no activity detected")
case <-time.After(coretesting.ShortWait):
agentBinaryDir := filepath.Join(dataDir, "tools")
link := filepath.Join(agentBinaryDir, commands.CommandNames()[0])
if _, err := os.Lstat(link); err == nil {
target, err := os.Readlink(link)
c.Assert(err, jc.ErrorIsNil)
c.Assert(target, gc.Equals, filepath.Join(agentBinaryDir, "jujud"))
return
}
}
}
}

func (s *CAASOperatorSuite) TestRunStop(c *gc.C) {
app, config, _ := s.primeAgent(c)
a := s.newAgent(c, app)
go func() { c.Check(a.Run(nil), gc.IsNil) }()
defer func() { c.Check(a.Stop(), gc.IsNil) }()
waitForApplicationActive(c, config.DataDir())
}

func (s *CAASOperatorSuite) TestOpenStateFails(c *gc.C) {
app, config, _ := s.primeAgent(c)
a := s.newAgent(c, app)
go func() { c.Check(a.Run(nil), gc.IsNil) }()
defer func() { c.Check(a.Stop(), gc.IsNil) }()
waitForApplicationActive(c, config.DataDir())

s.AssertCannotOpenState(c, config.Tag(), config.DataDir())
}

type CAASOperatorManifoldsFunc func(config caasoperator.ManifoldsConfig) dependency.Manifolds

func TrackCAASOperator(c *gc.C, tracker *agenttest.EngineTracker, inner CAASOperatorManifoldsFunc) CAASOperatorManifoldsFunc {
Expand Down
5 changes: 5 additions & 0 deletions state/presence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (w *Watcher) flush() {
return
case req := <-w.request:
w.handle(req)
// handle may append to the w.pending array, and/or it may unwatch something that was previously pending
// thus changing e.ch to nil while we are waiting to send the request. We need to make sure we are using
// the correct 'e' object
// See TestRobustness which fails if this line doesn't exist
e = &w.pending[i]
continue
case e.ch <- Change{e.key, e.alive}:
}
Expand Down
56 changes: 56 additions & 0 deletions state/presence/presence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package presence_test

import (
"fmt"
"strconv"
"sync"
"sync/atomic"
stdtesting "testing"
"time"

Expand Down Expand Up @@ -637,3 +640,56 @@ func (s *PresenceSuite) TestMultiplePingersForEntity(c *gc.C) {
w.Sync()
c.Check(w.BeingLoads(), gc.Equals, loads)
}

func (s *PresenceSuite) TestRobustness(c *gc.C) {
// There used to be a potential condition, where during a flush() we wait for a channel send, and while we're
// waiting for it, we would handle events, which might cause us to grow our pending array, which would realloc
// the slice. If while that happened the original watch was unwatched, then we nil the channel, but the object
// we were hung pending on part of the reallocated slice.
w := presence.NewWatcher(s.presence, s.modelTag)
defer assertStopped(c, w)
// Start a watch for changes to 'key'. Never listen for actual events on that channel, though, so we know flush()
// will always be blocked, but allowing other events while waiting to send that event.
rootKey := "key"
keyChan := make(chan presence.Change, 0)
w.Watch(rootKey, keyChan)
// Whenever we successfully watch in the main loop(), it starts a flush. We should now be able to build up more
// watches while waiting. Create enough of these that we know the slice gets reallocated
var wg sync.WaitGroup
defer wg.Wait()
var observed uint32
const numKeys = 10
for i := 0; i < numKeys; i++ {
k := fmt.Sprintf("k%d", i)
kChan := make(chan presence.Change, 0)
w.Watch("key", kChan)
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-kChan:
atomic.AddUint32(&observed, 1)
return
case <-time.After(testing.LongWait):
c.Fatalf("timed out waiting %s for %q to see its event", testing.LongWait, k)
}
}()
}
// None of them should actually have triggered, since the very first pending object has not been listened to
// And now we unwatch that object
time.Sleep(testing.ShortWait)
c.Check(atomic.LoadUint32(&observed), gc.Equals, uint32(0))
w.Unwatch(rootKey, keyChan)
// This should unblock all of them, and everything should go to observed
failTime := time.After(testing.LongWait)
o := atomic.LoadUint32(&observed)
for o != numKeys {
select {
case <-time.After(time.Millisecond):
o = atomic.LoadUint32(&observed)
case <-failTime:
c.Fatalf("only observed %d changes (expected %d) after %s time", atomic.LoadUint32(&observed), numKeys, testing.LongWait)
}
}
c.Check(atomic.LoadUint32(&observed), gc.Equals, uint32(numKeys))
}

0 comments on commit 75d0f22

Please sign in to comment.