Skip to content

Commit

Permalink
Update the pubsub dependency.
Browse files Browse the repository at this point in the history
  • Loading branch information
howbazaar committed Mar 31, 2017
1 parent 5f4de65 commit 0ed12fa
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 24 deletions.
4 changes: 2 additions & 2 deletions api/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ func (s *PubSubIntegrationSuite) connect(c *gc.C) apipubsub.MessageWriter {

func (s *PubSubIntegrationSuite) TestMessages(c *gc.C) {
writer := s.connect(c)
var topic pubsub.Topic = "test.message"
topic := "test.message"
messages := []map[string]interface{}{}
done := make(chan struct{})
_, err := s.hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, payload map[string]interface{}) {
_, err := s.hub.SubscribeMatch(pubsub.MatchAll, func(t string, payload map[string]interface{}) {
c.Check(t, gc.Equals, topic)
messages = append(messages, payload)
if len(messages) == 2 {
Expand Down
5 changes: 2 additions & 3 deletions apiserver/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/pubsub"
"github.com/juju/utils/featureflag"

"github.com/juju/juju/apiserver/common"
Expand All @@ -21,7 +20,7 @@ import (
// Hub defines the publish method that the handler uses to publish
// messages on the centralhub of the apiserver.
type Hub interface {
Publish(pubsub.Topic, interface{}) (<-chan struct{}, error)
Publish(string, interface{}) (<-chan struct{}, error)
}

func newPubSubHandler(h httpContext, hub Hub) http.Handler {
Expand Down Expand Up @@ -102,7 +101,7 @@ func (h *pubsubHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
case m := <-messageCh:
logger.Tracef("topic: %q, data: %v", m.Topic, m.Data)
_, err := h.hub.Publish(pubsub.Topic(m.Topic), m.Data)
_, err := h.hub.Publish(m.Topic, m.Data)
if err != nil {
logger.Errorf("publish failed: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions apiserver/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ func (s *pubsubSuite) TestMessage(c *gc.C) {
done := make(chan struct{})
loggo.GetLogger("pubsub").SetLogLevel(loggo.TRACE)
loggo.GetLogger("juju.apiserver").SetLogLevel(loggo.TRACE)
_, err := s.hub.Subscribe(pubsub.MatchAll, func(topic pubsub.Topic, data map[string]interface{}) {
_, err := s.hub.SubscribeMatch(pubsub.MatchAll, func(topic string, data map[string]interface{}) {
c.Logf("topic: %q, data: %v", topic, data)
messages = append(messages, params.PubSubMessage{
Topic: string(topic),
Topic: topic,
Data: data,
})
done <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion dependencies.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ github.com/juju/loggo git 21bc4c63e8b435779a080e39e592969b7b90b889 2017-02-22T12
github.com/juju/mempool git 24974d6c264fe5a29716e7d56ea24c4bd904b7cc 2016-02-05T10:49:27Z
github.com/juju/mutex git 59c26ee163447c5c57f63ff71610d433862013de 2016-06-17T01:09:07Z
github.com/juju/persistent-cookiejar git 5243747bf8f2d0897f6c7a52799327dc97d585e8 2016-11-15T13:33:28Z
github.com/juju/pubsub git 9dcaca7eb4340dbf685aa7b3ad4cc4f8691a33d4 2016-07-28T03:00:34Z
github.com/juju/pubsub git f4dfa62f30adc6955341b3dd73dde7c8d9b23b9e 2017-03-31T03:24:24Z
github.com/juju/replicaset git 6b5becf2232ce76656ea765d8d915d41755a1513 2016-11-25T16:08:49Z
github.com/juju/retry git 62c62032529169c7ec02fa48f93349604c345e1f 2015-10-29T02:48:21Z
github.com/juju/rfc git ebdbbdb950cd039a531d15cdc2ac2cbd94f068ee 2016-07-11T02:42:13Z
Expand Down
4 changes: 1 addition & 3 deletions pubsub/apiserver/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

package apiserver

import "github.com/juju/pubsub"

// DetailsTopic is the topic name for the published message when the details
// of the api servers change. This message is normally published by the
// peergrouper when the set of API servers changes.
const DetailsTopic pubsub.Topic = "apiserver.details"
const DetailsTopic = "apiserver.details"

// APIServer contains the machine id and addresses of a single API server machine.
type APIServer struct {
Expand Down
2 changes: 2 additions & 0 deletions pubsub/centralhub/centralhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package centralhub

import (
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
"github.com/juju/utils"
"gopkg.in/juju/names.v2"
Expand All @@ -18,6 +19,7 @@ func New(origin names.MachineTag) *pubsub.StructuredHub {

return pubsub.NewStructuredHub(
&pubsub.StructuredHubConfig{
Logger: loggo.GetLogger("juju.centralhub"),
Marshaller: &yamlMarshaller{},
Annotations: map[string]interface{}{
"origin": origin.String(),
Expand Down
18 changes: 9 additions & 9 deletions pubsub/centralhub/centralhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func (*CentralHubSuite) waitForSubscribers(c *gc.C, done <-chan struct{}) {

func (s *CentralHubSuite) TestSetsOrigin(c *gc.C) {
hub := centralhub.New(names.NewMachineTag("42"))
topic := pubsub.Topic("testing")
topic := "testing"
var called bool
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
unsub, err := hub.SubscribeMatch(pubsub.MatchAll, func(t string, data map[string]interface{}) {
c.Check(t, gc.Equals, topic)
expected := map[string]interface{}{
"key": "value",
Expand All @@ -42,7 +42,7 @@ func (s *CentralHubSuite) TestSetsOrigin(c *gc.C) {
})

c.Assert(err, jc.ErrorIsNil)
defer unsub.Unsubscribe()
defer unsub()

done, err := hub.Publish(topic, map[string]interface{}{"key": "value"})
c.Assert(err, jc.ErrorIsNil)
Expand All @@ -56,9 +56,9 @@ type IntStruct struct {

func (s *CentralHubSuite) TestYAMLMarshalling(c *gc.C) {
hub := centralhub.New(names.NewMachineTag("42"))
topic := pubsub.Topic("testing")
topic := "testing"
var called bool
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
unsub, err := hub.SubscribeMatch(pubsub.MatchAll, func(t string, data map[string]interface{}) {
c.Check(t, gc.Equals, topic)
expected := map[string]interface{}{
"key": 1234,
Expand All @@ -69,7 +69,7 @@ func (s *CentralHubSuite) TestYAMLMarshalling(c *gc.C) {
})

c.Assert(err, jc.ErrorIsNil)
defer unsub.Unsubscribe()
defer unsub()

// With the default JSON marshalling, integers are marshalled to floats into the map.
done, err := hub.Publish(topic, IntStruct{1234})
Expand All @@ -88,9 +88,9 @@ func (s *CentralHubSuite) TestPostProcessingMaps(c *gc.C) {
// need to be map[string]interface{} not map[interface{}]interface{},
// which is what the YAML marshaller will give us.
hub := centralhub.New(names.NewMachineTag("42"))
topic := pubsub.Topic("testing")
topic := "testing"
var called bool
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
unsub, err := hub.SubscribeMatch(pubsub.MatchAll, func(t string, data map[string]interface{}) {
c.Check(t, gc.Equals, topic)
expected := map[string]interface{}{
"key": "value",
Expand All @@ -104,7 +104,7 @@ func (s *CentralHubSuite) TestPostProcessingMaps(c *gc.C) {
})

c.Assert(err, jc.ErrorIsNil)
defer unsub.Unsubscribe()
defer unsub()

// With the default JSON marshalling, integers are marshalled to floats into the map.
done, err := hub.Publish(topic, NestedStruct{
Expand Down
3 changes: 1 addition & 2 deletions worker/peergrouper/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
"github.com/juju/replicaset"
"github.com/juju/utils/clock"
worker "gopkg.in/juju/worker.v1"
Expand Down Expand Up @@ -83,7 +82,7 @@ var (
// Hub defines the only method of the apiserver centralhub that
// the peer grouper uses.
type Hub interface {
Publish(topic pubsub.Topic, data interface{}) (<-chan struct{}, error)
Publish(topic string, data interface{}) (<-chan struct{}, error)
}

// pgWorker is a worker which watches the controller machines in state
Expand Down
4 changes: 2 additions & 2 deletions worker/peergrouper/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (s *workerSuite) TestControllersArePublishedOverHub(c *gc.C) {
InitState(c, st, 3, testIPv4)
hub := pubsub.NewStructuredHub(nil)
event := make(chan apiserver.Details)
_, err := hub.Subscribe(apiserver.DetailsTopic, func(topic pubsub.Topic, data apiserver.Details, err error) {
_, err := hub.Subscribe(apiserver.DetailsTopic, func(topic string, data apiserver.Details, err error) {
c.Check(err, jc.ErrorIsNil)
event <- data
})
Expand Down Expand Up @@ -774,7 +774,7 @@ func (noPublisher) publishAPIServers(apiServers [][]network.HostPort, instanceId

type noOpHub struct{}

func (h *noOpHub) Publish(topic pubsub.Topic, data interface{}) (<-chan struct{}, error) {
func (h *noOpHub) Publish(topic string, data interface{}) (<-chan struct{}, error) {
return nil, nil
}

Expand Down

0 comments on commit 0ed12fa

Please sign in to comment.