Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/image-builder-mk3/pkg/orchestrator/orchestrator.go
2500 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package orchestrator
6
7
import (
8
"context"
9
"crypto/sha256"
10
"encoding/json"
11
"errors"
12
"fmt"
13
"io"
14
"net/http"
15
"os"
16
"path/filepath"
17
"sort"
18
"strings"
19
"sync"
20
"time"
21
22
"github.com/aws/aws-sdk-go-v2/service/ecr"
23
"github.com/distribution/reference"
24
"github.com/google/uuid"
25
"github.com/hashicorp/go-retryablehttp"
26
"github.com/opentracing/opentracing-go"
27
"github.com/sirupsen/logrus"
28
"golang.org/x/xerrors"
29
"google.golang.org/grpc"
30
"google.golang.org/grpc/codes"
31
"google.golang.org/grpc/credentials"
32
"google.golang.org/grpc/credentials/insecure"
33
"google.golang.org/grpc/status"
34
35
awsconfig "github.com/aws/aws-sdk-go-v2/config"
36
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
37
"github.com/gitpod-io/gitpod/common-go/log"
38
"github.com/gitpod-io/gitpod/common-go/tracing"
39
csapi "github.com/gitpod-io/gitpod/content-service/api"
40
"github.com/gitpod-io/gitpod/image-builder/api"
41
protocol "github.com/gitpod-io/gitpod/image-builder/api"
42
"github.com/gitpod-io/gitpod/image-builder/api/config"
43
"github.com/gitpod-io/gitpod/image-builder/pkg/auth"
44
"github.com/gitpod-io/gitpod/image-builder/pkg/resolve"
45
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
46
)
47
48
const (
49
// buildWorkspaceManagerID identifies the manager for the workspace
50
buildWorkspaceManagerID = "image-builder"
51
52
// maxBuildRuntime is the maximum time a build is allowed to take
53
maxBuildRuntime = 60 * time.Minute
54
55
// workspaceBuildProcessVersion controls how we build workspace images.
56
// Incrementing this value will trigger a rebuild of all workspace images.
57
workspaceBuildProcessVersion = 2
58
)
59
60
// NewOrchestratingBuilder creates a new orchestrating image builder
61
func NewOrchestratingBuilder(cfg config.Configuration) (res *Orchestrator, err error) {
62
var authentication auth.CompositeAuth
63
if cfg.PullSecretFile != "" {
64
fn := cfg.PullSecretFile
65
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
66
fn = filepath.Join(tproot, fn)
67
}
68
69
ath, err := auth.NewDockerConfigFileAuth(fn)
70
if err != nil {
71
return nil, err
72
}
73
authentication = append(authentication, ath)
74
}
75
if cfg.EnableAdditionalECRAuth {
76
awsCfg, err := awsconfig.LoadDefaultConfig(context.Background())
77
if err != nil {
78
return nil, err
79
}
80
ecrc := ecr.NewFromConfig(awsCfg)
81
authentication = append(authentication, auth.NewECRAuthenticator(ecrc))
82
}
83
84
var wsman wsmanapi.WorkspaceManagerClient
85
if c, ok := cfg.WorkspaceManager.Client.(wsmanapi.WorkspaceManagerClient); ok {
86
wsman = c
87
} else {
88
grpcOpts := common_grpc.DefaultClientOptions()
89
if cfg.WorkspaceManager.TLS.Authority != "" || cfg.WorkspaceManager.TLS.Certificate != "" && cfg.WorkspaceManager.TLS.PrivateKey != "" {
90
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
91
cfg.WorkspaceManager.TLS.Authority, cfg.WorkspaceManager.TLS.Certificate, cfg.WorkspaceManager.TLS.PrivateKey,
92
common_grpc.WithSetRootCAs(true),
93
common_grpc.WithServerName("ws-manager"),
94
)
95
if err != nil {
96
log.WithField("config", cfg.WorkspaceManager.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")
97
return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)
98
}
99
100
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
101
} else {
102
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
103
}
104
conn, err := grpc.Dial(cfg.WorkspaceManager.Address, grpcOpts...)
105
if err != nil {
106
return nil, err
107
}
108
wsman = wsmanapi.NewWorkspaceManagerClient(conn)
109
}
110
111
retryResolveClient := NewRetryTimeoutClient()
112
113
o := &Orchestrator{
114
Config: cfg,
115
Auth: authentication,
116
AuthResolver: auth.Resolver{
117
BaseImageRepository: cfg.BaseImageRepository,
118
WorkspaceImageRepository: cfg.WorkspaceImageRepository,
119
},
120
RefResolver: &resolve.StandaloneRefResolver{},
121
122
retryResolveClient: retryResolveClient,
123
124
wsman: wsman,
125
buildListener: make(map[string]map[buildListener]struct{}),
126
logListener: make(map[string]map[logListener]struct{}),
127
censorship: make(map[string][]string),
128
metrics: newMetrics(),
129
}
130
o.monitor = newBuildMonitor(o, o.wsman)
131
132
return o, nil
133
}
134
135
// Orchestrator runs image builds by orchestrating headless build workspaces
136
type Orchestrator struct {
137
Config config.Configuration
138
Auth auth.RegistryAuthenticator
139
AuthResolver auth.Resolver
140
RefResolver resolve.DockerRefResolver
141
142
retryResolveClient *http.Client
143
144
wsman wsmanapi.WorkspaceManagerClient
145
146
buildListener map[string]map[buildListener]struct{}
147
logListener map[string]map[logListener]struct{}
148
censorship map[string][]string
149
mu sync.RWMutex
150
151
monitor *buildMonitor
152
153
metrics *metrics
154
155
protocol.UnimplementedImageBuilderServer
156
}
157
158
// Start fires up the internals of this image builder
159
func (o *Orchestrator) Start(ctx context.Context) error {
160
go o.monitor.Run()
161
return nil
162
}
163
164
// ResolveBaseImage returns the "digest" form of a Docker image tag thereby making it absolute.
165
func (o *Orchestrator) ResolveBaseImage(ctx context.Context, req *protocol.ResolveBaseImageRequest) (resp *protocol.ResolveBaseImageResponse, err error) {
166
span, ctx := opentracing.StartSpanFromContext(ctx, "ResolveBaseImage")
167
defer tracing.FinishSpan(span, &err)
168
tracing.LogRequestSafe(span, req)
169
170
reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)
171
172
refstr, err := o.getAbsoluteImageRef(ctx, req.Ref, reqauth, req.GetUseRetryClient())
173
if err != nil {
174
return nil, err
175
}
176
177
return &protocol.ResolveBaseImageResponse{
178
Ref: refstr,
179
}, nil
180
}
181
182
// ResolveWorkspaceImage returns information about a build configuration without actually attempting to build anything.
183
func (o *Orchestrator) ResolveWorkspaceImage(ctx context.Context, req *protocol.ResolveWorkspaceImageRequest) (resp *protocol.ResolveWorkspaceImageResponse, err error) {
184
span, ctx := opentracing.StartSpanFromContext(ctx, "ResolveWorkspaceImage")
185
defer tracing.FinishSpan(span, &err)
186
tracing.LogRequestSafe(span, req)
187
188
reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)
189
useRetryClient := req.GetUseRetryClient()
190
baseref, err := o.getBaseImageRef(ctx, req.Source, reqauth, useRetryClient)
191
if _, ok := status.FromError(err); err != nil && ok {
192
return nil, err
193
}
194
if err != nil {
195
return nil, status.Errorf(codes.Internal, "cannot resolve base image: %s", err.Error())
196
}
197
refstr, err := o.getWorkspaceImageRef(ctx, baseref)
198
if err != nil {
199
return nil, status.Errorf(codes.InvalidArgument, "cannot produce image ref: %v", err)
200
}
201
span.LogKV("refstr", refstr, "baseref", baseref)
202
203
// to check if the image exists we must have access to the image caching registry and the refstr we check here does not come
204
// from the user. Thus we can safely use auth.AllowedAuthForAll here.
205
auth, err := auth.AllowedAuthForAll().GetAuthFor(ctx, o.Auth, refstr)
206
if err != nil {
207
return nil, status.Errorf(codes.Internal, "cannot get workspace image authentication: %v", err)
208
}
209
exists, err := o.checkImageExists(ctx, refstr, auth, useRetryClient)
210
if err != nil {
211
return nil, status.Errorf(codes.Internal, "cannot resolve workspace image: %s", err.Error())
212
}
213
214
var status protocol.BuildStatus
215
if exists {
216
status = protocol.BuildStatus_done_success
217
} else {
218
status = protocol.BuildStatus_unknown
219
}
220
221
return &protocol.ResolveWorkspaceImageResponse{
222
Status: status,
223
Ref: refstr,
224
}, nil
225
}
226
227
// Build initiates the build of a Docker image using a build configuration. If a build of this
228
// configuration is already ongoing no new build will be started.
229
func (o *Orchestrator) Build(req *protocol.BuildRequest, resp protocol.ImageBuilder_BuildServer) (err error) {
230
span, ctx := opentracing.StartSpanFromContext(resp.Context(), "Build")
231
defer tracing.FinishSpan(span, &err)
232
tracing.LogRequestSafe(span, req)
233
234
if req.Source == nil {
235
return status.Errorf(codes.InvalidArgument, "build source is missing")
236
}
237
238
// resolve build request authentication
239
reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)
240
useRetryClient := req.GetUseRetryClient()
241
log.WithField("forceRebuild", req.GetForceRebuild()).WithField("baseImageNameResolved", req.BaseImageNameResolved).WithField("useRetryClient", useRetryClient).Info("build request")
242
243
// resolve to ref to baseImageNameResolved (if it exists)
244
if req.BaseImageNameResolved != "" && !req.GetForceRebuild() {
245
if req.Auth != nil && req.Auth.GetSelective() != nil {
246
// allow access to baseImage repository so we can look it up later
247
req.Auth.GetSelective().AllowBaserep = true
248
reqauth = o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)
249
}
250
251
wsrefstr, err := o.getWorkspaceImageRef(ctx, req.BaseImageNameResolved)
252
if err != nil {
253
return status.Errorf(codes.Internal, "cannot produce workspace image ref: %q", err)
254
}
255
wsrefAuth, err := reqauth.GetAuthFor(ctx, o.Auth, wsrefstr)
256
if err != nil {
257
return status.Errorf(codes.Internal, "cannot get workspace image authentication: %q", err)
258
}
259
260
// check if needs build -> early return
261
exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, useRetryClient)
262
if err != nil {
263
return status.Errorf(codes.Internal, "cannot check if image is already built: %q", err)
264
}
265
if exists {
266
err = resp.Send(&protocol.BuildResponse{
267
Status: protocol.BuildStatus_done_success,
268
Ref: wsrefstr,
269
BaseRef: req.BaseImageNameResolved,
270
})
271
if err != nil {
272
return handleFailedBuildStreamResponse(err, "cannot send build response")
273
}
274
return nil
275
}
276
baseref, err := o.getAbsoluteImageRef(ctx, req.BaseImageNameResolved, reqauth, useRetryClient)
277
if err == nil {
278
req.Source.From = &protocol.BuildSource_Ref{
279
Ref: &protocol.BuildSourceReference{
280
Ref: baseref,
281
},
282
}
283
}
284
}
285
286
log.Info("falling through to old way of building")
287
baseref, err := o.getBaseImageRef(ctx, req.Source, reqauth, useRetryClient)
288
if _, ok := status.FromError(err); err != nil && ok {
289
log.WithError(err).Error("gRPC status error")
290
return err
291
}
292
if err != nil {
293
log.WithError(err).Error("cannot get base image ref")
294
return status.Errorf(codes.Internal, "cannot resolve base image: %s", err.Error())
295
}
296
297
wsrefstr, err := o.getWorkspaceImageRef(ctx, baseref)
298
if err != nil {
299
return status.Errorf(codes.Internal, "cannot produce workspace image ref: %q", err)
300
}
301
wsrefAuth, err := auth.AllowedAuthForAll().GetAuthFor(ctx, o.Auth, wsrefstr)
302
if err != nil {
303
return status.Errorf(codes.Internal, "cannot get workspace image authentication: %q", err)
304
}
305
306
// check if needs build -> early return
307
exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, req.GetUseRetryClient())
308
if err != nil {
309
return status.Errorf(codes.Internal, "cannot check if image is already built: %q", err)
310
}
311
if exists && !req.GetForceRebuild() {
312
// image has already been built - no need for us to start building
313
err = resp.Send(&protocol.BuildResponse{
314
Status: protocol.BuildStatus_done_success,
315
Ref: wsrefstr,
316
BaseRef: baseref,
317
})
318
if err != nil {
319
return handleFailedBuildStreamResponse(err, "cannot send build response")
320
}
321
return nil
322
}
323
324
o.metrics.BuildStarted()
325
326
// Once a build is running we don't want it cancelled becuase the server disconnected i.e. during deployment.
327
// Instead we want to impose our own timeout/lifecycle on the build. Using context.WithTimeout does not shadow its parent's
328
// cancelation (see https://play.golang.org/p/N3QBIGlp8Iw for an example/experiment).
329
ctx, cancel := context.WithTimeout(&parentCantCancelContext{Delegate: ctx}, maxBuildRuntime)
330
defer cancel()
331
332
randomUUID, err := uuid.NewRandom()
333
if err != nil {
334
return status.Errorf(codes.Internal, "failed to generate build ID: %v", err)
335
}
336
buildID := randomUUID.String()
337
log := log.WithField("buildID", buildID)
338
339
var (
340
buildBase = "false"
341
contextPath = "."
342
dockerfilePath = "Dockerfile"
343
)
344
var initializer *csapi.WorkspaceInitializer = &csapi.WorkspaceInitializer{
345
Spec: &csapi.WorkspaceInitializer_Empty{
346
Empty: &csapi.EmptyInitializer{},
347
},
348
}
349
if fsrc := req.Source.GetFile(); fsrc != nil {
350
buildBase = "true"
351
initializer = fsrc.Source
352
contextPath = fsrc.ContextPath
353
dockerfilePath = fsrc.DockerfilePath
354
}
355
dockerfilePath = filepath.Join("/workspace", dockerfilePath)
356
357
if contextPath == "" {
358
contextPath = filepath.Dir(dockerfilePath)
359
}
360
contextPath = filepath.Join("/workspace", strings.TrimPrefix(contextPath, "/workspace"))
361
362
o.censor(buildID, []string{
363
wsrefstr,
364
baseref,
365
strings.Split(wsrefstr, ":")[0],
366
strings.Split(baseref, ":")[0],
367
})
368
369
// push some log to the client before starting the job, just in case the build workspace takes a while to start up
370
o.PublishLog(buildID, "starting image build")
371
372
retryIfUnavailable1 := func(err error) bool {
373
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
374
return true
375
}
376
return false
377
}
378
379
pbaseref, err := reference.ParseNormalizedNamed(baseref)
380
if err != nil {
381
return status.Errorf(codes.InvalidArgument, "cannot parse baseref: %v", err)
382
}
383
bobBaseref := "localhost:8080/base"
384
if r, ok := pbaseref.(reference.Digested); ok {
385
bobBaseref += "@" + r.Digest().String()
386
} else {
387
bobBaseref += ":latest"
388
}
389
wsref, err := reference.ParseNamed(wsrefstr)
390
var additionalAuth []byte
391
if err == nil {
392
ath := reqauth.GetImageBuildAuthFor(ctx, o.Auth, []string{reference.Domain(pbaseref), auth.DummyECRRegistryDomain}, []string{
393
reference.Domain(wsref),
394
})
395
additionalAuth, err = json.Marshal(ath)
396
if err != nil {
397
return status.Errorf(codes.InvalidArgument, "cannot marshal additional auth: %v", err)
398
}
399
}
400
401
var swr *wsmanapi.StartWorkspaceResponse
402
err = retry(ctx, func(ctx context.Context) (err error) {
403
swr, err = o.wsman.StartWorkspace(ctx, &wsmanapi.StartWorkspaceRequest{
404
Id: buildID,
405
ServicePrefix: buildID,
406
Metadata: &wsmanapi.WorkspaceMetadata{
407
MetaId: buildID,
408
Annotations: map[string]string{
409
annotationRef: wsrefstr,
410
annotationBaseRef: baseref,
411
annotationManagedBy: buildWorkspaceManagerID,
412
},
413
Owner: req.GetTriggeredBy(),
414
},
415
Spec: &wsmanapi.StartWorkspaceSpec{
416
Initializer: initializer,
417
Timeout: maxBuildRuntime.String(),
418
WorkspaceImage: o.Config.BuilderImage,
419
IdeImage: &wsmanapi.IDEImage{
420
WebRef: o.Config.BuilderImage,
421
SupervisorRef: req.SupervisorRef,
422
},
423
WorkspaceLocation: contextPath,
424
Envvars: []*wsmanapi.EnvironmentVariable{
425
{Name: "BOB_TARGET_REF", Value: "localhost:8080/target:latest"},
426
{Name: "BOB_BASE_REF", Value: bobBaseref},
427
{Name: "BOB_BUILD_BASE", Value: buildBase},
428
{Name: "BOB_DOCKERFILE_PATH", Value: dockerfilePath},
429
{Name: "BOB_CONTEXT_DIR", Value: contextPath},
430
{Name: "GITPOD_TASKS", Value: `[{"name": "build", "init": "sudo -E /app/bob build"}]`},
431
{Name: "WORKSPACEKIT_RING2_ENCLAVE", Value: "/app/bob proxy"},
432
{Name: "WORKSPACEKIT_BOBPROXY_BASEREF", Value: baseref},
433
{Name: "WORKSPACEKIT_BOBPROXY_TARGETREF", Value: wsrefstr},
434
{
435
Name: "WORKSPACEKIT_BOBPROXY_AUTH",
436
Secret: &wsmanapi.EnvironmentVariable_SecretKeyRef{
437
SecretName: o.Config.PullSecret,
438
Key: ".dockerconfigjson",
439
},
440
},
441
{
442
Name: "WORKSPACEKIT_BOBPROXY_ADDITIONALAUTH",
443
Value: string(additionalAuth),
444
},
445
{Name: "SUPERVISOR_DEBUG_ENABLE", Value: fmt.Sprintf("%v", log.Logger.IsLevelEnabled(logrus.DebugLevel))},
446
},
447
},
448
Type: wsmanapi.WorkspaceType_IMAGEBUILD,
449
})
450
return
451
}, retryIfUnavailable1, 1*time.Second, 10)
452
if status.Code(err) == codes.AlreadyExists {
453
// build is already running - do not add it to the list of builds
454
} else if errors.Is(err, errOutOfRetries) {
455
return status.Error(codes.Unavailable, "workspace services are currently unavailable")
456
} else if err != nil {
457
return status.Errorf(codes.Internal, "cannot start build: %q", err)
458
} else {
459
o.monitor.RegisterNewBuild(buildID, wsrefstr, baseref, swr.Url, swr.OwnerToken)
460
o.PublishLog(buildID, "starting image build ...\n")
461
}
462
463
updates, cancel := o.registerBuildListener(buildID)
464
defer cancel()
465
for {
466
update := <-updates
467
if update == nil {
468
// channel was closed unexpectatly
469
return status.Error(codes.Aborted, "subscription canceled - please try again")
470
}
471
472
// The failed condition of ws-manager is not stable, hence we might wrongly report that the
473
// build was successful when in fact it wasn't. This would break workspace startup with a strange
474
// "cannot pull from reg.gitpod.io" error message. Instead the image-build should fail properly.
475
// To do this, we resolve the built image afterwards to ensure it was actually built.
476
if update.Status == protocol.BuildStatus_done_success {
477
exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, useRetryClient)
478
if err != nil {
479
update.Status = protocol.BuildStatus_done_failure
480
update.Message = fmt.Sprintf("cannot check if workspace image exists after the build: %v", err)
481
} else if !exists {
482
update.Status = protocol.BuildStatus_done_failure
483
update.Message = "image build did not produce a workspace image"
484
}
485
}
486
487
err := resp.Send(update)
488
if err != nil {
489
log.WithError(err).Info("cannot forward build update - dropping listener")
490
return handleFailedBuildStreamResponse(err, "cannot send update")
491
}
492
493
if update.Status == protocol.BuildStatus_done_failure || update.Status == protocol.BuildStatus_done_success {
494
// build is done
495
o.clearListener(buildID)
496
o.metrics.BuildDone(update.Status == protocol.BuildStatus_done_success)
497
if update.Status != protocol.BuildStatus_done_success {
498
log.WithField("UserID", req.GetTriggeredBy()).Error("image build done failed for user")
499
}
500
break
501
}
502
}
503
504
return nil
505
}
506
507
// publishStatus broadcasts a build status update to all listeners
508
func (o *Orchestrator) PublishStatus(buildID string, resp *api.BuildResponse) {
509
o.mu.RLock()
510
listener, ok := o.buildListener[buildID]
511
o.mu.RUnlock()
512
513
// we don't have any log listener for this build
514
if !ok {
515
return
516
}
517
518
log.WithField("buildID", buildID).WithField("resp", resp).Debug("publishing status")
519
520
for l := range listener {
521
select {
522
case l <- resp:
523
continue
524
525
case <-time.After(5 * time.Second):
526
log.Warn("timeout while forwarding status to listener - dropping listener")
527
o.mu.Lock()
528
ll := o.buildListener[buildID]
529
// In the meantime the listener list may have been removed/cleared by a call to clearListener.
530
// We don't have to do any work in this case.
531
if ll != nil {
532
close(l)
533
delete(ll, l)
534
}
535
o.mu.Unlock()
536
}
537
}
538
}
539
540
// Logs listens to the build output of an ongoing Docker build identified build the build ID
541
func (o *Orchestrator) Logs(req *protocol.LogsRequest, resp protocol.ImageBuilder_LogsServer) (err error) {
542
span, ctx := opentracing.StartSpanFromContext(resp.Context(), "Logs")
543
defer tracing.FinishSpan(span, &err)
544
tracing.LogRequestSafe(span, req)
545
546
rb, err := o.monitor.GetAllRunningBuilds(ctx)
547
var buildID string
548
for _, bld := range rb {
549
if bld.Info.Ref == req.BuildRef {
550
buildID = bld.Info.BuildId
551
break
552
}
553
}
554
if buildID == "" {
555
return status.Error(codes.NotFound, "build not found")
556
}
557
558
logs, cancel := o.registerLogListener(buildID)
559
defer cancel()
560
for {
561
update := <-logs
562
if update == nil {
563
break
564
}
565
566
err := resp.Send(update)
567
if err != nil {
568
log.WithError(err).Info("cannot forward log output - dropping listener")
569
return handleFailedBuildStreamResponse(err, "cannot send log output")
570
}
571
}
572
573
return
574
}
575
576
// ListBuilds returns a list of currently running builds
577
func (o *Orchestrator) ListBuilds(ctx context.Context, req *protocol.ListBuildsRequest) (resp *protocol.ListBuildsResponse, err error) {
578
span, ctx := opentracing.StartSpanFromContext(ctx, "ListBuilds")
579
defer tracing.FinishSpan(span, &err)
580
581
builds, err := o.monitor.GetAllRunningBuilds(ctx)
582
if err != nil {
583
return
584
}
585
586
res := make([]*protocol.BuildInfo, 0, len(builds))
587
for _, ws := range builds {
588
res = append(res, &ws.Info)
589
}
590
591
return &protocol.ListBuildsResponse{Builds: res}, nil
592
}
593
594
func (o *Orchestrator) checkImageExists(ctx context.Context, ref string, authentication *auth.Authentication, useRetryClient bool) (exists bool, err error) {
595
span, ctx := opentracing.StartSpanFromContext(ctx, "checkImageExists")
596
defer tracing.FinishSpan(span, &err)
597
span.SetTag("ref", ref)
598
599
_, err = o.RefResolver.Resolve(ctx, ref, resolve.WithAuthentication(authentication), o.withRetryIfEnabled(useRetryClient))
600
if errors.Is(err, resolve.ErrNotFound) {
601
return false, nil
602
}
603
if errors.Is(err, resolve.ErrUnauthorized) {
604
return false, status.Errorf(codes.Unauthenticated, "cannot check if image exists: %q", err)
605
}
606
if err != nil {
607
return false, err
608
}
609
610
return true, nil
611
}
612
613
// getAbsoluteImageRef returns the "digest" form of an image, i.e. contains no mutable image tags
614
func (o *Orchestrator) getAbsoluteImageRef(ctx context.Context, ref string, allowedAuth auth.AllowedAuthFor, useRetryClient bool) (res string, err error) {
615
span, ctx := opentracing.StartSpanFromContext(ctx, "getAbsoluteImageRefWithResolver")
616
defer tracing.FinishSpan(span, &err)
617
span.LogKV("ref", ref)
618
span.LogKV("useRetryClient", useRetryClient)
619
620
log.WithField("ref", ref).WithField("useRetryClient", useRetryClient).Debug("getAbsoluteImageRefWithResolver")
621
auth, err := allowedAuth.GetAuthFor(ctx, o.Auth, ref)
622
if err != nil {
623
return "", status.Errorf(codes.InvalidArgument, "cannt resolve base image ref: %v", err)
624
}
625
626
ref, err = o.RefResolver.Resolve(ctx, ref, resolve.WithAuthentication(auth), o.withRetryIfEnabled(useRetryClient))
627
if errors.Is(err, resolve.ErrNotFound) {
628
return "", status.Error(codes.NotFound, "cannot resolve image")
629
}
630
if errors.Is(err, resolve.ErrUnauthorized) {
631
if auth == nil {
632
log.WithField("ref", ref).Warn("auth was nil")
633
} else if auth.Auth == "" && auth.Password == "" {
634
log.WithField("ref", ref).Warn("auth was empty")
635
}
636
return "", status.Error(codes.Unauthenticated, "cannot resolve image")
637
}
638
if resolve.TooManyRequestsMatcher(err) {
639
return "", status.Errorf(codes.Unavailable, "upstream registry responds with 'too many request': %v", err)
640
}
641
if err != nil {
642
return "", status.Errorf(codes.Internal, "cannot resolve image: %v", err)
643
}
644
return ref, nil
645
}
646
647
func (o *Orchestrator) withRetryIfEnabled(useRetryClient bool) resolve.DockerRefResolverOption {
648
if useRetryClient {
649
return resolve.WithHttpClient(o.retryResolveClient)
650
}
651
return resolve.WithHttpClient(nil)
652
}
653
654
func (o *Orchestrator) getBaseImageRef(ctx context.Context, bs *protocol.BuildSource, allowedAuth auth.AllowedAuthFor, useRetryClient bool) (res string, err error) {
655
span, ctx := opentracing.StartSpanFromContext(ctx, "getBaseImageRef")
656
defer tracing.FinishSpan(span, &err)
657
658
switch src := bs.From.(type) {
659
case *protocol.BuildSource_Ref:
660
return o.getAbsoluteImageRef(ctx, src.Ref.Ref, allowedAuth, useRetryClient)
661
662
case *protocol.BuildSource_File:
663
manifest := map[string]string{
664
"DockerfilePath": src.File.DockerfilePath,
665
"DockerfileVersion": src.File.DockerfileVersion,
666
"ContextPath": src.File.ContextPath,
667
}
668
// workspace starter will only ever send us Git sources. Should that ever change, we'll need to add
669
// manifest support for the other initializer types.
670
if src.File.Source.GetGit() != nil {
671
fsrc := src.File.Source.GetGit()
672
manifest["Source"] = "git"
673
manifest["CloneTarget"] = fsrc.CloneTaget
674
manifest["RemoteURI"] = fsrc.RemoteUri
675
} else {
676
return "", xerrors.Errorf("unsupported context initializer")
677
}
678
// Go maps do NOT maintain their order - we must sort the keys to maintain a stable order
679
var keys []string
680
for k := range manifest {
681
keys = append(keys, k)
682
}
683
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
684
var dfl string
685
for _, k := range keys {
686
dfl += fmt.Sprintf("%s: %s\n", k, manifest[k])
687
}
688
span.LogKV("manifest", dfl)
689
690
hash := sha256.New()
691
n, err := hash.Write([]byte(dfl))
692
if err != nil {
693
return "", xerrors.Errorf("cannot compute src image ref: %w", err)
694
}
695
if n < len(dfl) {
696
return "", xerrors.Errorf("cannot compute src image ref: short write")
697
}
698
699
// the mkII image builder supported an image hash salt. That salt broke other assumptions,
700
// which is why this mkIII implementation does not support it anymore. We need to stay compatible
701
// with the previous means of computing the hash though. This is why we add an extra breakline here,
702
// basically defaulting to an empty salt string.
703
_, err = fmt.Fprintln(hash, "")
704
if err != nil {
705
return "", xerrors.Errorf("cannot compute src image ref: %w", err)
706
}
707
708
return fmt.Sprintf("%s:%x", o.Config.BaseImageRepository, hash.Sum([]byte{})), nil
709
710
default:
711
return "", xerrors.Errorf("invalid base image")
712
}
713
}
714
715
func (o *Orchestrator) getWorkspaceImageRef(ctx context.Context, baseref string) (ref string, err error) {
716
cnt := []byte(fmt.Sprintf("%s\n%d\n", baseref, workspaceBuildProcessVersion))
717
hash := sha256.New()
718
n, err := hash.Write(cnt)
719
if err != nil {
720
return "", xerrors.Errorf("cannot produce workspace image name: %w", err)
721
}
722
if n < len(cnt) {
723
return "", xerrors.Errorf("cannot produce workspace image name: %w", io.ErrShortWrite)
724
}
725
726
dst := hash.Sum([]byte{})
727
return fmt.Sprintf("%s:%x", o.Config.WorkspaceImageRepository, dst), nil
728
}
729
730
func handleFailedBuildStreamResponse(err error, msg string) error {
731
if err == nil {
732
// OK is OK
733
return nil
734
}
735
736
// If the error is a context.DeadlineExceeded, we return nil (OK) as requested.
737
if errors.Is(err, context.DeadlineExceeded) {
738
// Return nil (OK) for DeadlineExceeded
739
return nil
740
}
741
742
// If it's already a gRPC status error, check for DeadlineExceeded
743
if st, ok := status.FromError(err); ok {
744
if st.Code() == codes.DeadlineExceeded {
745
// Return nil (OK) for DeadlineExceeded as requested
746
return nil
747
}
748
749
log.WithError(err).WithField("code", status.Code(err)).Error(fmt.Sprintf("unexpected error while sending build response: %s", msg))
750
return err
751
}
752
753
log.WithError(err).Error(fmt.Sprintf("unexpected error while sending build response: %s", msg))
754
return status.Errorf(codes.Unavailable, "%s: %v", msg, err)
755
}
756
757
// parentCantCancelContext is a bit of a hack. We have some operations which we want to keep alive even after clients
758
// disconnect. gRPC cancels the context once a client disconnects, thus we intercept the cancelation and act as if
759
// nothing had happened.
760
//
761
// This cannot be the best way to do this. Ideally we'd like to intercept client disconnect, but maintain the usual
762
// cancelation mechanism such as deadlines, timeouts, explicit cancelation.
763
type parentCantCancelContext struct {
764
Delegate context.Context
765
done chan struct{}
766
}
767
768
func (*parentCantCancelContext) Deadline() (deadline time.Time, ok bool) {
769
// return ok==false which means there's no deadline set
770
return time.Time{}, false
771
}
772
773
func (c *parentCantCancelContext) Done() <-chan struct{} {
774
return c.done
775
}
776
777
func (c *parentCantCancelContext) Err() error {
778
err := c.Delegate.Err()
779
if err == context.Canceled {
780
return nil
781
}
782
783
return err
784
}
785
786
func (c *parentCantCancelContext) Value(key interface{}) interface{} {
787
return c.Delegate.Value(key)
788
}
789
790
type buildListener chan *api.BuildResponse
791
792
type logListener chan *api.LogsResponse
793
794
func (o *Orchestrator) registerBuildListener(buildID string) (c <-chan *api.BuildResponse, cancel func()) {
795
o.mu.Lock()
796
defer o.mu.Unlock()
797
798
l := make(buildListener)
799
ls := o.buildListener[buildID]
800
if ls == nil {
801
ls = make(map[buildListener]struct{})
802
}
803
ls[l] = struct{}{}
804
o.buildListener[buildID] = ls
805
806
cancel = func() {
807
o.mu.Lock()
808
defer o.mu.Unlock()
809
ls := o.buildListener[buildID]
810
if ls == nil {
811
return
812
}
813
delete(ls, l)
814
o.buildListener[buildID] = ls
815
}
816
return l, cancel
817
}
818
819
func (o *Orchestrator) registerLogListener(buildID string) (c <-chan *api.LogsResponse, cancel func()) {
820
o.mu.Lock()
821
defer o.mu.Unlock()
822
823
l := make(logListener)
824
ls := o.logListener[buildID]
825
if ls == nil {
826
ls = make(map[logListener]struct{})
827
}
828
ls[l] = struct{}{}
829
o.logListener[buildID] = ls
830
log.WithField("buildID", buildID).WithField("listener", len(ls)).Debug("registered log listener")
831
832
cancel = func() {
833
o.mu.Lock()
834
defer o.mu.Unlock()
835
ls := o.logListener[buildID]
836
if ls == nil {
837
return
838
}
839
delete(ls, l)
840
o.logListener[buildID] = ls
841
842
log.WithField("buildID", buildID).WithField("listener", len(ls)).Debug("deregistered log listener")
843
}
844
return l, cancel
845
}
846
847
// clearListener removes all listener for a particular build
848
func (o *Orchestrator) clearListener(buildID string) {
849
o.mu.Lock()
850
defer o.mu.Unlock()
851
852
delete(o.buildListener, buildID)
853
delete(o.logListener, buildID)
854
delete(o.censorship, buildID)
855
}
856
857
// censor registers tokens that are censored in the log output
858
func (o *Orchestrator) censor(buildID string, words []string) {
859
o.mu.Lock()
860
defer o.mu.Unlock()
861
862
o.censorship[buildID] = words
863
}
864
865
// PublishLog broadcasts log output to all registered listener
866
func (o *Orchestrator) PublishLog(buildID string, message string) {
867
o.mu.RLock()
868
listener, ok := o.logListener[buildID]
869
o.mu.RUnlock()
870
871
// we don't have any log listener for this build
872
if !ok {
873
return
874
}
875
876
o.mu.RLock()
877
wds := o.censorship[buildID]
878
o.mu.RUnlock()
879
for _, w := range wds {
880
message = strings.ReplaceAll(message, w, "")
881
}
882
883
for l := range listener {
884
select {
885
case l <- &api.LogsResponse{
886
Content: []byte(message),
887
}:
888
continue
889
890
case <-time.After(5 * time.Second):
891
log.WithField("buildID", buildID).Warn("timeout while forwarding log to listener - dropping listener")
892
o.mu.Lock()
893
ll := o.logListener[buildID]
894
// In the meantime the listener list may have been removed/cleared by a call to clearListener.
895
// We don't have to do any work in this case.
896
if ll != nil {
897
close(l)
898
delete(ll, l)
899
}
900
o.mu.Unlock()
901
}
902
}
903
}
904
905
func NewRetryTimeoutClient() *http.Client {
906
retryClient := retryablehttp.NewClient()
907
retryClient.Backoff = retryablehttp.LinearJitterBackoff
908
retryClient.HTTPClient.Timeout = 15 * time.Second
909
retryClient.RetryMax = 3
910
retryClient.RetryWaitMin = 500 * time.Millisecond
911
retryClient.RetryWaitMax = 2 * time.Microsecond
912
retryClient.CheckRetry = retryablehttp.DefaultRetryPolicy
913
retryClient.Logger = log.WithField("retry", "true")
914
915
// Use a custom transport to handle retries and timeouts
916
retryClient.HTTPClient.Transport = &http.Transport{
917
DisableKeepAlives: true, // Disable keep-alives to ensure fresh connections
918
}
919
920
return retryClient.StandardClient()
921
}
922
923