Path: blob/main/components/ws-manager-bridge/src/bridge.ts
2498 views
/**1* Copyright (c) 2020 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import { inject, injectable } from "inversify";7import {8Disposable,9Queue,10WorkspaceInstancePort,11PortVisibility,12DisposableCollection,13PortProtocol,14} from "@gitpod/gitpod-protocol";15import * as protocol from "@gitpod/gitpod-protocol";16import {17WorkspaceStatus,18WorkspacePhase,19WorkspaceConditionBool,20PortVisibility as WsManPortVisibility,21PortProtocol as WsManPortProtocol,22DescribeClusterRequest,23WorkspaceType,24InitializerMetrics,25InitializerMetric,26} from "@gitpod/ws-manager/lib";27import { scrubber, TrustedValue } from "@gitpod/gitpod-protocol/lib/util/scrubbing";28import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";29import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";30import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";31import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";32import { TracedWorkspaceDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";33import { Metrics } from "./metrics";34import { ClientProvider, WsmanSubscriber } from "./wsman-subscriber";35import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";36import { Configuration } from "./config";37import { WorkspaceClass, WorkspaceCluster, WorkspaceClusterDB } from "@gitpod/gitpod-protocol/lib/workspace-cluster";38import { performance } from "perf_hooks";39import { WorkspaceInstanceController } from "./workspace-instance-controller";40import { PrebuildUpdater } from "./prebuild-updater";41import { RedisPublisher } from "@gitpod/gitpod-db/lib";42import { merge } from "ts-deepmerge";4344export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");4546function toBool(b: WorkspaceConditionBool | undefined): boolean | undefined {47if (b === WorkspaceConditionBool.EMPTY) {48return;49}5051return b === WorkspaceConditionBool.TRUE;52}5354export type WorkspaceClusterInfo = Pick<WorkspaceCluster, "name" | "url">;5556type UpdateQueue = {57instanceId: string;58lastStatus?: WorkspaceStatus;59queue: Queue;60};61namespace UpdateQueue {62export function create(instanceId: string): UpdateQueue {63return { instanceId, queue: new Queue() };64}65}6667@injectable()68export class WorkspaceManagerBridge implements Disposable {69constructor(70@inject(WorkspaceClusterDB) private readonly clusterDB: WorkspaceClusterDB,71@inject(TracedWorkspaceDB) private readonly workspaceDB: DBWithTracing<WorkspaceDB>,72@inject(Metrics) private readonly metrics: Metrics,73@inject(Configuration) private readonly config: Configuration,74@inject(IAnalyticsWriter) private readonly analytics: IAnalyticsWriter,75@inject(PrebuildUpdater) private readonly prebuildUpdater: PrebuildUpdater,76@inject(WorkspaceInstanceController) private readonly workspaceInstanceController: WorkspaceInstanceController, // bound in "transient" mode: we expect to receive a fresh instance here77@inject(RedisPublisher) private readonly publisher: RedisPublisher,78) {}7980protected readonly disposables = new DisposableCollection();81protected readonly queues = new Map<string, UpdateQueue>();8283protected cluster: WorkspaceClusterInfo;8485public start(cluster: WorkspaceClusterInfo, clientProvider: ClientProvider) {86const logPayload = { name: cluster.name, url: cluster.url };87log.info(`Starting bridge to cluster...`, logPayload);88this.cluster = cluster;8990const startStatusUpdateHandler = () => {91log.debug(`Starting status update handler: ${cluster.name}`, logPayload);92/* no await */ this.startStatusUpdateHandler(clientProvider, logPayload)93// this is a mere safe-guard: we do not expect the code inside to fail94.catch((err) => log.error("Cannot start status update handler", err));95};9697// notify servers and _update the DB_98startStatusUpdateHandler();99100// the actual "governing" part101const controllerIntervalSeconds = this.config.controllerIntervalSeconds;102if (controllerIntervalSeconds <= 0) {103throw new Error("controllerIntervalSeconds <= 0!");104}105106log.debug(`Starting controller: ${cluster.name}`, logPayload);107// Control all workspace instances, either against ws-manager or configured timeouts108this.workspaceInstanceController.start(109cluster.name,110clientProvider,111controllerIntervalSeconds,112this.config.controllerMaxDisconnectSeconds,113);114this.disposables.push(this.workspaceInstanceController);115116const tim = setInterval(() => {117this.updateWorkspaceClasses(cluster, clientProvider);118}, controllerIntervalSeconds * 1000);119this.disposables.push({ dispose: () => clearInterval(tim) });120121log.info(`Started bridge to cluster.`, logPayload);122}123124public stop() {125this.dispose();126}127128protected async updateWorkspaceClasses(clusterInfo: WorkspaceClusterInfo, clientProvider: ClientProvider) {129try {130const client = await clientProvider();131const resp = await client.describeCluster({}, new DescribeClusterRequest());132133const cluster = await this.clusterDB.findByName(clusterInfo.name);134if (!cluster) {135return;136}137cluster.availableWorkspaceClasses = resp.getWorkspaceClassesList().map((c) => {138return <WorkspaceClass>{139creditsPerMinute: c.getCreditsPerMinute(),140description: c.getDescription(),141displayName: c.getDisplayName(),142id: c.getId(),143};144});145cluster.preferredWorkspaceClass = resp.getPreferredWorkspaceClass();146147await this.clusterDB.save(cluster);148} catch (e) {149log.error({}, "Failed to update workspace classes", e, { clusterInfo });150}151}152153protected async startStatusUpdateHandler(clientProvider: ClientProvider, logPayload: {}): Promise<void> {154const subscriber = new WsmanSubscriber(clientProvider);155this.disposables.push(subscriber);156157const onReconnect = (ctx: TraceContext, s: WorkspaceStatus[]) => {158log.info("ws-manager subscriber reconnected", logPayload);159s.forEach((sx) => this.queueMessagesByInstanceId(ctx, sx));160};161const onStatusUpdate = (ctx: TraceContext, s: WorkspaceStatus) => {162this.queueMessagesByInstanceId(ctx, s);163};164await subscriber.subscribe({ onReconnect, onStatusUpdate }, logPayload);165}166167protected queueMessagesByInstanceId(ctx: TraceContext, status: WorkspaceStatus) {168const instanceId = status.getId();169if (!instanceId) {170log.warn("Received invalid message, could not read instanceId!", { msg: status });171return;172}173174// We can't just handle the status update directly, but have to "serialize" it to ensure the updates stay in order.175// If we did not do this, the async nature of our code would allow for one message to overtake the other.176const updateQueue = this.queues.get(instanceId) || UpdateQueue.create(instanceId);177this.queues.set(instanceId, updateQueue);178179updateQueue.queue.enqueue(async () => {180try {181await this.handleStatusUpdate(ctx, status, updateQueue.lastStatus);182updateQueue.lastStatus = status;183} catch (err) {184log.error({ instanceId }, err);185// if an error ocurrs, we want to be save, and better make sure we don't accidentally skip the next update186updateQueue.lastStatus = undefined;187}188});189}190191protected async handleStatusUpdate(192ctx: TraceContext,193rawStatus: WorkspaceStatus,194lastStatusUpdate: WorkspaceStatus | undefined,195) {196const start = performance.now();197const status = rawStatus.toObject();198log.info("Handling WorkspaceStatus update", { status: new TrustedValue(filterStatus(status)) });199200if (!status.spec || !status.metadata || !status.conditions) {201log.warn("Received invalid status update", status);202return;203}204205const logCtx = {206instanceId: status.id!,207workspaceId: status.metadata!.metaId!,208userId: status.metadata!.owner!,209};210const workspaceType = toWorkspaceType(status.spec.type);211212let updateError: any | undefined;213let skipUpdate = false;214try {215this.metrics.reportWorkspaceInstanceUpdateStarted(this.cluster.name, workspaceType);216217// If the last status update is identical to the current one, we can skip the update.218skipUpdate = !!lastStatusUpdate && !hasRelevantDiff(rawStatus, lastStatusUpdate);219if (skipUpdate) {220log.info(logCtx, "Skipped WorkspaceInstance status update");221return;222}223224await this.statusUpdate(ctx, rawStatus);225226log.info(logCtx, "Successfully completed WorkspaceInstance status update");227} catch (e) {228updateError = e;229230log.error(logCtx, "Failed to complete WorkspaceInstance status update", e);231throw e;232} finally {233const durationMs = performance.now() - start;234this.metrics.reportWorkspaceInstanceUpdateCompleted(235durationMs / 1000,236this.cluster.name,237workspaceType,238skipUpdate,239updateError,240);241}242}243244private async statusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus) {245const status = rawStatus.toObject();246247if (!status.spec || !status.metadata || !status.conditions) {248return;249}250251const span = TraceContext.startSpan("handleStatusUpdate", ctx);252span.setTag("status", JSON.stringify(filterStatus(status)));253span.setTag("statusVersion", status.statusVersion);254try {255// Beware of the ID mapping here: What's a workspace to the ws-manager is a workspace instance to the rest of the system.256// The workspace ID of ws-manager is the workspace instance ID in the database.257// The meta ID of ws-manager is the workspace ID in the database.258const instanceId = status.id!;259const workspaceId = status.metadata!.metaId!;260const userId = status.metadata!.owner!;261const logContext: LogContext = {262userId,263instanceId,264workspaceId,265};266267const instance = await this.workspaceDB.trace({ span }).findInstanceById(instanceId);268if (instance) {269this.metrics.statusUpdateReceived(this.cluster.name, true);270} else {271// This scenario happens when the update for a WorkspaceInstance is picked up by a ws-manager-bridge in a different region,272// before periodic deleter finished running. This is because all ws-manager-bridge instances receive updates from all WorkspaceClusters.273// We ignore this update because we do not have anything to reconcile this update against, but also because we assume it is handled274// by another instance of ws-manager-bridge that is in the region where the WorkspaceInstance record was created.275this.metrics.statusUpdateReceived(this.cluster.name, false);276return;277}278279const currentStatusVersion = instance.status.version || 0;280if (currentStatusVersion > 0 && currentStatusVersion >= status.statusVersion) {281// We've gotten an event which is older than one we've already processed. We shouldn't process the stale one.282span.setTag("statusUpdate.staleEvent", true);283this.metrics.recordStaleStatusUpdate();284log.debug(ctx, "Stale status update received, skipping.");285}286287if (!!status.spec.exposedPortsList) {288instance.status.exposedPorts = status.spec.exposedPortsList.map((p) => {289return <WorkspaceInstancePort>{290port: p.port,291visibility: mapPortVisibility(p.visibility),292protocol: mapPortProtocol(p.protocol),293url: p.url,294};295});296}297298if (!instance.status.conditions.firstUserActivity && status.conditions.firstUserActivity) {299// Only report this when it's observed the first time300const firstUserActivity = mapFirstUserActivity(rawStatus.getConditions()!.getFirstUserActivity())!;301this.metrics.observeFirstUserActivity(instance, firstUserActivity);302}303304instance.ideUrl = status.spec.url!;305instance.status.version = status.statusVersion;306instance.status.timeout = status.spec.timeout;307if (!!instance.status.conditions.failed && !status.conditions.failed) {308// We already have a "failed" condition, and received an empty one: This is a bug, "failed" conditions are terminal per definition.309// Do not override!310log.error(logContext, 'We received an empty "failed" condition overriding an existing one!', {311current: instance.status.conditions.failed,312});313314// TODO(gpl) To make ensure we do not break anything big time we keep the unconditional override for now, and observe for some time.315instance.status.conditions.failed = status.conditions.failed;316} else {317instance.status.conditions.failed = status.conditions.failed;318}319instance.status.conditions.pullingImages = toBool(status.conditions.pullingImages!);320instance.status.conditions.deployed = toBool(status.conditions.deployed);321if (!instance.deployedTime && instance.status.conditions.deployed) {322// This is the first time we see the workspace pod being deployed.323// Like all other timestamps, it's set when bridge observes it, not when it actually happened (which only ws-manager could decide).324instance.deployedTime = new Date().toISOString();325}326instance.status.conditions.timeout = status.conditions.timeout;327instance.status.conditions.firstUserActivity = mapFirstUserActivity(328rawStatus.getConditions()!.getFirstUserActivity(),329);330instance.status.conditions.headlessTaskFailed = status.conditions.headlessTaskFailed;331instance.status.conditions.stoppedByRequest = toBool(status.conditions.stoppedByRequest);332instance.status.message = status.message;333instance.status.nodeName = instance.status.nodeName || status.runtime?.nodeName;334instance.status.podName = instance.status.podName || status.runtime?.podName;335instance.status.nodeIp = instance.status.nodeIp || status.runtime?.nodeIp;336instance.status.ownerToken = status.auth!.ownerToken;337// TODO(gpl): fade this our in favor of only using DBWorkspaceInstanceMetrics338instance.status.metrics = {339image: {340totalSize: instance.status.metrics?.image?.totalSize || status.metadata.metrics?.image?.totalSize,341workspaceImageSize:342instance.status.metrics?.image?.workspaceImageSize ||343status.metadata.metrics?.image?.workspaceImageSize,344},345};346347let lifecycleHandler: (() => Promise<void>) | undefined;348switch (status.phase) {349case WorkspacePhase.PENDING:350instance.status.phase = "pending";351break;352case WorkspacePhase.CREATING:353instance.status.phase = "creating";354break;355case WorkspacePhase.INITIALIZING:356instance.status.phase = "initializing";357break;358case WorkspacePhase.RUNNING:359if (!instance.startedTime) {360instance.startedTime = new Date().toISOString();361this.metrics.observeWorkspaceStartupTime(instance);362this.analytics.track({363event: "workspace_running",364messageId: `bridge-wsrun-${instance.id}`,365properties: { instanceId: instance.id, workspaceId: workspaceId },366userId,367timestamp: new Date(instance.startedTime),368});369}370371instance.status.phase = "running";372// let's check if the state is inconsistent and be loud if it is.373if (instance.stoppedTime || instance.stoppingTime) {374log.error("Resetting already stopped workspace to running.", {375instanceId: instance.id,376stoppedTime: instance.stoppedTime,377});378instance.stoppedTime = undefined;379instance.stoppingTime = undefined;380}381break;382case WorkspacePhase.INTERRUPTED:383instance.status.phase = "interrupted";384break;385case WorkspacePhase.STOPPING:386if (instance.status.phase != "stopped") {387instance.status.phase = "stopping";388if (!instance.stoppingTime) {389// The first time a workspace enters stopping we record that as it's stoppingTime time.390// This way we don't take the time a workspace requires to stop into account when391// computing the time a workspace instance was running.392instance.stoppingTime = new Date().toISOString();393}394} else {395log.warn(logContext, "Got a stopping event for an already stopped workspace.", instance);396}397break;398case WorkspacePhase.STOPPED:399const now = new Date().toISOString();400instance.stoppedTime = now;401instance.status.phase = "stopped";402if (!instance.stoppingTime) {403// It's possible we've never seen a stopping update, hence have not set the `stoppingTime`404// yet. Just for this case we need to set it now.405instance.stoppingTime = now;406}407lifecycleHandler = () => this.workspaceInstanceController.onStopped({ span }, userId, instance);408break;409}410411span.setTag("after", JSON.stringify(instance));412413await this.workspaceDB.trace(ctx).storeInstance(instance);414415// now notify all prebuild listeners about updates - and update DB if needed416await this.prebuildUpdater.updatePrebuiltWorkspace({ span }, userId, status);417418// store metrics419const instanceMetrics = mapInstanceMetrics(status);420if (instanceMetrics) {421await this.workspaceDB422.trace(ctx)423.updateMetrics(instance.id, instanceMetrics, mergeWorkspaceInstanceMetrics);424}425426// cleanup427// important: call this after the DB update428if (!!lifecycleHandler) {429await lifecycleHandler();430}431await this.publisher.publishInstanceUpdate({432ownerID: userId,433instanceID: instance.id,434workspaceID: instance.workspaceId,435});436} catch (e) {437TraceContext.setError({ span }, e);438throw e;439} finally {440span.finish();441}442}443444public dispose() {445this.disposables.dispose();446}447}448449const mapFirstUserActivity = (firstUserActivity: Timestamp | undefined): string | undefined => {450if (!firstUserActivity) {451return undefined;452}453454return firstUserActivity.toDate().toISOString();455};456457const mapPortVisibility = (visibility: WsManPortVisibility | undefined): PortVisibility | undefined => {458switch (visibility) {459case undefined:460return undefined;461case WsManPortVisibility.PORT_VISIBILITY_PRIVATE:462return "private";463case WsManPortVisibility.PORT_VISIBILITY_PUBLIC:464return "public";465}466};467468const mapPortProtocol = (protocol: WsManPortProtocol): PortProtocol => {469switch (protocol) {470case WsManPortProtocol.PORT_PROTOCOL_HTTPS:471return "https";472default:473return "http";474}475};476477/**478* Filter here to avoid overloading spans479* @param status480*/481export const filterStatus = (status: WorkspaceStatus.AsObject): Partial<WorkspaceStatus.AsObject> => {482return {483id: status.id,484metadata: scrubber.scrub(status.metadata),485phase: status.phase,486message: status.message,487conditions: status.conditions,488runtime: status.runtime,489};490};491492export function hasRelevantDiff(_a: WorkspaceStatus, _b: WorkspaceStatus): boolean {493const a = _a.cloneMessage();494const b = _b.cloneMessage();495496// Ignore these fields497a.setStatusVersion(0);498b.setStatusVersion(0);499500const as = a.serializeBinary();501const bs = b.serializeBinary();502return !(as.length === bs.length && as.every((v, i) => v === bs[i]));503}504505function toWorkspaceType(type: WorkspaceType): protocol.WorkspaceType {506switch (type) {507case WorkspaceType.REGULAR:508return "regular";509case WorkspaceType.IMAGEBUILD:510return "imagebuild";511case WorkspaceType.PREBUILD:512return "prebuild";513}514throw new Error("invalid WorkspaceType: " + type);515}516517function mergeWorkspaceInstanceMetrics(518current: protocol.WorkspaceInstanceMetrics,519update: protocol.WorkspaceInstanceMetrics,520): protocol.WorkspaceInstanceMetrics {521const merged = merge.withOptions({ mergeArrays: false, allowUndefinedOverrides: false }, current, update);522return merged;523}524525function mapInstanceMetrics(status: WorkspaceStatus.AsObject): protocol.WorkspaceInstanceMetrics | undefined {526let result: protocol.WorkspaceInstanceMetrics | undefined = undefined;527528if (status.metadata?.metrics?.image) {529result = result || {};530result.image = {531totalSize: status.metadata.metrics.image.totalSize,532workspaceImageSize: status.metadata.metrics.image.workspaceImageSize,533};534}535if (status.initializerMetrics) {536result = result || {};537result.initializerMetrics = mapInitializerMetrics(status.initializerMetrics);538}539540return result;541}542543function mapInitializerMetrics(metrics: InitializerMetrics.AsObject): protocol.InitializerMetrics {544const result: protocol.InitializerMetrics = {};545if (metrics.git) {546result.git = mapInitializerMetric(metrics.git);547}548if (metrics.fileDownload) {549result.fileDownload = mapInitializerMetric(metrics.fileDownload);550}551if (metrics.snapshot) {552result.snapshot = mapInitializerMetric(metrics.snapshot);553}554if (metrics.backup) {555result.backup = mapInitializerMetric(metrics.backup);556}557if (metrics.prebuild) {558result.prebuild = mapInitializerMetric(metrics.prebuild);559}560if (metrics.composite) {561result.composite = mapInitializerMetric(metrics.composite);562}563564return result;565}566567function mapInitializerMetric(metric: InitializerMetric.AsObject | undefined): protocol.InitializerMetric | undefined {568if (!metric || !metric.duration) {569return undefined;570}571572return {573duration: metric.duration.seconds * 1000 + metric.duration.nanos / 1000000,574size: metric.size,575};576}577578579