Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/container/containerd.go
2499 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package container
6
7
import (
8
"context"
9
"encoding/json"
10
"errors"
11
"fmt"
12
"path/filepath"
13
"regexp"
14
"strconv"
15
"strings"
16
"sync"
17
"time"
18
19
"github.com/containerd/containerd"
20
"github.com/containerd/containerd/api/events"
21
"github.com/containerd/containerd/api/services/tasks/v1"
22
"github.com/containerd/containerd/api/types"
23
"github.com/containerd/containerd/api/types/task"
24
"github.com/containerd/containerd/containers"
25
"github.com/containerd/containerd/errdefs"
26
"github.com/containerd/containerd/images"
27
"github.com/containerd/platforms"
28
"github.com/containerd/typeurl/v2"
29
ocispecs "github.com/opencontainers/runtime-spec/specs-go"
30
"github.com/opentracing/opentracing-go"
31
"golang.org/x/xerrors"
32
33
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
34
"github.com/gitpod-io/gitpod/common-go/log"
35
"github.com/gitpod-io/gitpod/common-go/tracing"
36
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
37
)
38
39
const (
40
kubernetesNamespace = "k8s.io"
41
containerLabelCRIKind = "io.cri-containerd.kind"
42
containerLabelK8sContainerName = "io.kubernetes.container.name"
43
containerLabelK8sPodName = "io.kubernetes.pod.name"
44
)
45
46
// NewContainerd creates a new containerd adapter
47
func NewContainerd(cfg *ContainerdConfig, pathMapping PathMapping, registryFacadeHost string) (*Containerd, error) {
48
cc, err := containerd.New(cfg.SocketPath, containerd.WithDefaultNamespace(kubernetesNamespace))
49
if err != nil {
50
return nil, xerrors.Errorf("cannot connect to containerd at %s: %w", cfg.SocketPath, err)
51
}
52
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
53
defer cancel()
54
_, err = cc.Version(ctx)
55
if err != nil {
56
return nil, xerrors.Errorf("cannot connect to containerd: %w", err)
57
}
58
59
res := &Containerd{
60
Client: cc,
61
Mapping: pathMapping,
62
63
cond: sync.NewCond(&sync.Mutex{}),
64
cntIdx: make(map[string]*containerInfo),
65
podIdx: make(map[string]*containerInfo),
66
wsiIdx: make(map[string]*containerInfo),
67
68
registryFacadeHost: registryFacadeHost,
69
}
70
go res.start()
71
72
return res, nil
73
}
74
75
// Containerd implements the ws-daemon CRI for containerd
76
type Containerd struct {
77
Client *containerd.Client
78
Mapping PathMapping
79
80
cond *sync.Cond
81
podIdx map[string]*containerInfo
82
wsiIdx map[string]*containerInfo
83
cntIdx map[string]*containerInfo
84
85
registryFacadeHost string
86
}
87
88
type containerInfo struct {
89
WorkspaceID string
90
InstanceID string
91
OwnerID string
92
ID string
93
Snapshotter string
94
SnapshotKey string
95
PodName string
96
SeenTask bool
97
Rootfs string
98
UpperDir string
99
CGroupPath string
100
PID uint32
101
ImageRef string
102
}
103
104
// start listening to containerd
105
func (s *Containerd) start() {
106
// Using the filter expression for subscribe does not seem to work. We simply don't get any events.
107
// That's ok as the event handler below are capable of ignoring any event that's not for them.
108
109
reconnectionInterval := 2 * time.Second
110
for {
111
func() {
112
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
113
defer cancel()
114
115
isServing, err := s.Client.IsServing(ctx)
116
if err != nil {
117
log.WithError(err).Error("cannot check if containerd is available")
118
time.Sleep(reconnectionInterval)
119
return
120
}
121
122
if !isServing {
123
err := s.Client.Reconnect()
124
if err != nil {
125
log.WithError(err).Error("cannot reconnect to containerd")
126
time.Sleep(reconnectionInterval)
127
return
128
}
129
}
130
131
cs, err := s.Client.ContainerService().List(ctx)
132
if err != nil {
133
log.WithError(err).Error("cannot list container")
134
time.Sleep(reconnectionInterval)
135
return
136
}
137
138
// we have to loop through the containers twice because we don't know in which order
139
// the sandbox and workspace container are in. handleNewContainer expects to see the
140
// sandbox before the actual workspace. Hence, the first pass is for the sandboxes,
141
// the second pass for workspaces.
142
for _, c := range cs {
143
s.handleNewContainer(c)
144
}
145
for _, c := range cs {
146
s.handleNewContainer(c)
147
}
148
149
tsks, err := s.Client.TaskService().List(ctx, &tasks.ListTasksRequest{})
150
if err != nil {
151
log.WithError(err).Error("cannot list tasks")
152
time.Sleep(reconnectionInterval)
153
return
154
}
155
for _, t := range tsks.Tasks {
156
s.handleNewTask(t.ID, nil, t.Pid)
157
}
158
159
evts, errchan := s.Client.Subscribe(context.Background())
160
log.Info("containerd subscription established")
161
LOOP:
162
for {
163
select {
164
case evt := <-evts:
165
ev, err := typeurl.UnmarshalAny(evt.Event)
166
if err != nil {
167
log.WithError(err).Warn("cannot unmarshal containerd event")
168
continue
169
}
170
s.handleContainerdEvent(ev)
171
case err := <-errchan:
172
log.WithError(err).Error("lost connection to containerd - will attempt to reconnect")
173
time.Sleep(reconnectionInterval)
174
break LOOP
175
}
176
}
177
}()
178
}
179
}
180
181
func (s *Containerd) handleContainerdEvent(ev interface{}) {
182
switch evt := ev.(type) {
183
case *events.ContainerCreate:
184
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
185
defer cancel()
186
187
c, err := s.Client.ContainerService().Get(ctx, evt.ID)
188
if err != nil {
189
log.WithError(err).WithField("ID", evt.ID).WithField("containerImage", evt.Image).Warn("cannot find container we just received a create event for")
190
return
191
}
192
s.handleNewContainer(c)
193
case *events.TaskCreate:
194
s.handleNewTask(evt.ContainerID, evt.Rootfs, evt.Pid)
195
196
case *events.TaskDelete:
197
198
case *events.ContainerDelete:
199
s.cond.L.Lock()
200
defer s.cond.L.Unlock()
201
202
info, ok := s.cntIdx[evt.ID]
203
if !ok {
204
return
205
}
206
delete(s.cntIdx, evt.ID)
207
delete(s.wsiIdx, info.InstanceID)
208
delete(s.podIdx, info.PodName)
209
}
210
}
211
212
func (s *Containerd) handleNewContainer(c containers.Container) {
213
// TODO(cw): check kubernetes namespace
214
podName := c.Labels[containerLabelK8sPodName]
215
if podName == "" {
216
return
217
}
218
219
if c.Labels[containerLabelCRIKind] == "sandbox" && c.Labels[wsk8s.WorkspaceIDLabel] != "" {
220
s.cond.L.Lock()
221
defer s.cond.L.Unlock()
222
223
if _, ok := s.podIdx[podName]; ok {
224
// we've already seen the pod - no need to add it to the info again,
225
// thereby possibly overwriting previously attached info.
226
return
227
}
228
229
var info *containerInfo
230
if _, ok := c.Labels["gpwsman"]; ok {
231
// this is a ws-manager-mk1 workspace
232
info = &containerInfo{
233
InstanceID: c.Labels[wsk8s.WorkspaceIDLabel],
234
OwnerID: c.Labels[wsk8s.OwnerLabel],
235
WorkspaceID: c.Labels[wsk8s.MetaIDLabel],
236
PodName: podName,
237
}
238
} else {
239
// this is a ws-manager-mk2 workspace
240
info = &containerInfo{
241
InstanceID: c.Labels["gitpod.io/instanceID"],
242
OwnerID: c.Labels[wsk8s.OwnerLabel],
243
WorkspaceID: c.Labels[wsk8s.WorkspaceIDLabel],
244
PodName: podName,
245
}
246
}
247
248
if info.Snapshotter == "" {
249
// c.Snapshotter is optional
250
info.Snapshotter = "overlayfs"
251
}
252
253
// Beware: the ID at this point is NOT the same as the ID of the actual workspace container.
254
// Here we're talking about the sandbox, not the "workspace" container.
255
s.podIdx[podName] = info
256
s.wsiIdx[info.InstanceID] = info
257
258
log.WithField("podname", podName).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Debug("found sandbox - adding to label cache")
259
return
260
}
261
262
if c.Labels[containerLabelCRIKind] == "container" && c.Labels[containerLabelK8sContainerName] == "workspace" {
263
s.cond.L.Lock()
264
defer s.cond.L.Unlock()
265
if _, ok := s.cntIdx[c.ID]; ok {
266
// we've already seen this container - no need to add it to the info again,
267
// thereby possibly overwriting previously attached info.
268
return
269
}
270
271
info, ok := s.podIdx[podName]
272
if !ok {
273
// we haven't seen this container's sandbox, hence have no info about it
274
return
275
}
276
277
var err error
278
info.CGroupPath, err = ExtractCGroupPathFromContainer(c)
279
if err != nil {
280
log.WithError(err).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Warn("cannot extract cgroup path")
281
}
282
283
info.ID = c.ID
284
info.SnapshotKey = c.SnapshotKey
285
info.Snapshotter = c.Snapshotter
286
info.ImageRef = c.Image
287
288
s.cntIdx[c.ID] = info
289
log.WithField("podname", podName).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).WithField("ID", c.ID).Debug("found workspace container - updating label cache")
290
}
291
}
292
293
func (s *Containerd) handleNewTask(cid string, rootfs []*types.Mount, pid uint32) {
294
s.cond.L.Lock()
295
defer s.cond.L.Unlock()
296
297
info, ok := s.cntIdx[cid]
298
if !ok {
299
// we don't care for this task as we haven't seen a workspace container for it
300
return
301
}
302
if info.SeenTask {
303
// we've already seen this task - no need to add it to the info again,
304
// thereby possibly overwriting previously attached info.
305
return
306
}
307
308
if rootfs == nil {
309
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
310
mnts, err := s.Client.SnapshotService(info.Snapshotter).Mounts(ctx, info.SnapshotKey)
311
cancel()
312
if err != nil {
313
log.WithError(err).WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).Warnf("cannot get mounts for container %v", cid)
314
}
315
for _, m := range mnts {
316
rootfs = append(rootfs, &types.Mount{
317
Source: m.Source,
318
Options: m.Options,
319
Type: m.Type,
320
})
321
}
322
}
323
324
for _, rfs := range rootfs {
325
if rfs.Type != info.Snapshotter {
326
continue
327
}
328
for _, opt := range rfs.Options {
329
if !strings.HasPrefix(opt, "upperdir=") {
330
continue
331
}
332
info.UpperDir = strings.TrimPrefix(opt, "upperdir=")
333
break
334
}
335
if info.UpperDir != "" {
336
break
337
}
338
}
339
340
info.PID = pid
341
info.SeenTask = true
342
343
log.WithFields(log.OWI(info.OwnerID, info.WorkspaceID, info.InstanceID)).WithField("cid", cid).WithField("upperdir", info.UpperDir).WithField("rootfs", info.Rootfs).Debug("found task")
344
s.cond.Broadcast()
345
}
346
347
// WaitForContainer waits for workspace container to come into existence.
348
func (s *Containerd) WaitForContainer(ctx context.Context, workspaceInstanceID string) (cid ID, err error) {
349
//nolint:ineffassign
350
span, ctx := opentracing.StartSpanFromContext(ctx, "WaitForContainer")
351
span.LogKV("workspaceInstanceID", workspaceInstanceID)
352
defer tracing.FinishSpan(span, &err)
353
354
rchan := make(chan ID, 1)
355
go func() {
356
s.cond.L.Lock()
357
defer s.cond.L.Unlock()
358
359
for {
360
info, ok := s.wsiIdx[workspaceInstanceID]
361
362
if ok && info.SeenTask {
363
select {
364
case rchan <- ID(info.ID):
365
default:
366
// just to make sure this isn't blocking and we're not holding
367
// the cond Lock too long.
368
}
369
370
break
371
}
372
373
if ctx.Err() != nil {
374
break
375
}
376
377
s.cond.Wait()
378
}
379
}()
380
381
select {
382
case cid = <-rchan:
383
return
384
case <-ctx.Done():
385
err = ctx.Err()
386
return
387
}
388
}
389
390
// WaitForContainerStop waits for workspace container to be deleted.
391
func (s *Containerd) WaitForContainerStop(ctx context.Context, workspaceInstanceID string) (err error) {
392
//nolint:ineffassign
393
span, ctx := opentracing.StartSpanFromContext(ctx, "WaitForContainerStop")
394
span.LogKV("workspaceInstanceID", workspaceInstanceID)
395
defer tracing.FinishSpan(span, &err)
396
397
rchan := make(chan struct{}, 1)
398
go func() {
399
s.cond.L.Lock()
400
defer s.cond.L.Unlock()
401
402
_, ok := s.wsiIdx[workspaceInstanceID]
403
if !ok {
404
// container is already gone
405
return
406
}
407
408
for {
409
s.cond.Wait()
410
_, ok := s.wsiIdx[workspaceInstanceID]
411
412
if !ok {
413
select {
414
case rchan <- struct{}{}:
415
default:
416
// just to make sure this isn't blocking and we're not holding
417
// the cond Lock too long.
418
}
419
420
break
421
}
422
423
if ctx.Err() != nil {
424
break
425
}
426
}
427
}()
428
429
select {
430
case <-rchan:
431
return
432
case <-ctx.Done():
433
err = ctx.Err()
434
return
435
}
436
}
437
438
func (s *Containerd) DisposeContainer(ctx context.Context, workspaceInstanceID string) {
439
log := log.WithContext(ctx)
440
441
log.Debug("containerd: disposing container")
442
443
s.cond.L.Lock()
444
defer s.cond.L.Unlock()
445
446
info, ok := s.wsiIdx[workspaceInstanceID]
447
if !ok {
448
// seems we are already done here
449
log.Debug("containerd: disposing container skipped")
450
return
451
}
452
defer log.Debug("containerd: disposing container done")
453
454
if info.ID != "" {
455
err := s.Client.ContainerService().Delete(ctx, info.ID)
456
if err != nil && !errors.Is(err, errdefs.ErrNotFound) {
457
log.WithField("containerId", info.ID).WithError(err).Error("cannot delete containerd container")
458
}
459
}
460
461
delete(s.wsiIdx, info.InstanceID)
462
delete(s.podIdx, info.PodName)
463
delete(s.cntIdx, info.ID)
464
}
465
466
// ContainerExists finds out if a container with the given ID exists.
467
func (s *Containerd) ContainerExists(ctx context.Context, id ID) (exists bool, err error) {
468
_, err = s.Client.ContainerService().Get(ctx, string(id))
469
if err == errdefs.ErrNotFound {
470
return false, nil
471
}
472
if err == nil {
473
return false, err
474
}
475
476
return true, nil
477
}
478
479
// ContainerRootfs finds the workspace container's rootfs.
480
func (s *Containerd) ContainerRootfs(ctx context.Context, id ID, opts OptsContainerRootfs) (loc string, err error) {
481
_, ok := s.cntIdx[string(id)]
482
if !ok {
483
return "", ErrNotFound
484
}
485
486
rootfs := fmt.Sprintf("/run/containerd/io.containerd.runtime.v2.task/k8s.io/%v/rootfs", id)
487
488
if opts.Unmapped {
489
return rootfs, nil
490
}
491
492
return s.Mapping.Translate(rootfs)
493
}
494
495
// ContainerCGroupPath finds the container's cgroup path suffix
496
func (s *Containerd) ContainerCGroupPath(ctx context.Context, id ID) (loc string, err error) {
497
info, ok := s.cntIdx[string(id)]
498
if !ok {
499
return "", ErrNotFound
500
}
501
502
if info.CGroupPath == "" {
503
return "", ErrNoCGroup
504
}
505
506
return info.CGroupPath, nil
507
}
508
509
// ContainerPID finds the workspace container's PID
510
func (s *Containerd) ContainerPID(ctx context.Context, id ID) (pid uint64, err error) {
511
info, ok := s.cntIdx[string(id)]
512
if !ok {
513
return 0, ErrNotFound
514
}
515
516
return uint64(info.PID), nil
517
}
518
519
func (s *Containerd) GetContainerImageInfo(ctx context.Context, id ID) (*workspacev1.WorkspaceImageInfo, error) {
520
info, ok := s.cntIdx[string(id)]
521
if !ok {
522
return nil, ErrNotFound
523
}
524
525
image, err := s.Client.GetImage(ctx, info.ImageRef)
526
if err != nil {
527
return nil, err
528
}
529
size, err := image.Size(ctx)
530
if err != nil {
531
return nil, err
532
}
533
534
wsImageInfo := &workspacev1.WorkspaceImageInfo{
535
TotalSize: size,
536
}
537
538
// Fetch the manifest
539
manifest, err := images.Manifest(ctx, s.Client.ContentStore(), image.Target(), platforms.Default())
540
if err != nil {
541
log.WithError(err).WithField("image", info.ImageRef).Error("Failed to get manifest")
542
return wsImageInfo, nil
543
}
544
if manifest.Annotations != nil {
545
wsImageInfo.WorkspaceImageRef = manifest.Annotations["io.gitpod.workspace-image.ref"]
546
if size, err := strconv.Atoi(manifest.Annotations["io.gitpod.workspace-image.size"]); err == nil {
547
wsImageInfo.WorkspaceImageSize = int64(size)
548
}
549
}
550
return wsImageInfo, nil
551
}
552
553
func (s *Containerd) IsContainerdReady(ctx context.Context) (bool, error) {
554
if len(s.registryFacadeHost) == 0 {
555
return s.Client.IsServing(ctx)
556
}
557
558
// check registry facade can reach containerd and returns image not found.
559
isServing, err := s.Client.IsServing(ctx)
560
if err != nil {
561
return false, err
562
}
563
564
if !isServing {
565
return false, nil
566
}
567
568
_, err = s.Client.GetImage(ctx, fmt.Sprintf("%v/not-a-valid-image:latest", s.registryFacadeHost))
569
if err != nil {
570
if errdefs.IsNotFound(err) {
571
return true, nil
572
}
573
574
return false, nil
575
}
576
577
return true, nil
578
}
579
580
func (s *Containerd) GetContainerTaskInfo(ctx context.Context, id ID) (*task.Process, error) {
581
task, err := s.Client.TaskService().Get(ctx, &tasks.GetRequest{
582
ContainerID: string(id),
583
})
584
if err != nil {
585
return nil, err
586
}
587
if task.Process == nil {
588
return nil, fmt.Errorf("task has no process")
589
}
590
return task.Process, nil
591
}
592
593
func (s *Containerd) ForceKillContainerTask(ctx context.Context, id ID) error {
594
_, err := s.Client.TaskService().Kill(ctx, &tasks.KillRequest{
595
ContainerID: string(id),
596
Signal: 9,
597
All: true,
598
})
599
return err
600
}
601
602
var kubepodsQoSRegexp = regexp.MustCompile(`([^/]+)-([^/]+)-pod`)
603
var kubepodsRegexp = regexp.MustCompile(`([^/]+)-pod`)
604
605
// ExtractCGroupPathFromContainer retrieves the CGroupPath from the linux section
606
// in a container's OCI spec.
607
func ExtractCGroupPathFromContainer(container containers.Container) (cgroupPath string, err error) {
608
var spec ocispecs.Spec
609
err = json.Unmarshal(container.Spec.GetValue(), &spec)
610
if err != nil {
611
return
612
}
613
if spec.Linux == nil {
614
return "", xerrors.Errorf("container spec has no Linux section")
615
}
616
617
// systemd: /kubepods.slice/kubepods-<QoS-class>.slice/kubepods-<QoS-class>-pod<pod-UID>.slice:<prefix>:<container-iD>
618
// systemd: /kubepods.slice/kubepods-pod<pod-UID>.slice:<prefix>:<container-iD>
619
// cgroupfs: /kubepods/<QoS-class>/pod<pod-UID>/<container-iD>
620
fields := strings.SplitN(spec.Linux.CgroupsPath, ":", 3)
621
if len(fields) != 3 {
622
623
return spec.Linux.CgroupsPath, nil
624
}
625
626
if match := kubepodsQoSRegexp.FindStringSubmatch(fields[0]); len(match) == 3 {
627
root, class := match[1], match[2]
628
return filepath.Join(
629
"/",
630
fmt.Sprintf("%v.slice", root),
631
fmt.Sprintf("%v-%v.slice", root, class),
632
fields[0],
633
fmt.Sprintf("%v-%v.scope", fields[1], fields[2]),
634
), nil
635
}
636
637
if match := kubepodsRegexp.FindStringSubmatch(fields[0]); len(match) == 2 {
638
root := match[1]
639
return filepath.Join(
640
"/",
641
fmt.Sprintf("%v.slice", root),
642
fields[0],
643
fmt.Sprintf("%v-%v.scope", fields[1], fields[2]),
644
), nil
645
}
646
647
return spec.Linux.CgroupsPath, nil
648
}
649
650