Skip to content

Commit 5966152

Browse files
committed
Add pubsub/centralhub package.
1 parent c3bd5c9 commit 5966152

File tree

4 files changed

+178
-0
lines changed

4 files changed

+178
-0
lines changed

dependencies.tsv

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ github.com/juju/loggo git 3b7ece48644d35850f4ced4c2cbc2cf8413f58e0 2016-08-18T02
3737
github.com/juju/mempool git 24974d6c264fe5a29716e7d56ea24c4bd904b7cc 2016-02-05T10:49:27Z
3838
github.com/juju/mutex git 59c26ee163447c5c57f63ff71610d433862013de 2016-06-17T01:09:07Z
3939
github.com/juju/persistent-cookiejar git 5243747bf8f2d0897f6c7a52799327dc97d585e8 2016-11-15T13:33:28Z
40+
github.com/juju/pubsub git 9dcaca7eb4340dbf685aa7b3ad4cc4f8691a33d4 2016-07-28T03:00:34Z
4041
github.com/juju/replicaset git 6b5becf2232ce76656ea765d8d915d41755a1513 2016-11-25T16:08:49Z
4142
github.com/juju/retry git 62c62032529169c7ec02fa48f93349604c345e1f 2015-10-29T02:48:21Z
4243
github.com/juju/rfc git ebdbbdb950cd039a531d15cdc2ac2cbd94f068ee 2016-07-11T02:42:13Z

pubsub/centralhub/centralhub.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2016 Canonical Ltd.
2+
// Licensed under the AGPLv3, see LICENCE file for details.
3+
4+
package centralhub
5+
6+
import (
7+
"github.com/juju/errors"
8+
"github.com/juju/pubsub"
9+
"github.com/juju/utils"
10+
"gopkg.in/juju/names.v2"
11+
"gopkg.in/yaml.v2"
12+
)
13+
14+
// New returns a new structured hub using yaml marshalling with an origin
15+
// specified. The post processing ensures that the maps all have string keys
16+
// so they messages can be marshalled between apiservers.
17+
func New(origin names.MachineTag) *pubsub.StructuredHub {
18+
19+
return pubsub.NewStructuredHub(
20+
&pubsub.StructuredHubConfig{
21+
Marshaller: &yamlMarshaller{},
22+
Annotations: map[string]interface{}{
23+
"origin": origin.String(),
24+
},
25+
PostProcess: ensureStringMaps,
26+
})
27+
}
28+
29+
type yamlMarshaller struct{}
30+
31+
// Marshal implements Marshaller.
32+
func (*yamlMarshaller) Marshal(v interface{}) ([]byte, error) {
33+
return yaml.Marshal(v)
34+
}
35+
36+
// Unmarshal implements Marshaller.
37+
func (*yamlMarshaller) Unmarshal(data []byte, v interface{}) error {
38+
return yaml.Unmarshal(data, v)
39+
}
40+
41+
func ensureStringMaps(in map[string]interface{}) (map[string]interface{}, error) {
42+
out, err := utils.ConformYAML(in)
43+
if err != nil {
44+
return nil, errors.Trace(err)
45+
}
46+
return out.(map[string]interface{}), nil
47+
}

