Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 42 additions & 44 deletions pkg/cri/server/instrumented_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,25 @@ func (in *instrumentedService) checkInitialized() error {
// GRPC service request handlers should return error before server is fully
// initialized.
// NOTE(random-liu): All following functions MUST check initialized at the beginning.
func (in *instrumentedAlphaService) checkInitialized() error {
func (in *instrumentedAlphaService) checkInitialized(ctx context.Context) error {
in.emitUsageWarning(ctx)
if in.c.initialized.IsSet() {
return nil
}
return errors.New("server is not initialized yet")
}

// emitUsageWarning emits a warning when v1alpha2 cri-api is called.
func (in *instrumentedAlphaService) emitUsageWarning(ctx context.Context) {
// Only emit the warning the first time an v1alpha2 api is called
in.emitWarning.Do(func() {
log.G(ctx).Warning("CRI API v1alpha2 is deprecated since containerd v1.7 and removed in containerd v2.0. Use CRI API v1 instead.")
if in.warn != nil {
in.warn.Emit(ctx, deprecation.CRIAPIV1Alpha2)
}
})
}

func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
Expand All @@ -90,7 +102,7 @@ func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.Run
}

func (in *instrumentedAlphaService) RunPodSandbox(ctx context.Context, r *runtime_alpha.RunPodSandboxRequest) (res *runtime_alpha.RunPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("RunPodSandbox for %+v", r.GetConfig().GetMetadata())
Expand Down Expand Up @@ -143,7 +155,7 @@ func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.Li
}

func (in *instrumentedAlphaService) ListPodSandbox(ctx context.Context, r *runtime_alpha.ListPodSandboxRequest) (res *runtime_alpha.ListPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListPodSandbox with filter %+v", r.GetFilter())
Expand Down Expand Up @@ -196,7 +208,7 @@ func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.
}

func (in *instrumentedAlphaService) PodSandboxStatus(ctx context.Context, r *runtime_alpha.PodSandboxStatusRequest) (res *runtime_alpha.PodSandboxStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("PodSandboxStatus for %q", r.GetPodSandboxId())
Expand Down Expand Up @@ -249,7 +261,7 @@ func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.St
}

func (in *instrumentedAlphaService) StopPodSandbox(ctx context.Context, r *runtime_alpha.StopPodSandboxRequest) (res *runtime_alpha.StopPodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("StopPodSandbox for %q", r.GetPodSandboxId())
Expand Down Expand Up @@ -302,7 +314,7 @@ func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.
}

func (in *instrumentedAlphaService) RemovePodSandbox(ctx context.Context, r *runtime_alpha.RemovePodSandboxRequest) (res *runtime_alpha.RemovePodSandboxResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("RemovePodSandbox for %q", r.GetPodSandboxId())
Expand Down Expand Up @@ -355,7 +367,7 @@ func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortF
}

func (in *instrumentedAlphaService) PortForward(ctx context.Context, r *runtime_alpha.PortForwardRequest) (res *runtime_alpha.PortForwardResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("Portforward for %q port %v", r.GetPodSandboxId(), r.GetPort())
Expand Down Expand Up @@ -411,7 +423,7 @@ func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.C
}

func (in *instrumentedAlphaService) CreateContainer(ctx context.Context, r *runtime_alpha.CreateContainerRequest) (res *runtime_alpha.CreateContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("CreateContainer within sandbox %q for container %+v",
Expand Down Expand Up @@ -468,7 +480,7 @@ func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.St
}

func (in *instrumentedAlphaService) StartContainer(ctx context.Context, r *runtime_alpha.StartContainerRequest) (res *runtime_alpha.StartContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("StartContainer for %q", r.GetContainerId())
Expand Down Expand Up @@ -522,7 +534,7 @@ func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.Li
}

func (in *instrumentedAlphaService) ListContainers(ctx context.Context, r *runtime_alpha.ListContainersRequest) (res *runtime_alpha.ListContainersResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListContainers with filter %+v", r.GetFilter())
Expand Down Expand Up @@ -576,7 +588,7 @@ func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.C
}

func (in *instrumentedAlphaService) ContainerStatus(ctx context.Context, r *runtime_alpha.ContainerStatusRequest) (res *runtime_alpha.ContainerStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ContainerStatus for %q", r.GetContainerId())
Expand Down Expand Up @@ -629,7 +641,7 @@ func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.Sto
}

func (in *instrumentedAlphaService) StopContainer(ctx context.Context, r *runtime_alpha.StopContainerRequest) (res *runtime_alpha.StopContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout())
Expand Down Expand Up @@ -682,7 +694,7 @@ func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.R
}

func (in *instrumentedAlphaService) RemoveContainer(ctx context.Context, r *runtime_alpha.RemoveContainerRequest) (res *runtime_alpha.RemoveContainerResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("RemoveContainer for %q", r.GetContainerId())
Expand Down Expand Up @@ -735,7 +747,7 @@ func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSync
}

func (in *instrumentedAlphaService) ExecSync(ctx context.Context, r *runtime_alpha.ExecSyncRequest) (res *runtime_alpha.ExecSyncResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout())
Expand Down Expand Up @@ -789,7 +801,7 @@ func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest)
}

