Path: blob/main/components/ws-manager-bridge/src/cluster-service-server.ts
2498 views
/**1* Copyright (c) 2021 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";7import { Queue } from "@gitpod/gitpod-protocol";8import { log } from "@gitpod/gitpod-protocol/lib/util/logging";9import {10WorkspaceCluster,11WorkspaceClusterDB,12WorkspaceClusterState,13TLSConfig,14AdmissionConstraint,15AdmissionConstraintHasPermission,16WorkspaceClusterWoTLS,17WorkspaceClass,18} from "@gitpod/gitpod-protocol/lib/workspace-cluster";19import {20ClusterServiceService,21ClusterState,22ClusterStatus,23DeregisterRequest,24DeregisterResponse,25IClusterServiceServer,26ListRequest,27ListResponse,28Preferability,29RegisterRequest,30RegisterResponse,31UpdateRequest,32UpdateResponse,33AdmissionConstraint as GRPCAdmissionConstraint,34} from "@gitpod/ws-manager-bridge-api/lib";35import { WorkspaceManagerClientProvider } from "@gitpod/ws-manager/lib/client-provider";36import {37WorkspaceManagerClientProviderCompositeSource,38WorkspaceManagerClientProviderSource,39} from "@gitpod/ws-manager/lib/client-provider-source";40import * as grpc from "@grpc/grpc-js";41import { inject, injectable } from "inversify";42import { BridgeController } from "./bridge-controller";43import { Configuration } from "./config";44import { GRPCError } from "./rpc";45import { isWorkspaceRegion } from "@gitpod/gitpod-protocol/lib/workspace-cluster";46import { DescribeClusterRequest, DescribeClusterResponse, WorkspaceManagerClient } from "@gitpod/ws-manager/lib";4748export interface ClusterServiceServerOptions {49port: number;50host: string;51}5253@injectable()54export class ClusterService implements IClusterServiceServer {55// Satisfy the grpc.UntypedServiceImplementation interface.56[name: string]: any;5758constructor(59@inject(WorkspaceClusterDB) private readonly clusterDB: WorkspaceClusterDB,60@inject(WorkspaceDB) private readonly workspaceDB: WorkspaceDB,61@inject(BridgeController) private readonly bridgeController: BridgeController,62@inject(WorkspaceManagerClientProvider) private readonly clientProvider: WorkspaceManagerClientProvider,63@inject(WorkspaceManagerClientProviderCompositeSource)64private readonly allClientProvider: WorkspaceManagerClientProviderSource,65) {}6667// using a queue to make sure we do concurrency right68private readonly queue: Queue = new Queue();6970public register(71call: grpc.ServerUnaryCall<RegisterRequest, RegisterResponse>,72callback: grpc.sendUnaryData<RegisterResponse>,73) {74log.info("requested clusters.register", getClientInfo(call));75this.queue.enqueue(async () => {76try {77// check if the name or URL are already registered/in use78const req = call.request.toObject();7980if (!isWorkspaceRegion(req.region)) {81throw new GRPCError(grpc.status.INVALID_ARGUMENT, `Invalid value for workspace region.`);82}8384const clusterByNamePromise = this.clusterDB.findByName(req.name);85const clusterByUrlPromise = this.clusterDB.findFiltered({86url: req.url,87});8889const [clusterByName, clusterByUrl] = await Promise.all([clusterByNamePromise, clusterByUrlPromise]);9091if (!!clusterByName) {92throw new GRPCError(93grpc.status.ALREADY_EXISTS,94`a WorkspaceCluster with name ${req.name} already exists in the DB`,95);96}97if (!!clusterByUrl) {98if (clusterByUrl.length > 0) {99throw new GRPCError(100grpc.status.ALREADY_EXISTS,101`a WorkspaceCluster with url ${req.url} already exists in the DB`,102);103}104}105106// store the ws-manager into the database107let perfereability = Preferability.NONE;108const govern = true;109let state: WorkspaceClusterState = "available";110if (req.hints) {111perfereability = req.hints.perfereability;112state = mapCordoned(req.hints.cordoned);113}114const score = mapPreferabilityToScore(perfereability);115if (score === undefined) {116throw new GRPCError(grpc.status.INVALID_ARGUMENT, `unknown preferability ${perfereability}`);117}118119if (!req.tls) {120throw new GRPCError(grpc.status.INVALID_ARGUMENT, "missing required TLS config");121}122// we assume that client's have already base64-encoded their input!123const tls: TLSConfig = {124ca: req.tls.ca,125crt: req.tls.crt,126key: req.tls.key,127};128129const admissionConstraints = call.request130.getAdmissionConstraintsList()131.map(mapAdmissionConstraint)132.filter((c) => !!c) as AdmissionConstraint[];133134const newCluster: WorkspaceCluster = {135name: req.name,136url: req.url,137region: req.region,138state,139score,140maxScore: 100,141govern,142tls,143admissionConstraints,144};145146// try to connect to validate the config. Throws an exception if it fails.147const clusterDesc = await new Promise<DescribeClusterResponse>((resolve, reject) => {148const c = this.clientProvider.createConnection(WorkspaceManagerClient, newCluster);149c.describeCluster(new DescribeClusterRequest(), (err: any, resp: DescribeClusterResponse) => {150if (err) {151reject(152new GRPCError(153grpc.status.FAILED_PRECONDITION,154`cannot reach ${req.url}: ${err.message}`,155),156);157} else {158resolve(resp);159}160});161});162163// Make a test call to the cluster to validate the TLS config.164// We use this test call to gather the available workspace classes.165newCluster.preferredWorkspaceClass = clusterDesc.getPreferredWorkspaceClass();166newCluster.availableWorkspaceClasses = clusterDesc.getWorkspaceClassesList().map((c) => {167return <WorkspaceClass>{168creditsPerMinute: c.getCreditsPerMinute(),169description: c.getDescription(),170displayName: c.getDisplayName(),171id: c.getId(),172};173});174175await this.clusterDB.save(newCluster);176log.info({}, "cluster registered", { cluster: req.name });177this.triggerReconcile("register", req.name);178179callback(null, new RegisterResponse());180} catch (err) {181callback(mapToGRPCError(err), null);182}183});184}185186public update(187call: grpc.ServerUnaryCall<UpdateRequest, UpdateResponse>,188callback: grpc.sendUnaryData<UpdateResponse>,189) {190log.info("requested clusters.update", getClientInfo(call));191this.queue.enqueue(async () => {192try {193const req = call.request.toObject();194const cluster = await this.clusterDB.findByName(req.name);195if (!cluster) {196throw new GRPCError(197grpc.status.NOT_FOUND,198`a WorkspaceCluster with name ${req.name} does not exist in the DB!`,199);200}201202if (call.request.hasMaxScore()) {203cluster.maxScore = req.maxScore;204}205if (call.request.hasScore()) {206cluster.score = req.score;207}208if (call.request.hasCordoned()) {209cluster.state = mapCordoned(req.cordoned);210}211if (call.request.hasAdmissionConstraint()) {212const mod = call.request.getAdmissionConstraint()!;213const c = mapAdmissionConstraint(mod.getConstraint());214if (!!c) {215if (mod.getAdd()) {216cluster.admissionConstraints = (cluster.admissionConstraints || []).concat([c]);217} else {218cluster.admissionConstraints = cluster.admissionConstraints?.filter((v) => {219if (v.type !== c.type) {220return true;221}222223switch (v.type) {224case "has-feature-preview":225return false;226case "has-permission":227if (v.permission === (c as AdmissionConstraintHasPermission).permission) {228return false;229}230break;231}232return true;233});234}235}236}237if (call.request.hasTls()) {238const tls = req.tls;239if (!tls?.ca || !tls?.crt || !tls?.key) {240throw new GRPCError(grpc.status.INVALID_ARGUMENT, "missing required TLS config");241}242if (tls.ca === cluster.tls?.ca && tls.crt === cluster.tls?.crt && tls.key === cluster.tls?.key) {243callback(null, new UpdateResponse());244return;245}246247const newCluster: WorkspaceCluster = {248name: req.name,249url: cluster.url,250region: cluster.region,251state: cluster.state,252score: cluster.score,253maxScore: 100,254govern: cluster.govern,255tls: tls,256admissionConstraints: cluster.admissionConstraints,257};258259// try to connect to validate the config. Throws an exception if it fails.260await new Promise<DescribeClusterResponse>((resolve, reject) => {261const c = this.clientProvider.createConnection(WorkspaceManagerClient, newCluster);262c.describeCluster(new DescribeClusterRequest(), (err: any, resp: DescribeClusterResponse) => {263if (err) {264reject(265new GRPCError(266grpc.status.FAILED_PRECONDITION,267`cannot reach ${cluster.url}: ${err.message}`,268),269);270} else {271resolve(resp);272}273});274});275cluster.tls = tls;276}277await this.clusterDB.save(cluster);278log.info({}, "cluster updated", { cluster: req.name });279this.triggerReconcile("update", req.name);280281callback(null, new UpdateResponse());282} catch (err) {283callback(mapToGRPCError(err), null);284}285});286}287288public deregister(289call: grpc.ServerUnaryCall<DeregisterRequest, DeregisterResponse>,290callback: grpc.sendUnaryData<DeregisterResponse>,291) {292log.info("requested clusters.deregister", getClientInfo(call));293this.queue.enqueue(async () => {294try {295const req = call.request.toObject();296297const instances = await this.workspaceDB.findRegularRunningInstances();298const relevantInstances = instances.filter((i) => i.region === req.name);299if (!req.force && relevantInstances.length > 0) {300log.info({}, "forced cluster deregistration even though there are still instances running", {301cluster: req.name,302});303throw new GRPCError(304grpc.status.FAILED_PRECONDITION,305`cluster is not empty (${relevantInstances.length} instances remaining)[${relevantInstances306.map((i) => i.id)307.join(",")}]`,308);309}310311await this.clusterDB.deleteByName(req.name);312log.info({}, "cluster deregistered", { cluster: req.name });313this.triggerReconcile("deregister", req.name);314315callback(null, new DeregisterResponse());316} catch (err) {317callback(mapToGRPCError(err), null);318}319});320}321322public list(call: grpc.ServerUnaryCall<ListRequest, ListResponse>, callback: grpc.sendUnaryData<ListResponse>) {323log.info("requested clusters.list", getClientInfo(call));324this.queue.enqueue(async () => {325try {326const response = new ListResponse();327328const dbClusterIdx = new Map<string, boolean>();329const allDBClusters = await this.clusterDB.findFiltered({});330for (const cluster of allDBClusters) {331const clusterStatus = convertToGRPC(cluster);332response.addStatus(clusterStatus);333dbClusterIdx.set(cluster.name, true);334}335336const allCluster = await this.allClientProvider.getAllWorkspaceClusters();337for (const cluster of allCluster) {338if (dbClusterIdx.get(cluster.name)) {339continue;340}341342const clusterStatus = convertToGRPC(cluster);343clusterStatus.setStatic(true);344response.addStatus(clusterStatus);345}346347callback(null, response);348} catch (err) {349callback(mapToGRPCError(err), null);350}351});352}353354private triggerReconcile(action: string, name: string) {355const payload = { action, name };356log.info("reconcile: on request", payload);357this.bridgeController358.runReconcileNow()359.catch((err) => log.error("error during forced reconcile", err, payload));360}361}362363function convertToGRPC(ws: WorkspaceClusterWoTLS): ClusterStatus {364const clusterStatus = new ClusterStatus();365clusterStatus.setName(ws.name);366clusterStatus.setUrl(ws.url);367clusterStatus.setState(mapClusterState(ws.state));368clusterStatus.setScore(ws.score);369clusterStatus.setMaxScore(ws.maxScore);370clusterStatus.setGoverned(ws.govern);371clusterStatus.setRegion(ws.region);372373ws.admissionConstraints?.forEach((c) => {374const constraint = new GRPCAdmissionConstraint();375switch (c.type) {376case "has-feature-preview":377constraint.setHasFeaturePreview(new GRPCAdmissionConstraint.FeaturePreview());378break;379case "has-permission":380const perm = new GRPCAdmissionConstraint.HasPermission();381perm.setPermission(c.permission);382constraint.setHasPermission(perm);383break;384default:385return;386}387clusterStatus.addAdmissionConstraint(constraint);388});389return clusterStatus;390}391392function mapAdmissionConstraint(c: GRPCAdmissionConstraint | undefined): AdmissionConstraint | undefined {393if (!c) {394return;395}396397if (c.hasHasFeaturePreview()) {398return <AdmissionConstraint>{ type: "has-feature-preview" };399}400if (c.hasHasPermission()) {401const permission = c.getHasPermission()?.getPermission();402if (!permission) {403return;404}405406return <AdmissionConstraintHasPermission>{ type: "has-permission", permission };407}408return;409}410411function mapPreferabilityToScore(p: Preferability): number | undefined {412switch (p) {413case Preferability.PREFER:414return 100;415case Preferability.NONE:416return 50;417case Preferability.DONTSCHEDULE:418return 0;419default:420return undefined;421}422}423424function mapCordoned(cordoned: boolean): WorkspaceClusterState {425return cordoned ? "cordoned" : "available";426}427428function mapClusterState(state: WorkspaceClusterState): ClusterState {429switch (state) {430case "available":431return ClusterState.AVAILABLE;432case "cordoned":433return ClusterState.CORDONED;434case "draining":435return ClusterState.DRAINING;436}437}438439function mapToGRPCError(err: any): any {440if (!GRPCError.isGRPCError(err)) {441return new GRPCError(grpc.status.INTERNAL, err);442}443return err;444}445446function getClientInfo(call: grpc.ServerUnaryCall<any, any>) {447const clientNameHeader = call.metadata.get("client-name");448const userAgentHeader = call.metadata.get("user-agent");449450let [clientName, userAgent] = ["", ""];451if (clientNameHeader.length != 0) {452clientName = clientNameHeader[0].toString();453}454if (userAgentHeader.length != 0) {455userAgent = userAgentHeader[0].toString();456}457const clientIP = call.getPeer();458return { clientIP, clientName, userAgent };459}460461// "grpc" does not allow additional methods on it's "ServiceServer"s so we have an additional wrapper here462@injectable()463export class ClusterServiceServer {464constructor(465@inject(Configuration) private readonly config: Configuration,466@inject(ClusterService) private readonly service: ClusterService,467) {}468469private server: grpc.Server | undefined = undefined;470471public async start() {472// Default value for maxSessionMemory is 10 which is low for this gRPC server473// See https://nodejs.org/api/http2.html#http2_http2_connect_authority_options_listener.474const server = new grpc.Server({475"grpc-node.max_session_memory": 50,476});477// @ts-ignore478server.addService(ClusterServiceService, this.service);479this.server = server;480481const bindTo = `${this.config.clusterService.host}:${this.config.clusterService.port}`;482server.bindAsync(bindTo, grpc.ServerCredentials.createInsecure(), (err, port) => {483if (err) {484throw err;485}486487log.info(`gRPC server listening on: ${bindTo}`);488server.start();489});490}491492public async stop() {493const server = this.server;494if (server !== undefined) {495await new Promise((resolve) => {496server.tryShutdown(() => resolve({}));497});498this.server = undefined;499}500}501}502503504