Path: blob/main/components/gitpod-protocol/src/util/grpc.ts
2500 views
/**1* Copyright (c) 2021 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import * as grpc from "@grpc/grpc-js";7import { Status } from "@grpc/grpc-js/build/src/constants";8import { log } from "./logging";9import { TrustedValue } from "./scrubbing";1011export const defaultGRPCOptions = {12"grpc.keepalive_timeout_ms": 10000,13"grpc.keepalive_time_ms": 60000,14"grpc.http2.min_time_between_pings_ms": 10000,15"grpc.keepalive_permit_without_calls": 1,16"grpc-node.max_session_memory": 50,17"grpc.max_reconnect_backoff_ms": 5000,18"grpc.max_receive_message_length": 1024 * 1024 * 16,19// default is 30s, which is too long for us during rollouts (where service DNS entries are updated)20"grpc.dns_min_time_between_resolutions_ms": 2000,21};2223export type GrpcMethodType = "unary" | "client_stream" | "server_stream" | "bidi_stream";2425export interface IGrpcCallMetricsLabels {26service: string;27method: string;28type: GrpcMethodType;29}3031export interface IGrpcCallMetricsLabelsWithCode extends IGrpcCallMetricsLabels {32code: string;33}3435export const IClientCallMetrics = Symbol("IClientCallMetrics");3637export interface IClientCallMetrics {38started(labels: IGrpcCallMetricsLabels): void;39sent(labels: IGrpcCallMetricsLabels): void;40received(labels: IGrpcCallMetricsLabels): void;41handled(labels: IGrpcCallMetricsLabelsWithCode): void;42startHandleTimer(43labels: IGrpcCallMetricsLabels,44): (labels?: Partial<Record<string, string | number>> | undefined) => number;45}4647export function getGrpcMethodType(requestStream: boolean, responseStream: boolean): GrpcMethodType {48if (requestStream) {49if (responseStream) {50return "bidi_stream";51} else {52return "client_stream";53}54} else {55if (responseStream) {56return "server_stream";57} else {58return "unary";59}60}61}6263export function createClientCallMetricsInterceptor(metrics: IClientCallMetrics): grpc.Interceptor {64return (options, nextCall): grpc.InterceptingCall => {65const methodDef = options.method_definition;66const method = methodDef.path.substring(methodDef.path.lastIndexOf("/") + 1);67const service = methodDef.path.substring(1, methodDef.path.length - method.length - 1);68const labels = {69service,70method,71type: getGrpcMethodType(options.method_definition.requestStream, options.method_definition.responseStream),72};73const requester = new grpc.RequesterBuilder()74.withStart((metadata, listener, next) => {75const newListener = new grpc.ListenerBuilder()76.withOnReceiveStatus((status, next) => {77try {78metrics.handled({79...labels,80code: Status[status.code],81});82} finally {83next(status);84}85})86.withOnReceiveMessage((message, next) => {87try {88metrics.received(labels);89} finally {90next(message);91}92})93.build();94try {95metrics.started(labels);96} finally {97next(metadata, newListener);98}99})100.withSendMessage((message, next) => {101try {102metrics.sent(labels);103} finally {104next(message);105}106})107.build();108return new grpc.InterceptingCall(nextCall(options), requester);109};110}111112export function createDebugLogInterceptor(additionalContextF: (() => object) | undefined): grpc.Interceptor {113const FAILURE_STATUS_CODES = new Map([114[Status.ABORTED, true],115[Status.CANCELLED, true],116[Status.DATA_LOSS, true],117[Status.DEADLINE_EXCEEDED, true],118[Status.FAILED_PRECONDITION, true],119[Status.INTERNAL, true],120[Status.PERMISSION_DENIED, true],121[Status.RESOURCE_EXHAUSTED, true],122[Status.UNAUTHENTICATED, true],123[Status.UNAVAILABLE, true],124[Status.UNIMPLEMENTED, true],125[Status.UNKNOWN, true],126]);127128return (options, nextCall): grpc.InterceptingCall => {129const methodDef = options.method_definition;130const method = methodDef.path.substring(methodDef.path.lastIndexOf("/") + 1);131const service = methodDef.path.substring(1, methodDef.path.length - method.length - 1);132const labels = {133service,134method,135type: getGrpcMethodType(options.method_definition.requestStream, options.method_definition.responseStream),136};137const requester = new grpc.RequesterBuilder()138.withStart((metadata, listener, next) => {139const newListener = new grpc.ListenerBuilder()140.withOnReceiveStatus((status, next) => {141// If given, call the additionalContext function and log the result142let additionalContext = {};143try {144if (additionalContextF) {145additionalContext = additionalContextF();146}147} catch (e) {}148149try {150const info = {151labels: new TrustedValue(labels),152metadata: new TrustedValue(metadata.toJSON()),153code: Status[status.code],154details: status.details,155additionalContext: new TrustedValue(additionalContext),156};157if (FAILURE_STATUS_CODES.has(status.code)) {158log.warn(`grpc call failed`, info);159} else {160log.debug(`grpc call status`, info);161}162} finally {163next(status);164}165})166.build();167try {168log.debug(`grpc call started`, {169labels: new TrustedValue(labels),170metadata: new TrustedValue(metadata.toJSON()),171});172} finally {173next(metadata, newListener);174}175})176.withCancel((next) => {177try {178log.debug(`grpc call cancelled`, { labels: new TrustedValue(labels) });179} finally {180next();181}182})183.build();184return new grpc.InterceptingCall(nextCall(options), requester);185};186}187188export function isGrpcError(err: any): err is grpc.StatusObject {189return err.code && err.details;190}191192export function isConnectionAlive(client: grpc.Client) {193const cs = client.getChannel().getConnectivityState(false);194return (195cs == grpc.connectivityState.CONNECTING ||196cs == grpc.connectivityState.IDLE ||197cs == grpc.connectivityState.READY198);199}200201202