Skip to content

Commit

Permalink
Rename api state to conn
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyworld committed Aug 7, 2023
1 parent d0d5409 commit f64a714
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 175 deletions.
154 changes: 77 additions & 77 deletions api/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type rpcConnection interface {
Close() error
}

// state is the internal implementation of the Connection interface.
type state struct {
// conn is the internal implementation of the Connection interface.
type conn struct {
ctx context.Context
client rpcConnection
conn jsoncodec.JSONConn
Expand Down Expand Up @@ -247,7 +247,7 @@ func Open(info *Info, opts DialOpts) (Connection, error) {
host = dialResult.addr
}

st := &state{
st := &conn{
ctx: context.Background(),
client: client,
conn: dialResult.conn,
Expand Down Expand Up @@ -320,7 +320,7 @@ func PerferredHost(info *Info) string {
// if the context is cancelled.
// TODO(rogpeppe) pass Context into Login (and all API calls) so
// that this becomes unnecessary.
func loginWithContext(ctx context.Context, st *state, info *Info) error {
func loginWithContext(ctx context.Context, st *conn, info *Info) error {
result := make(chan error, 1)
go func() {
result <- st.Login(info.Tag, info.Password, info.Nonce, info.Macaroons)
Expand Down Expand Up @@ -353,20 +353,20 @@ func (t *hostSwitchingTransport) RoundTrip(req *http.Request) (*http.Response, e
return t.fallback.RoundTrip(req)
}

// Context returns the context associated with this state.
func (st *state) Context() context.Context {
return st.ctx
// Context returns the context associated with this conn.
func (c *conn) Context() context.Context {
return c.ctx
}

// ConnectStream implements StreamConnector.ConnectStream. The stream
// returned will apply a 30-second write deadline, so WriteJSON should
// only be called from one goroutine.
func (st *state) ConnectStream(path string, attrs url.Values) (base.Stream, error) {
path, err := apiPath(st.modelTag.Id(), path)
func (c *conn) ConnectStream(path string, attrs url.Values) (base.Stream, error) {
path, err := apiPath(c.modelTag.Id(), path)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := st.connectStreamWithRetry(path, attrs, nil)
conn, err := c.connectStreamWithRetry(path, attrs, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -378,22 +378,22 @@ func (st *state) ConnectStream(path string, attrs url.Values) (base.Stream, erro
// endpoint needs one) can be specified in the headers. The stream
// returned will apply a 30-second write deadline, so WriteJSON should
// only be called from one goroutine.
func (st *state) ConnectControllerStream(path string, attrs url.Values, headers http.Header) (base.Stream, error) {
func (c *conn) ConnectControllerStream(path string, attrs url.Values, headers http.Header) (base.Stream, error) {
if !strings.HasPrefix(path, "/") {
return nil, errors.Errorf("path %q is not absolute", path)
}
if strings.HasPrefix(path, modelRoot) {
return nil, errors.Errorf("path %q is model-specific", path)
}
conn, err := st.connectStreamWithRetry(path, attrs, headers)
conn, err := c.connectStreamWithRetry(path, attrs, headers)
if err != nil {
return nil, errors.Trace(err)
}
return conn, nil
}

func (st *state) connectStreamWithRetry(path string, attrs url.Values, headers http.Header) (base.Stream, error) {
if !st.isLoggedIn() {
func (c *conn) connectStreamWithRetry(path string, attrs url.Values, headers http.Header) (base.Stream, error) {
if !c.isLoggedIn() {
return nil, errors.New("cannot use ConnectStream without logging in")
}
// We use the standard "macaraq" macaroon authentication dance here.
Expand All @@ -402,18 +402,18 @@ func (st *state) connectStreamWithRetry(path string, attrs url.Values, headers h
// error, the response will contain a macaroon that, when discharged,
// may allow access, so we discharge it (using bakery.Client.HandleError)
// and try the request again.
conn, err := st.connectStream(path, attrs, headers)
conn, err := c.connectStream(path, attrs, headers)
if err == nil {
return conn, err
}
if params.ErrCode(err) != params.CodeDischargeRequired {
return nil, errors.Trace(err)
}
if err := st.bakeryClient.HandleError(st.ctx, st.cookieURL, bakeryError(err)); err != nil {
if err := c.bakeryClient.HandleError(c.ctx, c.cookieURL, bakeryError(err)); err != nil {
return nil, errors.Trace(err)
}
// Try again with the discharged macaroon.
conn, err = st.connectStream(path, attrs, headers)
conn, err = c.connectStream(path, attrs, headers)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -423,10 +423,10 @@ func (st *state) connectStreamWithRetry(path string, attrs url.Values, headers h
// connectStream is the internal version of ConnectStream. It differs from
// ConnectStream only in that it will not retry the connection if it encounters
// discharge-required error.
func (st *state) connectStream(path string, attrs url.Values, extraHeaders http.Header) (base.Stream, error) {
func (c *conn) connectStream(path string, attrs url.Values, extraHeaders http.Header) (base.Stream, error) {
target := url.URL{
Scheme: "wss",
Host: st.addr,
Host: c.addr,
Path: path,
RawQuery: attrs.Encode(),
}
Expand All @@ -436,22 +436,22 @@ func (st *state) connectStream(path string, attrs url.Values, extraHeaders http.

dialer := &websocket.Dialer{
Proxy: proxy.DefaultConfig.GetProxy,
TLSClientConfig: st.tlsConfig,
TLSClientConfig: c.tlsConfig,
}
var requestHeader http.Header
if st.tag != "" {
requestHeader = jujuhttp.BasicAuthHeader(st.tag, st.password)
if c.tag != "" {
requestHeader = jujuhttp.BasicAuthHeader(c.tag, c.password)
} else {
requestHeader = make(http.Header)
}
requestHeader.Set(params.JujuClientVersion, jujuversion.Current.String())
requestHeader.Set("Origin", "http://localhost/")
if st.nonce != "" {
requestHeader.Set(params.MachineNonceHeader, st.nonce)
if c.nonce != "" {
requestHeader.Set(params.MachineNonceHeader, c.nonce)
}
// Add any cookies because they will not be sent to websocket
// connections by default.
err := st.addCookiesToHeader(requestHeader)
err := c.addCookiesToHeader(requestHeader)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func readInitialStreamError(ws base.Stream) error {
// addCookiesToHeader adds any cookies associated with the
// API host to the given header. This is necessary because
// otherwise cookies are not sent to websocket endpoints.
func (st *state) addCookiesToHeader(h http.Header) error {
func (c *conn) addCookiesToHeader(h http.Header) error {
// net/http only allows adding cookies to a request,
// but when it sends a request to a non-http endpoint,
// it doesn't add the cookies, so make a request, starting
Expand All @@ -514,16 +514,16 @@ func (st *state) addCookiesToHeader(h http.Header) error {
req := &http.Request{
Header: h,
}
cookies := st.bakeryClient.Client.Jar.Cookies(st.cookieURL)
cookies := c.bakeryClient.Client.Jar.Cookies(c.cookieURL)
for _, c := range cookies {
req.AddCookie(c)
}
if len(cookies) == 0 && len(st.macaroons) > 0 {
if len(cookies) == 0 && len(c.macaroons) > 0 {
// These macaroons must have been added directly rather than
// obtained from a request. Add them. (For example in the
// logtransfer connection for a migration.)
// See https://bugs.launchpad.net/juju/+bug/1650451
for _, macaroon := range st.macaroons {
for _, macaroon := range c.macaroons {
cookie, err := httpbakery.NewCookie(coremacaroon.MacaroonNamespace, macaroon)
if err != nil {
return errors.Trace(err)
Expand All @@ -537,14 +537,14 @@ func (st *state) addCookiesToHeader(h http.Header) error {

// apiEndpoint returns a URL that refers to the given API slash-prefixed
// endpoint path and query parameters.
func (st *state) apiEndpoint(path, query string) (*url.URL, error) {
path, err := apiPath(st.modelTag.Id(), path)
func (c *conn) apiEndpoint(path, query string) (*url.URL, error) {
path, err := apiPath(c.modelTag.Id(), path)
if err != nil {
return nil, errors.Trace(err)
}
return &url.URL{
Scheme: st.serverScheme,
Host: st.Addr(),
Scheme: c.serverScheme,
Host: c.Addr(),
Path: path,
RawQuery: query,
}, nil
Expand All @@ -567,8 +567,8 @@ func apiURL(addr, model string) *url.URL {
}

// ping implements calls the Pinger.ping facade.
func (s *state) ping() error {
return s.APICall("Pinger", s.pingerFacadeVersion, "", "Ping", nil, nil)
func (c *conn) ping() error {
return c.APICall("Pinger", c.pingerFacadeVersion, "", "Ping", nil, nil)
}

// apiPath returns the given API endpoint path relative
Expand Down Expand Up @@ -1241,8 +1241,8 @@ func isX509Error(err error) bool {
// This fills out the rpc.Request on the given facade, version for a given
// object id, and the specific RPC method. It marshalls the Arguments, and will
// unmarshall the result into the response object that is supplied.
func (s *state) APICall(facade string, vers int, id, method string, args, response interface{}) error {
err := s.client.Call(rpc.Request{
func (c *conn) APICall(facade string, vers int, id, method string, args, response interface{}) error {
err := c.client.Call(rpc.Request{
Type: facade,
Version: vers,
Id: id,
Expand Down Expand Up @@ -1273,73 +1273,73 @@ func (s *state) APICall(facade string, vers int, id, method string, args, respon
jujuversion.Current.Major, jujuversion.Current.Minor, serverMajorVersion))
}

func (s *state) Close() error {
err := s.client.Close()
func (c *conn) Close() error {
err := c.client.Close()
select {
case <-s.closed:
case <-c.closed:
default:
close(s.closed)
close(c.closed)
}
<-s.broken
if s.proxier != nil {
s.proxier.Stop()
<-c.broken
if c.proxier != nil {
c.proxier.Stop()
}
return err
}

// BakeryClient implements api.Connection.
func (s *state) BakeryClient() base.MacaroonDischarger {
return s.bakeryClient
func (c *conn) BakeryClient() base.MacaroonDischarger {
return c.bakeryClient
}

// Broken implements api.Connection.
func (s *state) Broken() <-chan struct{} {
return s.broken
func (c *conn) Broken() <-chan struct{} {
return c.broken
}

// IsBroken implements api.Connection.
func (s *state) IsBroken() bool {
func (c *conn) IsBroken() bool {
select {
case <-s.broken:
case <-c.broken:
return true
default:
}
if err := s.ping(); err != nil {
if err := c.ping(); err != nil {
logger.Debugf("connection ping failed: %v", err)
return true
}
return false
}

// Addr returns the address used to connect to the API server.
func (s *state) Addr() string {
return s.addr
func (c *conn) Addr() string {
return c.addr
}

// IPAddr returns the resolved IP address that was used to
// connect to the API server.
func (s *state) IPAddr() string {
return s.ipAddr
func (c *conn) IPAddr() string {
return c.ipAddr
}

// IsProxied indicates if this connection was proxied
func (s *state) IsProxied() bool {
return s.proxier != nil
func (c *conn) IsProxied() bool {
return c.proxier != nil
}

// Proxy returns the proxy being used with this connection if one is being used.
func (s *state) Proxy() jujuproxy.Proxier {
return s.proxier
func (c *conn) Proxy() jujuproxy.Proxier {
return c.proxier
}

// ModelTag implements base.APICaller.ModelTag.
func (s *state) ModelTag() (names.ModelTag, bool) {
return s.modelTag, s.modelTag.Id() != ""
func (c *conn) ModelTag() (names.ModelTag, bool) {
return c.modelTag, c.modelTag.Id() != ""
}

// ControllerTag implements base.APICaller.ControllerTag.
func (s *state) ControllerTag() names.ControllerTag {
return s.controllerTag
func (c *conn) ControllerTag() names.ControllerTag {
return c.controllerTag
}

// APIHostPorts returns addresses that may be used to connect
Expand All @@ -1350,11 +1350,11 @@ func (s *state) ControllerTag() names.ControllerTag {
// Juju CLI, all addresses must be attempted, as the CLI may
// be invoked both within and outside the model (think
// private clouds).
func (s *state) APIHostPorts() []network.MachineHostPorts {
// NOTE: We're making a copy of s.hostPorts before returning it,
func (c *conn) APIHostPorts() []network.MachineHostPorts {
// NOTE: We're making a copy of c.hostPorts before returning it,
// for safety.
hostPorts := make([]network.MachineHostPorts, len(s.hostPorts))
for i, servers := range s.hostPorts {
hostPorts := make([]network.MachineHostPorts, len(c.hostPorts))
for i, servers := range c.hostPorts {
hostPorts[i] = append(network.MachineHostPorts{}, servers...)
}
return hostPorts
Expand All @@ -1364,8 +1364,8 @@ func (s *state) APIHostPorts() []network.MachineHostPorts {
// signed certificate will be used for TLS connection to the server.
// If empty, the private Juju CA certificate must be used to verify
// the connection.
func (s *state) PublicDNSName() string {
return s.publicDNSName
func (c *conn) PublicDNSName() string {
return c.publicDNSName
}

// BestFacadeVersion compares the versions of facades that we know about, and
Expand All @@ -1374,22 +1374,22 @@ func (s *state) PublicDNSName() string {
// TODO(jam) this is the eventual implementation of what version of a given
// Facade we will want to use. It needs to line up the versions that the server
// reports to us, with the versions that our client knows how to use.
func (s *state) BestFacadeVersion(facade string) int {
return bestVersion(facadeVersions[facade], s.facadeVersions[facade])
func (c *conn) BestFacadeVersion(facade string) int {
return bestVersion(facadeVersions[facade], c.facadeVersions[facade])
}

// serverRoot returns the cached API server address and port used
// to login, prefixed with "<URI scheme>://" (usually https).
func (s *state) serverRoot() string {
return s.serverScheme + "://" + s.serverRootAddress
func (c *conn) serverRoot() string {
return c.serverScheme + "://" + c.serverRootAddress
}

func (s *state) isLoggedIn() bool {
return atomic.LoadInt32(&s.loggedIn) == 1
func (c *conn) isLoggedIn() bool {
return atomic.LoadInt32(&c.loggedIn) == 1
}

func (s *state) setLoggedIn() {
atomic.StoreInt32(&s.loggedIn, 1)
func (c *conn) setLoggedIn() {
atomic.StoreInt32(&c.loggedIn, 1)
}

// emptyDNSCache implements DNSCache by
Expand Down
Loading

0 comments on commit f64a714

Please sign in to comment.