Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/gitpod-protocol/src/util/nice-grpc.ts
2500 views
1
/**
2
* Copyright (c) 2022 Gitpod GmbH. All rights reserved.
3
* Licensed under the GNU Affero General Public License (AGPL).
4
* See License.AGPL.txt in the project root for license information.
5
*/
6
7
import { isAbortError } from "abort-controller-x";
8
import {
9
CallOptions,
10
ClientError,
11
ClientMiddleware,
12
ClientMiddlewareCall,
13
Status,
14
MethodDescriptor,
15
} from "nice-grpc-common";
16
import { GrpcMethodType, IClientCallMetrics } from "./grpc";
17
18
function getLabels(method: MethodDescriptor) {
19
const callType = method.requestStream
20
? method.responseStream
21
? "bidi_stream"
22
: "client_stream"
23
: method.responseStream
24
? "server_stream"
25
: "unary";
26
const { path } = method;
27
const [serviceName, methodName] = path.split("/").slice(1);
28
29
return {
30
type: callType as GrpcMethodType,
31
service: serviceName,
32
method: methodName,
33
};
34
}
35
36
async function* incrementStreamMessagesCounter<T>(iterable: AsyncIterable<T>, callback: () => void): AsyncIterable<T> {
37
for await (const item of iterable) {
38
callback();
39
yield item;
40
}
41
}
42
43
export function prometheusClientMiddleware(metrics: IClientCallMetrics): ClientMiddleware {
44
return async function* prometheusClientMiddlewareGenerator<Request, Response>(
45
call: ClientMiddlewareCall<Request, Response>,
46
options: CallOptions,
47
): AsyncGenerator<Response, Response | void, undefined> {
48
const labels = getLabels(call.method);
49
50
metrics.started(labels);
51
52
const stopTimer = metrics.startHandleTimer(labels);
53
54
let settled = false;
55
let status: Status = Status.OK;
56
57
try {
58
let request;
59
60
if (!call.requestStream) {
61
request = call.request;
62
} else {
63
request = incrementStreamMessagesCounter(call.request, metrics.sent.bind(metrics, labels));
64
}
65
66
if (!call.responseStream) {
67
const response = yield* call.next(request, options);
68
settled = true;
69
return response;
70
} else {
71
yield* incrementStreamMessagesCounter(
72
call.next(request, options),
73
metrics.received.bind(metrics, labels),
74
);
75
settled = true;
76
return;
77
}
78
} catch (err) {
79
settled = true;
80
if (err instanceof ClientError) {
81
status = err.code;
82
} else if (isAbortError(err)) {
83
status = Status.CANCELLED;
84
} else {
85
status = Status.UNKNOWN;
86
}
87
throw err;
88
} finally {
89
if (!settled) {
90
status = Status.CANCELLED;
91
}
92
stopTimer({ grpc_code: Status[status] });
93
metrics.handled({ ...labels, code: Status[status] });
94
}
95
};
96
}
97
98