Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/gitpod-protocol/src/util/grpc.ts
2500 views
1
/**
2
* Copyright (c) 2021 Gitpod GmbH. All rights reserved.
3
* Licensed under the GNU Affero General Public License (AGPL).
4
* See License.AGPL.txt in the project root for license information.
5
*/
6
7
import * as grpc from "@grpc/grpc-js";
8
import { Status } from "@grpc/grpc-js/build/src/constants";
9
import { log } from "./logging";
10
import { TrustedValue } from "./scrubbing";
11
12
export const defaultGRPCOptions = {
13
"grpc.keepalive_timeout_ms": 10000,
14
"grpc.keepalive_time_ms": 60000,
15
"grpc.http2.min_time_between_pings_ms": 10000,
16
"grpc.keepalive_permit_without_calls": 1,
17
"grpc-node.max_session_memory": 50,
18
"grpc.max_reconnect_backoff_ms": 5000,
19
"grpc.max_receive_message_length": 1024 * 1024 * 16,
20
// default is 30s, which is too long for us during rollouts (where service DNS entries are updated)
21
"grpc.dns_min_time_between_resolutions_ms": 2000,
22
};
23
24
export type GrpcMethodType = "unary" | "client_stream" | "server_stream" | "bidi_stream";
25
26
export interface IGrpcCallMetricsLabels {
27
service: string;
28
method: string;
29
type: GrpcMethodType;
30
}
31
32
export interface IGrpcCallMetricsLabelsWithCode extends IGrpcCallMetricsLabels {
33
code: string;
34
}
35
36
export const IClientCallMetrics = Symbol("IClientCallMetrics");
37
38
export interface IClientCallMetrics {
39
started(labels: IGrpcCallMetricsLabels): void;
40
sent(labels: IGrpcCallMetricsLabels): void;
41
received(labels: IGrpcCallMetricsLabels): void;
42
handled(labels: IGrpcCallMetricsLabelsWithCode): void;
43
startHandleTimer(
44
labels: IGrpcCallMetricsLabels,
45
): (labels?: Partial<Record<string, string | number>> | undefined) => number;
46
}
47
48
export function getGrpcMethodType(requestStream: boolean, responseStream: boolean): GrpcMethodType {
49
if (requestStream) {
50
if (responseStream) {
51
return "bidi_stream";
52
} else {
53
return "client_stream";
54
}
55
} else {
56
if (responseStream) {
57
return "server_stream";
58
} else {
59
return "unary";
60
}
61
}
62
}
63
64
export function createClientCallMetricsInterceptor(metrics: IClientCallMetrics): grpc.Interceptor {
65
return (options, nextCall): grpc.InterceptingCall => {
66
const methodDef = options.method_definition;
67
const method = methodDef.path.substring(methodDef.path.lastIndexOf("/") + 1);
68
const service = methodDef.path.substring(1, methodDef.path.length - method.length - 1);
69
const labels = {
70
service,
71
method,
72
type: getGrpcMethodType(options.method_definition.requestStream, options.method_definition.responseStream),
73
};
74
const requester = new grpc.RequesterBuilder()
75
.withStart((metadata, listener, next) => {
76
const newListener = new grpc.ListenerBuilder()
77
.withOnReceiveStatus((status, next) => {
78
try {
79
metrics.handled({
80
...labels,
81
code: Status[status.code],
82
});
83
} finally {
84
next(status);
85
}
86
})
87
.withOnReceiveMessage((message, next) => {
88
try {
89
metrics.received(labels);
90
} finally {
91
next(message);
92
}
93
})
94
.build();
95
try {
96
metrics.started(labels);
97
} finally {
98
next(metadata, newListener);
99
}
100
})
101
.withSendMessage((message, next) => {
102
try {
103
metrics.sent(labels);
104
} finally {
105
next(message);
106
}
107
})
108
.build();
109
return new grpc.InterceptingCall(nextCall(options), requester);
110
};
111
}
112
113
export function createDebugLogInterceptor(additionalContextF: (() => object) | undefined): grpc.Interceptor {
114
const FAILURE_STATUS_CODES = new Map([
115
[Status.ABORTED, true],
116
[Status.CANCELLED, true],
117
[Status.DATA_LOSS, true],
118
[Status.DEADLINE_EXCEEDED, true],
119
[Status.FAILED_PRECONDITION, true],
120
[Status.INTERNAL, true],
121
[Status.PERMISSION_DENIED, true],
122
[Status.RESOURCE_EXHAUSTED, true],
123
[Status.UNAUTHENTICATED, true],
124
[Status.UNAVAILABLE, true],
125
[Status.UNIMPLEMENTED, true],
126
[Status.UNKNOWN, true],
127
]);
128
129
return (options, nextCall): grpc.InterceptingCall => {
130
const methodDef = options.method_definition;
131
const method = methodDef.path.substring(methodDef.path.lastIndexOf("/") + 1);
132
const service = methodDef.path.substring(1, methodDef.path.length - method.length - 1);
133
const labels = {
134
service,
135
method,
136
type: getGrpcMethodType(options.method_definition.requestStream, options.method_definition.responseStream),
137
};
138
const requester = new grpc.RequesterBuilder()
139
.withStart((metadata, listener, next) => {
140
const newListener = new grpc.ListenerBuilder()
141
.withOnReceiveStatus((status, next) => {
142
// If given, call the additionalContext function and log the result
143
let additionalContext = {};
144
try {
145
if (additionalContextF) {
146
additionalContext = additionalContextF();
147
}
148
} catch (e) {}
149
150
try {
151
const info = {
152
labels: new TrustedValue(labels),
153
metadata: new TrustedValue(metadata.toJSON()),
154
code: Status[status.code],
155
details: status.details,
156
additionalContext: new TrustedValue(additionalContext),
157
};
158
if (FAILURE_STATUS_CODES.has(status.code)) {
159
log.warn(`grpc call failed`, info);
160
} else {
161
log.debug(`grpc call status`, info);
162
}
163
} finally {
164
next(status);
165
}
166
})
167
.build();
168
try {
169
log.debug(`grpc call started`, {
170
labels: new TrustedValue(labels),
171
metadata: new TrustedValue(metadata.toJSON()),
172
});
173
} finally {
174
next(metadata, newListener);
175
}
176
})
177
.withCancel((next) => {
178
try {
179
log.debug(`grpc call cancelled`, { labels: new TrustedValue(labels) });
180
} finally {
181
next();
182
}
183
})
184
.build();
185
return new grpc.InterceptingCall(nextCall(options), requester);
186
};
187
}
188
189
export function isGrpcError(err: any): err is grpc.StatusObject {
190
return err.code && err.details;
191
}
192
193
export function isConnectionAlive(client: grpc.Client) {
194
const cs = client.getChannel().getConnectivityState(false);
195
return (
196
cs == grpc.connectivityState.CONNECTING ||
197
cs == grpc.connectivityState.IDLE ||
198
cs == grpc.connectivityState.READY
199
);
200
}
201
202