Path: blob/main/components/ws-daemon/pkg/container/containerd.go
2499 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package container56import (7"context"8"encoding/json"9"errors"10"fmt"11"path/filepath"12"regexp"13"strconv"14"strings"15"sync"16"time"1718"github.com/containerd/containerd"19"github.com/containerd/containerd/api/events"20"github.com/containerd/containerd/api/services/tasks/v1"21"github.com/containerd/containerd/api/types"22"github.com/containerd/containerd/api/types/task"23"github.com/containerd/containerd/containers"24"github.com/containerd/containerd/errdefs"25"github.com/containerd/containerd/images"26"github.com/containerd/platforms"27"github.com/containerd/typeurl/v2"28ocispecs "github.com/opencontainers/runtime-spec/specs-go"29"github.com/opentracing/opentracing-go"30"golang.org/x/xerrors"3132wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"33"github.com/gitpod-io/gitpod/common-go/log"34"github.com/gitpod-io/gitpod/common-go/tracing"35workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"36)3738const (39kubernetesNamespace = "k8s.io"40containerLabelCRIKind = "io.cri-containerd.kind"41containerLabelK8sContainerName = "io.kubernetes.container.name"42containerLabelK8sPodName = "io.kubernetes.pod.name"43)4445// NewContainerd creates a new containerd adapter46func NewContainerd(cfg *ContainerdConfig, pathMapping PathMapping, registryFacadeHost string) (*Containerd, error) {47cc, err := containerd.New(cfg.SocketPath, containerd.WithDefaultNamespace(kubernetesNamespace))48if err != nil {49return nil, xerrors.Errorf("cannot connect to containerd at %s: %w", cfg.SocketPath, err)50}51ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)52defer cancel()53_, err = cc.Version(ctx)54if err != nil {55return nil, xerrors.Errorf("cannot connect to containerd: %w", err)56}5758res := &Containerd{59Client: cc,60Mapping: pathMapping,6162cond: sync.NewCond(&sync.Mutex{}),63cntIdx: make(map[string]*containerInfo),64podIdx: make(map[string]*containerInfo),65wsiIdx: make(map[string]*containerInfo),6667registryFacadeHost: registryFacadeHost,68}69go res.start()7071return res, nil72}7374// Containerd implements the ws-daemon CRI for containerd75type Containerd struct {76Client *containerd.Client77Mapping PathMapping7879cond *sync.Cond80podIdx map[string]*containerInfo81wsiIdx map[string]*containerInfo82cntIdx map[string]*containerInfo8384registryFacadeHost string85}8687type containerInfo struct {88WorkspaceID string89InstanceID string90OwnerID string91ID string92Snapshotter string93SnapshotKey string94PodName string95SeenTask bool96Rootfs string97UpperDir string98CGroupPath string99PID uint32100ImageRef string101}102103// start listening to containerd104func (s *Containerd) start() {105// Using the filter expression for subscribe does not seem to work. We simply don't get any events.106// That's ok as the event handler below are capable of ignoring any event that's not for them.107108reconnectionInterval := 2 * time.Second109for {110func() {111ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)112defer cancel()113114isServing, err := s.Client.IsServing(ctx)115if err != nil {116log.WithError(err).Error("cannot check if containerd is available")117time.Sleep(reconnectionInterval)118return119}120121if !isServing {122err := s.Client.Reconnect()123if err != nil {124log.WithError(err).Error("cannot reconnect to containerd")125time.Sleep(reconnectionInterval)126return127}128}129130cs, err := s.Client.ContainerService().List(ctx)131if err != nil {132log.WithError(err).Error("cannot list container")133time.Sleep(reconnectionInterval)134return135}136137// we have to loop through the containers twice because we don't know in which order138// the sandbox and workspace container are in. handleNewContainer expects to see the139// sandbox before the actual workspace. Hence, the first pass is for the sandboxes,140// the second pass for workspaces.141for _, c := range cs {142s.handleNewContainer(c)143}144for _, c := range cs {145s.handleNewContainer(c)146}147148tsks, err := s.Client.TaskService().List(ctx, &tasks.ListTasksRequest{})149if err != nil {150log.WithError(err).Error("cannot list tasks")151time.Sleep(reconnectionInterval)152return153}154for _, t := range tsks.Tasks {155s.handleNewTask(t.ID, nil, t.Pid)156}157158evts, errchan := s.Client.Subscribe(context.Background())159log.Info("containerd subscription established")160LOOP:161for {162select {163case evt := <-evts:164ev, err := typeurl.UnmarshalAny(evt.Event)165if err != nil {166log.WithError(err).Warn("cannot unmarshal containerd event")167continue168}169s.handleContainerdEvent(ev)170case err := <-errchan:171log.WithError(err).Error("lost connection to containerd - will attempt to reconnect")172time.Sleep(reconnectionInterval)173break LOOP174}175}176}()177}178}179180func (s *Containerd) handleContainerdEvent(ev interface{}) {181switch evt := ev.(type) {182case *events.ContainerCreate:183ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)184defer cancel()185186c, err := s.Client.ContainerService().Get(ctx, evt.ID)187if err != nil {188log.WithError(err).WithField("ID", evt.ID).WithField("containerImage", evt.Image).Warn("cannot find container we just received a create event for")189return190}191s.handleNewContainer(c)192case *events.TaskCreate:193s.handleNewTask(evt.ContainerID, evt.Rootfs, evt.Pid)194195case *events.TaskDelete:196197case *events.ContainerDelete:198s.cond.L.Lock()199defer s.cond.L.Unlock()200201info, ok := s.cntIdx[evt.ID]202if !ok {203return204}205delete(s.cntIdx, evt.ID)206delete(s.wsiIdx, info.InstanceID)207delete(s.podIdx, info.PodName)208}209}210211func (s *Containerd) handleNewContainer(c containers.Container) {212// TODO(cw): check kubernetes namespace213podName := c.Labels[containerLabelK8sPodName]214if podName == "" {215return216}217218if c.Labels[containerLabelCRIKind] == "sandbox" && c.Labels[wsk8s.WorkspaceIDLabel] != "" {219s.cond.L.Lock()220defer s.cond.L.Unlock()221222if _, ok := s.podIdx[podName]; ok {223// we've already seen the pod - no need to add it to the info again,224// thereby possibly overwriting previously attached info.225return226}227228var info *containerInfo229if _, ok := c.Labels["gpwsman"]; ok {230// this is a ws-manager-mk1 workspace231info = &containerInfo{232InstanceID: c.Labels[wsk8s.WorkspaceIDLabel],233OwnerID: c.Labels[wsk8s.OwnerLabel],234WorkspaceID: c.Labels[wsk8s.MetaIDLabel],235PodName: podName,236}237} else {238// this is a ws-manager-mk2 workspace239info = &containerInfo{240InstanceID: c.Labels["gitpod.io/instanceID"],241OwnerID: c.Labels[wsk8s.OwnerLabel],242WorkspaceID: c.Labels[wsk8s.WorkspaceIDLabel],243PodName: podName,244}245}246247if info.Snapshotter == "" {248// c.Snapshotter is optional249info.Snapshotter = "overlayfs"250}251252// Beware: the ID at this point is NOT the same as the ID of the actual workspace container.253// Here we're talking about the sandbox, not the "workspace" container.254s.podIdx[podName] = info255s.wsiIdx[info.InstanceID] = info256257log.WithField("podname", podName).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Debug("found sandbox - adding to label cache")258return259}260261if c.Labels[containerLabelCRIKind] == "container" && c.Labels[containerLabelK8sContainerName] == "workspace" {262s.cond.L.Lock()263defer s.cond.L.Unlock()264if _, ok := s.cntIdx[c.ID]; ok {265// we've already seen this container - no need to add it to the info again,266// thereby possibly overwriting previously attached info.267return268}269270info, ok := s.podIdx[podName]271if !ok {272// we haven't seen this container's sandbox, hence have no info about it273return274}275276var err error277info.CGroupPath, err = ExtractCGroupPathFromContainer(c)278if err != nil {279log.WithError(err).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Warn("cannot extract cgroup path")280}281282info.ID = c.ID283info.SnapshotKey = c.SnapshotKey284info.Snapshotter = c.Snapshotter285info.ImageRef = c.Image286287s.cntIdx[c.ID] = info288log.WithField("podname", podName).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).WithField("ID", c.ID).Debug("found workspace container - updating label cache")289}290}291292func (s *Containerd) handleNewTask(cid string, rootfs []*types.Mount, pid uint32) {293s.cond.L.Lock()294defer s.cond.L.Unlock()295296info, ok := s.cntIdx[cid]297if !ok {298// we don't care for this task as we haven't seen a workspace container for it299return300}301if info.SeenTask {302// we've already seen this task - no need to add it to the info again,303// thereby possibly overwriting previously attached info.304return305}306307if rootfs == nil {308ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)309mnts, err := s.Client.SnapshotService(info.Snapshotter).Mounts(ctx, info.SnapshotKey)310cancel()311if err != nil {312log.WithError(err).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Warnf("cannot get mounts for container %v", cid)313}314for _, m := range mnts {315rootfs = append(rootfs, &types.Mount{316Source: m.Source,317Options: m.Options,318Type: m.Type,319})320}321}322323for _, rfs := range rootfs {324if rfs.Type != info.Snapshotter {325continue326}327for _, opt := range rfs.Options {328if !strings.HasPrefix(opt, "upperdir=") {329continue330}331info.UpperDir = strings.TrimPrefix(opt, "upperdir=")332break333}334if info.UpperDir != "" {335break336}337}338339info.PID = pid340info.SeenTask = true341342log.WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).WithField("cid", cid).WithField("upperdir", info.UpperDir).WithField("rootfs", info.Rootfs).Debug("found task")343s.cond.Broadcast()344}345346// WaitForContainer waits for workspace container to come into existence.347func (s *Containerd) WaitForContainer(ctx context.Context, workspaceInstanceID string) (cid ID, err error) {348//nolint:ineffassign349span, ctx := opentracing.StartSpanFromContext(ctx, "WaitForContainer")350span.LogKV("workspaceInstanceID", workspaceInstanceID)351defer tracing.FinishSpan(span, &err)352353rchan := make(chan ID, 1)354go func() {355s.cond.L.Lock()356defer s.cond.L.Unlock()357358for {359info, ok := s.wsiIdx[workspaceInstanceID]360361if ok && info.SeenTask {362select {363case rchan <- ID(info.ID):364default:365// just to make sure this isn't blocking and we're not holding366// the cond Lock too long.367}368369break370}371372if ctx.Err() != nil {373break374}375376s.cond.Wait()377}378}()379380select {381case cid = <-rchan:382return383case <-ctx.Done():384err = ctx.Err()385return386}387}388389// WaitForContainerStop waits for workspace container to be deleted.390func (s *Containerd) WaitForContainerStop(ctx context.Context, workspaceInstanceID string) (err error) {391//nolint:ineffassign392span, ctx := opentracing.StartSpanFromContext(ctx, "WaitForContainerStop")393span.LogKV("workspaceInstanceID", workspaceInstanceID)394defer tracing.FinishSpan(span, &err)395396rchan := make(chan struct{}, 1)397go func() {398s.cond.L.Lock()399defer s.cond.L.Unlock()400401_, ok := s.wsiIdx[workspaceInstanceID]402if !ok {403// container is already gone404return405}406407for {408s.cond.Wait()409_, ok := s.wsiIdx[workspaceInstanceID]410411if !ok {412select {413case rchan <- struct{}{}:414default:415// just to make sure this isn't blocking and we're not holding416// the cond Lock too long.417}418419break420}421422if ctx.Err() != nil {423break424}425}426}()427428select {429case <-rchan:430return431case <-ctx.Done():432err = ctx.Err()433return434}435}436437func (s *Containerd) DisposeContainer(ctx context.Context, workspaceInstanceID string) {438log := log.WithContext(ctx)439440log.Debug("containerd: disposing container")441442s.cond.L.Lock()443defer s.cond.L.Unlock()444445info, ok := s.wsiIdx[workspaceInstanceID]446if !ok {447// seems we are already done here448log.Debug("containerd: disposing container skipped")449return450}451defer log.Debug("containerd: disposing container done")452453if info.ID != "" {454err := s.Client.ContainerService().Delete(ctx, info.ID)455if err != nil && !errors.Is(err, errdefs.ErrNotFound) {456log.WithField("containerId", info.ID).WithError(err).Error("cannot delete containerd container")457}458}459460delete(s.wsiIdx, info.InstanceID)461delete(s.podIdx, info.PodName)462delete(s.cntIdx, info.ID)463}464465// ContainerExists finds out if a container with the given ID exists.466func (s *Containerd) ContainerExists(ctx context.Context, id ID) (exists bool, err error) {467_, err = s.Client.ContainerService().Get(ctx, string(id))468if err == errdefs.ErrNotFound {469return false, nil470}471if err == nil {472return false, err473}474475return true, nil476}477478// ContainerRootfs finds the workspace container's rootfs.479func (s *Containerd) ContainerRootfs(ctx context.Context, id ID, opts OptsContainerRootfs) (loc string, err error) {480_, ok := s.cntIdx[string(id)]481if !ok {482return "", ErrNotFound483}484485rootfs := fmt.Sprintf("/run/containerd/io.containerd.runtime.v2.task/k8s.io/%v/rootfs", id)486487if opts.Unmapped {488return rootfs, nil489}490491return s.Mapping.Translate(rootfs)492}493494// ContainerCGroupPath finds the container's cgroup path suffix495func (s *Containerd) ContainerCGroupPath(ctx context.Context, id ID) (loc string, err error) {496info, ok := s.cntIdx[string(id)]497if !ok {498return "", ErrNotFound499}500501if info.CGroupPath == "" {502return "", ErrNoCGroup503}504505return info.CGroupPath, nil506}507508// ContainerPID finds the workspace container's PID509func (s *Containerd) ContainerPID(ctx context.Context, id ID) (pid uint64, err error) {510info, ok := s.cntIdx[string(id)]511if !ok {512return 0, ErrNotFound513}514515return uint64(info.PID), nil516}517518func (s *Containerd) GetContainerImageInfo(ctx context.Context, id ID) (*workspacev1.WorkspaceImageInfo, error) {519info, ok := s.cntIdx[string(id)]520if !ok {521return nil, ErrNotFound522}523524image, err := s.Client.GetImage(ctx, info.ImageRef)525if err != nil {526return nil, err527}528size, err := image.Size(ctx)529if err != nil {530return nil, err531}532533wsImageInfo := &workspacev1.WorkspaceImageInfo{534TotalSize: size,535}536537// Fetch the manifest538manifest, err := images.Manifest(ctx, s.Client.ContentStore(), image.Target(), platforms.Default())539if err != nil {540log.WithError(err).WithField("image", info.ImageRef).Error("Failed to get manifest")541return wsImageInfo, nil542}543if manifest.Annotations != nil {544wsImageInfo.WorkspaceImageRef = manifest.Annotations["io.gitpod.workspace-image.ref"]545if size, err := strconv.Atoi(manifest.Annotations["io.gitpod.workspace-image.size"]); err == nil {546wsImageInfo.WorkspaceImageSize = int64(size)547}548}549return wsImageInfo, nil550}551552func (s *Containerd) IsContainerdReady(ctx context.Context) (bool, error) {553if len(s.registryFacadeHost) == 0 {554return s.Client.IsServing(ctx)555}556557// check registry facade can reach containerd and returns image not found.558isServing, err := s.Client.IsServing(ctx)559if err != nil {560return false, err561}562563if !isServing {564return false, nil565}566567_, err = s.Client.GetImage(ctx, fmt.Sprintf("%v/not-a-valid-image:latest", s.registryFacadeHost))568if err != nil {569if errdefs.IsNotFound(err) {570return true, nil571}572573return false, nil574}575576return true, nil577}578579func (s *Containerd) GetContainerTaskInfo(ctx context.Context, id ID) (*task.Process, error) {580task, err := s.Client.TaskService().Get(ctx, &tasks.GetRequest{581ContainerID: string(id),582})583if err != nil {584return nil, err585}586if task.Process == nil {587return nil, fmt.Errorf("task has no process")588}589return task.Process, nil590}591592func (s *Containerd) ForceKillContainerTask(ctx context.Context, id ID) error {593_, err := s.Client.TaskService().Kill(ctx, &tasks.KillRequest{594ContainerID: string(id),595Signal: 9,596All: true,597})598return err599}600601var kubepodsQoSRegexp = regexp.MustCompile(`([^/]+)-([^/]+)-pod`)602var kubepodsRegexp = regexp.MustCompile(`([^/]+)-pod`)603604// ExtractCGroupPathFromContainer retrieves the CGroupPath from the linux section605// in a container's OCI spec.606func ExtractCGroupPathFromContainer(container containers.Container) (cgroupPath string, err error) {607var spec ocispecs.Spec608err = json.Unmarshal(container.Spec.GetValue(), &spec)609if err != nil {610return611}612if spec.Linux == nil {613return "", xerrors.Errorf("container spec has no Linux section")614}615616// systemd: /kubepods.slice/kubepods-<QoS-class>.slice/kubepods-<QoS-class>-pod<pod-UID>.slice:<prefix>:<container-iD>617// systemd: /kubepods.slice/kubepods-pod<pod-UID>.slice:<prefix>:<container-iD>618// cgroupfs: /kubepods/<QoS-class>/pod<pod-UID>/<container-iD>619fields := strings.SplitN(spec.Linux.CgroupsPath, ":", 3)620if len(fields) != 3 {621622return spec.Linux.CgroupsPath, nil623}624625if match := kubepodsQoSRegexp.FindStringSubmatch(fields[0]); len(match) == 3 {626root, class := match[1], match[2]627return filepath.Join(628"/",629fmt.Sprintf("%v.slice", root),630fmt.Sprintf("%v-%v.slice", root, class),631fields[0],632fmt.Sprintf("%v-%v.scope", fields[1], fields[2]),633), nil634}635636if match := kubepodsRegexp.FindStringSubmatch(fields[0]); len(match) == 2 {637root := match[1]638return filepath.Join(639"/",640fmt.Sprintf("%v.slice", root),641fields[0],642fmt.Sprintf("%v-%v.scope", fields[1], fields[2]),643), nil644}645646return spec.Linux.CgroupsPath, nil647}648649650