Path: blob/main/components/dashboard/src/data/prebuilds/prebuild-logs-emitter.ts
2501 views
/**1* Copyright (c) 2024 Gitpod GmbH. All rights reserved.2* Licensed under the GNU Affero General Public License (AGPL).3* See License.AGPL.txt in the project root for license information.4*/56import { useEffect, useMemo } from "react";7import { matchPrebuildError } from "@gitpod/public-api-common/lib/prebuild-utils";8import { ApplicationError, ErrorCodes } from "@gitpod/gitpod-protocol/lib/messaging/error";9import { Disposable, DisposableCollection, HEADLESS_LOG_STREAM_STATUS_CODE_REGEX } from "@gitpod/gitpod-protocol";10import { Prebuild, PrebuildPhase_Phase } from "@gitpod/public-api/lib/gitpod/v1/prebuild_pb";11import { PlainMessage } from "@bufbuild/protobuf";12import { ReplayableEventEmitter } from "../../utils";1314type LogEventTypes = {15error: [Error];16logs: [string];17"logs-error": [ApplicationError];18reset: [];19};2021/**22* Watches the logs of a prebuild task by returning an EventEmitter that emits logs, logs-error, and error events.23* @param prebuildId ID of the prebuild to watch24* @param taskId ID of the task to watch.25*/26export function usePrebuildLogsEmitter(prebuild: PlainMessage<Prebuild>, taskId: string) {27const emitter = useMemo(28() => new ReplayableEventEmitter<LogEventTypes>(),29// We would like to re-create the emitter when the prebuildId or taskId changes, so that logs of old tasks / prebuilds are not mixed with the new ones.30// eslint-disable-next-line react-hooks/exhaustive-deps31[prebuild.id, taskId],32);3334const shouldFetchLogs = useMemo<boolean>(() => {35const phase = prebuild.status?.phase?.name;36if (phase === PrebuildPhase_Phase.QUEUED && taskId === "image-build") {37return true;38}39switch (phase) {40case PrebuildPhase_Phase.QUEUED:41case PrebuildPhase_Phase.UNSPECIFIED:42return false;43// This is the online case: we do the actual streaming44// All others below are terminal states, where we get re-directed to the logs stored in content-service45case PrebuildPhase_Phase.BUILDING:46case PrebuildPhase_Phase.AVAILABLE:47case PrebuildPhase_Phase.FAILED:48case PrebuildPhase_Phase.ABORTED:49case PrebuildPhase_Phase.TIMEOUT:50return true;51}5253return false;54}, [prebuild.status?.phase?.name, taskId]);5556useEffect(() => {57if (!shouldFetchLogs || emitter.hasReachedEnd()) {58return;59}6061const task = {62taskId,63logUrl: "",64};65if (taskId === "image-build") {66if (!prebuild.status?.imageBuildLogUrl) {67throw new ApplicationError(ErrorCodes.NOT_FOUND, `Image build logs URL not found in response`);68}69task.logUrl = prebuild.status?.imageBuildLogUrl;70} else {71const logUrl = prebuild?.status?.taskLogs?.find((log) => log.taskId === taskId)?.logUrl;72if (!logUrl) {73throw new ApplicationError(ErrorCodes.NOT_FOUND, `Task ${taskId} not found`);74}7576task.logUrl = logUrl;77}7879const disposables = new DisposableCollection();80disposables.push(81streamPrebuildLogs(82taskId,83task.logUrl,84(chunk) => {85emitter.emit("logs", chunk);86},87(err) => {88emitter.emit("logs-error", err);89},90() => {91emitter.markReachedEnd();92},93),94);9596return () => {97disposables.dispose();98if (!emitter.hasReachedEnd()) {99// If we haven't finished yet, but the page is re-rendered, clear the output we already got.100emitter.emit("reset");101}102};103// eslint-disable-next-line react-hooks/exhaustive-deps104}, [emitter, prebuild.id, taskId, shouldFetchLogs]);105106return { emitter };107}108109function streamPrebuildLogs(110taskId: string,111streamUrl: string,112onLog: (chunk: Uint8Array) => void,113onError: (err: Error) => void,114onEnd?: () => void,115): DisposableCollection {116const disposables = new DisposableCollection();117118// initializing non-empty here to use this as a stopping signal for the retries down below119disposables.push(Disposable.NULL);120121// retry configuration goes here122const initialDelaySeconds = 1;123const backoffFactor = 1.2;124const maxBackoffSeconds = 5;125let delayInSeconds = initialDelaySeconds;126127const startWatchingLogs = async () => {128const retryBackoff = async (reason: string, err?: Error) => {129delayInSeconds = Math.min(delayInSeconds * backoffFactor, maxBackoffSeconds);130131console.debug("re-trying headless-logs because: " + reason, err);132await new Promise((resolve) => {133setTimeout(resolve, delayInSeconds * 1000);134});135if (disposables.disposed) {136return; // and stop retrying137}138startWatchingLogs().catch(console.error);139};140141let response: Response | undefined = undefined;142let reader: ReadableStreamDefaultReader<Uint8Array> | undefined = undefined;143try {144disposables.push({145dispose: async () => {146await reader?.cancel();147},148});149console.debug("fetching from streamUrl: " + streamUrl);150response = await fetch(streamUrl, {151method: "GET",152cache: "no-store", // we don't want the browser to a) look at the cache, or b) update the cache (which would interrupt any running fetches to that resource!)153credentials: "include",154headers: {155TE: "trailers", // necessary to receive stream status code156},157redirect: "follow",158});159reader = response.body?.getReader();160if (!reader) {161await retryBackoff("no reader");162return;163}164165const decoder = new TextDecoder("utf-8");166let chunk = await reader.read();167let received200 = false;168while (!chunk.done) {169if (disposables.disposed) {170// stop reading when disposed171return;172}173174// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).175// So we resort to this hand-written solution:176const msg = decoder.decode(chunk.value, { stream: true });177const matches = msg.match(HEADLESS_LOG_STREAM_STATUS_CODE_REGEX);178const prebuildMatches = matchPrebuildError(msg);179if (matches) {180if (matches.length < 2) {181console.debug("error parsing log stream status code. msg: " + msg);182} else {183const prefix = msg.substring(0, matches.index);184if (prefix) {185const prefixChunk = new TextEncoder().encode(prefix);186onLog(prefixChunk);187}188const code = parseStatusCode(matches[1]);189if (code !== 200) {190throw new StreamError(code);191}192if (code === 200) {193received200 = true;194break;195}196}197} else if (prebuildMatches) {198if (prebuildMatches.code === ErrorCodes.HEADLESS_LOG_NOT_YET_AVAILABLE) {199// reset backoff because this error is expected200delayInSeconds = initialDelaySeconds;201throw prebuildMatches;202}203onError(prebuildMatches);204} else {205onLog(chunk.value);206}207208chunk = await reader.read();209}210console.info("[stream] end of stream", { received200 });211reader.cancel();212} catch (err) {213if (err instanceof DOMException && err.name === "AbortError") {214console.debug("stopped watching headless logs, not retrying: method got disposed of");215return;216}217reader?.cancel().catch(console.debug);218if (err.code === 400) {219// sth is really off, and we _should not_ retry220console.error("stopped watching headless logs", err);221return;222}223await retryBackoff("error while listening to stream", err);224} finally {225reader?.cancel().catch(console.debug);226if (onEnd) {227onEnd();228}229}230};231startWatchingLogs().catch(console.error);232233return disposables;234}235236class StreamError extends Error {237constructor(readonly code?: number) {238super(`stream status code: ${code}`);239}240}241242function parseStatusCode(code: string | undefined): number | undefined {243try {244if (!code) {245return undefined;246}247return Number.parseInt(code);248} catch (err) {249return undefined;250}251}252253254