Path: blob/main/components/ws-manager-bridge/src/wsman-subscriber.ts
2498 views
/**1* Copyright (c) 2020 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import {7WorkspaceStatus,8SubscribeRequest,9SubscribeResponse,10GetWorkspacesRequest,11PromisifiedWorkspaceManagerClient,12} from "@gitpod/ws-manager/lib";13import { Disposable } from "@gitpod/gitpod-protocol";14import { ClientReadableStream } from "@grpc/grpc-js";15import { log, LogPayload } from "@gitpod/gitpod-protocol/lib/util/logging";16import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";17import * as opentracing from "opentracing";1819export type ClientProvider = () => Promise<PromisifiedWorkspaceManagerClient>;2021export class WsmanSubscriber implements Disposable {22protected run = true;23protected sub: ClientReadableStream<SubscribeResponse> | undefined;2425constructor(protected readonly clientProvider: ClientProvider) {}2627public async subscribe(28callbacks: {29onStatusUpdate: (ctx: TraceContext, s: WorkspaceStatus) => void;30onReconnect: (ctx: TraceContext, s: WorkspaceStatus[]) => void;31},32logPayload?: LogPayload,33) {34const payload = logPayload || ({} as LogPayload);35while (this.run) {36await new Promise<void>(async (resolve, reject) => {37log.info("Attempting to establish wsman subscription", payload);38let client: PromisifiedWorkspaceManagerClient | undefined = undefined;39try {40client = await this.clientProvider();4142// take stock of the existing workspaces43const workspaces = await client.getWorkspaces({}, new GetWorkspacesRequest());44callbacks.onReconnect({}, workspaces.getStatusList());4546// start subscription47const req = new SubscribeRequest();48this.sub = await client.subscribe({}, req);49log.info("wsman subscription established", payload);5051this.sub.on("data", (incoming: SubscribeResponse) => {52const status = incoming.getStatus();53if (!!status) {54const header: any = {};55if (!!incoming.getHeaderMap()) {56incoming.getHeaderMap().forEach((v: string, k: string) => (header[k] = v));57}58const spanCtx = opentracing.globalTracer().extract(opentracing.FORMAT_HTTP_HEADERS, header);59const span = !!spanCtx60? opentracing.globalTracer().startSpan("incomingSubscriptionResponse", {61references: [opentracing.childOf(spanCtx!)],62})63: undefined;6465try {66callbacks.onStatusUpdate({ span }, status);67} catch (err) {68if (span) {69TraceContext.setError({ span }, err);70}71log.error("Error handling onStatusUpdate", err, payload);72} finally {73if (span) {74span.finish();75}76}77}78});79this.sub.on("end", function () {80log.info("wsman subscription ended", payload);81resolve();82});83this.sub.on("error", function (e) {84log.error("wsman subscription error", e, payload);85resolve();86});87} catch (err) {88log.error("Cannot maintain subscription to wsman", err, payload);89resolve();90} finally {91if (client) {92client.dispose();93}94}95});9697if (!this.run) {98log.info("Shutting down wsman subscriber", payload);99return;100} else {101// we have been disconnected forcefully - wait for some time and try again102await new Promise((resolve) => setTimeout(resolve, 1000));103}104}105}106107public dispose() {108this.run = false;109if (!!this.sub) {110this.sub.cancel();111}112}113}114115116