Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-bridge/src/bridge.ts
2498 views
1
/**
2
* Copyright (c) 2020 Gitpod GmbH. All rights reserved.
3
* Licensed under the GNU Affero General Public License (AGPL).
4
* See License.AGPL.txt in the project root for license information.
5
*/
6
7
import { inject, injectable } from "inversify";
8
import {
9
Disposable,
10
Queue,
11
WorkspaceInstancePort,
12
PortVisibility,
13
DisposableCollection,
14
PortProtocol,
15
} from "@gitpod/gitpod-protocol";
16
import * as protocol from "@gitpod/gitpod-protocol";
17
import {
18
WorkspaceStatus,
19
WorkspacePhase,
20
WorkspaceConditionBool,
21
PortVisibility as WsManPortVisibility,
22
PortProtocol as WsManPortProtocol,
23
DescribeClusterRequest,
24
WorkspaceType,
25
InitializerMetrics,
26
InitializerMetric,
27
} from "@gitpod/ws-manager/lib";
28
import { scrubber, TrustedValue } from "@gitpod/gitpod-protocol/lib/util/scrubbing";
29
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
30
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
31
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
32
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
33
import { TracedWorkspaceDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
34
import { Metrics } from "./metrics";
35
import { ClientProvider, WsmanSubscriber } from "./wsman-subscriber";
36
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
37
import { Configuration } from "./config";
38
import { WorkspaceClass, WorkspaceCluster, WorkspaceClusterDB } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
39
import { performance } from "perf_hooks";
40
import { WorkspaceInstanceController } from "./workspace-instance-controller";
41
import { PrebuildUpdater } from "./prebuild-updater";
42
import { RedisPublisher } from "@gitpod/gitpod-db/lib";
43
import { merge } from "ts-deepmerge";
44
45
export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");
46
47
function toBool(b: WorkspaceConditionBool | undefined): boolean | undefined {
48
if (b === WorkspaceConditionBool.EMPTY) {
49
return;
50
}
51
52
return b === WorkspaceConditionBool.TRUE;
53
}
54
55
export type WorkspaceClusterInfo = Pick<WorkspaceCluster, "name" | "url">;
56
57
type UpdateQueue = {
58
instanceId: string;
59
lastStatus?: WorkspaceStatus;
60
queue: Queue;
61
};
62
namespace UpdateQueue {
63
export function create(instanceId: string): UpdateQueue {
64
return { instanceId, queue: new Queue() };
65
}
66
}
67
68
@injectable()
69
export class WorkspaceManagerBridge implements Disposable {
70
constructor(
71
@inject(WorkspaceClusterDB) private readonly clusterDB: WorkspaceClusterDB,
72
@inject(TracedWorkspaceDB) private readonly workspaceDB: DBWithTracing<WorkspaceDB>,
73
@inject(Metrics) private readonly metrics: Metrics,
74
@inject(Configuration) private readonly config: Configuration,
75
@inject(IAnalyticsWriter) private readonly analytics: IAnalyticsWriter,
76
@inject(PrebuildUpdater) private readonly prebuildUpdater: PrebuildUpdater,
77
@inject(WorkspaceInstanceController) private readonly workspaceInstanceController: WorkspaceInstanceController, // bound in "transient" mode: we expect to receive a fresh instance here
78
@inject(RedisPublisher) private readonly publisher: RedisPublisher,
79
) {}
80
81
protected readonly disposables = new DisposableCollection();
82
protected readonly queues = new Map<string, UpdateQueue>();
83
84
protected cluster: WorkspaceClusterInfo;
85
86
public start(cluster: WorkspaceClusterInfo, clientProvider: ClientProvider) {
87
const logPayload = { name: cluster.name, url: cluster.url };
88
log.info(`Starting bridge to cluster...`, logPayload);
89
this.cluster = cluster;
90
91
const startStatusUpdateHandler = () => {
92
log.debug(`Starting status update handler: ${cluster.name}`, logPayload);
93
/* no await */ this.startStatusUpdateHandler(clientProvider, logPayload)
94
// this is a mere safe-guard: we do not expect the code inside to fail
95
.catch((err) => log.error("Cannot start status update handler", err));
96
};
97
98
// notify servers and _update the DB_
99
startStatusUpdateHandler();
100
101
// the actual "governing" part
102
const controllerIntervalSeconds = this.config.controllerIntervalSeconds;
103
if (controllerIntervalSeconds <= 0) {
104
throw new Error("controllerIntervalSeconds <= 0!");
105
}
106
107
log.debug(`Starting controller: ${cluster.name}`, logPayload);
108
// Control all workspace instances, either against ws-manager or configured timeouts
109
this.workspaceInstanceController.start(
110
cluster.name,
111
clientProvider,
112
controllerIntervalSeconds,
113
this.config.controllerMaxDisconnectSeconds,
114
);
115
this.disposables.push(this.workspaceInstanceController);
116
117
const tim = setInterval(() => {
118
this.updateWorkspaceClasses(cluster, clientProvider);
119
}, controllerIntervalSeconds * 1000);
120
this.disposables.push({ dispose: () => clearInterval(tim) });
121
122
log.info(`Started bridge to cluster.`, logPayload);
123
}
124
125
public stop() {
126
this.dispose();
127
}
128
129
protected async updateWorkspaceClasses(clusterInfo: WorkspaceClusterInfo, clientProvider: ClientProvider) {
130
try {
131
const client = await clientProvider();
132
const resp = await client.describeCluster({}, new DescribeClusterRequest());
133
134
const cluster = await this.clusterDB.findByName(clusterInfo.name);
135
if (!cluster) {
136
return;
137
}
138
cluster.availableWorkspaceClasses = resp.getWorkspaceClassesList().map((c) => {
139
return <WorkspaceClass>{
140
creditsPerMinute: c.getCreditsPerMinute(),
141
description: c.getDescription(),
142
displayName: c.getDisplayName(),
143
id: c.getId(),
144
};
145
});
146
cluster.preferredWorkspaceClass = resp.getPreferredWorkspaceClass();
147
148
await this.clusterDB.save(cluster);
149
} catch (e) {
150
log.error({}, "Failed to update workspace classes", e, { clusterInfo });
151
}
152
}
153
154
protected async startStatusUpdateHandler(clientProvider: ClientProvider, logPayload: {}): Promise<void> {
155
const subscriber = new WsmanSubscriber(clientProvider);
156
this.disposables.push(subscriber);
157
158
const onReconnect = (ctx: TraceContext, s: WorkspaceStatus[]) => {
159
log.info("ws-manager subscriber reconnected", logPayload);
160
s.forEach((sx) => this.queueMessagesByInstanceId(ctx, sx));
161
};
162
const onStatusUpdate = (ctx: TraceContext, s: WorkspaceStatus) => {
163
this.queueMessagesByInstanceId(ctx, s);
164
};
165
await subscriber.subscribe({ onReconnect, onStatusUpdate }, logPayload);
166
}
167
168
protected queueMessagesByInstanceId(ctx: TraceContext, status: WorkspaceStatus) {
169
const instanceId = status.getId();
170
if (!instanceId) {
171
log.warn("Received invalid message, could not read instanceId!", { msg: status });
172
return;
173
}
174
175
// We can't just handle the status update directly, but have to "serialize" it to ensure the updates stay in order.
176
// If we did not do this, the async nature of our code would allow for one message to overtake the other.
177
const updateQueue = this.queues.get(instanceId) || UpdateQueue.create(instanceId);
178
this.queues.set(instanceId, updateQueue);
179
180
updateQueue.queue.enqueue(async () => {
181
try {
182
await this.handleStatusUpdate(ctx, status, updateQueue.lastStatus);
183
updateQueue.lastStatus = status;
184
} catch (err) {
185
log.error({ instanceId }, err);
186
// if an error ocurrs, we want to be save, and better make sure we don't accidentally skip the next update
187
updateQueue.lastStatus = undefined;
188
}
189
});
190
}
191
192
protected async handleStatusUpdate(
193
ctx: TraceContext,
194
rawStatus: WorkspaceStatus,
195
lastStatusUpdate: WorkspaceStatus | undefined,
196
) {
197
const start = performance.now();
198
const status = rawStatus.toObject();
199
log.info("Handling WorkspaceStatus update", { status: new TrustedValue(filterStatus(status)) });
200
201
if (!status.spec || !status.metadata || !status.conditions) {
202
log.warn("Received invalid status update", status);
203
return;
204
}
205
206
const logCtx = {
207
instanceId: status.id!,
208
workspaceId: status.metadata!.metaId!,
209
userId: status.metadata!.owner!,
210
};
211
const workspaceType = toWorkspaceType(status.spec.type);
212
213
let updateError: any | undefined;
214
let skipUpdate = false;
215
try {
216
this.metrics.reportWorkspaceInstanceUpdateStarted(this.cluster.name, workspaceType);
217
218
// If the last status update is identical to the current one, we can skip the update.
219
skipUpdate = !!lastStatusUpdate && !hasRelevantDiff(rawStatus, lastStatusUpdate);
220
if (skipUpdate) {
221
log.info(logCtx, "Skipped WorkspaceInstance status update");
222
return;
223
}
224
225
await this.statusUpdate(ctx, rawStatus);
226
227
log.info(logCtx, "Successfully completed WorkspaceInstance status update");
228
} catch (e) {
229
updateError = e;
230
231
log.error(logCtx, "Failed to complete WorkspaceInstance status update", e);
232
throw e;
233
} finally {
234
const durationMs = performance.now() - start;
235
this.metrics.reportWorkspaceInstanceUpdateCompleted(
236
durationMs / 1000,
237
this.cluster.name,
238
workspaceType,
239
skipUpdate,
240
updateError,
241
);
242
}
243
}
244
245
private async statusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus) {
246
const status = rawStatus.toObject();
247
248
if (!status.spec || !status.metadata || !status.conditions) {
249
return;
250
}
251
252
const span = TraceContext.startSpan("handleStatusUpdate", ctx);
253
span.setTag("status", JSON.stringify(filterStatus(status)));
254
span.setTag("statusVersion", status.statusVersion);
255
try {
256
// Beware of the ID mapping here: What's a workspace to the ws-manager is a workspace instance to the rest of the system.
257
// The workspace ID of ws-manager is the workspace instance ID in the database.
258
// The meta ID of ws-manager is the workspace ID in the database.
259
const instanceId = status.id!;
260
const workspaceId = status.metadata!.metaId!;
261
const userId = status.metadata!.owner!;
262
const logContext: LogContext = {
263
userId,
264
instanceId,
265
workspaceId,
266
};
267
268
const instance = await this.workspaceDB.trace({ span }).findInstanceById(instanceId);
269
if (instance) {
270
this.metrics.statusUpdateReceived(this.cluster.name, true);
271
} else {
272
// This scenario happens when the update for a WorkspaceInstance is picked up by a ws-manager-bridge in a different region,
273
// before periodic deleter finished running. This is because all ws-manager-bridge instances receive updates from all WorkspaceClusters.
274
// We ignore this update because we do not have anything to reconcile this update against, but also because we assume it is handled
275
// by another instance of ws-manager-bridge that is in the region where the WorkspaceInstance record was created.
276
this.metrics.statusUpdateReceived(this.cluster.name, false);
277
return;
278
}
279
280
const currentStatusVersion = instance.status.version || 0;
281
if (currentStatusVersion > 0 && currentStatusVersion >= status.statusVersion) {
282
// We've gotten an event which is older than one we've already processed. We shouldn't process the stale one.
283
span.setTag("statusUpdate.staleEvent", true);
284
this.metrics.recordStaleStatusUpdate();
285
log.debug(ctx, "Stale status update received, skipping.");
286
}
287
288
if (!!status.spec.exposedPortsList) {
289
instance.status.exposedPorts = status.spec.exposedPortsList.map((p) => {
290
return <WorkspaceInstancePort>{
291
port: p.port,
292
visibility: mapPortVisibility(p.visibility),
293
protocol: mapPortProtocol(p.protocol),
294
url: p.url,
295
};
296
});
297
}
298
299
if (!instance.status.conditions.firstUserActivity && status.conditions.firstUserActivity) {
300
// Only report this when it's observed the first time
301
const firstUserActivity = mapFirstUserActivity(rawStatus.getConditions()!.getFirstUserActivity())!;
302
this.metrics.observeFirstUserActivity(instance, firstUserActivity);
303
}
304
305
instance.ideUrl = status.spec.url!;
306
instance.status.version = status.statusVersion;
307
instance.status.timeout = status.spec.timeout;
308
if (!!instance.status.conditions.failed && !status.conditions.failed) {
309
// We already have a "failed" condition, and received an empty one: This is a bug, "failed" conditions are terminal per definition.
310
// Do not override!
311
log.error(logContext, 'We received an empty "failed" condition overriding an existing one!', {
312
current: instance.status.conditions.failed,
313
});
314
315
// TODO(gpl) To make ensure we do not break anything big time we keep the unconditional override for now, and observe for some time.
316
instance.status.conditions.failed = status.conditions.failed;
317
} else {
318
instance.status.conditions.failed = status.conditions.failed;
319
}
320
instance.status.conditions.pullingImages = toBool(status.conditions.pullingImages!);
321
instance.status.conditions.deployed = toBool(status.conditions.deployed);
322
if (!instance.deployedTime && instance.status.conditions.deployed) {
323
// This is the first time we see the workspace pod being deployed.
324
// Like all other timestamps, it's set when bridge observes it, not when it actually happened (which only ws-manager could decide).
325
instance.deployedTime = new Date().toISOString();
326
}
327
instance.status.conditions.timeout = status.conditions.timeout;
328
instance.status.conditions.firstUserActivity = mapFirstUserActivity(
329
rawStatus.getConditions()!.getFirstUserActivity(),
330
);
331
instance.status.conditions.headlessTaskFailed = status.conditions.headlessTaskFailed;
332
instance.status.conditions.stoppedByRequest = toBool(status.conditions.stoppedByRequest);
333
instance.status.message = status.message;
334
instance.status.nodeName = instance.status.nodeName || status.runtime?.nodeName;
335
instance.status.podName = instance.status.podName || status.runtime?.podName;
336
instance.status.nodeIp = instance.status.nodeIp || status.runtime?.nodeIp;
337
instance.status.ownerToken = status.auth!.ownerToken;
338
// TODO(gpl): fade this our in favor of only using DBWorkspaceInstanceMetrics
339
instance.status.metrics = {
340
image: {
341
totalSize: instance.status.metrics?.image?.totalSize || status.metadata.metrics?.image?.totalSize,
342
workspaceImageSize:
343
instance.status.metrics?.image?.workspaceImageSize ||
344
status.metadata.metrics?.image?.workspaceImageSize,
345
},
346
};
347
348
let lifecycleHandler: (() => Promise<void>) | undefined;
349
switch (status.phase) {
350
case WorkspacePhase.PENDING:
351
instance.status.phase = "pending";
352
break;
353
case WorkspacePhase.CREATING:
354
instance.status.phase = "creating";
355
break;
356
case WorkspacePhase.INITIALIZING:
357
instance.status.phase = "initializing";
358
break;
359
case WorkspacePhase.RUNNING:
360
if (!instance.startedTime) {
361
instance.startedTime = new Date().toISOString();
362
this.metrics.observeWorkspaceStartupTime(instance);
363
this.analytics.track({
364
event: "workspace_running",
365
messageId: `bridge-wsrun-${instance.id}`,
366
properties: { instanceId: instance.id, workspaceId: workspaceId },
367
userId,
368
timestamp: new Date(instance.startedTime),
369
});
370
}
371
372
instance.status.phase = "running";
373
// let's check if the state is inconsistent and be loud if it is.
374
if (instance.stoppedTime || instance.stoppingTime) {
375
log.error("Resetting already stopped workspace to running.", {
376
instanceId: instance.id,
377
stoppedTime: instance.stoppedTime,
378
});
379
instance.stoppedTime = undefined;
380
instance.stoppingTime = undefined;
381
}
382
break;
383
case WorkspacePhase.INTERRUPTED:
384
instance.status.phase = "interrupted";
385
break;
386
case WorkspacePhase.STOPPING:
387
if (instance.status.phase != "stopped") {
388
instance.status.phase = "stopping";
389
if (!instance.stoppingTime) {
390
// The first time a workspace enters stopping we record that as it's stoppingTime time.
391
// This way we don't take the time a workspace requires to stop into account when
392
// computing the time a workspace instance was running.
393
instance.stoppingTime = new Date().toISOString();
394
}
395
} else {
396
log.warn(logContext, "Got a stopping event for an already stopped workspace.", instance);
397
}
398
break;
399
case WorkspacePhase.STOPPED:
400
const now = new Date().toISOString();
401
instance.stoppedTime = now;
402
instance.status.phase = "stopped";
403
if (!instance.stoppingTime) {
404
// It's possible we've never seen a stopping update, hence have not set the `stoppingTime`
405
// yet. Just for this case we need to set it now.
406
instance.stoppingTime = now;
407
}
408
lifecycleHandler = () => this.workspaceInstanceController.onStopped({ span }, userId, instance);
409
break;
410
}
411
412
span.setTag("after", JSON.stringify(instance));
413
414
await this.workspaceDB.trace(ctx).storeInstance(instance);
415
416
// now notify all prebuild listeners about updates - and update DB if needed
417
await this.prebuildUpdater.updatePrebuiltWorkspace({ span }, userId, status);
418
419
// store metrics
420
const instanceMetrics = mapInstanceMetrics(status);
421
if (instanceMetrics) {
422
await this.workspaceDB
423
.trace(ctx)
424
.updateMetrics(instance.id, instanceMetrics, mergeWorkspaceInstanceMetrics);
425
}
426
427
// cleanup
428
// important: call this after the DB update
429
if (!!lifecycleHandler) {
430
await lifecycleHandler();
431
}
432
await this.publisher.publishInstanceUpdate({
433
ownerID: userId,
434
instanceID: instance.id,
435
workspaceID: instance.workspaceId,
436
});
437
} catch (e) {
438
TraceContext.setError({ span }, e);
439
throw e;
440
} finally {
441
span.finish();
442
}
443
}
444
445
public dispose() {
446
this.disposables.dispose();
447
}
448
}
449
450
const mapFirstUserActivity = (firstUserActivity: Timestamp | undefined): string | undefined => {
451
if (!firstUserActivity) {
452
return undefined;
453
}
454
455
return firstUserActivity.toDate().toISOString();
456
};
457
458
const mapPortVisibility = (visibility: WsManPortVisibility | undefined): PortVisibility | undefined => {
459
switch (visibility) {
460
case undefined:
461
return undefined;
462
case WsManPortVisibility.PORT_VISIBILITY_PRIVATE:
463
return "private";
464
case WsManPortVisibility.PORT_VISIBILITY_PUBLIC:
465
return "public";
466
}
467
};
468
469
const mapPortProtocol = (protocol: WsManPortProtocol): PortProtocol => {
470
switch (protocol) {
471
case WsManPortProtocol.PORT_PROTOCOL_HTTPS:
472
return "https";
473
default:
474
return "http";
475
}
476
};
477
478
/**
479
* Filter here to avoid overloading spans
480
* @param status
481
*/
482
export const filterStatus = (status: WorkspaceStatus.AsObject): Partial<WorkspaceStatus.AsObject> => {
483
return {
484
id: status.id,
485
metadata: scrubber.scrub(status.metadata),
486
phase: status.phase,
487
message: status.message,
488
conditions: status.conditions,
489
runtime: status.runtime,
490
};
491
};
492
493
export function hasRelevantDiff(_a: WorkspaceStatus, _b: WorkspaceStatus): boolean {
494
const a = _a.cloneMessage();
495
const b = _b.cloneMessage();
496
497
// Ignore these fields
498
a.setStatusVersion(0);
499
b.setStatusVersion(0);
500
501
const as = a.serializeBinary();
502
const bs = b.serializeBinary();
503
return !(as.length === bs.length && as.every((v, i) => v === bs[i]));
504
}
505
506
function toWorkspaceType(type: WorkspaceType): protocol.WorkspaceType {
507
switch (type) {
508
case WorkspaceType.REGULAR:
509
return "regular";
510
case WorkspaceType.IMAGEBUILD:
511
return "imagebuild";
512
case WorkspaceType.PREBUILD:
513
return "prebuild";
514
}
515
throw new Error("invalid WorkspaceType: " + type);
516
}
517
518
function mergeWorkspaceInstanceMetrics(
519
current: protocol.WorkspaceInstanceMetrics,
520
update: protocol.WorkspaceInstanceMetrics,
521
): protocol.WorkspaceInstanceMetrics {
522
const merged = merge.withOptions({ mergeArrays: false, allowUndefinedOverrides: false }, current, update);
523
return merged;
524
}
525
526
function mapInstanceMetrics(status: WorkspaceStatus.AsObject): protocol.WorkspaceInstanceMetrics | undefined {
527
let result: protocol.WorkspaceInstanceMetrics | undefined = undefined;
528
529
if (status.metadata?.metrics?.image) {
530
result = result || {};
531
result.image = {
532
totalSize: status.metadata.metrics.image.totalSize,
533
workspaceImageSize: status.metadata.metrics.image.workspaceImageSize,
534
};
535
}
536
if (status.initializerMetrics) {
537
result = result || {};
538
result.initializerMetrics = mapInitializerMetrics(status.initializerMetrics);
539
}
540
541
return result;
542
}
543
544
function mapInitializerMetrics(metrics: InitializerMetrics.AsObject): protocol.InitializerMetrics {
545
const result: protocol.InitializerMetrics = {};
546
if (metrics.git) {
547
result.git = mapInitializerMetric(metrics.git);
548
}
549
if (metrics.fileDownload) {
550
result.fileDownload = mapInitializerMetric(metrics.fileDownload);
551
}
552
if (metrics.snapshot) {
553
result.snapshot = mapInitializerMetric(metrics.snapshot);
554
}
555
if (metrics.backup) {
556
result.backup = mapInitializerMetric(metrics.backup);
557
}
558
if (metrics.prebuild) {
559
result.prebuild = mapInitializerMetric(metrics.prebuild);
560
}
561
if (metrics.composite) {
562
result.composite = mapInitializerMetric(metrics.composite);
563
}
564
565
return result;
566
}
567
568
function mapInitializerMetric(metric: InitializerMetric.AsObject | undefined): protocol.InitializerMetric | undefined {
569
if (!metric || !metric.duration) {
570
return undefined;
571
}
572
573
return {
574
duration: metric.duration.seconds * 1000 + metric.duration.nanos / 1000000,
575
size: metric.size,
576
};
577
}
578
579