@@ -16,14 +16,14 @@ import (
16
16
"sync/atomic"
17
17
"time"
18
18
19
+ "github.com/gorilla/websocket"
19
20
"github.com/juju/errors"
20
21
"github.com/juju/loggo"
21
22
"github.com/juju/retry"
22
23
"github.com/juju/utils"
23
24
"github.com/juju/utils/clock"
24
25
"github.com/juju/utils/parallel"
25
26
"github.com/juju/version"
26
- "golang.org/x/net/websocket"
27
27
"gopkg.in/juju/names.v2"
28
28
"gopkg.in/macaroon-bakery.v1/httpbakery"
29
29
"gopkg.in/macaroon.v1"
@@ -47,6 +47,11 @@ const pingTimeout = 30 * time.Second
47
47
// modelRoot is the prefix that all model API paths begin with.
48
48
const modelRoot = "/model/"
49
49
50
+ // Use a 64k frame size for the websockets while we need to deal
51
+ // with x/net/websocket connections that don't deal with recieving
52
+ // fragmented messages.
53
+ const websocketFrameSize = 65536
54
+
50
55
var logger = loggo .GetLogger ("juju.api" )
51
56
52
57
type rpcConnection interface {
@@ -183,12 +188,12 @@ func open(
183
188
if clock == nil {
184
189
return nil , errors .NotValidf ("nil clock" )
185
190
}
186
- conn , tlsConfig , err := dialAPI (info , opts )
191
+ dialResult , err := dialAPI (info , opts )
187
192
if err != nil {
188
193
return nil , errors .Trace (err )
189
194
}
190
195
191
- client := rpc .NewConn (jsoncodec .NewWebsocket (conn ), observer .None ())
196
+ client := rpc .NewConn (jsoncodec .NewWebsocket (dialResult . conn ), observer .None ())
192
197
client .Start ()
193
198
194
199
bakeryClient := opts .BakeryClient
@@ -201,29 +206,36 @@ func open(
201
206
httpc := * bakeryClient .Client
202
207
bakeryClient .Client = & httpc
203
208
}
204
- apiHost := conn .Config ().Location .Host
209
+ apiURL , err := url .Parse (dialResult .urlStr )
210
+ if err != nil {
211
+ // This should never happen as the url would have failed during dialAPI above.
212
+ // However the code paths don't allow capture of the url.URL used.
213
+ return nil , errors .Trace (err )
214
+ }
215
+ apiHost := apiURL .Host
216
+
205
217
// Technically when there's no CACert, we don't need this
206
218
// machinery, because we could just use http.DefaultTransport
207
219
// for everything, but it's easier just to leave it in place.
208
220
bakeryClient .Client .Transport = & hostSwitchingTransport {
209
221
primaryHost : apiHost ,
210
- primary : utils .NewHttpTLSTransport (tlsConfig ),
222
+ primary : utils .NewHttpTLSTransport (dialResult . tlsConfig ),
211
223
fallback : http .DefaultTransport ,
212
224
}
213
225
214
226
st := & state {
215
227
client : client ,
216
- conn : conn ,
228
+ conn : dialResult . conn ,
217
229
clock : clock ,
218
230
addr : apiHost ,
219
231
cookieURL : & url.URL {
220
232
Scheme : "https" ,
221
- Host : conn . Config (). Location . Host ,
233
+ Host : apiHost ,
222
234
Path : "/" ,
223
235
},
224
236
pingerFacadeVersion : facadeVersions ["Pinger" ],
225
237
serverScheme : "https" ,
226
- serverRootAddress : conn . Config (). Location . Host ,
238
+ serverRootAddress : apiHost ,
227
239
// We populate the username and password before
228
240
// login because, when doing HTTP requests, we'll want
229
241
// to use the same username and password for authenticating
@@ -232,13 +244,13 @@ func open(
232
244
password : info .Password ,
233
245
macaroons : info .Macaroons ,
234
246
nonce : info .Nonce ,
235
- tlsConfig : tlsConfig ,
247
+ tlsConfig : dialResult . tlsConfig ,
236
248
bakeryClient : bakeryClient ,
237
249
modelTag : info .ModelTag ,
238
250
}
239
251
if ! info .SkipLogin {
240
252
if err := st .Login (info .Tag , info .Password , info .Nonce , info .Macaroons ); err != nil {
241
- conn .Close ()
253
+ dialResult . conn .Close ()
242
254
return nil , errors .Trace (err )
243
255
}
244
256
}
@@ -348,49 +360,65 @@ func (st *state) connectStream(path string, attrs url.Values, extraHeaders http.
348
360
// TODO(macgreagoir) IPv6. Ubuntu still always provides IPv4 loopback,
349
361
// and when/if this changes localhost should resolve to IPv6 loopback
350
362
// in any case (lp:1644009). Review.
351
- cfg , err := websocket .NewConfig (target .String (), "http://localhost/" )
352
- if err != nil {
353
- return nil , errors .Trace (err )
363
+
364
+ dialer := & websocket.Dialer {
365
+ Proxy : http .ProxyFromEnvironment ,
366
+ TLSClientConfig : st .tlsConfig ,
367
+ // In order to deal with the remote side not handling message
368
+ // fragmentation, we default to largeish frames.
369
+ ReadBufferSize : websocketFrameSize ,
370
+ WriteBufferSize : websocketFrameSize ,
354
371
}
372
+ var requestHeader http.Header
355
373
if st .tag != "" {
356
- cfg .Header = utils .BasicAuthHeader (st .tag , st .password )
374
+ requestHeader = utils .BasicAuthHeader (st .tag , st .password )
375
+ } else {
376
+ requestHeader = make (http.Header )
357
377
}
378
+ requestHeader .Set ("Origin" , "http://localhost/" )
358
379
if st .nonce != "" {
359
- cfg . Header .Set (params .MachineNonceHeader , st .nonce )
380
+ requestHeader .Set (params .MachineNonceHeader , st .nonce )
360
381
}
361
382
// Add any cookies because they will not be sent to websocket
362
383
// connections by default.
363
- err = st .addCookiesToHeader (cfg . Header )
384
+ err : = st .addCookiesToHeader (requestHeader )
364
385
if err != nil {
365
386
return nil , errors .Trace (err )
366
387
}
367
388
for header , values := range extraHeaders {
368
389
for _ , value := range values {
369
- cfg . Header .Add (header , value )
390
+ requestHeader .Add (header , value )
370
391
}
371
392
}
372
393
373
- cfg .TlsConfig = st .tlsConfig
374
- connection , err := websocketDialConfig (cfg )
394
+ connection , err := websocketDial (dialer , target .String (), requestHeader )
375
395
if err != nil {
376
396
return nil , err
377
397
}
378
398
if err := readInitialStreamError (connection ); err != nil {
399
+ connection .Close ()
379
400
return nil , errors .Trace (err )
380
401
}
381
402
return connection , nil
382
403
}
383
404
384
405
// readInitialStreamError reads the initial error response
385
406
// from a stream connection and returns it.
386
- func readInitialStreamError (conn io. Reader ) error {
407
+ func readInitialStreamError (ws base. Stream ) error {
387
408
// We can use bufio here because the websocket guarantees that a
388
409
// single read will not read more than a single frame; there is
389
410
// no guarantee that a single read might not read less than the
390
411
// whole frame though, so using a single Read call is not
391
412
// correct. By using ReadSlice rather than ReadBytes, we
392
413
// guarantee that the error can't be too big (>4096 bytes).
393
- line , err := bufio .NewReader (conn ).ReadSlice ('\n' )
414
+ messageType , reader , err := ws .NextReader ()
415
+ if err != nil {
416
+ return errors .Annotate (err , "unable to get reader" )
417
+ }
418
+ if messageType != websocket .TextMessage {
419
+ return errors .Errorf ("unexpected message type %v" , messageType )
420
+ }
421
+ line , err := bufio .NewReader (reader ).ReadSlice ('\n' )
394
422
if err != nil {
395
423
return errors .Annotate (err , "unable to read initial response" )
396
424
}
@@ -477,20 +505,21 @@ func tagToString(tag names.Tag) string {
477
505
return tag .String ()
478
506
}
479
507
508
+ type dialResult struct {
509
+ conn * websocket.Conn
510
+ urlStr string
511
+ tlsConfig * tls.Config
512
+ }
513
+
480
514
// dialAPI establishes a websocket connection to the RPC
481
515
// API websocket on the API server using Info. If multiple API addresses
482
516
// are provided in Info they will be tried concurrently - the first successful
483
517
// connection wins.
484
518
//
485
519
// It also returns the TLS configuration that it has derived from the Info.
486
- func dialAPI (info * Info , opts DialOpts ) (* websocket.Conn , * tls.Config , error ) {
487
- // Set opts.DialWebsocket here rather than in open because
488
- // some tests call dialAPI directly.
489
- if opts .DialWebsocket == nil {
490
- opts .DialWebsocket = websocket .DialConfig
491
- }
520
+ func dialAPI (info * Info , opts DialOpts ) (* dialResult , error ) {
492
521
if len (info .Addrs ) == 0 {
493
- return nil , nil , errors .New ("no API addresses to connect to" )
522
+ return nil , errors .New ("no API addresses to connect to" )
494
523
}
495
524
tlsConfig := utils .SecureTLSConfig ()
496
525
tlsConfig .InsecureSkipVerify = opts .InsecureSkipVerify
@@ -501,7 +530,7 @@ func dialAPI(info *Info, opts DialOpts) (*websocket.Conn, *tls.Config, error) {
501
530
tlsConfig .ServerName = "juju-apiserver"
502
531
certPool , err := CreateCertPool (info .CACert )
503
532
if err != nil {
504
- return nil , nil , errors .Annotate (err , "cert pool creation failed" )
533
+ return nil , errors .Annotate (err , "cert pool creation failed" )
505
534
}
506
535
tlsConfig .RootCAs = certPool
507
536
} else {
@@ -510,33 +539,62 @@ func dialAPI(info *Info, opts DialOpts) (*websocket.Conn, *tls.Config, error) {
510
539
// name in the address will be used as usual).
511
540
tlsConfig .ServerName = info .SNIHostName
512
541
}
542
+
543
+ opts .tlsConfig = tlsConfig
544
+
545
+ // Set opts.DialWebsocket here rather than in open because
546
+ // some tests call dialAPI directly.
547
+ if opts .DialWebsocket == nil {
548
+ dialer := & websocketDialerAdapter {
549
+ & websocket.Dialer {
550
+ Proxy : http .ProxyFromEnvironment ,
551
+ TLSClientConfig : tlsConfig ,
552
+ // In order to deal with the remote side not handling message
553
+ // fragmentation, we default to largeish frames.
554
+ ReadBufferSize : websocketFrameSize ,
555
+ WriteBufferSize : websocketFrameSize ,
556
+ },
557
+ }
558
+ opts .DialWebsocket = dialer .Dial
559
+ }
560
+
513
561
path , err := apiPath (info .ModelTag , "/api" )
514
562
if err != nil {
515
- return nil , nil , errors .Trace (err )
563
+ return nil , errors .Trace (err )
516
564
}
517
- conn , err := dialWebsocketMulti (info .Addrs , path , tlsConfig , opts )
565
+ conn , urlStr , err := dialWebsocketMulti (info .Addrs , path , opts )
518
566
if err != nil {
519
- return nil , nil , errors .Trace (err )
567
+ return nil , errors .Trace (err )
520
568
}
521
- logger .Infof ("connection established to %q" , conn .RemoteAddr ())
522
- return conn , tlsConfig , nil
569
+ logger .Infof ("connection established to %q" , urlStr )
570
+ return & dialResult {conn , urlStr , tlsConfig }, nil
571
+ }
572
+
573
+ type websocketDialerAdapter struct {
574
+ dialer * websocket.Dialer
575
+ }
576
+
577
+ func (a * websocketDialerAdapter ) Dial (urlStr string , tlsConfig * tls.Config , requestHeader http.Header ) (* websocket.Conn , * http.Response , error ) {
578
+ // Ignore the tlsConfig because it is set on the dialer.
579
+ // The tls.Config is only passed through for the purpose of catpure in the tests.
580
+ return a .dialer .Dial (urlStr , requestHeader )
523
581
}
524
582
525
583
// dialWebsocketMulti dials a websocket with one of the provided addresses, the
526
584
// specified URL path, TLS configuration, and dial options. Each of the
527
585
// specified addresses will be attempted concurrently, and the first
528
586
// successful connection will be returned.
529
- func dialWebsocketMulti (addrs []string , path string , tlsConfig * tls. Config , opts DialOpts ) (* websocket.Conn , error ) {
587
+ func dialWebsocketMulti (addrs []string , path string , opts DialOpts ) (* websocket.Conn , string , error ) {
530
588
// Dial all addresses at reasonable intervals.
531
589
try := parallel .NewTry (0 , nil )
532
590
defer try .Kill ()
533
591
for _ , addr := range addrs {
534
- err := startDialWebsocket (try , addr , path , opts , tlsConfig )
592
+ err := startDialWebsocket (try , addr , path , opts )
535
593
if err == parallel .ErrStopped {
536
594
break
537
595
}
538
596
if err != nil {
539
- return nil , errors .Trace (err )
597
+ return nil , "" , errors .Trace (err )
540
598
}
541
599
select {
542
600
case <- time .After (opts .DialAddressInterval ):
@@ -546,30 +604,40 @@ func dialWebsocketMulti(addrs []string, path string, tlsConfig *tls.Config, opts
546
604
try .Close ()
547
605
result , err := try .Result ()
548
606
if err != nil {
549
- return nil , errors .Trace (err )
607
+ return nil , "" , errors .Trace (err )
550
608
}
551
- return result .(* websocket.Conn ), nil
609
+ wrapper := result .(* connWrapper )
610
+ return wrapper .conn , wrapper .urlStr , nil
552
611
}
553
612
554
613
// startDialWebsocket starts websocket connection to a single address
555
614
// on the given try instance.
556
- func startDialWebsocket (try * parallel.Try , addr , path string , opts DialOpts , tlsConfig * tls. Config ) error {
615
+ func startDialWebsocket (try * parallel.Try , addr , path string , opts DialOpts ) error {
557
616
// origin is required by the WebSocket API, used for "origin policy"
558
617
// in websockets. We pass localhost to satisfy the API; it is
559
618
// inconsequential to us.
560
- const origin = "http://localhost/"
561
- cfg , err := websocket .NewConfig ("wss://" + addr + path , origin )
562
- if err != nil {
563
- return errors .Trace (err )
564
- }
565
- cfg .TlsConfig = tlsConfig
566
- return try .Start (newWebsocketDialer (cfg , opts ))
619
+ urlStr := "wss://" + addr + path
620
+ return try .Start (newWebsocketDialer (urlStr , opts ))
621
+ }
622
+
623
+ // connWrapper contains the *websocket.Conn and the urlStr that was used
624
+ // to connect to it. The gorilla/websocket code does not remember the URL
625
+ // that was used to connect to it, and many internal parts of Juju assume
626
+ // that it does.
627
+ type connWrapper struct {
628
+ conn * websocket.Conn
629
+ urlStr string
630
+ }
631
+
632
+ // This is defined for the parallel try to close other results.
633
+ func (c * connWrapper ) Close () error {
634
+ return c .conn .Close ()
567
635
}
568
636
569
637
// newWebsocketDialer0 returns a function that dials the websocket represented
570
638
// by the given configuration with the given dial options, suitable for passing
571
639
// to utils/parallel.Try.Start.
572
- func newWebsocketDialer (cfg * websocket. Config , opts DialOpts ) func (<- chan struct {}) (io.Closer , error ) {
640
+ func newWebsocketDialer (urlStr string , opts DialOpts ) func (<- chan struct {}) (io.Closer , error ) {
573
641
// TODO(katco): 2016-08-09: lp:1611427
574
642
openAttempt := utils.AttemptStrategy {
575
643
Total : opts .Timeout ,
@@ -587,11 +655,12 @@ func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct
587
655
return nil , parallel .ErrStopped
588
656
default :
589
657
}
590
- logger .Debugf ("dialing %q" , cfg .Location )
591
- conn , err := opts .DialWebsocket (cfg )
658
+ logger .Debugf ("dialing %q" , urlStr )
659
+ // Not passing through any extra header information
660
+ conn , _ , err := opts .DialWebsocket (urlStr , opts .tlsConfig , nil )
592
661
if err == nil {
593
- logger .Debugf ("successfully dialed %q" , cfg . Location )
594
- return conn , nil
662
+ logger .Debugf ("successfully dialed %q" , urlStr )
663
+ return & connWrapper { conn , urlStr } , nil
595
664
}
596
665
if isCertErr := isX509Error (err ); ! a .HasNext () || isCertErr {
597
666
// We won't reconnect when there's an X509
@@ -612,25 +681,20 @@ func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct
612
681
// isX509Error reports whether the given websocket error
613
682
// results from an X509 problem.
614
683
func isX509Error (err error ) bool {
615
- wsErr , ok := errors .Cause (err ).(* websocket.DialError )
616
- if ! ok {
617
- return false
618
- }
619
- switch wsErr .Err .(type ) {
620
- case x509.HostnameError ,
684
+ switch errType := err .(type ) {
685
+ case * websocket.CloseError :
686
+ return errType .Code == websocket .CloseTLSHandshake
687
+ case x509.CertificateInvalidError ,
688
+ x509.HostnameError ,
621
689
x509.InsecureAlgorithmError ,
622
690
x509.UnhandledCriticalExtension ,
623
691
x509.UnknownAuthorityError ,
624
692
x509.ConstraintViolationError ,
625
693
x509.SystemRootsError :
626
694
return true
695
+ default :
696
+ return false
627
697
}
628
- switch err {
629
- case x509 .ErrUnsupportedAlgorithm ,
630
- x509 .IncorrectPasswordError :
631
- return true
632
- }
633
- return false
634
698
}
635
699
636
700
type hasErrorCode interface {
0 commit comments