Skip to content

Commit

Permalink
Ensure DetailsRequests are only sent locally
Browse files Browse the repository at this point in the history
We don't want them to be forwarded to other controllers (each one is
running its own peergrouper).

Also fix an intermittent failure in the clusterer tests - use a channel
rather than a stub so we can wait for the message.
  • Loading branch information
babbageclunk committed Apr 12, 2018
1 parent fa4a17f commit 7492b9c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 9 deletions.
1 change: 1 addition & 0 deletions pubsub/apiserver/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const DetailsRequestTopic = "apiserver.details-request"
// to be sent.
type DetailsRequest struct {
Requester string `yaml:"requester"`
LocalOnly bool `yaml:"local-only"`
}

// ConnectTopic is the topic name for the published message
Expand Down
5 changes: 4 additions & 1 deletion worker/pubsub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func newSubscriber(config WorkerConfig) (*subscriber, error) {
sub.unsubServerDetails = unsub

// Ask for the current server details now that we're subscribed.
detailsRequest := apiserver.DetailsRequest{Requester: "pubsub-forwarder"}
detailsRequest := apiserver.DetailsRequest{
Requester: "pubsub-forwarder",
LocalOnly: true,
}
if _, err := config.Hub.Publish(apiserver.DetailsRequestTopic, detailsRequest); err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions worker/pubsub/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (s *SubscriberSuite) TestRequestsDetailsOnceSubscribed(c *gc.C) {
subscribed := make(chan apiserver.DetailsRequest)
s.config.Hub.Subscribe(apiserver.DetailsRequestTopic,
func(_ string, req apiserver.DetailsRequest, err error) {
c.Assert(err, jc.ErrorIsNil)
c.Check(err, jc.ErrorIsNil)
subscribed <- req
},
)
Expand All @@ -339,7 +339,7 @@ func (s *SubscriberSuite) TestRequestsDetailsOnceSubscribed(c *gc.C) {

select {
case req := <-subscribed:
c.Assert(req, gc.Equals, apiserver.DetailsRequest{Requester: "pubsub-forwarder"})
c.Assert(req, gc.Equals, apiserver.DetailsRequest{Requester: "pubsub-forwarder", LocalOnly: true})
case <-time.After(coretesting.LongWait):
c.Fatalf("timed out waiting for details request")
}
Expand Down
5 changes: 4 additions & 1 deletion worker/raft/raftclusterer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func NewWorker(config Config) (worker.Worker, error) {
return nil, errors.Annotate(err, "subscribing to apiserver details")
}
// Now that we're subscribed, request the current API server details.
req := apiserver.DetailsRequest{Requester: "raft-clusterer"}
req := apiserver.DetailsRequest{
Requester: "raft-clusterer",
LocalOnly: true,
}
if _, err := config.Hub.Publish(apiserver.DetailsRequestTopic, req); err != nil {
return nil, errors.Annotate(err, "requesting current apiserver details")
}
Expand Down
19 changes: 14 additions & 5 deletions worker/raft/raftclusterer/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/hashicorp/raft"
"github.com/juju/pubsub"
"github.com/juju/testing"
jc "github.com/juju/testing/checkers"
gc "gopkg.in/check.v1"
"gopkg.in/juju/names.v2"
Expand Down Expand Up @@ -77,19 +76,20 @@ func (s *WorkerValidationSuite) testValidateError(c *gc.C, f func(*raftclusterer
type WorkerSuite struct {
workerFixture
worker worker.Worker
stub testing.Stub
reqs chan apiserver.DetailsRequest
}

var _ = gc.Suite(&WorkerSuite{})

func (s *WorkerSuite) SetUpTest(c *gc.C) {
s.workerFixture.SetUpTest(c)

s.stub.ResetCalls()
s.reqs = make(chan apiserver.DetailsRequest, 10)
s.hub.Subscribe(
apiserver.DetailsRequestTopic,
func(topic string, req apiserver.DetailsRequest, err error) {
s.stub.AddCall("DetailsRequest", req, err)
c.Check(err, jc.ErrorIsNil)
s.reqs <- req
},
)

Expand Down Expand Up @@ -255,7 +255,16 @@ func (s *WorkerSuite) TestChangeLocalServer(c *gc.C) {
}

func (s *WorkerSuite) TestRequestsDetails(c *gc.C) {
s.stub.CheckCall(c, 0, "DetailsRequest", apiserver.DetailsRequest{Requester: "raft-clusterer"}, nil)
// The worker is started in SetUpTest.
select {
case req := <-s.reqs:
c.Assert(req, gc.Equals, apiserver.DetailsRequest{
Requester: "raft-clusterer",
LocalOnly: true,
})
case <-time.After(coretesting.LongWait):
c.Fatalf("timed out waiting for details request")
}
}

func (s *WorkerSuite) publishDetails(c *gc.C, details apiserver.Details) {
Expand Down

0 comments on commit 7492b9c

Please sign in to comment.