pubsub/centralhub/centralhub_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2016 Canonical Ltd.
2+
// Licensed under the AGPLv3, see LICENCE file for details.
3+
4+
package centralhub_test
5+
6+
import (
7+
"time"
8+
9+
"github.com/juju/pubsub"
10+
jc "github.com/juju/testing/checkers"
11+
gc "gopkg.in/check.v1"
12+
"gopkg.in/juju/names.v2"
13+
14+
"github.com/juju/juju/pubsub/centralhub"
15+
"github.com/juju/juju/testing"
16+
)
17+
18+
type CentralHubSuite struct{}
19+
20+
var _ = gc.Suite(&CentralHubSuite{})
21+
22+
func (*CentralHubSuite) waitForSubscribers(c *gc.C, done <-chan struct{}) {
23+
select {
24+
case <-done:
25+
case <-time.After(testing.ShortWait):
26+
c.Fatal("subscribers not finished")
27+
}
28+
}
29+
30+
func (s *CentralHubSuite) TestSetsOrigin(c *gc.C) {
31+
hub := centralhub.New(names.NewMachineTag("42"))
32+
topic := pubsub.Topic("testing")
33+
var called bool
34+
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
35+
c.Check(t, gc.Equals, topic)
36+
expected := map[string]interface{}{
37+
"key": "value",
38+
"origin": "machine-42",
39+
}
40+
c.Check(data, jc.DeepEquals, expected)
41+
called = true
42+
})
43+
44+
c.Assert(err, jc.ErrorIsNil)
45+
defer unsub.Unsubscribe()
46+
47+
done, err := hub.Publish(topic, map[string]interface{}{"key": "value"})
48+
c.Assert(err, jc.ErrorIsNil)
49+
s.waitForSubscribers(c, done)
50+
c.Assert(called, jc.IsTrue)
51+
}
52+
53+
type IntStruct struct {
54+
Key int `json:"key"`
55+
}
56+
57+
func (s *CentralHubSuite) TestYAMLMarshalling(c *gc.C) {
58+
hub := centralhub.New(names.NewMachineTag("42"))
59+
topic := pubsub.Topic("testing")
60+
var called bool
61+
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
62+
c.Check(t, gc.Equals, topic)
63+
expected := map[string]interface{}{
64+
"key": 1234,
65+
"origin": "machine-42",
66+
}
67+
c.Check(data, jc.DeepEquals, expected)
68+
called = true
69+
})
70+
71+
c.Assert(err, jc.ErrorIsNil)
72+
defer unsub.Unsubscribe()
73+
74+
// With the default JSON marshalling, integers are marshalled to floats into the map.
75+
done, err := hub.Publish(topic, IntStruct{1234})
76+
c.Assert(err, jc.ErrorIsNil)
77+
s.waitForSubscribers(c, done)
78+
c.Assert(called, jc.IsTrue)
79+
}
80+
81+
type NestedStruct struct {
82+
Key string `yaml:"key"`
83+
Nested IntStruct `yaml:"nested"`
84+
}
85+
86+
func (s *CentralHubSuite) TestPostProcessingMaps(c *gc.C) {
87+
// Due to the need to send the resulting maps over the API, nested structs
88+
// need to be map[string]interface{} not map[interface{}]interface{},
89+
// which is what the YAML marshaller will give us.
90+
hub := centralhub.New(names.NewMachineTag("42"))
91+
topic := pubsub.Topic("testing")
92+
var called bool
93+
unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) {
94+
c.Check(t, gc.Equals, topic)
95+
expected := map[string]interface{}{
96+
"key": "value",
97+
"nested": map[string]interface{}{
98+
"key": 1234,
99+
},
100+
"origin": "machine-42",
101+
}
102+
c.Check(data, jc.DeepEquals, expected)
103+
called = true
104+
})
105+
106+
c.Assert(err, jc.ErrorIsNil)
107+
defer unsub.Unsubscribe()
108+
109+
// With the default JSON marshalling, integers are marshalled to floats into the map.
110+
done, err := hub.Publish(topic, NestedStruct{
111+
Key: "value",
112+
Nested: IntStruct{1234}})
113+
c.Assert(err, jc.ErrorIsNil)
114+
s.waitForSubscribers(c, done)
115+
c.Assert(called, jc.IsTrue)
116+
}

pubsub/centralhub/package_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright 2016 Canonical Ltd.
2+
// Licensed under the AGPLv3, see LICENCE file for details.
3+
4+
package centralhub_test
5+
6+
import (
7+
"testing"
8+
9+
gc "gopkg.in/check.v1"
10+
)
11+
12+
func TestPackage(t *testing.T) {
13+
gc.TestingT(t)
14+
}

0 commit comments

Comments
 (0)