|
| 1 | +// Copyright 2016 Canonical Ltd. |
| 2 | +// Licensed under the AGPLv3, see LICENCE file for details. |
| 3 | + |
| 4 | +package centralhub_test |
| 5 | + |
| 6 | +import ( |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/juju/pubsub" |
| 10 | + jc "github.com/juju/testing/checkers" |
| 11 | + gc "gopkg.in/check.v1" |
| 12 | + "gopkg.in/juju/names.v2" |
| 13 | + |
| 14 | + "github.com/juju/juju/pubsub/centralhub" |
| 15 | + "github.com/juju/juju/testing" |
| 16 | +) |
| 17 | + |
| 18 | +type CentralHubSuite struct{} |
| 19 | + |
| 20 | +var _ = gc.Suite(&CentralHubSuite{}) |
| 21 | + |
| 22 | +func (*CentralHubSuite) waitForSubscribers(c *gc.C, done <-chan struct{}) { |
| 23 | + select { |
| 24 | + case <-done: |
| 25 | + case <-time.After(testing.ShortWait): |
| 26 | + c.Fatal("subscribers not finished") |
| 27 | + } |
| 28 | +} |
| 29 | + |
| 30 | +func (s *CentralHubSuite) TestSetsOrigin(c *gc.C) { |
| 31 | + hub := centralhub.New(names.NewMachineTag("42")) |
| 32 | + topic := pubsub.Topic("testing") |
| 33 | + var called bool |
| 34 | + unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) { |
| 35 | + c.Check(t, gc.Equals, topic) |
| 36 | + expected := map[string]interface{}{ |
| 37 | + "key": "value", |
| 38 | + "origin": "machine-42", |
| 39 | + } |
| 40 | + c.Check(data, jc.DeepEquals, expected) |
| 41 | + called = true |
| 42 | + }) |
| 43 | + |
| 44 | + c.Assert(err, jc.ErrorIsNil) |
| 45 | + defer unsub.Unsubscribe() |
| 46 | + |
| 47 | + done, err := hub.Publish(topic, map[string]interface{}{"key": "value"}) |
| 48 | + c.Assert(err, jc.ErrorIsNil) |
| 49 | + s.waitForSubscribers(c, done) |
| 50 | + c.Assert(called, jc.IsTrue) |
| 51 | +} |
| 52 | + |
| 53 | +type IntStruct struct { |
| 54 | + Key int `json:"key"` |
| 55 | +} |
| 56 | + |
| 57 | +func (s *CentralHubSuite) TestYAMLMarshalling(c *gc.C) { |
| 58 | + hub := centralhub.New(names.NewMachineTag("42")) |
| 59 | + topic := pubsub.Topic("testing") |
| 60 | + var called bool |
| 61 | + unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) { |
| 62 | + c.Check(t, gc.Equals, topic) |
| 63 | + expected := map[string]interface{}{ |
| 64 | + "key": 1234, |
| 65 | + "origin": "machine-42", |
| 66 | + } |
| 67 | + c.Check(data, jc.DeepEquals, expected) |
| 68 | + called = true |
| 69 | + }) |
| 70 | + |
| 71 | + c.Assert(err, jc.ErrorIsNil) |
| 72 | + defer unsub.Unsubscribe() |
| 73 | + |
| 74 | + // With the default JSON marshalling, integers are marshalled to floats into the map. |
| 75 | + done, err := hub.Publish(topic, IntStruct{1234}) |
| 76 | + c.Assert(err, jc.ErrorIsNil) |
| 77 | + s.waitForSubscribers(c, done) |
| 78 | + c.Assert(called, jc.IsTrue) |
| 79 | +} |
| 80 | + |
| 81 | +type NestedStruct struct { |
| 82 | + Key string `yaml:"key"` |
| 83 | + Nested IntStruct `yaml:"nested"` |
| 84 | +} |
| 85 | + |
| 86 | +func (s *CentralHubSuite) TestPostProcessingMaps(c *gc.C) { |
| 87 | + // Due to the need to send the resulting maps over the API, nested structs |
| 88 | + // need to be map[string]interface{} not map[interface{}]interface{}, |
| 89 | + // which is what the YAML marshaller will give us. |
| 90 | + hub := centralhub.New(names.NewMachineTag("42")) |
| 91 | + topic := pubsub.Topic("testing") |
| 92 | + var called bool |
| 93 | + unsub, err := hub.Subscribe(pubsub.MatchAll, func(t pubsub.Topic, data map[string]interface{}) { |
| 94 | + c.Check(t, gc.Equals, topic) |
| 95 | + expected := map[string]interface{}{ |
| 96 | + "key": "value", |
| 97 | + "nested": map[string]interface{}{ |
| 98 | + "key": 1234, |
| 99 | + }, |
| 100 | + "origin": "machine-42", |
| 101 | + } |
| 102 | + c.Check(data, jc.DeepEquals, expected) |
| 103 | + called = true |
| 104 | + }) |
| 105 | + |
| 106 | + c.Assert(err, jc.ErrorIsNil) |
| 107 | + defer unsub.Unsubscribe() |
| 108 | + |
| 109 | + // With the default JSON marshalling, integers are marshalled to floats into the map. |
| 110 | + done, err := hub.Publish(topic, NestedStruct{ |
| 111 | + Key: "value", |
| 112 | + Nested: IntStruct{1234}}) |
| 113 | + c.Assert(err, jc.ErrorIsNil) |
| 114 | + s.waitForSubscribers(c, done) |
| 115 | + c.Assert(called, jc.IsTrue) |
| 116 | +} |
0 commit comments