Skip to content

Commit f06c3e9

Browse files
authored
Merge pull request #7013 from howbazaar/gorilla-websocket
Replace x/net/websocket with gorilla/websocket. There was a tech-board decision some time in the last two years when we were discussing websocket libraries and why we had two. The decision then was to standardise on gorilla/websocket, but nothing happened. I'm hoping that using the more standards compliant websocket library will help with some of the leaks we are seeing. Even if they don't help right now, the new library gives the client code access to the websocket ping/pong frames, which the old library didn't. This should allow us to have better recognition when the other end goes away. ## QA steps Deploy an old controller, make sure there are workload agents. Upgrade the controller and look at the logs. All older clients, CLI and agents should continue to work as normal. New juju CLI should work with older controller. ## Documentation changes No user impact at all.
2 parents 3c535e6 + 0d3d1ac commit f06c3e9

30 files changed

+698
-534
lines changed

api/apiclient.go

Lines changed: 128 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ import (
1616
"sync/atomic"
1717
"time"
1818

19+
"github.com/gorilla/websocket"
1920
"github.com/juju/errors"
2021
"github.com/juju/loggo"
2122
"github.com/juju/retry"
2223
"github.com/juju/utils"
2324
"github.com/juju/utils/clock"
2425
"github.com/juju/utils/parallel"
2526
"github.com/juju/version"
26-
"golang.org/x/net/websocket"
2727
"gopkg.in/juju/names.v2"
2828
"gopkg.in/macaroon-bakery.v1/httpbakery"
2929
"gopkg.in/macaroon.v1"
@@ -47,6 +47,11 @@ const pingTimeout = 30 * time.Second
4747
// modelRoot is the prefix that all model API paths begin with.
4848
const modelRoot = "/model/"
4949

50+
// Use a 64k frame size for the websockets while we need to deal
51+
// with x/net/websocket connections that don't deal with recieving
52+
// fragmented messages.
53+
const websocketFrameSize = 65536
54+
5055
var logger = loggo.GetLogger("juju.api")
5156

5257
type rpcConnection interface {
@@ -183,12 +188,12 @@ func open(
183188
if clock == nil {
184189
return nil, errors.NotValidf("nil clock")
185190
}
186-
conn, tlsConfig, err := dialAPI(info, opts)
191+
dialResult, err := dialAPI(info, opts)
187192
if err != nil {
188193
return nil, errors.Trace(err)
189194
}
190195

191-
client := rpc.NewConn(jsoncodec.NewWebsocket(conn), observer.None())
196+
client := rpc.NewConn(jsoncodec.NewWebsocket(dialResult.conn), observer.None())
192197
client.Start()
193198

194199
bakeryClient := opts.BakeryClient
@@ -201,29 +206,36 @@ func open(
201206
httpc := *bakeryClient.Client
202207
bakeryClient.Client = &httpc
203208
}
204-
apiHost := conn.Config().Location.Host
209+
apiURL, err := url.Parse(dialResult.urlStr)
210+
if err != nil {
211+
// This should never happen as the url would have failed during dialAPI above.
212+
// However the code paths don't allow capture of the url.URL used.
213+
return nil, errors.Trace(err)
214+
}
215+
apiHost := apiURL.Host
216+
205217
// Technically when there's no CACert, we don't need this
206218
// machinery, because we could just use http.DefaultTransport
207219
// for everything, but it's easier just to leave it in place.
208220
bakeryClient.Client.Transport = &hostSwitchingTransport{
209221
primaryHost: apiHost,
210-
primary: utils.NewHttpTLSTransport(tlsConfig),
222+
primary: utils.NewHttpTLSTransport(dialResult.tlsConfig),
211223
fallback: http.DefaultTransport,
212224
}
213225

214226
st := &state{
215227
client: client,
216-
conn: conn,
228+
conn: dialResult.conn,
217229
clock: clock,
218230
addr: apiHost,
219231
cookieURL: &url.URL{
220232
Scheme: "https",
221-
Host: conn.Config().Location.Host,
233+
Host: apiHost,
222234
Path: "/",
223235
},
224236
pingerFacadeVersion: facadeVersions["Pinger"],
225237
serverScheme: "https",
226-
serverRootAddress: conn.Config().Location.Host,
238+
serverRootAddress: apiHost,
227239
// We populate the username and password before
228240
// login because, when doing HTTP requests, we'll want
229241
// to use the same username and password for authenticating
@@ -232,13 +244,13 @@ func open(
232244
password: info.Password,
233245
macaroons: info.Macaroons,
234246
nonce: info.Nonce,
235-
tlsConfig: tlsConfig,
247+
tlsConfig: dialResult.tlsConfig,
236248
bakeryClient: bakeryClient,
237249
modelTag: info.ModelTag,
238250
}
239251
if !info.SkipLogin {
240252
if err := st.Login(info.Tag, info.Password, info.Nonce, info.Macaroons); err != nil {
241-
conn.Close()
253+
dialResult.conn.Close()
242254
return nil, errors.Trace(err)
243255
}
244256
}
@@ -348,49 +360,65 @@ func (st *state) connectStream(path string, attrs url.Values, extraHeaders http.
348360
// TODO(macgreagoir) IPv6. Ubuntu still always provides IPv4 loopback,
349361
// and when/if this changes localhost should resolve to IPv6 loopback
350362
// in any case (lp:1644009). Review.
351-
cfg, err := websocket.NewConfig(target.String(), "http://localhost/")
352-
if err != nil {
353-
return nil, errors.Trace(err)
363+
364+
dialer := &websocket.Dialer{
365+
Proxy: http.ProxyFromEnvironment,
366+
TLSClientConfig: st.tlsConfig,
367+
// In order to deal with the remote side not handling message
368+
// fragmentation, we default to largeish frames.
369+
ReadBufferSize: websocketFrameSize,
370+
WriteBufferSize: websocketFrameSize,
354371
}
372+
var requestHeader http.Header
355373
if st.tag != "" {
356-
cfg.Header = utils.BasicAuthHeader(st.tag, st.password)
374+
requestHeader = utils.BasicAuthHeader(st.tag, st.password)
375+
} else {
376+
requestHeader = make(http.Header)
357377
}
378+
requestHeader.Set("Origin", "http://localhost/")
358379
if st.nonce != "" {
359-
cfg.Header.Set(params.MachineNonceHeader, st.nonce)
380+
requestHeader.Set(params.MachineNonceHeader, st.nonce)
360381
}
361382
// Add any cookies because they will not be sent to websocket
362383
// connections by default.
363-
err = st.addCookiesToHeader(cfg.Header)
384+
err := st.addCookiesToHeader(requestHeader)
364385
if err != nil {
365386
return nil, errors.Trace(err)
366387
}
367388
for header, values := range extraHeaders {
368389
for _, value := range values {
369-
cfg.Header.Add(header, value)
390+
requestHeader.Add(header, value)
370391
}
371392
}
372393

373-
cfg.TlsConfig = st.tlsConfig
374-
connection, err := websocketDialConfig(cfg)
394+
connection, err := websocketDial(dialer, target.String(), requestHeader)
375395
if err != nil {
376396
return nil, err
377397
}
378398
if err := readInitialStreamError(connection); err != nil {
399+
connection.Close()
379400
return nil, errors.Trace(err)
380401
}
381402
return connection, nil
382403
}
383404

384405
// readInitialStreamError reads the initial error response
385406
// from a stream connection and returns it.
386-
func readInitialStreamError(conn io.Reader) error {
407+
func readInitialStreamError(ws base.Stream) error {
387408
// We can use bufio here because the websocket guarantees that a
388409
// single read will not read more than a single frame; there is
389410
// no guarantee that a single read might not read less than the
390411
// whole frame though, so using a single Read call is not
391412
// correct. By using ReadSlice rather than ReadBytes, we
392413
// guarantee that the error can't be too big (>4096 bytes).
393-
line, err := bufio.NewReader(conn).ReadSlice('\n')
414+
messageType, reader, err := ws.NextReader()
415+
if err != nil {
416+
return errors.Annotate(err, "unable to get reader")
417+
}
418+
if messageType != websocket.TextMessage {
419+
return errors.Errorf("unexpected message type %v", messageType)
420+
}
421+
line, err := bufio.NewReader(reader).ReadSlice('\n')
394422
if err != nil {
395423
return errors.Annotate(err, "unable to read initial response")
396424
}
@@ -477,20 +505,21 @@ func tagToString(tag names.Tag) string {
477505
return tag.String()
478506
}
479507

508+
type dialResult struct {
509+
conn *websocket.Conn
510+
urlStr string
511+
tlsConfig *tls.Config
512+
}
513+
480514
// dialAPI establishes a websocket connection to the RPC
481515
// API websocket on the API server using Info. If multiple API addresses
482516
// are provided in Info they will be tried concurrently - the first successful
483517
// connection wins.
484518
//
485519
// It also returns the TLS configuration that it has derived from the Info.
486-
func dialAPI(info *Info, opts DialOpts) (*websocket.Conn, *tls.Config, error) {
487-
// Set opts.DialWebsocket here rather than in open because
488-
// some tests call dialAPI directly.
489-
if opts.DialWebsocket == nil {
490-
opts.DialWebsocket = websocket.DialConfig
491-
}
520+
func dialAPI(info *Info, opts DialOpts) (*dialResult, error) {
492521
if len(info.Addrs) == 0 {
493-
return nil, nil, errors.New("no API addresses to connect to")
522+
return nil, errors.New("no API addresses to connect to")
494523
}
495524
tlsConfig := utils.SecureTLSConfig()
496525
tlsConfig.InsecureSkipVerify = opts.InsecureSkipVerify
@@ -501,7 +530,7 @@ func dialAPI(info *Info, opts DialOpts) (*websocket.Conn, *tls.Config, error) {
501530
tlsConfig.ServerName = "juju-apiserver"
502531
certPool, err := CreateCertPool(info.CACert)
503532
if err != nil {
504-
return nil, nil, errors.Annotate(err, "cert pool creation failed")
533+
return nil, errors.Annotate(err, "cert pool creation failed")
505534
}
506535
tlsConfig.RootCAs = certPool
507536
} else {
@@ -510,33 +539,62 @@ func dialAPI(info *Info, opts DialOpts) (*websocket.Conn, *tls.Config, error) {
510539
// name in the address will be used as usual).
511540
tlsConfig.ServerName = info.SNIHostName
512541
}
542+
543+
opts.tlsConfig = tlsConfig
544+
545+
// Set opts.DialWebsocket here rather than in open because
546+
// some tests call dialAPI directly.
547+
if opts.DialWebsocket == nil {
548+
dialer := &websocketDialerAdapter{
549+
&websocket.Dialer{
550+
Proxy: http.ProxyFromEnvironment,
551+
TLSClientConfig: tlsConfig,
552+
// In order to deal with the remote side not handling message
553+
// fragmentation, we default to largeish frames.
554+
ReadBufferSize: websocketFrameSize,
555+
WriteBufferSize: websocketFrameSize,
556+
},
557+
}
558+
opts.DialWebsocket = dialer.Dial
559+
}
560+
513561
path, err := apiPath(info.ModelTag, "/api")
514562
if err != nil {
515-
return nil, nil, errors.Trace(err)
563+
return nil, errors.Trace(err)
516564
}
517-
conn, err := dialWebsocketMulti(info.Addrs, path, tlsConfig, opts)
565+
conn, urlStr, err := dialWebsocketMulti(info.Addrs, path, opts)
518566
if err != nil {
519-
return nil, nil, errors.Trace(err)
567+
return nil, errors.Trace(err)
520568
}
521-
logger.Infof("connection established to %q", conn.RemoteAddr())
522-
return conn, tlsConfig, nil
569+
logger.Infof("connection established to %q", urlStr)
570+
return &dialResult{conn, urlStr, tlsConfig}, nil
571+
}
572+
573+
type websocketDialerAdapter struct {
574+
dialer *websocket.Dialer
575+
}
576+
577+
func (a *websocketDialerAdapter) Dial(urlStr string, tlsConfig *tls.Config, requestHeader http.Header) (*websocket.Conn, *http.Response, error) {
578+
// Ignore the tlsConfig because it is set on the dialer.
579+
// The tls.Config is only passed through for the purpose of catpure in the tests.
580+
return a.dialer.Dial(urlStr, requestHeader)
523581
}
524582

525583
// dialWebsocketMulti dials a websocket with one of the provided addresses, the
526584
// specified URL path, TLS configuration, and dial options. Each of the
527585
// specified addresses will be attempted concurrently, and the first
528586
// successful connection will be returned.
529-
func dialWebsocketMulti(addrs []string, path string, tlsConfig *tls.Config, opts DialOpts) (*websocket.Conn, error) {
587+
func dialWebsocketMulti(addrs []string, path string, opts DialOpts) (*websocket.Conn, string, error) {
530588
// Dial all addresses at reasonable intervals.
531589
try := parallel.NewTry(0, nil)
532590
defer try.Kill()
533591
for _, addr := range addrs {
534-
err := startDialWebsocket(try, addr, path, opts, tlsConfig)
592+
err := startDialWebsocket(try, addr, path, opts)
535593
if err == parallel.ErrStopped {
536594
break
537595
}
538596
if err != nil {
539-
return nil, errors.Trace(err)
597+
return nil, "", errors.Trace(err)
540598
}
541599
select {
542600
case <-time.After(opts.DialAddressInterval):
@@ -546,30 +604,40 @@ func dialWebsocketMulti(addrs []string, path string, tlsConfig *tls.Config, opts
546604
try.Close()
547605
result, err := try.Result()
548606
if err != nil {
549-
return nil, errors.Trace(err)
607+
return nil, "", errors.Trace(err)
550608
}
551-
return result.(*websocket.Conn), nil
609+
wrapper := result.(*connWrapper)
610+
return wrapper.conn, wrapper.urlStr, nil
552611
}
553612

554613
// startDialWebsocket starts websocket connection to a single address
555614
// on the given try instance.
556-
func startDialWebsocket(try *parallel.Try, addr, path string, opts DialOpts, tlsConfig *tls.Config) error {
615+
func startDialWebsocket(try *parallel.Try, addr, path string, opts DialOpts) error {
557616
// origin is required by the WebSocket API, used for "origin policy"
558617
// in websockets. We pass localhost to satisfy the API; it is
559618
// inconsequential to us.
560-
const origin = "http://localhost/"
561-
cfg, err := websocket.NewConfig("wss://"+addr+path, origin)
562-
if err != nil {
563-
return errors.Trace(err)
564-
}
565-
cfg.TlsConfig = tlsConfig
566-
return try.Start(newWebsocketDialer(cfg, opts))
619+
urlStr := "wss://" + addr + path
620+
return try.Start(newWebsocketDialer(urlStr, opts))
621+
}
622+
623+
// connWrapper contains the *websocket.Conn and the urlStr that was used
624+
// to connect to it. The gorilla/websocket code does not remember the URL
625+
// that was used to connect to it, and many internal parts of Juju assume
626+
// that it does.
627+
type connWrapper struct {
628+
conn *websocket.Conn
629+
urlStr string
630+
}
631+
632+
// This is defined for the parallel try to close other results.
633+
func (c *connWrapper) Close() error {
634+
return c.conn.Close()
567635
}
568636

569637
// newWebsocketDialer0 returns a function that dials the websocket represented
570638
// by the given configuration with the given dial options, suitable for passing
571639
// to utils/parallel.Try.Start.
572-
func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct{}) (io.Closer, error) {
640+
func newWebsocketDialer(urlStr string, opts DialOpts) func(<-chan struct{}) (io.Closer, error) {
573641
// TODO(katco): 2016-08-09: lp:1611427
574642
openAttempt := utils.AttemptStrategy{
575643
Total: opts.Timeout,
@@ -587,11 +655,12 @@ func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct
587655
return nil, parallel.ErrStopped
588656
default:
589657
}
590-
logger.Debugf("dialing %q", cfg.Location)
591-
conn, err := opts.DialWebsocket(cfg)
658+
logger.Debugf("dialing %q", urlStr)
659+
// Not passing through any extra header information
660+
conn, _, err := opts.DialWebsocket(urlStr, opts.tlsConfig, nil)
592661
if err == nil {
593-
logger.Debugf("successfully dialed %q", cfg.Location)
594-
return conn, nil
662+
logger.Debugf("successfully dialed %q", urlStr)
663+
return &connWrapper{conn, urlStr}, nil
595664
}
596665
if isCertErr := isX509Error(err); !a.HasNext() || isCertErr {
597666
// We won't reconnect when there's an X509
@@ -612,25 +681,20 @@ func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct
612681
// isX509Error reports whether the given websocket error
613682
// results from an X509 problem.
614683
func isX509Error(err error) bool {
615-
wsErr, ok := errors.Cause(err).(*websocket.DialError)
616-
if !ok {
617-
return false
618-
}
619-
switch wsErr.Err.(type) {
620-
case x509.HostnameError,
684+
switch errType := err.(type) {
685+
case *websocket.CloseError:
686+
return errType.Code == websocket.CloseTLSHandshake
687+
case x509.CertificateInvalidError,
688+
x509.HostnameError,
621689
x509.InsecureAlgorithmError,
622690
x509.UnhandledCriticalExtension,
623691
x509.UnknownAuthorityError,
624692
x509.ConstraintViolationError,
625693
x509.SystemRootsError:
626694
return true
695+
default:
696+
return false
627697
}
628-
switch err {
629-
case x509.ErrUnsupportedAlgorithm,
630-
x509.IncorrectPasswordError:
631-
return true
632-
}
633-
return false
634698
}
635699

636700
type hasErrorCode interface {

0 commit comments

Comments
 (0)