Path: blob/main/components/image-builder-mk3/pkg/orchestrator/monitor.go
2500 views
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package orchestrator56import (7"context"8"encoding/json"9"errors"10"fmt"11"io"12"io/ioutil"13"net/http"14"strings"15"sync"16"time"1718wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"19"github.com/hashicorp/go-retryablehttp"20"golang.org/x/xerrors"2122"github.com/gitpod-io/gitpod/common-go/log"23"github.com/gitpod-io/gitpod/common-go/tracing"24"github.com/gitpod-io/gitpod/image-builder/api"25)2627const (28annotationRef = "ref"29annotationBaseRef = "baseref"30annotationManagedBy = "managed-by"31)3233type orchestrator interface {34PublishStatus(buildID string, resp *api.BuildResponse)35PublishLog(buildID string, message string)36}3738func newBuildMonitor(o orchestrator, wsman wsmanapi.WorkspaceManagerClient) *buildMonitor {39return &buildMonitor{40O: o,41wsman: wsman,42runningBuilds: make(map[string]*runningBuild),43logs: map[string]context.CancelFunc{},44}45}4647type buildMonitor struct {48O orchestrator4950wsman wsmanapi.WorkspaceManagerClient51runningBuilds map[string]*runningBuild52runningBuildsMu sync.RWMutex5354logs map[string]context.CancelFunc55}5657type runningBuild struct {58Info api.BuildInfo59Logs buildLogs60}6162type buildLogs struct {63IdeURL string64OwnerToken string65}6667// Run subscribes to the ws-manager, listens for build updates and distributes them internally68func (m *buildMonitor) Run() {69ctx := context.Background()70for {71wss, err := m.wsman.GetWorkspaces(ctx, &wsmanapi.GetWorkspacesRequest{72MustMatch: &wsmanapi.MetadataFilter{73Annotations: map[string]string{74annotationManagedBy: buildWorkspaceManagerID,75},76},77})78if err != nil {79log.WithError(err).Info("cannot get running builds from ws-manager - retrying")80time.Sleep(1 * time.Second)81continue82}8384m.runningBuildsMu.Lock()85m.runningBuilds = make(map[string]*runningBuild, len(wss.Status))86m.runningBuildsMu.Unlock()87for _, ws := range wss.Status {88m.handleStatusUpdate(ws)89}9091sub, err := m.wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{92MustMatch: &wsmanapi.MetadataFilter{93Annotations: map[string]string{94annotationManagedBy: buildWorkspaceManagerID,95},96},97})98if err != nil {99log.WithError(err).Info("connection to ws-manager lost - retrying")100time.Sleep(5 * time.Second)101continue102}103104for {105msg, err := sub.Recv()106if err != nil {107log.WithError(err).Info("connection to ws-manager lost - retrying")108time.Sleep(1 * time.Second)109break110}111112status := msg.GetStatus()113if status == nil {114continue115}116117m.handleStatusUpdate(status)118}119}120}121122func (m *buildMonitor) handleStatusUpdate(status *wsmanapi.WorkspaceStatus) {123var (124bld = extractRunningBuild(status)125resp = extractBuildResponse(status)126)127m.runningBuildsMu.Lock()128if resp.Status != api.BuildStatus_running {129delete(m.runningBuilds, status.Id)130} else {131m.runningBuilds[status.Id] = bld132}133m.runningBuildsMu.Unlock()134135m.O.PublishStatus(status.Id, resp)136137// handleStatusUpdate is called from a single go-routine, hence there's no need to synchronize138// access to m.logs139if bld.Info.Status == api.BuildStatus_running {140if _, ok := m.logs[status.Id]; !ok {141// we don't have a headless log listener yet, but need one142ctx, cancel := context.WithCancel(context.Background())143go listenToHeadlessLogs(ctx, bld.Logs.IdeURL, bld.Logs.OwnerToken, m.handleHeadlessLogs(status.Id))144m.logs[status.Id] = cancel145}146} else {147if cancel, ok := m.logs[status.Id]; ok {148// we have a headless log listener, and need to stop it149cancel()150delete(m.logs, status.Id)151}152}153}154155func (m *buildMonitor) handleHeadlessLogs(buildID string) listenToHeadlessLogsCallback {156return func(content []byte, err error) {157if err != nil && !errors.Is(err, context.Canceled) {158log.WithError(err).WithField("buildID", buildID).Warn("headless log listener failed")159m.O.PublishLog(buildID, "Build log listener failed. The image build is still running, but you won't see any log output.")160return161}162163if len(content) > 0 {164m.O.PublishLog(buildID, string(content))165}166}167}168169var errOutOfRetries = xerrors.Errorf("out of retries")170171// retry makes multiple attempts to execute op if op returns an UNAVAILABLE gRPC status code172func retry(ctx context.Context, op func(ctx context.Context) error, retry func(err error) bool, initialBackoff time.Duration, retries int) (err error) {173span, ctx := tracing.FromContext(ctx, "retryIfUnavailable")174defer tracing.FinishSpan(span, &err)175176for i := 0; i < retries; i++ {177err := op(ctx)178span.LogKV("attempt", i)179180if retry(err) {181time.Sleep(initialBackoff * time.Duration(1+i))182continue183}184if err != nil {185return err186}187return nil188}189190// we've maxed out our retry attempts191return errOutOfRetries192}193194func extractBuildStatus(status *wsmanapi.WorkspaceStatus) *api.BuildInfo {195s := api.BuildStatus_running196if status.Phase == wsmanapi.WorkspacePhase_STOPPING || status.Phase == wsmanapi.WorkspacePhase_STOPPED {197if status.Conditions.Failed == "" && status.Conditions.HeadlessTaskFailed == "" {198s = api.BuildStatus_done_success199} else {200s = api.BuildStatus_done_failure201}202}203204return &api.BuildInfo{205BuildId: status.Metadata.MetaId,206Ref: status.Metadata.Annotations[annotationRef],207BaseRef: status.Metadata.Annotations[annotationBaseRef],208Status: s,209StartedAt: status.Metadata.StartedAt.Seconds,210LogInfo: &api.LogInfo{211Url: status.Spec.Url,212Headers: map[string]string{213"x-gitpod-owner-token": status.Auth.OwnerToken,214},215},216}217}218219func extractRunningBuild(status *wsmanapi.WorkspaceStatus) *runningBuild {220return &runningBuild{221Info: *extractBuildStatus(status),222Logs: buildLogs{223IdeURL: status.Spec.Url,224OwnerToken: status.Auth.OwnerToken,225},226}227}228229func extractBuildResponse(status *wsmanapi.WorkspaceStatus) *api.BuildResponse {230var (231info = extractBuildStatus(status)232msg = status.Message233)234if status.Phase == wsmanapi.WorkspacePhase_STOPPING || status.Phase == wsmanapi.WorkspacePhase_STOPPED {235if status.Conditions.Failed != "" {236msg = status.Conditions.Failed237} else if status.Conditions.HeadlessTaskFailed != "" {238msg = status.Conditions.HeadlessTaskFailed239}240}241242return &api.BuildResponse{243Ref: info.Ref, // set for backwards compatibilty - new clients should consume Info244BaseRef: info.BaseRef, // set for backwards compatibilty - new clients should consume Info245Status: info.Status,246Message: msg,247Info: info,248}249}250251func (m *buildMonitor) GetAllRunningBuilds(ctx context.Context) (res []*runningBuild, err error) {252m.runningBuildsMu.RLock()253defer m.runningBuildsMu.RUnlock()254255res = make([]*runningBuild, 0, len(m.runningBuilds))256for _, ws := range m.runningBuilds {257res = append(res, ws)258}259260return261}262263func (m *buildMonitor) RegisterNewBuild(buildID string, ref, baseRef, url, ownerToken string) {264m.runningBuildsMu.Lock()265defer m.runningBuildsMu.Unlock()266267bld := &runningBuild{268Info: api.BuildInfo{269BuildId: buildID,270Ref: ref,271BaseRef: baseRef,272Status: api.BuildStatus_running,273StartedAt: time.Now().Unix(),274},275Logs: buildLogs{276IdeURL: url,277OwnerToken: ownerToken,278},279}280m.runningBuilds[buildID] = bld281log.WithField("build", bld).WithField("buildID", buildID).Debug("new build registered")282}283284type listenToHeadlessLogsCallback func(content []byte, err error)285286func listenToHeadlessLogs(ctx context.Context, url, authToken string, callback listenToHeadlessLogsCallback) {287var err error288defer func() {289if err != nil {290callback(nil, err)291}292}()293294var (295logURL string296noTerminalErr = fmt.Errorf("no terminal")297)298err = retry(ctx, func(ctx context.Context) error {299logURL, err = findTaskLogURL(ctx, url, authToken)300if err != nil {301return err302}303if strings.HasSuffix(logURL, "/listen/") {304return noTerminalErr305}306return nil307}, func(err error) bool {308return err == noTerminalErr309}, 1*time.Second, 10)310if err != nil {311return312}313log.WithField("logURL", logURL).Debug("found log URL")314callback([]byte("Connecting to log output ...\n"), nil)315316req, err := http.NewRequestWithContext(ctx, "GET", logURL, nil)317if err != nil {318return319}320req.Header.Set("x-gitpod-owner-token", authToken)321req.Header.Set("Cache", "no-cache")322323client := retryablehttp.NewClient()324client.HTTPClient = &http.Client{325Timeout: 2 * time.Second,326}327328resp, err := client.StandardClient().Do(req)329if err != nil {330return331}332log.WithField("logURL", logURL).Debug("terminal log response received")333callback([]byte("Connected to log output ...\n"), nil)334_ = resp.Body.Close()335336resp, err = http.DefaultClient.Do(req)337if err != nil {338return339}340defer resp.Body.Close()341342var line struct {343Result struct {344Data []byte `json:"data"`345} `json:"result"`346}347348dec := json.NewDecoder(resp.Body)349for err == nil {350err = dec.Decode(&line)351if errors.Is(err, io.EOF) {352// EOF is not an error in this case353err = nil354break355}356if err != nil {357break358}359360callback(line.Result.Data, nil)361}362}363364func findTaskLogURL(ctx context.Context, ideURL, authToken string) (taskLogURL string, err error) {365ideURL = strings.TrimSuffix(ideURL, "/")366tasksURL := ideURL + "/_supervisor/v1/status/tasks"367req, err := http.NewRequestWithContext(ctx, "GET", tasksURL, nil)368if err != nil {369return "", err370}371req.Header.Set("x-gitpod-owner-token", authToken)372req.Header.Set("Cache", "no-cache")373374client := retryablehttp.NewClient()375client.RetryMax = 10376client.Logger = nil377client.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {378if errors.Is(err, io.EOF) {379// the network is not reliable380return true, nil381}382if err != nil && strings.Contains(err.Error(), "received non-200 status") {383// gRPC-web race in supervisor?384return true, nil385}386if resp != nil && resp.StatusCode == http.StatusNotFound {387// We're too quick to connect - ws-proxy doesn't keep up388return true, nil389}390return retryablehttp.DefaultRetryPolicy(ctx, resp, err)391}392393resp, err := client.StandardClient().Do(req)394if err != nil {395return "", xerrors.Errorf("cannot connect to supervisor: %w", err)396}397defer resp.Body.Close()398399if resp.StatusCode != http.StatusOK {400return "", xerrors.Errorf("received non-200 status from %s: %v", tasksURL, resp.StatusCode)401}402403msg, err := ioutil.ReadAll(resp.Body)404if err != nil {405return "", err406}407408var respb struct {409Result struct {410Tasks []struct {411Terminal string `json:"terminal"`412} `json:"tasks"`413} `json:"result"`414}415err = json.Unmarshal(msg, &respb)416if err != nil {417return "", xerrors.Errorf("cannot decode supervisor status response: %w", err)418}419420if len(respb.Result.Tasks) == 0 {421return "", xerrors.Errorf("build workspace has no tasks")422}423return fmt.Sprintf("%s/_supervisor/v1/terminal/listen/%s", ideURL, respb.Result.Tasks[0].Terminal), nil424}425426427