Path: blob/main/components/image-builder-mk3/pkg/orchestrator/orchestrator.go
2500 views
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package orchestrator56import (7"context"8"crypto/sha256"9"encoding/json"10"errors"11"fmt"12"io"13"net/http"14"os"15"path/filepath"16"sort"17"strings"18"sync"19"time"2021"github.com/aws/aws-sdk-go-v2/service/ecr"22"github.com/distribution/reference"23"github.com/google/uuid"24"github.com/hashicorp/go-retryablehttp"25"github.com/opentracing/opentracing-go"26"github.com/sirupsen/logrus"27"golang.org/x/xerrors"28"google.golang.org/grpc"29"google.golang.org/grpc/codes"30"google.golang.org/grpc/credentials"31"google.golang.org/grpc/credentials/insecure"32"google.golang.org/grpc/status"3334awsconfig "github.com/aws/aws-sdk-go-v2/config"35common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"36"github.com/gitpod-io/gitpod/common-go/log"37"github.com/gitpod-io/gitpod/common-go/tracing"38csapi "github.com/gitpod-io/gitpod/content-service/api"39"github.com/gitpod-io/gitpod/image-builder/api"40protocol "github.com/gitpod-io/gitpod/image-builder/api"41"github.com/gitpod-io/gitpod/image-builder/api/config"42"github.com/gitpod-io/gitpod/image-builder/pkg/auth"43"github.com/gitpod-io/gitpod/image-builder/pkg/resolve"44wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"45)4647const (48// buildWorkspaceManagerID identifies the manager for the workspace49buildWorkspaceManagerID = "image-builder"5051// maxBuildRuntime is the maximum time a build is allowed to take52maxBuildRuntime = 60 * time.Minute5354// workspaceBuildProcessVersion controls how we build workspace images.55// Incrementing this value will trigger a rebuild of all workspace images.56workspaceBuildProcessVersion = 257)5859// NewOrchestratingBuilder creates a new orchestrating image builder60func NewOrchestratingBuilder(cfg config.Configuration) (res *Orchestrator, err error) {61var authentication auth.CompositeAuth62if cfg.PullSecretFile != "" {63fn := cfg.PullSecretFile64if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {65fn = filepath.Join(tproot, fn)66}6768ath, err := auth.NewDockerConfigFileAuth(fn)69if err != nil {70return nil, err71}72authentication = append(authentication, ath)73}74if cfg.EnableAdditionalECRAuth {75awsCfg, err := awsconfig.LoadDefaultConfig(context.Background())76if err != nil {77return nil, err78}79ecrc := ecr.NewFromConfig(awsCfg)80authentication = append(authentication, auth.NewECRAuthenticator(ecrc))81}8283var wsman wsmanapi.WorkspaceManagerClient84if c, ok := cfg.WorkspaceManager.Client.(wsmanapi.WorkspaceManagerClient); ok {85wsman = c86} else {87grpcOpts := common_grpc.DefaultClientOptions()88if cfg.WorkspaceManager.TLS.Authority != "" || cfg.WorkspaceManager.TLS.Certificate != "" && cfg.WorkspaceManager.TLS.PrivateKey != "" {89tlsConfig, err := common_grpc.ClientAuthTLSConfig(90cfg.WorkspaceManager.TLS.Authority, cfg.WorkspaceManager.TLS.Certificate, cfg.WorkspaceManager.TLS.PrivateKey,91common_grpc.WithSetRootCAs(true),92common_grpc.WithServerName("ws-manager"),93)94if err != nil {95log.WithField("config", cfg.WorkspaceManager.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")96return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)97}9899grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))100} else {101grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))102}103conn, err := grpc.Dial(cfg.WorkspaceManager.Address, grpcOpts...)104if err != nil {105return nil, err106}107wsman = wsmanapi.NewWorkspaceManagerClient(conn)108}109110retryResolveClient := NewRetryTimeoutClient()111112o := &Orchestrator{113Config: cfg,114Auth: authentication,115AuthResolver: auth.Resolver{116BaseImageRepository: cfg.BaseImageRepository,117WorkspaceImageRepository: cfg.WorkspaceImageRepository,118},119RefResolver: &resolve.StandaloneRefResolver{},120121retryResolveClient: retryResolveClient,122123wsman: wsman,124buildListener: make(map[string]map[buildListener]struct{}),125logListener: make(map[string]map[logListener]struct{}),126censorship: make(map[string][]string),127metrics: newMetrics(),128}129o.monitor = newBuildMonitor(o, o.wsman)130131return o, nil132}133134// Orchestrator runs image builds by orchestrating headless build workspaces135type Orchestrator struct {136Config config.Configuration137Auth auth.RegistryAuthenticator138AuthResolver auth.Resolver139RefResolver resolve.DockerRefResolver140141retryResolveClient *http.Client142143wsman wsmanapi.WorkspaceManagerClient144145buildListener map[string]map[buildListener]struct{}146logListener map[string]map[logListener]struct{}147censorship map[string][]string148mu sync.RWMutex149150monitor *buildMonitor151152metrics *metrics153154protocol.UnimplementedImageBuilderServer155}156157// Start fires up the internals of this image builder158func (o *Orchestrator) Start(ctx context.Context) error {159go o.monitor.Run()160return nil161}162163// ResolveBaseImage returns the "digest" form of a Docker image tag thereby making it absolute.164func (o *Orchestrator) ResolveBaseImage(ctx context.Context, req *protocol.ResolveBaseImageRequest) (resp *protocol.ResolveBaseImageResponse, err error) {165span, ctx := opentracing.StartSpanFromContext(ctx, "ResolveBaseImage")166defer tracing.FinishSpan(span, &err)167tracing.LogRequestSafe(span, req)168169reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)170171refstr, err := o.getAbsoluteImageRef(ctx, req.Ref, reqauth, req.GetUseRetryClient())172if err != nil {173return nil, err174}175176return &protocol.ResolveBaseImageResponse{177Ref: refstr,178}, nil179}180181// ResolveWorkspaceImage returns information about a build configuration without actually attempting to build anything.182func (o *Orchestrator) ResolveWorkspaceImage(ctx context.Context, req *protocol.ResolveWorkspaceImageRequest) (resp *protocol.ResolveWorkspaceImageResponse, err error) {183span, ctx := opentracing.StartSpanFromContext(ctx, "ResolveWorkspaceImage")184defer tracing.FinishSpan(span, &err)185tracing.LogRequestSafe(span, req)186187reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)188useRetryClient := req.GetUseRetryClient()189baseref, err := o.getBaseImageRef(ctx, req.Source, reqauth, useRetryClient)190if _, ok := status.FromError(err); err != nil && ok {191return nil, err192}193if err != nil {194return nil, status.Errorf(codes.Internal, "cannot resolve base image: %s", err.Error())195}196refstr, err := o.getWorkspaceImageRef(ctx, baseref)197if err != nil {198return nil, status.Errorf(codes.InvalidArgument, "cannot produce image ref: %v", err)199}200span.LogKV("refstr", refstr, "baseref", baseref)201202// to check if the image exists we must have access to the image caching registry and the refstr we check here does not come203// from the user. Thus we can safely use auth.AllowedAuthForAll here.204auth, err := auth.AllowedAuthForAll().GetAuthFor(ctx, o.Auth, refstr)205if err != nil {206return nil, status.Errorf(codes.Internal, "cannot get workspace image authentication: %v", err)207}208exists, err := o.checkImageExists(ctx, refstr, auth, useRetryClient)209if err != nil {210return nil, status.Errorf(codes.Internal, "cannot resolve workspace image: %s", err.Error())211}212213var status protocol.BuildStatus214if exists {215status = protocol.BuildStatus_done_success216} else {217status = protocol.BuildStatus_unknown218}219220return &protocol.ResolveWorkspaceImageResponse{221Status: status,222Ref: refstr,223}, nil224}225226// Build initiates the build of a Docker image using a build configuration. If a build of this227// configuration is already ongoing no new build will be started.228func (o *Orchestrator) Build(req *protocol.BuildRequest, resp protocol.ImageBuilder_BuildServer) (err error) {229span, ctx := opentracing.StartSpanFromContext(resp.Context(), "Build")230defer tracing.FinishSpan(span, &err)231tracing.LogRequestSafe(span, req)232233if req.Source == nil {234return status.Errorf(codes.InvalidArgument, "build source is missing")235}236237// resolve build request authentication238reqauth := o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)239useRetryClient := req.GetUseRetryClient()240log.WithField("forceRebuild", req.GetForceRebuild()).WithField("baseImageNameResolved", req.BaseImageNameResolved).WithField("useRetryClient", useRetryClient).Info("build request")241242// resolve to ref to baseImageNameResolved (if it exists)243if req.BaseImageNameResolved != "" && !req.GetForceRebuild() {244if req.Auth != nil && req.Auth.GetSelective() != nil {245// allow access to baseImage repository so we can look it up later246req.Auth.GetSelective().AllowBaserep = true247reqauth = o.AuthResolver.ResolveRequestAuth(ctx, req.Auth)248}249250wsrefstr, err := o.getWorkspaceImageRef(ctx, req.BaseImageNameResolved)251if err != nil {252return status.Errorf(codes.Internal, "cannot produce workspace image ref: %q", err)253}254wsrefAuth, err := reqauth.GetAuthFor(ctx, o.Auth, wsrefstr)255if err != nil {256return status.Errorf(codes.Internal, "cannot get workspace image authentication: %q", err)257}258259// check if needs build -> early return260exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, useRetryClient)261if err != nil {262return status.Errorf(codes.Internal, "cannot check if image is already built: %q", err)263}264if exists {265err = resp.Send(&protocol.BuildResponse{266Status: protocol.BuildStatus_done_success,267Ref: wsrefstr,268BaseRef: req.BaseImageNameResolved,269})270if err != nil {271return handleFailedBuildStreamResponse(err, "cannot send build response")272}273return nil274}275baseref, err := o.getAbsoluteImageRef(ctx, req.BaseImageNameResolved, reqauth, useRetryClient)276if err == nil {277req.Source.From = &protocol.BuildSource_Ref{278Ref: &protocol.BuildSourceReference{279Ref: baseref,280},281}282}283}284285log.Info("falling through to old way of building")286baseref, err := o.getBaseImageRef(ctx, req.Source, reqauth, useRetryClient)287if _, ok := status.FromError(err); err != nil && ok {288log.WithError(err).Error("gRPC status error")289return err290}291if err != nil {292log.WithError(err).Error("cannot get base image ref")293return status.Errorf(codes.Internal, "cannot resolve base image: %s", err.Error())294}295296wsrefstr, err := o.getWorkspaceImageRef(ctx, baseref)297if err != nil {298return status.Errorf(codes.Internal, "cannot produce workspace image ref: %q", err)299}300wsrefAuth, err := auth.AllowedAuthForAll().GetAuthFor(ctx, o.Auth, wsrefstr)301if err != nil {302return status.Errorf(codes.Internal, "cannot get workspace image authentication: %q", err)303}304305// check if needs build -> early return306exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, req.GetUseRetryClient())307if err != nil {308return status.Errorf(codes.Internal, "cannot check if image is already built: %q", err)309}310if exists && !req.GetForceRebuild() {311// image has already been built - no need for us to start building312err = resp.Send(&protocol.BuildResponse{313Status: protocol.BuildStatus_done_success,314Ref: wsrefstr,315BaseRef: baseref,316})317if err != nil {318return handleFailedBuildStreamResponse(err, "cannot send build response")319}320return nil321}322323o.metrics.BuildStarted()324325// Once a build is running we don't want it cancelled becuase the server disconnected i.e. during deployment.326// Instead we want to impose our own timeout/lifecycle on the build. Using context.WithTimeout does not shadow its parent's327// cancelation (see https://play.golang.org/p/N3QBIGlp8Iw for an example/experiment).328ctx, cancel := context.WithTimeout(&parentCantCancelContext{Delegate: ctx}, maxBuildRuntime)329defer cancel()330331randomUUID, err := uuid.NewRandom()332if err != nil {333return status.Errorf(codes.Internal, "failed to generate build ID: %v", err)334}335buildID := randomUUID.String()336log := log.WithField("buildID", buildID)337338var (339buildBase = "false"340contextPath = "."341dockerfilePath = "Dockerfile"342)343var initializer *csapi.WorkspaceInitializer = &csapi.WorkspaceInitializer{344Spec: &csapi.WorkspaceInitializer_Empty{345Empty: &csapi.EmptyInitializer{},346},347}348if fsrc := req.Source.GetFile(); fsrc != nil {349buildBase = "true"350initializer = fsrc.Source351contextPath = fsrc.ContextPath352dockerfilePath = fsrc.DockerfilePath353}354dockerfilePath = filepath.Join("/workspace", dockerfilePath)355356if contextPath == "" {357contextPath = filepath.Dir(dockerfilePath)358}359contextPath = filepath.Join("/workspace", strings.TrimPrefix(contextPath, "/workspace"))360361o.censor(buildID, []string{362wsrefstr,363baseref,364strings.Split(wsrefstr, ":")[0],365strings.Split(baseref, ":")[0],366})367368// push some log to the client before starting the job, just in case the build workspace takes a while to start up369o.PublishLog(buildID, "starting image build")370371retryIfUnavailable1 := func(err error) bool {372if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {373return true374}375return false376}377378pbaseref, err := reference.ParseNormalizedNamed(baseref)379if err != nil {380return status.Errorf(codes.InvalidArgument, "cannot parse baseref: %v", err)381}382bobBaseref := "localhost:8080/base"383if r, ok := pbaseref.(reference.Digested); ok {384bobBaseref += "@" + r.Digest().String()385} else {386bobBaseref += ":latest"387}388wsref, err := reference.ParseNamed(wsrefstr)389var additionalAuth []byte390if err == nil {391ath := reqauth.GetImageBuildAuthFor(ctx, o.Auth, []string{reference.Domain(pbaseref), auth.DummyECRRegistryDomain}, []string{392reference.Domain(wsref),393})394additionalAuth, err = json.Marshal(ath)395if err != nil {396return status.Errorf(codes.InvalidArgument, "cannot marshal additional auth: %v", err)397}398}399400var swr *wsmanapi.StartWorkspaceResponse401err = retry(ctx, func(ctx context.Context) (err error) {402swr, err = o.wsman.StartWorkspace(ctx, &wsmanapi.StartWorkspaceRequest{403Id: buildID,404ServicePrefix: buildID,405Metadata: &wsmanapi.WorkspaceMetadata{406MetaId: buildID,407Annotations: map[string]string{408annotationRef: wsrefstr,409annotationBaseRef: baseref,410annotationManagedBy: buildWorkspaceManagerID,411},412Owner: req.GetTriggeredBy(),413},414Spec: &wsmanapi.StartWorkspaceSpec{415Initializer: initializer,416Timeout: maxBuildRuntime.String(),417WorkspaceImage: o.Config.BuilderImage,418IdeImage: &wsmanapi.IDEImage{419WebRef: o.Config.BuilderImage,420SupervisorRef: req.SupervisorRef,421},422WorkspaceLocation: contextPath,423Envvars: []*wsmanapi.EnvironmentVariable{424{Name: "BOB_TARGET_REF", Value: "localhost:8080/target:latest"},425{Name: "BOB_BASE_REF", Value: bobBaseref},426{Name: "BOB_BUILD_BASE", Value: buildBase},427{Name: "BOB_DOCKERFILE_PATH", Value: dockerfilePath},428{Name: "BOB_CONTEXT_DIR", Value: contextPath},429{Name: "GITPOD_TASKS", Value: `[{"name": "build", "init": "sudo -E /app/bob build"}]`},430{Name: "WORKSPACEKIT_RING2_ENCLAVE", Value: "/app/bob proxy"},431{Name: "WORKSPACEKIT_BOBPROXY_BASEREF", Value: baseref},432{Name: "WORKSPACEKIT_BOBPROXY_TARGETREF", Value: wsrefstr},433{434Name: "WORKSPACEKIT_BOBPROXY_AUTH",435Secret: &wsmanapi.EnvironmentVariable_SecretKeyRef{436SecretName: o.Config.PullSecret,437Key: ".dockerconfigjson",438},439},440{441Name: "WORKSPACEKIT_BOBPROXY_ADDITIONALAUTH",442Value: string(additionalAuth),443},444{Name: "SUPERVISOR_DEBUG_ENABLE", Value: fmt.Sprintf("%v", log.Logger.IsLevelEnabled(logrus.DebugLevel))},445},446},447Type: wsmanapi.WorkspaceType_IMAGEBUILD,448})449return450}, retryIfUnavailable1, 1*time.Second, 10)451if status.Code(err) == codes.AlreadyExists {452// build is already running - do not add it to the list of builds453} else if errors.Is(err, errOutOfRetries) {454return status.Error(codes.Unavailable, "workspace services are currently unavailable")455} else if err != nil {456return status.Errorf(codes.Internal, "cannot start build: %q", err)457} else {458o.monitor.RegisterNewBuild(buildID, wsrefstr, baseref, swr.Url, swr.OwnerToken)459o.PublishLog(buildID, "starting image build ...\n")460}461462updates, cancel := o.registerBuildListener(buildID)463defer cancel()464for {465update := <-updates466if update == nil {467// channel was closed unexpectatly468return status.Error(codes.Aborted, "subscription canceled - please try again")469}470471// The failed condition of ws-manager is not stable, hence we might wrongly report that the472// build was successful when in fact it wasn't. This would break workspace startup with a strange473// "cannot pull from reg.gitpod.io" error message. Instead the image-build should fail properly.474// To do this, we resolve the built image afterwards to ensure it was actually built.475if update.Status == protocol.BuildStatus_done_success {476exists, err := o.checkImageExists(ctx, wsrefstr, wsrefAuth, useRetryClient)477if err != nil {478update.Status = protocol.BuildStatus_done_failure479update.Message = fmt.Sprintf("cannot check if workspace image exists after the build: %v", err)480} else if !exists {481update.Status = protocol.BuildStatus_done_failure482update.Message = "image build did not produce a workspace image"483}484}485486err := resp.Send(update)487if err != nil {488log.WithError(err).Info("cannot forward build update - dropping listener")489return handleFailedBuildStreamResponse(err, "cannot send update")490}491492if update.Status == protocol.BuildStatus_done_failure || update.Status == protocol.BuildStatus_done_success {493// build is done494o.clearListener(buildID)495o.metrics.BuildDone(update.Status == protocol.BuildStatus_done_success)496if update.Status != protocol.BuildStatus_done_success {497log.WithField("UserID", req.GetTriggeredBy()).Error("image build done failed for user")498}499break500}501}502503return nil504}505506// publishStatus broadcasts a build status update to all listeners507func (o *Orchestrator) PublishStatus(buildID string, resp *api.BuildResponse) {508o.mu.RLock()509listener, ok := o.buildListener[buildID]510o.mu.RUnlock()511512// we don't have any log listener for this build513if !ok {514return515}516517log.WithField("buildID", buildID).WithField("resp", resp).Debug("publishing status")518519for l := range listener {520select {521case l <- resp:522continue523524case <-time.After(5 * time.Second):525log.Warn("timeout while forwarding status to listener - dropping listener")526o.mu.Lock()527ll := o.buildListener[buildID]528// In the meantime the listener list may have been removed/cleared by a call to clearListener.529// We don't have to do any work in this case.530if ll != nil {531close(l)532delete(ll, l)533}534o.mu.Unlock()535}536}537}538539// Logs listens to the build output of an ongoing Docker build identified build the build ID540func (o *Orchestrator) Logs(req *protocol.LogsRequest, resp protocol.ImageBuilder_LogsServer) (err error) {541span, ctx := opentracing.StartSpanFromContext(resp.Context(), "Logs")542defer tracing.FinishSpan(span, &err)543tracing.LogRequestSafe(span, req)544545rb, err := o.monitor.GetAllRunningBuilds(ctx)546var buildID string547for _, bld := range rb {548if bld.Info.Ref == req.BuildRef {549buildID = bld.Info.BuildId550break551}552}553if buildID == "" {554return status.Error(codes.NotFound, "build not found")555}556557logs, cancel := o.registerLogListener(buildID)558defer cancel()559for {560update := <-logs561if update == nil {562break563}564565err := resp.Send(update)566if err != nil {567log.WithError(err).Info("cannot forward log output - dropping listener")568return handleFailedBuildStreamResponse(err, "cannot send log output")569}570}571572return573}574575// ListBuilds returns a list of currently running builds576func (o *Orchestrator) ListBuilds(ctx context.Context, req *protocol.ListBuildsRequest) (resp *protocol.ListBuildsResponse, err error) {577span, ctx := opentracing.StartSpanFromContext(ctx, "ListBuilds")578defer tracing.FinishSpan(span, &err)579580builds, err := o.monitor.GetAllRunningBuilds(ctx)581if err != nil {582return583}584585res := make([]*protocol.BuildInfo, 0, len(builds))586for _, ws := range builds {587res = append(res, &ws.Info)588}589590return &protocol.ListBuildsResponse{Builds: res}, nil591}592593func (o *Orchestrator) checkImageExists(ctx context.Context, ref string, authentication *auth.Authentication, useRetryClient bool) (exists bool, err error) {594span, ctx := opentracing.StartSpanFromContext(ctx, "checkImageExists")595defer tracing.FinishSpan(span, &err)596span.SetTag("ref", ref)597598_, err = o.RefResolver.Resolve(ctx, ref, resolve.WithAuthentication(authentication), o.withRetryIfEnabled(useRetryClient))599if errors.Is(err, resolve.ErrNotFound) {600return false, nil601}602if errors.Is(err, resolve.ErrUnauthorized) {603return false, status.Errorf(codes.Unauthenticated, "cannot check if image exists: %q", err)604}605if err != nil {606return false, err607}608609return true, nil610}611612// getAbsoluteImageRef returns the "digest" form of an image, i.e. contains no mutable image tags613func (o *Orchestrator) getAbsoluteImageRef(ctx context.Context, ref string, allowedAuth auth.AllowedAuthFor, useRetryClient bool) (res string, err error) {614span, ctx := opentracing.StartSpanFromContext(ctx, "getAbsoluteImageRefWithResolver")615defer tracing.FinishSpan(span, &err)616span.LogKV("ref", ref)617span.LogKV("useRetryClient", useRetryClient)618619log.WithField("ref", ref).WithField("useRetryClient", useRetryClient).Debug("getAbsoluteImageRefWithResolver")620auth, err := allowedAuth.GetAuthFor(ctx, o.Auth, ref)621if err != nil {622return "", status.Errorf(codes.InvalidArgument, "cannt resolve base image ref: %v", err)623}624625ref, err = o.RefResolver.Resolve(ctx, ref, resolve.WithAuthentication(auth), o.withRetryIfEnabled(useRetryClient))626if errors.Is(err, resolve.ErrNotFound) {627return "", status.Error(codes.NotFound, "cannot resolve image")628}629if errors.Is(err, resolve.ErrUnauthorized) {630if auth == nil {631log.WithField("ref", ref).Warn("auth was nil")632} else if auth.Auth == "" && auth.Password == "" {633log.WithField("ref", ref).Warn("auth was empty")634}635return "", status.Error(codes.Unauthenticated, "cannot resolve image")636}637if resolve.TooManyRequestsMatcher(err) {638return "", status.Errorf(codes.Unavailable, "upstream registry responds with 'too many request': %v", err)639}640if err != nil {641return "", status.Errorf(codes.Internal, "cannot resolve image: %v", err)642}643return ref, nil644}645646func (o *Orchestrator) withRetryIfEnabled(useRetryClient bool) resolve.DockerRefResolverOption {647if useRetryClient {648return resolve.WithHttpClient(o.retryResolveClient)649}650return resolve.WithHttpClient(nil)651}652653func (o *Orchestrator) getBaseImageRef(ctx context.Context, bs *protocol.BuildSource, allowedAuth auth.AllowedAuthFor, useRetryClient bool) (res string, err error) {654span, ctx := opentracing.StartSpanFromContext(ctx, "getBaseImageRef")655defer tracing.FinishSpan(span, &err)656657switch src := bs.From.(type) {658case *protocol.BuildSource_Ref:659return o.getAbsoluteImageRef(ctx, src.Ref.Ref, allowedAuth, useRetryClient)660661case *protocol.BuildSource_File:662manifest := map[string]string{663"DockerfilePath": src.File.DockerfilePath,664"DockerfileVersion": src.File.DockerfileVersion,665"ContextPath": src.File.ContextPath,666}667// workspace starter will only ever send us Git sources. Should that ever change, we'll need to add668// manifest support for the other initializer types.669if src.File.Source.GetGit() != nil {670fsrc := src.File.Source.GetGit()671manifest["Source"] = "git"672manifest["CloneTarget"] = fsrc.CloneTaget673manifest["RemoteURI"] = fsrc.RemoteUri674} else {675return "", xerrors.Errorf("unsupported context initializer")676}677// Go maps do NOT maintain their order - we must sort the keys to maintain a stable order678var keys []string679for k := range manifest {680keys = append(keys, k)681}682sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })683var dfl string684for _, k := range keys {685dfl += fmt.Sprintf("%s: %s\n", k, manifest[k])686}687span.LogKV("manifest", dfl)688689hash := sha256.New()690n, err := hash.Write([]byte(dfl))691if err != nil {692return "", xerrors.Errorf("cannot compute src image ref: %w", err)693}694if n < len(dfl) {695return "", xerrors.Errorf("cannot compute src image ref: short write")696}697698// the mkII image builder supported an image hash salt. That salt broke other assumptions,699// which is why this mkIII implementation does not support it anymore. We need to stay compatible700// with the previous means of computing the hash though. This is why we add an extra breakline here,701// basically defaulting to an empty salt string.702_, err = fmt.Fprintln(hash, "")703if err != nil {704return "", xerrors.Errorf("cannot compute src image ref: %w", err)705}706707return fmt.Sprintf("%s:%x", o.Config.BaseImageRepository, hash.Sum([]byte{})), nil708709default:710return "", xerrors.Errorf("invalid base image")711}712}713714func (o *Orchestrator) getWorkspaceImageRef(ctx context.Context, baseref string) (ref string, err error) {715cnt := []byte(fmt.Sprintf("%s\n%d\n", baseref, workspaceBuildProcessVersion))716hash := sha256.New()717n, err := hash.Write(cnt)718if err != nil {719return "", xerrors.Errorf("cannot produce workspace image name: %w", err)720}721if n < len(cnt) {722return "", xerrors.Errorf("cannot produce workspace image name: %w", io.ErrShortWrite)723}724725dst := hash.Sum([]byte{})726return fmt.Sprintf("%s:%x", o.Config.WorkspaceImageRepository, dst), nil727}728729func handleFailedBuildStreamResponse(err error, msg string) error {730if err == nil {731// OK is OK732return nil733}734735// If the error is a context.DeadlineExceeded, we return nil (OK) as requested.736if errors.Is(err, context.DeadlineExceeded) {737// Return nil (OK) for DeadlineExceeded738return nil739}740741// If it's already a gRPC status error, check for DeadlineExceeded742if st, ok := status.FromError(err); ok {743if st.Code() == codes.DeadlineExceeded {744// Return nil (OK) for DeadlineExceeded as requested745return nil746}747748log.WithError(err).WithField("code", status.Code(err)).Error(fmt.Sprintf("unexpected error while sending build response: %s", msg))749return err750}751752log.WithError(err).Error(fmt.Sprintf("unexpected error while sending build response: %s", msg))753return status.Errorf(codes.Unavailable, "%s: %v", msg, err)754}755756// parentCantCancelContext is a bit of a hack. We have some operations which we want to keep alive even after clients757// disconnect. gRPC cancels the context once a client disconnects, thus we intercept the cancelation and act as if758// nothing had happened.759//760// This cannot be the best way to do this. Ideally we'd like to intercept client disconnect, but maintain the usual761// cancelation mechanism such as deadlines, timeouts, explicit cancelation.762type parentCantCancelContext struct {763Delegate context.Context764done chan struct{}765}766767func (*parentCantCancelContext) Deadline() (deadline time.Time, ok bool) {768// return ok==false which means there's no deadline set769return time.Time{}, false770}771772func (c *parentCantCancelContext) Done() <-chan struct{} {773return c.done774}775776func (c *parentCantCancelContext) Err() error {777err := c.Delegate.Err()778if err == context.Canceled {779return nil780}781782return err783}784785func (c *parentCantCancelContext) Value(key interface{}) interface{} {786return c.Delegate.Value(key)787}788789type buildListener chan *api.BuildResponse790791type logListener chan *api.LogsResponse792793func (o *Orchestrator) registerBuildListener(buildID string) (c <-chan *api.BuildResponse, cancel func()) {794o.mu.Lock()795defer o.mu.Unlock()796797l := make(buildListener)798ls := o.buildListener[buildID]799if ls == nil {800ls = make(map[buildListener]struct{})801}802ls[l] = struct{}{}803o.buildListener[buildID] = ls804805cancel = func() {806o.mu.Lock()807defer o.mu.Unlock()808ls := o.buildListener[buildID]809if ls == nil {810return811}812delete(ls, l)813o.buildListener[buildID] = ls814}815return l, cancel816}817818func (o *Orchestrator) registerLogListener(buildID string) (c <-chan *api.LogsResponse, cancel func()) {819o.mu.Lock()820defer o.mu.Unlock()821822l := make(logListener)823ls := o.logListener[buildID]824if ls == nil {825ls = make(map[logListener]struct{})826}827ls[l] = struct{}{}828o.logListener[buildID] = ls829log.WithField("buildID", buildID).WithField("listener", len(ls)).Debug("registered log listener")830831cancel = func() {832o.mu.Lock()833defer o.mu.Unlock()834ls := o.logListener[buildID]835if ls == nil {836return837}838delete(ls, l)839o.logListener[buildID] = ls840841log.WithField("buildID", buildID).WithField("listener", len(ls)).Debug("deregistered log listener")842}843return l, cancel844}845846// clearListener removes all listener for a particular build847func (o *Orchestrator) clearListener(buildID string) {848o.mu.Lock()849defer o.mu.Unlock()850851delete(o.buildListener, buildID)852delete(o.logListener, buildID)853delete(o.censorship, buildID)854}855856// censor registers tokens that are censored in the log output857func (o *Orchestrator) censor(buildID string, words []string) {858o.mu.Lock()859defer o.mu.Unlock()860861o.censorship[buildID] = words862}863864// PublishLog broadcasts log output to all registered listener865func (o *Orchestrator) PublishLog(buildID string, message string) {866o.mu.RLock()867listener, ok := o.logListener[buildID]868o.mu.RUnlock()869870// we don't have any log listener for this build871if !ok {872return873}874875o.mu.RLock()876wds := o.censorship[buildID]877o.mu.RUnlock()878for _, w := range wds {879message = strings.ReplaceAll(message, w, "")880}881882for l := range listener {883select {884case l <- &api.LogsResponse{885Content: []byte(message),886}:887continue888889case <-time.After(5 * time.Second):890log.WithField("buildID", buildID).Warn("timeout while forwarding log to listener - dropping listener")891o.mu.Lock()892ll := o.logListener[buildID]893// In the meantime the listener list may have been removed/cleared by a call to clearListener.894// We don't have to do any work in this case.895if ll != nil {896close(l)897delete(ll, l)898}899o.mu.Unlock()900}901}902}903904func NewRetryTimeoutClient() *http.Client {905retryClient := retryablehttp.NewClient()906retryClient.Backoff = retryablehttp.LinearJitterBackoff907retryClient.HTTPClient.Timeout = 15 * time.Second908retryClient.RetryMax = 3909retryClient.RetryWaitMin = 500 * time.Millisecond910retryClient.RetryWaitMax = 2 * time.Microsecond911retryClient.CheckRetry = retryablehttp.DefaultRetryPolicy912retryClient.Logger = log.WithField("retry", "true")913914// Use a custom transport to handle retries and timeouts915retryClient.HTTPClient.Transport = &http.Transport{916DisableKeepAlives: true, // Disable keep-alives to ensure fresh connections917}918919return retryClient.StandardClient()920}921922923