Path: blob/main/components/gitpod-protocol/src/util/nice-grpc.ts
2500 views
/**1* Copyright (c) 2022 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import { isAbortError } from "abort-controller-x";7import {8CallOptions,9ClientError,10ClientMiddleware,11ClientMiddlewareCall,12Status,13MethodDescriptor,14} from "nice-grpc-common";15import { GrpcMethodType, IClientCallMetrics } from "./grpc";1617function getLabels(method: MethodDescriptor) {18const callType = method.requestStream19? method.responseStream20? "bidi_stream"21: "client_stream"22: method.responseStream23? "server_stream"24: "unary";25const { path } = method;26const [serviceName, methodName] = path.split("/").slice(1);2728return {29type: callType as GrpcMethodType,30service: serviceName,31method: methodName,32};33}3435async function* incrementStreamMessagesCounter<T>(iterable: AsyncIterable<T>, callback: () => void): AsyncIterable<T> {36for await (const item of iterable) {37callback();38yield item;39}40}4142export function prometheusClientMiddleware(metrics: IClientCallMetrics): ClientMiddleware {43return async function* prometheusClientMiddlewareGenerator<Request, Response>(44call: ClientMiddlewareCall<Request, Response>,45options: CallOptions,46): AsyncGenerator<Response, Response | void, undefined> {47const labels = getLabels(call.method);4849metrics.started(labels);5051const stopTimer = metrics.startHandleTimer(labels);5253let settled = false;54let status: Status = Status.OK;5556try {57let request;5859if (!call.requestStream) {60request = call.request;61} else {62request = incrementStreamMessagesCounter(call.request, metrics.sent.bind(metrics, labels));63}6465if (!call.responseStream) {66const response = yield* call.next(request, options);67settled = true;68return response;69} else {70yield* incrementStreamMessagesCounter(71call.next(request, options),72metrics.received.bind(metrics, labels),73);74settled = true;75return;76}77} catch (err) {78settled = true;79if (err instanceof ClientError) {80status = err.code;81} else if (isAbortError(err)) {82status = Status.CANCELLED;83} else {84status = Status.UNKNOWN;85}86throw err;87} finally {88if (!settled) {89status = Status.CANCELLED;90}91stopTimer({ grpc_code: Status[status] });92metrics.handled({ ...labels, code: Status[status] });93}94};95}969798