Skip to content

Commit

Permalink
Handle edge case as described in issue #328 (#335) (#338)
Browse files Browse the repository at this point in the history
* handle edge case as described in issue #328

* address review comments
  • Loading branch information
rishabh-11 authored Nov 25, 2024
1 parent c257e96 commit 7d34b21
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 62 deletions.
11 changes: 1 addition & 10 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,11 @@ func (mcm *mcmCloudProvider) checkMCMAvailableReplicas() error {
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (mcm *mcmCloudProvider) Refresh() error {

err := mcm.checkMCMAvailableReplicas()
if err != nil {
return err
}

for _, machineDeployment := range mcm.machinedeployments {
err := mcm.mcmManager.resetPriorityForNotToBeDeletedMachines(machineDeployment.Name)
if err != nil {
klog.Errorf("failed to reset priority for machines in MachineDeployment %s, err: %v", machineDeployment.Name, err.Error())
return err
}
}
return nil
return mcm.mcmManager.Refresh()
}

// GPULabel returns the label added to nodes with GPU resource.
Expand Down
71 changes: 59 additions & 12 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestDeleteNodes(t *testing.T) {
},
},
{
"should not scale down when machine deployment update call times out",
"should not scale down when machine deployment update call times out and should reset priority of the corresponding machine",
setup{
nodes: newNodes(2, "fakeID", []bool{true, false}),
machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}),
Expand All @@ -168,10 +168,10 @@ func TestDeleteNodes(t *testing.T) {
},
action{node: newNodes(1, "fakeID", []bool{true})[0]},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
mdName: "machinedeployment-1",
mdReplicas: 2,
err: fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %v", mdUpdateErrorMsg),
err: errors.Join(nil, fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %w", errors.New(mdUpdateErrorMsg))),
},
},
{
Expand Down Expand Up @@ -332,13 +332,13 @@ func TestDeleteNodes(t *testing.T) {
flag := false
for _, entryMachineItem := range entry.expect.machines {
if entryMachineItem.Name == machine.Name {
g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal(entryMachineItem.Annotations[priorityAnnotationKey]))
g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(entryMachineItem.Annotations[machinePriorityAnnotation]))
flag = true
break
}
}
if !flag {
g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal("3"))
g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal("3"))
}
}
})
Expand All @@ -357,7 +357,6 @@ func TestRefresh(t *testing.T) {
}
table := []data{
{

"should return an error if MCM has zero available replicas",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
Expand All @@ -371,7 +370,6 @@ func TestRefresh(t *testing.T) {
},
},
{

"should return an error if MCM deployment is not found",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
Expand All @@ -384,8 +382,7 @@ func TestRefresh(t *testing.T) {
},
},
{

"should reset priority of a machine with node without ToBeDeletedTaint to 3",
"should reset priority of a machine to 3 if machine deployment is not scaled in",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
Expand All @@ -399,14 +396,64 @@ func TestRefresh(t *testing.T) {
},
},
{
"should not reset priority of a machine to 3 if the node has ToBeDeleted taint",
"should reset priority of a machine to 3 if machine deployment is not scaled in even if ToBeDeletedTaint is present on the corresponding node",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
err: nil,
},
},
{
"should NOT skip paused machine deployment",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, &v1alpha1.MachineDeploymentStatus{
Conditions: []v1alpha1.MachineDeploymentCondition{
{Type: v1alpha1.MachineDeploymentProgressing, Status: v1alpha1.ConditionUnknown, Reason: machineDeploymentPausedReason},
},
}, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
err: nil,
},
},
{
"should ignore terminating/failed machines in checking if number of annotated machines is more than desired",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{
CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed},
}, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{
CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed},
}, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
err: nil,
},
},
{
"should not reset priority of a machine to 3 if machine deployment is scaled in",
setup{
nodes: newNodes(1, "fakeID", []bool{true}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 0, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
err: nil,
Expand All @@ -428,7 +475,7 @@ func TestRefresh(t *testing.T) {
},
expect{
machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)},
err: errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg)),
err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))),
},
},
}
Expand Down Expand Up @@ -461,7 +508,7 @@ func TestRefresh(t *testing.T) {
for _, mc := range entry.expect.machines {
machine, err := m.machineClient.Machines(m.namespace).Get(context.TODO(), mc.Name, metav1.GetOptions{})
g.Expect(err).To(BeNil())
g.Expect(mc.Annotations[priorityAnnotationKey]).To(Equal(machine.Annotations[priorityAnnotationKey]))
g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(machine.Annotations[machinePriorityAnnotation]))
}
})
}
Expand Down
109 changes: 73 additions & 36 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/discovery"
appsinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers"
Expand All @@ -77,8 +76,10 @@ const (
// defaultResetAnnotationTimeout is the timeout for resetting the priority annotation of a machine
defaultResetAnnotationTimeout = 10 * time.Second
// defaultPriorityValue is the default value for the priority annotation used by CA. It is set to 3 because MCM defaults the priority of machine it creates to 3.
defaultPriorityValue = "3"
minResyncPeriodDefault = 1 * time.Hour
defaultPriorityValue = "3"
// priorityValueForCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1.
priorityValueForCandidateMachines = "1"
minResyncPeriodDefault = 1 * time.Hour
// machinePriorityAnnotation is the annotation to set machine priority while deletion
machinePriorityAnnotation = "machinepriority.machine.sapcloud.io"
// kindMachineClass is the kind for generic machine class used by the OOT providers
Expand All @@ -91,13 +92,10 @@ const (
machineGroup = "machine.sapcloud.io"
// machineGroup is the API version used to identify machine API group objects
machineVersion = "v1alpha1"
// machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down.
// Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false".
machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing"
// newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete
newISAvailableReason = "NewMachineSetAvailable"
// conditionTrue means the given condition status is true
conditionTrue v1alpha1.ConditionStatus = "True"
// machineDeploymentPausedReason is the reason in "Progressing" condition when machineDeployment is paused
machineDeploymentPausedReason = "DeploymentPaused"
// machineDeploymentNameLabel key for Machine Deployment name in machine labels
machineDeploymentNameLabel = "name"
)
Expand Down Expand Up @@ -411,9 +409,52 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
}, nil
}

