Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-bridge/src/cluster-service-server.ts
2498 views
1
/**
2
* Copyright (c) 2021 Gitpod GmbH. All rights reserved.
3
* Licensed under the GNU Affero General Public License (AGPL).
4
* See License.AGPL.txt in the project root for license information.
5
*/
6
7
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
8
import { Queue } from "@gitpod/gitpod-protocol";
9
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
10
import {
11
WorkspaceCluster,
12
WorkspaceClusterDB,
13
WorkspaceClusterState,
14
TLSConfig,
15
AdmissionConstraint,
16
AdmissionConstraintHasPermission,
17
WorkspaceClusterWoTLS,
18
WorkspaceClass,
19
} from "@gitpod/gitpod-protocol/lib/workspace-cluster";
20
import {
21
ClusterServiceService,
22
ClusterState,
23
ClusterStatus,
24
DeregisterRequest,
25
DeregisterResponse,
26
IClusterServiceServer,
27
ListRequest,
28
ListResponse,
29
Preferability,
30
RegisterRequest,
31
RegisterResponse,
32
UpdateRequest,
33
UpdateResponse,
34
AdmissionConstraint as GRPCAdmissionConstraint,
35
} from "@gitpod/ws-manager-bridge-api/lib";
36
import { WorkspaceManagerClientProvider } from "@gitpod/ws-manager/lib/client-provider";
37
import {
38
WorkspaceManagerClientProviderCompositeSource,
39
WorkspaceManagerClientProviderSource,
40
} from "@gitpod/ws-manager/lib/client-provider-source";
41
import * as grpc from "@grpc/grpc-js";
42
import { inject, injectable } from "inversify";
43
import { BridgeController } from "./bridge-controller";
44
import { Configuration } from "./config";
45
import { GRPCError } from "./rpc";
46
import { isWorkspaceRegion } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
47
import { DescribeClusterRequest, DescribeClusterResponse, WorkspaceManagerClient } from "@gitpod/ws-manager/lib";
48
49
export interface ClusterServiceServerOptions {
50
port: number;
51
host: string;
52
}
53
54
@injectable()
55
export class ClusterService implements IClusterServiceServer {
56
// Satisfy the grpc.UntypedServiceImplementation interface.
57
[name: string]: any;
58
59
constructor(
60
@inject(WorkspaceClusterDB) private readonly clusterDB: WorkspaceClusterDB,
61
@inject(WorkspaceDB) private readonly workspaceDB: WorkspaceDB,
62
@inject(BridgeController) private readonly bridgeController: BridgeController,
63
@inject(WorkspaceManagerClientProvider) private readonly clientProvider: WorkspaceManagerClientProvider,
64
@inject(WorkspaceManagerClientProviderCompositeSource)
65
private readonly allClientProvider: WorkspaceManagerClientProviderSource,
66
) {}
67
68
// using a queue to make sure we do concurrency right
69
private readonly queue: Queue = new Queue();
70
71
public register(
72
call: grpc.ServerUnaryCall<RegisterRequest, RegisterResponse>,
73
callback: grpc.sendUnaryData<RegisterResponse>,
74
) {
75
log.info("requested clusters.register", getClientInfo(call));
76
this.queue.enqueue(async () => {
77
try {
78
// check if the name or URL are already registered/in use
79
const req = call.request.toObject();
80
81
if (!isWorkspaceRegion(req.region)) {
82
throw new GRPCError(grpc.status.INVALID_ARGUMENT, `Invalid value for workspace region.`);
83
}
84
85
const clusterByNamePromise = this.clusterDB.findByName(req.name);
86
const clusterByUrlPromise = this.clusterDB.findFiltered({
87
url: req.url,
88
});
89
90
const [clusterByName, clusterByUrl] = await Promise.all([clusterByNamePromise, clusterByUrlPromise]);
91
92
if (!!clusterByName) {
93
throw new GRPCError(
94
grpc.status.ALREADY_EXISTS,
95
`a WorkspaceCluster with name ${req.name} already exists in the DB`,
96
);
97
}
98
if (!!clusterByUrl) {
99
if (clusterByUrl.length > 0) {
100
throw new GRPCError(
101
grpc.status.ALREADY_EXISTS,
102
`a WorkspaceCluster with url ${req.url} already exists in the DB`,
103
);
104
}
105
}
106
107
// store the ws-manager into the database
108
let perfereability = Preferability.NONE;
109
const govern = true;
110
let state: WorkspaceClusterState = "available";
111
if (req.hints) {
112
perfereability = req.hints.perfereability;
113
state = mapCordoned(req.hints.cordoned);
114
}
115
const score = mapPreferabilityToScore(perfereability);
116
if (score === undefined) {
117
throw new GRPCError(grpc.status.INVALID_ARGUMENT, `unknown preferability ${perfereability}`);
118
}
119
120
if (!req.tls) {
121
throw new GRPCError(grpc.status.INVALID_ARGUMENT, "missing required TLS config");
122
}
123
// we assume that client's have already base64-encoded their input!
124
const tls: TLSConfig = {
125
ca: req.tls.ca,
126
crt: req.tls.crt,
127
key: req.tls.key,
128
};
129
130
const admissionConstraints = call.request
131
.getAdmissionConstraintsList()
132
.map(mapAdmissionConstraint)
133
.filter((c) => !!c) as AdmissionConstraint[];
134
135
const newCluster: WorkspaceCluster = {
136
name: req.name,
137
url: req.url,
138
region: req.region,
139
state,
140
score,
141
maxScore: 100,
142
govern,
143
tls,
144
admissionConstraints,
145
};
146
147
// try to connect to validate the config. Throws an exception if it fails.
148
const clusterDesc = await new Promise<DescribeClusterResponse>((resolve, reject) => {
149
const c = this.clientProvider.createConnection(WorkspaceManagerClient, newCluster);
150
c.describeCluster(new DescribeClusterRequest(), (err: any, resp: DescribeClusterResponse) => {
151
if (err) {
152
reject(
153
new GRPCError(
154
grpc.status.FAILED_PRECONDITION,
155
`cannot reach ${req.url}: ${err.message}`,
156
),
157
);
158
} else {
159
resolve(resp);
160
}
161
});
162
});
163
164
// Make a test call to the cluster to validate the TLS config.
165
// We use this test call to gather the available workspace classes.
166
newCluster.preferredWorkspaceClass = clusterDesc.getPreferredWorkspaceClass();
167
newCluster.availableWorkspaceClasses = clusterDesc.getWorkspaceClassesList().map((c) => {
168
return <WorkspaceClass>{
169
creditsPerMinute: c.getCreditsPerMinute(),
170
description: c.getDescription(),
171
displayName: c.getDisplayName(),
172
id: c.getId(),
173
};
174
});
175
176
await this.clusterDB.save(newCluster);
177
log.info({}, "cluster registered", { cluster: req.name });
178
this.triggerReconcile("register", req.name);
179
180
callback(null, new RegisterResponse());
181
} catch (err) {
182
callback(mapToGRPCError(err), null);
183
}
184
});
185
}
186
187
public update(
188
call: grpc.ServerUnaryCall<UpdateRequest, UpdateResponse>,
189
callback: grpc.sendUnaryData<UpdateResponse>,
190
) {
191
log.info("requested clusters.update", getClientInfo(call));
192
this.queue.enqueue(async () => {
193
try {
194
const req = call.request.toObject();
195
const cluster = await this.clusterDB.findByName(req.name);
196
if (!cluster) {
197
throw new GRPCError(
198
grpc.status.NOT_FOUND,
199
`a WorkspaceCluster with name ${req.name} does not exist in the DB!`,
200
);
201
}
202
203
if (call.request.hasMaxScore()) {
204
cluster.maxScore = req.maxScore;
205
}
206
if (call.request.hasScore()) {
207
cluster.score = req.score;
208
}
209
if (call.request.hasCordoned()) {
210
cluster.state = mapCordoned(req.cordoned);
211
}
212
if (call.request.hasAdmissionConstraint()) {
213
const mod = call.request.getAdmissionConstraint()!;
214
const c = mapAdmissionConstraint(mod.getConstraint());
215
if (!!c) {
216
if (mod.getAdd()) {
217
cluster.admissionConstraints = (cluster.admissionConstraints || []).concat([c]);
218
} else {
219
cluster.admissionConstraints = cluster.admissionConstraints?.filter((v) => {
220
if (v.type !== c.type) {
221
return true;
222
}
223
224
switch (v.type) {
225
case "has-feature-preview":
226
return false;
227
case "has-permission":
228
if (v.permission === (c as AdmissionConstraintHasPermission).permission) {
229
return false;
230
}
231
break;
232
}
233
return true;
234
});
235
}
236
}
237
}
238
if (call.request.hasTls()) {
239
const tls = req.tls;
240
if (!tls?.ca || !tls?.crt || !tls?.key) {
241
throw new GRPCError(grpc.status.INVALID_ARGUMENT, "missing required TLS config");
242
}
243
if (tls.ca === cluster.tls?.ca && tls.crt === cluster.tls?.crt && tls.key === cluster.tls?.key) {
244
callback(null, new UpdateResponse());
245
return;
246
}
247
248
const newCluster: WorkspaceCluster = {
249
name: req.name,
250
url: cluster.url,
251
region: cluster.region,
252
state: cluster.state,
253
score: cluster.score,
254
maxScore: 100,
255
govern: cluster.govern,
256
tls: tls,
257
admissionConstraints: cluster.admissionConstraints,
258
};
259
260
// try to connect to validate the config. Throws an exception if it fails.
261
await new Promise<DescribeClusterResponse>((resolve, reject) => {
262
const c = this.clientProvider.createConnection(WorkspaceManagerClient, newCluster);
263
c.describeCluster(new DescribeClusterRequest(), (err: any, resp: DescribeClusterResponse) => {
264
if (err) {
265
reject(
266
new GRPCError(
267
grpc.status.FAILED_PRECONDITION,
268
`cannot reach ${cluster.url}: ${err.message}`,
269
),
270
);
271
} else {
272
resolve(resp);
273
}
274
});
275
});
276
cluster.tls = tls;
277
}
278
await this.clusterDB.save(cluster);
279
log.info({}, "cluster updated", { cluster: req.name });
280
this.triggerReconcile("update", req.name);
281
282
callback(null, new UpdateResponse());
283
} catch (err) {
284
callback(mapToGRPCError(err), null);
285
}
286
});
287
}
288
289
public deregister(
290
call: grpc.ServerUnaryCall<DeregisterRequest, DeregisterResponse>,
291
callback: grpc.sendUnaryData<DeregisterResponse>,
292
) {
293
log.info("requested clusters.deregister", getClientInfo(call));
294
this.queue.enqueue(async () => {
295
try {
296
const req = call.request.toObject();
297
298
const instances = await this.workspaceDB.findRegularRunningInstances();
299
const relevantInstances = instances.filter((i) => i.region === req.name);
300
if (!req.force && relevantInstances.length > 0) {
301
log.info({}, "forced cluster deregistration even though there are still instances running", {
302
cluster: req.name,
303
});
304
throw new GRPCError(
305
grpc.status.FAILED_PRECONDITION,
306
`cluster is not empty (${relevantInstances.length} instances remaining)[${relevantInstances
307
.map((i) => i.id)
308
.join(",")}]`,
309
);
310
}
311
312
await this.clusterDB.deleteByName(req.name);
313
log.info({}, "cluster deregistered", { cluster: req.name });
314
this.triggerReconcile("deregister", req.name);
315
316
callback(null, new DeregisterResponse());
317
} catch (err) {
318
callback(mapToGRPCError(err), null);
319
}
320
});
321
}
322
323
public list(call: grpc.ServerUnaryCall<ListRequest, ListResponse>, callback: grpc.sendUnaryData<ListResponse>) {
324
log.info("requested clusters.list", getClientInfo(call));
325
this.queue.enqueue(async () => {
326
try {
327
const response = new ListResponse();
328
329
const dbClusterIdx = new Map<string, boolean>();
330
const allDBClusters = await this.clusterDB.findFiltered({});
331
for (const cluster of allDBClusters) {
332
const clusterStatus = convertToGRPC(cluster);
333
response.addStatus(clusterStatus);
334
dbClusterIdx.set(cluster.name, true);
335
}
336
337
const allCluster = await this.allClientProvider.getAllWorkspaceClusters();
338
for (const cluster of allCluster) {
339
if (dbClusterIdx.get(cluster.name)) {
340
continue;
341
}
342
343
const clusterStatus = convertToGRPC(cluster);
344
clusterStatus.setStatic(true);
345
response.addStatus(clusterStatus);
346
}
347
348
callback(null, response);
349
} catch (err) {
350
callback(mapToGRPCError(err), null);
351
}
352
});
353
}
354
355
private triggerReconcile(action: string, name: string) {
356
const payload = { action, name };
357
log.info("reconcile: on request", payload);
358
this.bridgeController
359
.runReconcileNow()
360
.catch((err) => log.error("error during forced reconcile", err, payload));
361
}
362
}
363
364
function convertToGRPC(ws: WorkspaceClusterWoTLS): ClusterStatus {
365
const clusterStatus = new ClusterStatus();
366
clusterStatus.setName(ws.name);
367
clusterStatus.setUrl(ws.url);
368
clusterStatus.setState(mapClusterState(ws.state));
369
clusterStatus.setScore(ws.score);
370
clusterStatus.setMaxScore(ws.maxScore);
371
clusterStatus.setGoverned(ws.govern);
372
clusterStatus.setRegion(ws.region);
373
374
ws.admissionConstraints?.forEach((c) => {
375
const constraint = new GRPCAdmissionConstraint();
376
switch (c.type) {
377
case "has-feature-preview":
378
constraint.setHasFeaturePreview(new GRPCAdmissionConstraint.FeaturePreview());
379
break;
380
case "has-permission":
381
const perm = new GRPCAdmissionConstraint.HasPermission();
382
perm.setPermission(c.permission);
383
constraint.setHasPermission(perm);
384
break;
385
default:
386
return;
387
}
388
clusterStatus.addAdmissionConstraint(constraint);
389
});
390
return clusterStatus;
391
}
392
393
function mapAdmissionConstraint(c: GRPCAdmissionConstraint | undefined): AdmissionConstraint | undefined {
394
if (!c) {
395
return;
396
}
397
398
if (c.hasHasFeaturePreview()) {
399
return <AdmissionConstraint>{ type: "has-feature-preview" };
400
}
401
if (c.hasHasPermission()) {
402
const permission = c.getHasPermission()?.getPermission();
403
if (!permission) {
404
return;
405
}
406
407
return <AdmissionConstraintHasPermission>{ type: "has-permission", permission };
408
}
409
return;
410
}
411
412
function mapPreferabilityToScore(p: Preferability): number | undefined {
413
switch (p) {
414
case Preferability.PREFER:
415
return 100;
416
case Preferability.NONE:
417
return 50;
418
case Preferability.DONTSCHEDULE:
419
return 0;
420
default:
421
return undefined;
422
}
423
}
424
425
function mapCordoned(cordoned: boolean): WorkspaceClusterState {
426
return cordoned ? "cordoned" : "available";
427
}
428
429
function mapClusterState(state: WorkspaceClusterState): ClusterState {
430
switch (state) {
431
case "available":
432
return ClusterState.AVAILABLE;
433
case "cordoned":
434
return ClusterState.CORDONED;
435
case "draining":
436
return ClusterState.DRAINING;
437
}
438
}
439
440
function mapToGRPCError(err: any): any {
441
if (!GRPCError.isGRPCError(err)) {
442
return new GRPCError(grpc.status.INTERNAL, err);
443
}
444
return err;
445
}
446
447
function getClientInfo(call: grpc.ServerUnaryCall<any, any>) {
448
const clientNameHeader = call.metadata.get("client-name");
449
const userAgentHeader = call.metadata.get("user-agent");
450
451
let [clientName, userAgent] = ["", ""];
452
if (clientNameHeader.length != 0) {
453
clientName = clientNameHeader[0].toString();
454
}
455
if (userAgentHeader.length != 0) {
456
userAgent = userAgentHeader[0].toString();
457
}
458
const clientIP = call.getPeer();
459
return { clientIP, clientName, userAgent };
460
}
461
462
// "grpc" does not allow additional methods on it's "ServiceServer"s so we have an additional wrapper here
463
@injectable()
464
export class ClusterServiceServer {
465
constructor(
466
@inject(Configuration) private readonly config: Configuration,
467
@inject(ClusterService) private readonly service: ClusterService,
468
) {}
469
470
private server: grpc.Server | undefined = undefined;
471
472
public async start() {
473
// Default value for maxSessionMemory is 10 which is low for this gRPC server
474
// See https://nodejs.org/api/http2.html#http2_http2_connect_authority_options_listener.
475
const server = new grpc.Server({
476
"grpc-node.max_session_memory": 50,
477
});
478
// @ts-ignore
479
server.addService(ClusterServiceService, this.service);
480
this.server = server;
481
482
const bindTo = `${this.config.clusterService.host}:${this.config.clusterService.port}`;
483
server.bindAsync(bindTo, grpc.ServerCredentials.createInsecure(), (err, port) => {
484
if (err) {
485
throw err;
486
}
487
488
log.info(`gRPC server listening on: ${bindTo}`);
489
server.start();
490
});
491
}
492
493
public async stop() {
494
const server = this.server;
495
if (server !== undefined) {
496
await new Promise((resolve) => {
497
server.tryShutdown(() => resolve({}));
498
});
499
this.server = undefined;
500
}
501
}
502
}
503
504