-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
133 lines (117 loc) · 3.81 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package apiserver
import (
"net/http"
"time"
gorillaws "github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/featureflag"
"github.com/juju/loggo"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/apiserver/websocket"
"github.com/juju/juju/feature"
)
// Hub defines the publish method that the handler uses to publish
// messages on the centralhub of the apiserver.
type Hub interface {
Publish(string, interface{}) (func(), error)
}
func newPubSubHandler(h httpContext, hub Hub) http.Handler {
return &pubsubHandler{
ctxt: h,
hub: hub,
logger: loggo.GetLogger("juju.apiserver.pubsub"),
}
}
type pubsubHandler struct {
ctxt httpContext
hub Hub
logger loggo.Logger
}
// ServeHTTP implements the http.Handler interface.
func (h *pubsubHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler := func(socket *websocket.Conn) {
h.logger.Debugf("start of *pubsubHandler.ServeHTTP")
defer socket.Close()
// If we get to here, no more errors to report, so we report a nil
// error. This way the first line of the socket is always a json
// formatted simple error.
h.sendError(socket, req, nil)
// Here we configure the ping/pong handling for the websocket so
// the server can notice when the client goes away.
// See the long note in logsink.go for the rationale.
_ = socket.SetReadDeadline(time.Now().Add(websocket.PongDelay))
socket.SetPongHandler(func(string) error {
_ = socket.SetReadDeadline(time.Now().Add(websocket.PongDelay))
return nil
})
ticker := time.NewTicker(websocket.PingPeriod)
defer ticker.Stop()
messageCh := h.receiveMessages(socket)
for {
select {
case <-h.ctxt.stop():
return
case <-ticker.C:
deadline := time.Now().Add(websocket.WriteWait)
if err := socket.WriteControl(gorillaws.PingMessage, []byte{}, deadline); err != nil {
// This error is expected if the other end goes away. By
// returning we close the socket through the defer call.
h.logger.Debugf("failed to write ping: %s", err)
return
}
case m := <-messageCh:
h.logger.Tracef("topic: %q, data: %v", m.Topic, m.Data)
_, err := h.hub.Publish(m.Topic, m.Data)
if err != nil {
h.logger.Errorf("publish failed: %v", err)
}
}
}
}
websocket.Serve(w, req, handler)
}
func (h *pubsubHandler) receiveMessages(socket *websocket.Conn) <-chan params.PubSubMessage {
messageCh := make(chan params.PubSubMessage)
go func() {
for {
// The message needs to be new each time through the loop to ensure
// the map is not reused.
var m params.PubSubMessage
// Receive() blocks until data arrives but will also be
// unblocked when the API handler calls socket.Close as it
// finishes.
if err := socket.ReadJSON(&m); err != nil {
// Since we don't give a list of expected error codes,
// any CloseError type is considered unexpected.
if gorillaws.IsUnexpectedCloseError(err) {
h.logger.Tracef("websocket closed")
} else {
h.logger.Errorf("pubsub receive error: %v", err)
}
return
}
// Send the log message.
select {
case <-h.ctxt.stop():
return
case messageCh <- m:
}
}
}()
return messageCh
}
// sendError sends a JSON-encoded error response.
func (h *pubsubHandler) sendError(ws *websocket.Conn, req *http.Request, err error) {
// There is no need to log the error for normal operators as there is nothing
// they can action. This is for developers.
if err != nil && featureflag.Enabled(feature.DeveloperMode) {
h.logger.Errorf("returning error from %s %s: %s", req.Method, req.URL.Path, errors.Details(err))
}
if sendErr := ws.SendInitialErrorV0(err); sendErr != nil {
h.logger.Errorf("closing websocket, %v", err)
ws.Close()
return
}
}