Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/api/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ func (self *ResourceList) NvidiaGPU() *resource.Quantity {
}
return &resource.Quantity{}
}

func (self *ResourceList) StorageOverlay() *resource.Quantity {
if val, ok := (*self)[ResourceStorageOverlay]; ok {
return &val
}
return &resource.Quantity{}
}
7 changes: 7 additions & 0 deletions pkg/api/v1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ func (self *ResourceList) NvidiaGPU() *resource.Quantity {
}
return &resource.Quantity{}
}

func (self *ResourceList) StorageOverlay() *resource.Quantity {
if val, ok := (*self)[ResourceStorageOverlay]; ok {
return &val
}
return &resource.Quantity{}
}
1 change: 1 addition & 0 deletions pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3299,6 +3299,7 @@ func validateResourceName(value string, fldPath *field.Path) field.ErrorList {
// Refer to docs/design/resources.md for more details.
func validateContainerResourceName(value string, fldPath *field.Path) field.ErrorList {
allErrs := validateResourceName(value, fldPath)

if len(strings.Split(value, "/")) == 1 {
if !helper.IsStandardContainerResourceName(value) {
return append(allErrs, field.Invalid(fldPath, value, "must be a standard resource for containers"))
Expand Down
127 changes: 117 additions & 10 deletions pkg/kubelet/eviction/eviction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -151,9 +152,9 @@ func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePod
// start the eviction manager monitoring
go func() {
for {
if evictedPod := m.synchronize(diskInfoProvider, podFunc, nodeProvider); evictedPod != nil {
glog.Infof("eviction manager: pod %s evicted, waiting for pod to be cleaned up", format.Pod(evictedPod))
m.waitForPodCleanup(podCleanedUpFunc, evictedPod)
if evictedPods := m.synchronize(diskInfoProvider, podFunc, nodeProvider); evictedPods != nil {
glog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
Expand Down Expand Up @@ -210,7 +211,7 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, observatio

// synchronize is the main control loop that enforces eviction thresholds.
// Returns the pod that was killed, or nil if no pod was killed.
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, nodeProvider NodeProvider) *v1.Pod {
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, nodeProvider NodeProvider) []*v1.Pod {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 {
Expand Down Expand Up @@ -309,6 +310,14 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
m.lastObservations = observations
m.Unlock()

// evict pods if there is a resource uage violation from local volume temporary storage
// If eviction happenes in localVolumeEviction function, skip the rest of eviction action
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
if evictedPods := m.localStorageEviction(activePods); len(evictedPods) > 0 {
return evictedPods
}
}

// determine the set of resources under starvation
starvedResources := getStarvedResources(thresholds)
if len(starvedResources) == 0 {
Expand Down Expand Up @@ -387,24 +396,29 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
if err != nil {
glog.Warningf("eviction manager: error while evicting pod %s: %v", format.Pod(pod), err)
}
return pod
return []*v1.Pod{pod}
}
glog.Infof("eviction manager: unable to evict any pods from the node")
return nil
}

func (m *managerImpl) waitForPodCleanup(podCleanedUpFunc PodCleanedUpFunc, pod *v1.Pod) {
func (m *managerImpl) waitForPodsCleanup(podCleanedUpFunc PodCleanedUpFunc, pods []*v1.Pod) {
timeout := m.clock.NewTimer(podCleanupTimeout)
tick := m.clock.Tick(podCleanupPollFreq)
for {
select {
case <-timeout.C():
glog.Warningf("eviction manager: timed out waiting for pod %s to be cleaned up", format.Pod(pod))
glog.Warningf("eviction manager: timed out waiting for pods %s to be cleaned up", format.Pods(pods))
return
case <-tick:
if podCleanedUpFunc(pod) {
glog.Infof("eviction manager: pod %s successfully cleaned up", format.Pod(pod))
return
for i, pod := range pods {
if !podCleanedUpFunc(pod) {
break
}
if i == len(pods)-1 {
glog.Infof("eviction manager: pods %s successfully cleaned up", format.Pods(pods))
return
}
}
}
}
Expand Down Expand Up @@ -436,3 +450,96 @@ func (m *managerImpl) reclaimNodeLevelResources(resourceToReclaim v1.ResourceNam
}
return false
}

// localStorageEviction checks the EmptyDir volume usage for each pod and determine whether it exceeds the specified limit and needs
// to be evicted. It also checks every container in the pod, if the container overlay usage exceeds the limit, the pod will be evicted too.
func (m *managerImpl) localStorageEviction(pods []*v1.Pod) []*v1.Pod {
summary, err := m.summaryProvider.Get()
if err != nil {
glog.Errorf("Could not get summary provider")
return nil
}

statsFunc := cachedStatsFunc(summary.Pods)
evicted := []*v1.Pod{}
for _, pod := range pods {
podStats, ok := statsFunc(pod)
if !ok {
continue
}

if m.emptyDirLimitEviction(podStats, pod) {
evicted = append(evicted, pod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd that the same pod might end up being appended multiple times. Is there any de-duping happening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. If emptyDirLimitEviction returns true, will skip the containerOverlayLimitEviction check

continue
}

if m.containerOverlayLimitEviction(podStats, pod) {
evicted = append(evicted, pod)
}
}

return evicted
}

func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
podVolumeUsed := make(map[string]*resource.Quantity)
for _, volume := range podStats.VolumeStats {
podVolumeUsed[volume.Name] = resource.NewQuantity(int64(*volume.UsedBytes), resource.BinarySI)
}
for i := range pod.Spec.Volumes {
source := &pod.Spec.Volumes[i].VolumeSource
if source.EmptyDir != nil {
size := source.EmptyDir.SizeLimit
used := podVolumeUsed[pod.Spec.Volumes[i].Name]
if used != nil && size.Sign() == 1 && used.Cmp(size) > 0 {
// the emptyDir usage exceeds the size limit, evict the pod
return m.evictPod(pod, v1.ResourceName("EmptyDir"), fmt.Sprintf("emptyDir usage exceeds the limit %q", size.String()))
}
}
}
return false
}

func (m *managerImpl) containerOverlayLimitEviction(podStats statsapi.PodStats, pod *v1.Pod) bool {
thresholdsMap := make(map[string]*resource.Quantity)
for _, container := range pod.Spec.Containers {
overlayLimit := container.Resources.Limits.StorageOverlay()
if overlayLimit != nil && overlayLimit.Value() != 0 {
thresholdsMap[container.Name] = overlayLimit
}
}

for _, containerStat := range podStats.Containers {
rootfs := diskUsage(containerStat.Rootfs)
if overlayThreshold, ok := thresholdsMap[containerStat.Name]; ok {
if overlayThreshold.Cmp(*rootfs) < 0 {
return m.evictPod(pod, v1.ResourceName("containerOverlay"), fmt.Sprintf("container's overlay usage exceeds the limit %q", overlayThreshold.String()))

}
}
}
return false
}

func (m *managerImpl) evictPod(pod *v1.Pod, resourceName v1.ResourceName, evictMsg string) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) &&
kubelettypes.IsCriticalPod(pod) && kubepod.IsStaticPod(pod) {
glog.Errorf("eviction manager: cannot evict a critical pod %s", format.Pod(pod))
return false
}
status := v1.PodStatus{
Phase: v1.PodFailed,
Message: fmt.Sprintf(message, resourceName),
Reason: reason,
}
// record that we are evicting the pod
m.recorder.Eventf(pod, v1.EventTypeWarning, reason, evictMsg)
gracePeriod := int64(0)
err := m.killPodFunc(pod, status, &gracePeriod)
if err != nil {
glog.Errorf("eviction manager: pod %s failed to evict %v", format.Pod(pod), err)
} else {
glog.Infof("eviction manager: pod %s is evicted successfully", format.Pod(pod))
}
return true
}
7 changes: 7 additions & 0 deletions staging/src/k8s.io/client-go/pkg/api/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ func (self *ResourceList) NvidiaGPU() *resource.Quantity {
}
return &resource.Quantity{}
}

func (self *ResourceList) StorageOverlay() *resource.Quantity {
if val, ok := (*self)[ResourceStorageOverlay]; ok {
return &val
}
return &resource.Quantity{}
}
7 changes: 7 additions & 0 deletions staging/src/k8s.io/client-go/pkg/api/v1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ func (self *ResourceList) NvidiaGPU() *resource.Quantity {
}
return &resource.Quantity{}
}

func (self *ResourceList) StorageOverlay() *resource.Quantity {
if val, ok := (*self)[ResourceStorageOverlay]; ok {
return &val
}
return &resource.Quantity{}
}
2 changes: 2 additions & 0 deletions test/e2e_node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_test(
"kubelet_test.go",
"lifecycle_hook_test.go",
"local_storage_allocatable_eviction_test.go",
"local_storage_isolation_eviction_test.go",
"log_path_test.go",
"memory_eviction_test.go",
"mirror_pod_test.go",
Expand Down Expand Up @@ -137,6 +138,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
Expand Down
Loading