Skip to content

Commit

Permalink
Fix: support collect log files from pod pv when container runtime is …
Browse files Browse the repository at this point in the history
…containerd
  • Loading branch information
linshiyx committed Aug 8, 2023
1 parent 71e3a7e commit 162505b
Showing 1 changed file with 85 additions and 19 deletions.
104 changes: 85 additions & 19 deletions pkg/discovery/kubernetes/helper/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package helper
import (
"bytes"
"context"
"encoding/json"
"fmt"
dockerclient "github.com/docker/docker/client"
"github.com/loggie-io/loggie/pkg/discovery/kubernetes/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"path/filepath"
"strings"
"time"

dockerclient "github.com/docker/docker/client"
"github.com/loggie-io/loggie/pkg/discovery/kubernetes/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
criapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -437,8 +440,8 @@ func nodePathByContainerPath(pathPattern string, pod *corev1.Pod, volumeName str
return getEmptyDirNodePath(pathPattern, pod, volumeName, volumeMountPath, kubeletRootDir, subPathRes), nil
}

// If pod mount pvc as log path,we need set rootFsCollectionEnabled to true, and container runtime should be docker.
if vol.PersistentVolumeClaim != nil && rootFsCollectionEnabled && containerRuntime.Name() == runtime.RuntimeDocker {
// If pod mount pvc as log path,we need set rootFsCollectionEnabled to true.
if vol.PersistentVolumeClaim != nil && rootFsCollectionEnabled {
return getPVNodePath(pathPattern, volumeMountPath, containerId, containerRuntime)
}

Expand All @@ -463,25 +466,88 @@ func getEmptyDirNodePath(pathPattern string, pod *corev1.Pod, volumeName string,
// Find the actual path on the node based on pvc.
func getPVNodePath(pathPattern string, volumeMountPath string, containerId string, containerRuntime runtime.Runtime) (string, error) {
ctx := context.Background()
if containerRuntime == nil {
return "", errors.New("docker runtime is not initial")
}
if containerRuntime.Name() == runtime.RuntimeDocker {
cli := containerRuntime.Client().(*dockerclient.Client)
containerJson, err := cli.ContainerInspect(ctx, containerId)
if err != nil {
return "", errors.Errorf("containerId: %s, docker inspect error: %s", containerId, err)
}

cli := containerRuntime.Client().(*dockerclient.Client)
containerJson, err := cli.ContainerInspect(ctx, containerId)
if err != nil {
return "", errors.Errorf("containerId: %s, docker inspect error: %s", containerId, err)
}
for _, mnt := range containerJson.Mounts {
if !PathEqual(mnt.Destination, volumeMountPath) {
continue
}

for _, mnt := range containerJson.Mounts {
if !PathEqual(mnt.Destination, volumeMountPath) {
continue
pathSuffix := strings.TrimPrefix(pathPattern, volumeMountPath)
return filepath.Join(mnt.Source, pathSuffix), nil
}
return "", errors.New("cannot find pv volume path in node")
} else if containerRuntime.Name() == runtime.RuntimeContainerd {
cli := containerRuntime.Client().(criapi.RuntimeServiceClient)

request := &criapi.ContainerStatusRequest{
ContainerId: containerId,
Verbose: true,
}

pathSuffix := strings.TrimPrefix(pathPattern, volumeMountPath)
return filepath.Join(mnt.Source, pathSuffix), nil
response, err := cli.ContainerStatus(ctx, request)
if err != nil {
return "", errors.WithMessagef(err, "get container(id: %s) status failed", containerId)
}

infoStr, ok := response.GetInfo()["info"]
if !ok {
if log.IsDebugLevel() {
info, _ := json.Marshal(response.GetInfo())
log.Debug("get info: %s from container(id: %s)", string(info), containerId)
}
return "", errors.Errorf("cannot get info from container(id: %s) status", containerId)
}

infoMap := make(map[string]interface{})
if err := json.Unmarshal([]byte(infoStr), &infoMap); err != nil {
return "", errors.WithMessagef(err, "get info from container(id: %s)", containerId)
}
configIf, ok := infoMap["config"]
if !ok {
return "", errors.Errorf("cannot get config from container(id: %s) status", containerId)
}
configMap, ok := configIf.(map[string]interface{})
if !ok {
return "", errors.Errorf("cannot get config map from container(id: %s) status", containerId)
}
mountsIf, ok := configMap["mounts"]
if !ok {
return "", errors.Errorf("cannot get mounts from container(id: %s) status", containerId)
}
mountsSlice, ok := mountsIf.([]interface{})
if !ok {
return "", errors.Errorf("cannot get mounts slice from container(id: %s) status", containerId)
}
for _, mntIf := range mountsSlice {
mnt, ok := mntIf.(map[string]interface{})
if !ok {
return "", errors.Errorf("cannot get mount from container(id: %s) status", containerId)
}
container_path, ok := mnt["container_path"].(string)
if !ok {
return "", errors.Errorf("cannot get container_path from container(id: %s) status", containerId)
}
host_path, ok := mnt["host_path"].(string)
if !ok {
return "", errors.Errorf("cannot get host_path from container(id: %s) status", containerId)
}
if !PathEqual(container_path, volumeMountPath) {
continue
}

pathSuffix := strings.TrimPrefix(pathPattern, volumeMountPath)
return filepath.Join(host_path, pathSuffix), nil
}
return "", errors.New("cannot find pv volume path in node")
} else {
return "", errors.New("docker or containerd runtime is not initial")
}
return "", errors.New("cannot find pv volume path in node")
}

func GetMatchedPodLabel(labelKeys []string, pod *corev1.Pod) map[string]string {
Expand Down

0 comments on commit 162505b

Please sign in to comment.