Skip to content

Commit

Permalink
Merge pull request juju#12161 from wallyworld/k8s-leadership-flapping
Browse files Browse the repository at this point in the history
juju#12161

Fix 2 key issues with the k8s operator and uniter workers.
The first is the permission check used for the facade authoriser. If the unit was removed, a permission denied error was returned, causing the entire operator worker to bounce and mess up unit leadership. Instead, the tag of the requested unit is checked to see if it matches the authenticated application agent. Any subsequent sate api call will return not found if the unit is accessed. The logic to do the permission checks was duplicated in a few places, so is not extracted to a common method.
Also, the Remove() api call is now a no op if the unit has already been removed.

Secondly, when a unit goes dying and is destroyed, we check to see if the unit agent had started - if not we can short circuit the cleanup and remove directly. However, if there is contention for the machine execution lock and a lot of k8s unit churn caused by many pod-spec-set calls, the unit agent may not have updated agent status even though it had started. This would lead to juju removing the unit and leaving the unit agent running. This then leads to the issue in item one, where an operation on the unit gets a permission error and the entire operator worker restarts. This causes the leadership tracker worker to flip the unit leader to a minion and messes up leadership tracking.
The issue is fixed by adjusting the initial agent state to "installing agent". This is then updated to "initialising agent" when the unit agent starts, allowing juju to know whether to short circuit a unit destroy operation or not.

## QA steps

bootstrap k8s
deploy kubeflow

## Bug reference

https://bugs.launchpad.net/bugs/1898966
  • Loading branch information
jujubot authored Oct 20, 2020
2 parents e0f20aa + a3ce027 commit 717fb79
Show file tree
Hide file tree
Showing 24 changed files with 262 additions and 128 deletions.
6 changes: 5 additions & 1 deletion api/caasoperator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ func (c *Client) RemoveUnit(unitName string) error {
if err != nil {
return err
}
return result.OneError()
resultErr := result.OneError()
if params.IsCodeNotFound(resultErr) {
return nil
}
return resultErr
}

// Life returns the lifecycle state for the specified CAAS application
Expand Down
28 changes: 28 additions & 0 deletions api/caasoperator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ func (s *operatorSuite) TestRemoveUnit(c *gc.C) {
c.Assert(called, jc.IsTrue)
}

func (s *operatorSuite) TestRemoveUnitNotFound(c *gc.C) {
called := false
apiCaller := basetesting.APICallerFunc(func(objType string, version int, id, request string, arg, result interface{}) error {
c.Check(objType, gc.Equals, "CAASOperator")
c.Check(version, gc.Equals, 0)
c.Check(id, gc.Equals, "")
c.Check(request, gc.Equals, "Remove")
c.Check(arg, jc.DeepEquals, params.Entities{
Entities: []params.Entity{{
Tag: "unit-gitlab-0",
}},
})
c.Assert(result, gc.FitsTypeOf, &params.ErrorResults{})
*(result.(*params.ErrorResults)) = params.ErrorResults{
Results: []params.ErrorResult{{
Error: &params.Error{Code: params.CodeNotFound},
}},
}
called = true
return nil
})

client := caasoperator.NewClient(apiCaller)
err := client.RemoveUnit("gitlab/0")
c.Assert(err, jc.ErrorIsNil)
c.Assert(called, jc.IsTrue)
}