func (in *instrumentedAlphaService) Exec(ctx context.Context, r *runtime_alpha.ExecRequest) (res *runtime_alpha.ExecResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("Exec for %q with command %+v, tty %v and stdin %v",
Expand Down Expand Up @@ -843,7 +855,7 @@ func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequ
}

func (in *instrumentedAlphaService) Attach(ctx context.Context, r *runtime_alpha.AttachRequest) (res *runtime_alpha.AttachResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("Attach for %q with tty %v and stdin %v", r.GetContainerId(), r.GetTty(), r.GetStdin())
Expand Down Expand Up @@ -896,7 +908,7 @@ func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *
}

func (in *instrumentedAlphaService) UpdateContainerResources(ctx context.Context, r *runtime_alpha.UpdateContainerResourcesRequest) (res *runtime_alpha.UpdateContainerResourcesResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("UpdateContainerResources for %q with Linux: %+v / Windows: %+v", r.GetContainerId(), r.GetLinux(), r.GetWindows())
Expand Down Expand Up @@ -950,7 +962,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
}

func (in *instrumentedAlphaService) PullImage(ctx context.Context, r *runtime_alpha.PullImageRequest) (res *runtime_alpha.PullImageResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("PullImage %q", r.GetImage().GetImage())
Expand Down Expand Up @@ -1005,7 +1017,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
}

func (in *instrumentedAlphaService) ListImages(ctx context.Context, r *runtime_alpha.ListImagesRequest) (res *runtime_alpha.ListImagesResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListImages with filter %+v", r.GetFilter())
Expand Down Expand Up @@ -1060,7 +1072,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
}

func (in *instrumentedAlphaService) ImageStatus(ctx context.Context, r *runtime_alpha.ImageStatusRequest) (res *runtime_alpha.ImageStatusResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ImageStatus for %q", r.GetImage().GetImage())
Expand Down Expand Up @@ -1114,7 +1126,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
}

func (in *instrumentedAlphaService) RemoveImage(ctx context.Context, r *runtime_alpha.RemoveImageRequest) (res *runtime_alpha.RemoveImageResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Infof("RemoveImage %q", r.GetImage().GetImage())
Expand Down Expand Up @@ -1167,7 +1179,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
}

func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_alpha.ImageFsInfoRequest) (res *runtime_alpha.ImageFsInfoResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("ImageFsInfo")
Expand Down Expand Up @@ -1220,7 +1232,7 @@ func (in *instrumentedService) PodSandboxStats(ctx context.Context, r *runtime.P
}

func (in *instrumentedAlphaService) PodSandboxStats(ctx context.Context, r *runtime_alpha.PodSandboxStatsRequest) (res *runtime_alpha.PodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("PodSandboxStats for %q", r.GetPodSandboxId())
Expand Down Expand Up @@ -1273,7 +1285,7 @@ func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.Co
}

func (in *instrumentedAlphaService) ContainerStats(ctx context.Context, r *runtime_alpha.ContainerStatsRequest) (res *runtime_alpha.ContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("ContainerStats for %q", r.GetContainerId())
Expand Down Expand Up @@ -1326,7 +1338,7 @@ func (in *instrumentedService) ListPodSandboxStats(ctx context.Context, r *runti
}

func (in *instrumentedAlphaService) ListPodSandboxStats(ctx context.Context, r *runtime_alpha.ListPodSandboxStatsRequest) (res *runtime_alpha.ListPodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListPodSandboxStats with filter %+v", r.GetFilter())
Expand Down Expand Up @@ -1379,7 +1391,7 @@ func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtim
}

func (in *instrumentedAlphaService) ListContainerStats(ctx context.Context, r *runtime_alpha.ListContainerStatsRequest) (res *runtime_alpha.ListContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListContainerStats with filter %+v", r.GetFilter())
Expand Down Expand Up @@ -1432,14 +1444,7 @@ func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequ
}

func (in *instrumentedAlphaService) Status(ctx context.Context, r *runtime_alpha.StatusRequest) (res *runtime_alpha.StatusResponse, err error) {
// Only emit the warning the first time an v1alpha2 api is called
in.emitWarning.Do(func() {
log.G(ctx).Warning("CRI API v1alpha2 is deprecated since containerd v1.7 and removed in containerd v2.0. Use CRI API v1 instead.")
if in.warn != nil {
in.warn.Emit(ctx, deprecation.CRIAPIV1Alpha2)
}
})
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("Status")
Expand Down Expand Up @@ -1492,14 +1497,7 @@ func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRe
}

func (in *instrumentedAlphaService) Version(ctx context.Context, r *runtime_alpha.VersionRequest) (res *runtime_alpha.VersionResponse, err error) {
// Only emit the warning the first time the v1alpha2 api is called
in.emitWarning.Do(func() {
log.G(ctx).Warning("CRI API v1alpha2 is deprecated since containerd v1.7 and removed in containerd v2.0. Use CRI API v1 instead.")
if in.warn != nil {
in.warn.Emit(ctx, deprecation.CRIAPIV1Alpha2)
}
})
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Tracef("Version with client side version %q", r.GetVersion())
Expand Down Expand Up @@ -1531,7 +1529,7 @@ func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runti
}

func (in *instrumentedAlphaService) UpdateRuntimeConfig(ctx context.Context, r *runtime_alpha.UpdateRuntimeConfigRequest) (res *runtime_alpha.UpdateRuntimeConfigResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("UpdateRuntimeConfig with config %+v", r.GetRuntimeConfig())
Expand Down Expand Up @@ -1584,7 +1582,7 @@ func (in *instrumentedService) ReopenContainerLog(ctx context.Context, r *runtim
}

func (in *instrumentedAlphaService) ReopenContainerLog(ctx context.Context, r *runtime_alpha.ReopenContainerLogRequest) (res *runtime_alpha.ReopenContainerLogResponse, err error) {
if err := in.checkInitialized(); err != nil {
if err := in.checkInitialized(ctx); err != nil {
return nil, err
}
log.G(ctx).Debugf("ReopenContainerLog for %q", r.GetContainerId())
Expand Down