Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-bridge/src/wsman-subscriber.ts
2498 views
1
/**
2
* Copyright (c) 2020 Gitpod GmbH. All rights reserved.
3
* Licensed under the GNU Affero General Public License (AGPL).
4
* See License.AGPL.txt in the project root for license information.
5
*/
6
7
import {
8
WorkspaceStatus,
9
SubscribeRequest,
10
SubscribeResponse,
11
GetWorkspacesRequest,
12
PromisifiedWorkspaceManagerClient,
13
} from "@gitpod/ws-manager/lib";
14
import { Disposable } from "@gitpod/gitpod-protocol";
15
import { ClientReadableStream } from "@grpc/grpc-js";
16
import { log, LogPayload } from "@gitpod/gitpod-protocol/lib/util/logging";
17
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
18
import * as opentracing from "opentracing";
19
20
export type ClientProvider = () => Promise<PromisifiedWorkspaceManagerClient>;
21
22
export class WsmanSubscriber implements Disposable {
23
protected run = true;
24
protected sub: ClientReadableStream<SubscribeResponse> | undefined;
25
26
constructor(protected readonly clientProvider: ClientProvider) {}
27
28
public async subscribe(
29
callbacks: {
30
onStatusUpdate: (ctx: TraceContext, s: WorkspaceStatus) => void;
31
onReconnect: (ctx: TraceContext, s: WorkspaceStatus[]) => void;
32
},
33
logPayload?: LogPayload,
34
) {
35
const payload = logPayload || ({} as LogPayload);
36
while (this.run) {
37
await new Promise<void>(async (resolve, reject) => {
38
log.info("Attempting to establish wsman subscription", payload);
39
let client: PromisifiedWorkspaceManagerClient | undefined = undefined;
40
try {
41
client = await this.clientProvider();
42
43
// take stock of the existing workspaces
44
const workspaces = await client.getWorkspaces({}, new GetWorkspacesRequest());
45
callbacks.onReconnect({}, workspaces.getStatusList());
46
47
// start subscription
48
const req = new SubscribeRequest();
49
this.sub = await client.subscribe({}, req);
50
log.info("wsman subscription established", payload);
51
52
this.sub.on("data", (incoming: SubscribeResponse) => {
53
const status = incoming.getStatus();
54
if (!!status) {
55
const header: any = {};
56
if (!!incoming.getHeaderMap()) {
57
incoming.getHeaderMap().forEach((v: string, k: string) => (header[k] = v));
58
}
59
const spanCtx = opentracing.globalTracer().extract(opentracing.FORMAT_HTTP_HEADERS, header);
60
const span = !!spanCtx
61
? opentracing.globalTracer().startSpan("incomingSubscriptionResponse", {
62
references: [opentracing.childOf(spanCtx!)],
63
})
64
: undefined;
65
66
try {
67
callbacks.onStatusUpdate({ span }, status);
68
} catch (err) {
69
if (span) {
70
TraceContext.setError({ span }, err);
71
}
72
log.error("Error handling onStatusUpdate", err, payload);
73
} finally {
74
if (span) {
75
span.finish();
76
}
77
}
78
}
79
});
80
this.sub.on("end", function () {
81
log.info("wsman subscription ended", payload);
82
resolve();
83
});
84
this.sub.on("error", function (e) {
85
log.error("wsman subscription error", e, payload);
86
resolve();
87
});
88
} catch (err) {
89
log.error("Cannot maintain subscription to wsman", err, payload);
90
resolve();
91
} finally {
92
if (client) {
93
client.dispose();
94
}
95
}
96
});
97
98
if (!this.run) {
99
log.info("Shutting down wsman subscriber", payload);
100
return;
101
} else {
102
// we have been disconnected forcefully - wait for some time and try again
103
await new Promise((resolve) => setTimeout(resolve, 1000));
104
}
105
}
106
}
107
108
public dispose() {
109
this.run = false;
110
if (!!this.sub) {
111
this.sub.cancel();
112
}
113
}
114
}
115
116