// Refresh does nothing at the moment.
// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired.
// It will select the machines to reset the priority based on the descending order of creation timestamp.
func (m *McmManager) Refresh() error {
return nil
machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything())
if err != nil {
klog.Errorf("[Refresh] unable to list machine deployments")
return err
}
var collectiveError error
for _, machineDeployment := range machineDeployments {
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(machineDeployment) {
klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name)
continue
}
replicas := machineDeployment.Spec.Replicas
// check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed.
machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
collectiveError = errors.Join(collectiveError, err)
continue
}
var machinesMarkedForDeletion []*v1alpha1.Machine
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines {
machinesMarkedForDeletion = append(machinesMarkedForDeletion, machine)
}
}
if int(replicas) > len(machines)-len(machinesMarkedForDeletion) {
slices.SortStableFunc(machinesMarkedForDeletion, func(m1, m2 *v1alpha1.Machine) int {
return -m1.CreationTimestamp.Compare(m2.CreationTimestamp.Time)
})
diff := int(replicas) - len(machines) + len(machinesMarkedForDeletion)
targetRefs := make([]*Ref, 0, diff)
for i := 0; i < min(diff, len(machinesMarkedForDeletion)); i++ {
targetRefs = append(targetRefs, &Ref{Name: machinesMarkedForDeletion[i].Name, Namespace: machinesMarkedForDeletion[i].Namespace})
}
collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs))
}
}
return collectiveError
}

// Cleanup does nothing at the moment.
Expand Down Expand Up @@ -449,7 +490,7 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy
return true, err
}

// DeleteMachines deletes the Machines and also reduces the desired replicas of the MachineDeployment in parallel.
// DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment.
func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
if len(targetMachineRefs) == 0 {
return nil
Expand All @@ -468,7 +509,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}
// update priorities of machines to be deleted except the ones already in termination to 1
scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs, commonMachineDeployment.Name)
scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
if err != nil {
return err
}
Expand All @@ -477,33 +518,26 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount)
}, "MachineDeployment", "update", commonMachineDeployment.Name)
if err != nil {
klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
return fmt.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err)
return errors.Join(err, m.resetPriorityForMachines(targetMachineRefs))
}
return nil
}

