Skip to content

Commit

Permalink
CAAS unit actions wait for container start.
Browse files Browse the repository at this point in the history
- CAAS broker can now watch on specific container start events.
- CAAS unit provisioner annotates pods with the unit name, this is used
to filter container start events until pod name/uid is associated with a
unit. Fixes a pod init race condition for pods created by k8s.
- Uniter resolver triggered by CAAS container start events.
- Action resolver defers running actions until container starts.
  • Loading branch information
hpidcock committed Oct 31, 2019
1 parent dbb5c98 commit 726ef78
Show file tree
Hide file tree
Showing 52 changed files with 1,290 additions and 334 deletions.
15 changes: 11 additions & 4 deletions api/caasoperator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,22 @@ func (c *Client) SetVersion(appName string, v version.Binary) error {
return results.OneError()
}

// WatchUnitStart watchs for Unit starts via the CAAS provider.
func (c *Client) WatchUnitStart(application string) (watcher.StringsWatcher, error) {
// WatchContainerStart watchs for Unit starts via the CAAS provider.
func (c *Client) WatchContainerStart(application string, containerName string) (watcher.StringsWatcher, error) {
applicationTag, err := applicationTag(application)
if err != nil {
return nil, errors.Trace(err)
}
args := entities(applicationTag)
args := params.WatchContainerStartArgs{
Args: []params.WatchContainerStartArg{{
Entity: params.Entity{
Tag: applicationTag.String(),
},
Container: containerName,
}},
}
var results params.StringsWatchResults
if err := c.facade.FacadeCall("WatchUnitStart", args, &results); err != nil {
if err := c.facade.FacadeCall("WatchContainerStart", args, &results); err != nil {
return nil, err
}
if n := len(results.Results); n != 1 {
Expand Down
13 changes: 8 additions & 5 deletions api/caasoperator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,13 @@ func (s *operatorSuite) TestWatchUnitStart(c *gc.C) {
c.Check(objType, gc.Equals, "CAASOperator")
c.Check(version, gc.Equals, 0)
c.Check(id, gc.Equals, "")
c.Check(request, gc.Equals, "WatchUnitStart")
c.Assert(arg, jc.DeepEquals, params.Entities{
Entities: []params.Entity{{
Tag: "application-gitlab",
c.Check(request, gc.Equals, "WatchContainerStart")
c.Assert(arg, jc.DeepEquals, params.WatchContainerStartArgs{
Args: []params.WatchContainerStartArg{{
Entity: params.Entity{
Tag: "application-gitlab",
},
Container: "container",
}},
})
c.Assert(result, gc.FitsTypeOf, &params.StringsWatchResults{})
Expand All @@ -353,7 +356,7 @@ func (s *operatorSuite) TestWatchUnitStart(c *gc.C) {
})

client := caasoperator.NewClient(apiCaller)
watcher, err := client.WatchUnitStart("gitlab")
watcher, err := client.WatchContainerStart("gitlab", "container")
c.Assert(watcher, gc.IsNil)
c.Assert(err, gc.ErrorMatches, "FAIL")
}
19 changes: 10 additions & 9 deletions api/caasunitprovisioner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,23 +280,24 @@ func maybeNotFound(err *params.Error) error {

// UpdateUnits updates the state model to reflect the state of the units
// as reported by the cloud.
func (c *Client) UpdateUnits(arg params.UpdateApplicationUnits) error {
var result params.ErrorResults
func (c *Client) UpdateUnits(arg params.UpdateApplicationUnits) (*params.UpdateApplicationUnitsInfo, error) {
var result params.UpdateApplicationUnitResults
args := params.UpdateApplicationUnitArgs{Args: []params.UpdateApplicationUnits{arg}}
err := c.facade.FacadeCall("UpdateApplicationsUnits", args, &result)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
if len(result.Results) != len(args.Args) {
return errors.Errorf("expected %d result(s), got %d", len(args.Args), len(result.Results))
return nil, errors.Errorf("expected %d result(s), got %d", len(args.Args), len(result.Results))
}
if result.Results[0].Error == nil {
return nil
firstResult := result.Results[0]
if firstResult.Error == nil {
return firstResult.Info, nil
}
if params.IsCodeForbidden(result.Results[0].Error) {
return errors.NewForbidden(result.Results[0].Error, "")
if params.IsCodeForbidden(firstResult.Error) {
return firstResult.Info, errors.NewForbidden(firstResult.Error, "")
}
return maybeNotFound(result.Results[0].Error)
return firstResult.Info, maybeNotFound(firstResult.Error)
}

// UpdateApplicationService updates the state model to reflect the state of the application's
Expand Down
27 changes: 20 additions & 7 deletions api/caasunitprovisioner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,19 @@ func (s *unitprovisionerSuite) TestUpdateUnits(c *gc.C) {
},
},
})
c.Assert(result, gc.FitsTypeOf, &params.ErrorResults{})
*(result.(*params.ErrorResults)) = params.ErrorResults{
Results: []params.ErrorResult{{}},
c.Assert(result, gc.FitsTypeOf, &params.UpdateApplicationUnitResults{})
*(result.(*params.UpdateApplicationUnitResults)) = params.UpdateApplicationUnitResults{
Results: []params.UpdateApplicationUnitResult{{
Info: &params.UpdateApplicationUnitsInfo{
Units: []params.ApplicationUnitInfo{
{ProviderId: "uuid", UnitTag: "unit-gitlab-0"},
},
},
}},
}
return nil
})
err := client.UpdateUnits(params.UpdateApplicationUnits{
info, err := client.UpdateUnits(params.UpdateApplicationUnits{
ApplicationTag: names.NewApplicationTag("app").String(),
Units: []params.ApplicationUnitParams{
{ProviderId: "uuid", UnitTag: "unit-gitlab-0", Address: "address", Ports: []string{"port"},
Expand All @@ -350,25 +356,32 @@ func (s *unitprovisionerSuite) TestUpdateUnits(c *gc.C) {
})
c.Check(err, jc.ErrorIsNil)
c.Check(called, jc.IsTrue)
c.Check(info, jc.DeepEquals, &params.UpdateApplicationUnitsInfo{
Units: []params.ApplicationUnitInfo{
{ProviderId: "uuid", UnitTag: "unit-gitlab-0"},
},
})
}

func (s *unitprovisionerSuite) TestUpdateUnitsCount(c *gc.C) {
client := newClient(func(objType string, version int, id, request string, a, result interface{}) error {
*(result.(*params.ErrorResults)) = params.ErrorResults{
Results: []params.ErrorResult{
c.Assert(result, gc.FitsTypeOf, &params.UpdateApplicationUnitResults{})
*(result.(*params.UpdateApplicationUnitResults)) = params.UpdateApplicationUnitResults{
Results: []params.UpdateApplicationUnitResult{
{Error: &params.Error{Message: "FAIL"}},
{Error: &params.Error{Message: "FAIL"}},
},
}
return nil
})
err := client.UpdateUnits(params.UpdateApplicationUnits{
info, err := client.UpdateUnits(params.UpdateApplicationUnits{
ApplicationTag: names.NewApplicationTag("app").String(),
Units: []params.ApplicationUnitParams{
{ProviderId: "uuid", Address: "address"},
},
})
c.Check(err, gc.ErrorMatches, `expected 1 result\(s\), got 2`)
c.Assert(info, gc.IsNil)
}

func (s *unitprovisionerSuite) TestUpdateApplicationService(c *gc.C) {
Expand Down
4 changes: 2 additions & 2 deletions apiserver/facades/agent/caasoperator/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ type mockBroker struct {
watcher corewatcher.StringsWatcher
}

func (b *mockBroker) WatchUnitStart(appName string) (corewatcher.StringsWatcher, error) {
b.MethodCall(b, "WatchUnitStart", appName)
func (b *mockBroker) WatchContainerStart(appName string, containerName string) (corewatcher.StringsWatcher, error) {
b.MethodCall(b, "WatchContainerStart", appName, containerName)
return b.watcher, b.NextErr()
}

Expand Down
18 changes: 9 additions & 9 deletions apiserver/facades/agent/caasoperator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Facade struct {
}

type CAASBrokerInterface interface {
WatchUnitStart(appName string) (corewatcher.StringsWatcher, error)
WatchContainerStart(appName string, containerName string) (corewatcher.StringsWatcher, error)
}

// NewStateFacade provides the signature required for facade registration.
Expand Down Expand Up @@ -245,14 +245,14 @@ func (f *Facade) watchUnits(tagString string) (string, []string, error) {
return "", nil, watcher.EnsureErr(w)
}

// WatchUnitStart starts a StringWatcher to watch for Unit start events
// on the CAAS api.
func (f *Facade) WatchUnitStart(args params.Entities) (params.StringsWatchResults, error) {
// WatchContainerStart starts a StringWatcher to watch for container start events
// on the CAAS api for a specific application and container.
func (f *Facade) WatchContainerStart(args params.WatchContainerStartArgs) (params.StringsWatchResults, error) {
results := params.StringsWatchResults{
Results: make([]params.StringsWatchResult, len(args.Entities)),
Results: make([]params.StringsWatchResult, len(args.Args)),
}
for i, arg := range args.Entities {
id, changes, err := f.watchUnitStart(arg.Tag)
for i, arg := range args.Args {
id, changes, err := f.watchContainerStart(arg.Entity.Tag, arg.Container)
if err != nil {
results.Results[i].Error = common.ServerError(err)
continue
Expand All @@ -263,12 +263,12 @@ func (f *Facade) WatchUnitStart(args params.Entities) (params.StringsWatchResult
return results, nil
}

func (f *Facade) watchUnitStart(tagString string) (string, []string, error) {
func (f *Facade) watchContainerStart(tagString string, containerName string) (string, []string, error) {
tag, err := names.ParseApplicationTag(tagString)
if err != nil {
return "", nil, errors.Trace(err)
}
w, err := f.broker.WatchUnitStart(tag.Name)
w, err := f.broker.WatchContainerStart(tag.Name, containerName)
if err != nil {
return "", nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions apiserver/facades/agent/caasoperator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (s *CAASOperatorSuite) TestWatchAPIHostPorts(c *gc.C) {
s.st.CheckCallNames(c, "Model", "WatchAPIHostPortsForAgents")
}

func (s *CAASOperatorSuite) TestWatchUnitStart(c *gc.C) {
func (s *CAASOperatorSuite) TestWatchContainerStart(c *gc.C) {
s.st.app.unitsChanges <- []string{"gitlab/0", "gitlab/1"}

wc := make(chan []string, 1)
Expand All @@ -352,10 +352,10 @@ func (s *CAASOperatorSuite) TestWatchUnitStart(c *gc.C) {
},
}

results, err := s.facade.WatchUnitStart(params.Entities{
Entities: []params.Entity{
{Tag: "application-gitlab"},
{Tag: "unit-gitlab-0"},
results, err := s.facade.WatchContainerStart(params.WatchContainerStartArgs{
Args: []params.WatchContainerStartArg{
{Entity: params.Entity{Tag: "application-gitlab"}, Container: "container"},
{Entity: params.Entity{Tag: "unit-gitlab-0"}, Container: "container"},
},
})
c.Assert(err, jc.ErrorIsNil)
Expand All @@ -365,7 +365,7 @@ func (s *CAASOperatorSuite) TestWatchUnitStart(c *gc.C) {
Message: `"unit-gitlab-0" is not a valid application tag`,
})

s.broker.CheckCall(c, 0, "WatchUnitStart", "gitlab")
s.broker.CheckCall(c, 0, "WatchContainerStart", "gitlab", "container")

c.Assert(results.Results[0].StringsWatcherId, gc.Equals, "1")
c.Assert(results.Results[0].Changes, jc.DeepEquals, []string{"gitlab/1"})
Expand Down
52 changes: 40 additions & 12 deletions apiserver/facades/controller/caasunitprovisioner/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ func (f *Facade) getApplicationConfig(tagString string) (map[string]interface{},

// UpdateApplicationsUnits updates the Juju data model to reflect the given
// units of the specified application.
func (a *Facade) UpdateApplicationsUnits(args params.UpdateApplicationUnitArgs) (params.ErrorResults, error) {
result := params.ErrorResults{
Results: make([]params.ErrorResult, len(args.Args)),
func (a *Facade) UpdateApplicationsUnits(args params.UpdateApplicationUnitArgs) (params.UpdateApplicationUnitResults, error) {
result := params.UpdateApplicationUnitResults{
Results: make([]params.UpdateApplicationUnitResult, len(args.Args)),
}
if len(args.Args) == 0 {
return result, nil
Expand Down Expand Up @@ -487,12 +487,19 @@ func (a *Facade) UpdateApplicationsUnits(args params.UpdateApplicationUnitArgs)
continue
}
}
err = a.updateUnitsFromCloud(app, appUpdate.Scale, appUpdate.Generation, appUpdate.Units)
appUnitInfo, err := a.updateUnitsFromCloud(app, appUpdate.Scale, appUpdate.Generation, appUpdate.Units)
if err != nil {
// Mask any not found errors as the worker (caller) treats them specially
// and they are not relevant here.
result.Results[i].Error = common.ServerError(errors.Mask(err))
}

// Errors from SetScale will also include unit info.
if appUnitInfo != nil {
result.Results[i].Info = &params.UpdateApplicationUnitsInfo{
Units: appUnitInfo,
}
}
}
return result, nil
}
Expand Down Expand Up @@ -546,20 +553,21 @@ func (a *Facade) updateStatus(params params.ApplicationUnitParams) (
// source (typically a cloud update event) and merges that with the existing unit
// data model in state. The passed in units are the complete set for the cloud, so
// any existing units in state with provider ids which aren't in the set will be removed.
func (a *Facade) updateUnitsFromCloud(app Application, scale *int, generation *int64, unitUpdates []params.ApplicationUnitParams) error {
func (a *Facade) updateUnitsFromCloud(app Application, scale *int,
generation *int64, unitUpdates []params.ApplicationUnitParams) ([]params.ApplicationUnitInfo, error) {
logger.Debugf("unit updates: %#v", unitUpdates)
if scale != nil {
logger.Debugf("application scale: %v", *scale)
if *scale > 0 && len(unitUpdates) == 0 {
// no ops for empty units because we can not determine if it's stateful or not in this case.
logger.Debugf("ignoring empty k8s event for %q", app.Tag().String())
return nil
return nil, nil
}
}
// Set up the initial data structures.
existingStateUnits, err := app.AllUnits()
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
stateUnitsById := make(map[string]stateUnit)
cloudPodsById := make(map[string]params.ApplicationUnitParams)
Expand Down Expand Up @@ -596,7 +604,7 @@ func (a *Facade) updateUnitsFromCloud(app Application, scale *int, generation *i
var providerId string
info, err := u.ContainerInfo()
if err != nil && !errors.IsNotFound(err) {
return errors.Trace(err)
return nil, errors.Trace(err)
}
if err == nil {
providerId = info.ProviderId()
Expand Down Expand Up @@ -685,11 +693,31 @@ func (a *Facade) updateUnitsFromCloud(app Application, scale *int, generation *i
}

if err := a.updateStateUnits(app, unitInfo); err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}

var providerIds []string
for _, u := range unitUpdates {
providerIds = append(providerIds, u.ProviderId)
}
m, err := a.state.Model()
if err != nil {
return nil, errors.Trace(err)
}
containers, err := m.Containers(providerIds...)
if err != nil {
return nil, errors.Trace(err)
}
var appUnitInfo []params.ApplicationUnitInfo
for _, c := range containers {
appUnitInfo = append(appUnitInfo, params.ApplicationUnitInfo{
ProviderId: c.ProviderId(),
UnitTag: names.NewUnitTag(c.Unit()).String(),
})
}

if scale == nil {
return nil
return appUnitInfo, nil
}
// Update the scale last now that the state
// model accurately reflects the cluster pods.
Expand All @@ -699,9 +727,9 @@ func (a *Facade) updateUnitsFromCloud(app Application, scale *int, generation *i
gen = *generation
}
if currentScale != *scale {
return app.SetScale(*scale, gen, false)
return appUnitInfo, app.SetScale(*scale, gen, false)
}
return nil
return appUnitInfo, nil
}

type stateUnit struct {
Expand Down
Loading

0 comments on commit 726ef78

Please sign in to comment.