func (s *operatorSuite) TestRemoveUnitInvalidUnitName(c *gc.C) {
client := caasoperator.NewClient(basetesting.APICallerFunc(func(_ string, _ int, _, _ string, _, _ interface{}) error {
return errors.New("should not be called")
Expand Down
2 changes: 1 addition & 1 deletion apiserver/common/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Remover) removeEntity(tag names.Tag) error {
if !ok {
return NotSupportedError(tag, "removal")
}
// Only remove entites that are not Alive.
// Only remove entities that are not Alive.
if life := remover.Life(); life == state.Alive {
return fmt.Errorf("cannot remove entity %q: still alive", tag.String())
}
Expand Down
72 changes: 72 additions & 0 deletions apiserver/common/unitcommon/accessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2020 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package unitcommon

import (
"github.com/juju/errors"
"github.com/juju/names/v4"

"github.com/juju/juju/apiserver/common"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/state"
)

// ApplicationGetter provides a method
// to determine if an application exists.
type ApplicationGetter interface {
ApplicationExists(string) error
}

type stateApplicationGetter interface {
Application(string) (*state.Application, error)
}

// Backend returns an application abstraction for a
// given state.State instance.
func Backend(st stateApplicationGetter) ApplicationGetter {
return backend{st}
}

type backend struct {
stateApplicationGetter
}

// Application implements ApplicationGetter.
func (b backend) ApplicationExists(name string) error {
_, err := b.stateApplicationGetter.Application(name)
return err
}

// UnitAccessor returns an auth function which determines if the
// authenticated entity can access a unit or application.
func UnitAccessor(authorizer facade.Authorizer, st ApplicationGetter) common.GetAuthFunc {
return func() (common.AuthFunc, error) {
switch tag := authorizer.GetAuthTag().(type) {
case names.ApplicationTag:
// If called by an application agent, any of the units
// belonging to that application can be accessed.
appName := tag.Name
err := st.ApplicationExists(appName)
if err != nil {
return nil, errors.Trace(err)
}
return func(tag names.Tag) bool {
if tag.Kind() != names.UnitTagKind {
return false
}
unitApp, err := names.UnitApplication(tag.Id())
if err != nil {
return false
}
return unitApp == appName
}, nil
case names.UnitTag:
return func(tag names.Tag) bool {
return authorizer.AuthOwner(tag)
}, nil
default:
return nil, errors.Errorf("expected names.UnitTag or names.ApplicationTag, got %T", tag)
}
}
}
69 changes: 69 additions & 0 deletions apiserver/common/unitcommon/accessor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package unitcommon_test

import (
"github.com/juju/errors"
"github.com/juju/names/v4"
"github.com/juju/testing"
jc "github.com/juju/testing/checkers"
gc "gopkg.in/check.v1"

"github.com/juju/juju/apiserver/common/unitcommon"
apiservertesting "github.com/juju/juju/apiserver/testing"
)

type UnitAccessorSuite struct {
testing.IsolationSuite
}

var _ = gc.Suite(&UnitAccessorSuite{})

type appGetter struct {
exits bool
}

func (a appGetter) ApplicationExists(name string) error {
if a.exits {
return nil
}
return errors.NotFoundf("application %q", name)
}

func (s *UnitAccessorSuite) TestApplicationAgent(c *gc.C) {
auth := apiservertesting.FakeAuthorizer{
Tag: names.NewApplicationTag("gitlab"),
}
getAuthFunc := unitcommon.UnitAccessor(auth, appGetter{true})
authFunc, err := getAuthFunc()
c.Assert(err, jc.ErrorIsNil)
ok := authFunc(names.NewUnitTag("gitlab/0"))
c.Assert(ok, jc.IsTrue)
ok = authFunc(names.NewUnitTag("mysql/0"))
c.Assert(ok, jc.IsFalse)
}

func (s *UnitAccessorSuite) TestApplicationNotFound(c *gc.C) {
auth := apiservertesting.FakeAuthorizer{
Tag: names.NewApplicationTag("gitlab"),
}
getAuthFunc := unitcommon.UnitAccessor(auth, appGetter{false})
_, err := getAuthFunc()
c.Assert(err, jc.Satisfies, errors.IsNotFound)
}

func (s *UnitAccessorSuite) TestUnitAgent(c *gc.C) {
auth := apiservertesting.FakeAuthorizer{
Tag: names.NewUnitTag("gitlab/0"),
}
getAuthFunc := unitcommon.UnitAccessor(auth, appGetter{true})
authFunc, err := getAuthFunc()
c.Assert(err, jc.ErrorIsNil)
ok := authFunc(names.NewUnitTag("gitlab/0"))
c.Assert(ok, jc.IsTrue)
ok = authFunc(names.NewUnitTag("gitlab/1"))
c.Assert(ok, jc.IsFalse)
ok = authFunc(names.NewUnitTag("mysql/0"))
c.Assert(ok, jc.IsFalse)
}
14 changes: 14 additions & 0 deletions apiserver/common/unitcommon/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2020 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package unitcommon_test

import (
"testing"

gc "gopkg.in/check.v1"
)

func TestPackage(t *testing.T) {
gc.TestingT(t)
}
21 changes: 13 additions & 8 deletions apiserver/facades/agent/caasoperator/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func newMockState() *mockState {
st := &mockState{
entities: make(map[string]state.Entity),
app: mockApplication{
name: "gitlab",
life: state.Alive,
charm: mockCharm{
url: charm.MustParseURL("cs:gitlab-1"),
Expand Down Expand Up @@ -78,6 +79,17 @@ func (st *mockState) Application(id string) (caasoperator.Application, error) {
return &st.app, nil
}

func (st *mockState) ApplicationExists(id string) error {
st.MethodCall(st, "ApplicationExists", id)
if err := st.NextErr(); err != nil {
return err
}
if st.app.name != id {
return errors.NotFoundf("application %q", id)
}
return nil
}

func (st *mockState) Model() (caasoperator.Model, error) {
st.MethodCall(st, "Model")
if err := st.NextErr(); err != nil {
Expand Down Expand Up @@ -133,6 +145,7 @@ func (st *mockModel) Containers(providerIds ...string) ([]state.CloudContainer,

type mockApplication struct {
testing.Stub
name string
life state.Life
charm mockCharm
forceUpgrade bool
Expand Down Expand Up @@ -179,14 +192,6 @@ func (a *mockApplication) Watch() state.NotifyWatcher {
return a.watcher
}

func (a *mockApplication) AllUnits() ([]caasoperator.Unit, error) {
a.MethodCall(a, "AllUnits")
if err := a.NextErr(); err != nil {
return nil, err
}
return []caasoperator.Unit{&mockUnit{}}, nil
}

func (a *mockApplication) AgentTools() (*tools.Tools, error) {
return nil, errors.NotImplementedf("AgentTools")
}
Expand Down
32 changes: 6 additions & 26 deletions apiserver/facades/agent/caasoperator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/juju/names/v4"

"github.com/juju/juju/apiserver/common"
"github.com/juju/juju/apiserver/common/unitcommon"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/caas"
Expand Down Expand Up @@ -54,7 +55,9 @@ func NewStateFacade(ctx facade.Context) (*Facade, error) {
if err != nil {
return nil, errors.Annotate(err, "getting leadership client")
}
return NewFacade(resources, authorizer, stateShim{ctx.State()},
return NewFacade(resources, authorizer,
stateShim{ctx.State()},
unitcommon.Backend(ctx.State()),
caasBroker, leadershipRevoker)
}

Expand All @@ -63,6 +66,7 @@ func NewFacade(
resources facade.Resources,
authorizer facade.Authorizer,
st CAASOperatorState,
appGetter unitcommon.ApplicationGetter,
broker CAASBrokerInterface,
leadershipRevoker leadership.Revoker,
) (*Facade, error) {
Expand All @@ -77,31 +81,7 @@ func NewFacade(
common.AuthFuncForTagKind(names.ApplicationTagKind),
common.AuthFuncForTagKind(names.UnitTagKind),
)
accessUnit := func() (common.AuthFunc, error) {
switch tag := authorizer.GetAuthTag().(type) {
case names.ApplicationTag:
// Any of the units belonging to
// the application can be accessed.
app, err := st.Application(tag.Name)
if err != nil {
return nil, errors.Trace(err)
}
allUnits, err := app.AllUnits()
if err != nil {
return nil, errors.Trace(err)
}
return func(tag names.Tag) bool {
for _, u := range allUnits {
if u.Tag() == tag {
return true
}
}
return false
}, nil
default:
return nil, errors.Errorf("expected names.ApplicationTag, got %T", tag)
}
}
accessUnit := unitcommon.UnitAccessor(authorizer, appGetter)
return &Facade{
LifeGetter: common.NewLifeGetter(st, canRead),
APIAddresser: common.NewAPIAddresser(st, resources),
Expand Down
4 changes: 2 additions & 2 deletions apiserver/facades/agent/caasoperator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *CAASOperatorSuite) SetUpTest(c *gc.C) {
s.broker = &mockBroker{}
s.revoker = &mockLeadershipRevoker{revoked: set.NewStrings()}

facade, err := caasoperator.NewFacade(s.resources, s.authorizer, s.st, s.broker, s.revoker)
facade, err := caasoperator.NewFacade(s.resources, s.authorizer, s.st, s.st, s.broker, s.revoker)
c.Assert(err, jc.ErrorIsNil)
s.facade = facade
}
Expand All @@ -63,7 +63,7 @@ func (s *CAASOperatorSuite) TestPermission(c *gc.C) {
s.authorizer = &apiservertesting.FakeAuthorizer{
Tag: names.NewMachineTag("0"),
}
_, err := caasoperator.NewFacade(s.resources, s.authorizer, s.st, s.broker, nil)
_, err := caasoperator.NewFacade(s.resources, s.authorizer, s.st, s.st, s.broker, nil)
c.Assert(err, gc.ErrorMatches, "permission denied")
}

Expand Down
1 change: 0 additions & 1 deletion apiserver/facades/agent/caasoperator/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Application interface {
CharmModifiedVersion() int
SetOperatorStatus(status.StatusInfo) error
WatchUnits() state.StringsWatcher
AllUnits() ([]Unit, error)
}

// Charm provides the subset of charm state required by the
Expand Down
Loading

0 comments on commit 717fb79

Please sign in to comment.