// resetPriorityForNotToBeDeletedMachines resets the priority of machines with nodes without ToBeDeleted taint to 3
func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error {
allMachinesForMachineDeployment, err := m.getMachinesForMachineDeployment(mdName)
if err != nil {
return fmt.Errorf("unable to list all machines for node group %s, Error: %v", mdName, err)
}
// resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue
func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
var collectiveError error
for _, machine := range allMachinesForMachineDeployment {
for _, mcRef := range mcRefs {
machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name)
if err != nil {
collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err))
continue
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout))
err := func() error {
err = func() error {
defer cancelFn()
val, ok := machine.Annotations[machinePriorityAnnotation]
if ok && val != defaultPriorityValue {
nodeName := machine.Labels[v1alpha1.NodeLabelKey]
node, err := m.nodeLister.Get(nodeName)
if err != nil && !kube_errors.IsNotFound(err) {
return fmt.Errorf("unable to get Node object %s for machine %s, Error: %v", nodeName, machine.Name, err)
} else if err == nil && taints.HasToBeDeletedTaint(node) {
// Don't update priority annotation if the taint is present on the node
return nil
}
_, err = m.updateAnnotationOnMachine(ctx, machine.Name, machinePriorityAnnotation, defaultPriorityValue)
return err
}
Expand All @@ -518,7 +552,7 @@ func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error
}

// prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref, mdName string) (int, error) {
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (int, error) {
var expectedToTerminateMachineNodePairs = make(map[string]string)
for _, machineRef := range targetMachineRefs {
// Trying to update the priority of machineRef till m.maxRetryTimeout
Expand All @@ -536,7 +570,7 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref, mdN
return false, nil
}
expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"]
return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, "1")
return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines)
}, "Machine", "update", machineRef.Name); err != nil {
klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
return 0, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
Expand Down Expand Up @@ -570,7 +604,7 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin
}
_, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err == nil {
klog.Infof("Machine %s marked with priority 1 successfully", mcName)
klog.Infof("Machine %s marked with priority %s successfully", mcName, val)
}
return true, err
}
Expand All @@ -594,7 +628,7 @@ func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName stri
mdclone.Spec.Replicas = expectedReplicas
_, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(ctx, mdclone, metav1.UpdateOptions{})
if err != nil {
return true, err
return true, fmt.Errorf("unable to scale in machine deployment %s, Error: %w", mdName, err)
}
klog.V(2).Infof("MachineDeployment %s size decreased to %d ", mdclone.Name, mdclone.Spec.Replicas)
return false, nil
Expand Down Expand Up @@ -885,9 +919,12 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine
func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason:
case cond.Type == v1alpha1.MachineDeploymentProgressing && cond.Status == v1alpha1.ConditionTrue && cond.Reason == newISAvailableReason:
return true
// NOTE:- This check is for paused machine deployments as is taken from MCM. If the check in MCM changes, this should be updated.
case cond.Type == v1alpha1.MachineDeploymentProgressing && cond.Status == v1alpha1.ConditionUnknown && cond.Reason == machineDeploymentPausedReason:
return true
case cond.Type == machineDeploymentProgressing:
case cond.Type == v1alpha1.MachineDeploymentProgressing:
return false
}
}
Expand Down
7 changes: 3 additions & 4 deletions cluster-autoscaler/cloudprovider/mcm/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
)

var (
testNamespace = "test-namespace"
priorityAnnotationKey = "machinepriority.machine.sapcloud.io"
testTaintValue = fmt.Sprint(time.Now().Unix())
testNamespace = "test-namespace"
testTaintValue = fmt.Sprint(time.Now().Unix())
)

func newMachineDeployments(
Expand Down Expand Up @@ -135,7 +134,7 @@ func newMachines(
{Name: msName},
},
Labels: map[string]string{machineDeploymentNameLabel: mdName},
Annotations: map[string]string{priorityAnnotationKey: priorityAnnotationValues[i]},
Annotations: map[string]string{machinePriorityAnnotation: priorityAnnotationValues[i]},
CreationTimestamp: metav1.Now(),
},
}
Expand Down

0 comments on commit 7d34b21

Please sign in to comment.