Path: blob/main/src/vs/platform/files/node/watcher/baseWatcher.ts
3296 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { watchFile, unwatchFile, Stats } from 'fs';6import { Disposable, DisposableMap, DisposableStore, toDisposable } from '../../../../base/common/lifecycle.js';7import { ILogMessage, IRecursiveWatcherWithSubscribe, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, IWatcherErrorEvent, isWatchRequestWithCorrelation, requestFilterToString } from '../../common/watcher.js';8import { Emitter, Event } from '../../../../base/common/event.js';9import { FileChangeType, IFileChange } from '../../common/files.js';10import { URI } from '../../../../base/common/uri.js';11import { DeferredPromise, ThrottledDelayer } from '../../../../base/common/async.js';12import { hash } from '../../../../base/common/hash.js';13import { onUnexpectedError } from '../../../../base/common/errors.js';1415interface ISuspendedWatchRequest {16readonly id: number;17readonly correlationId: number | undefined;18readonly path: string;19}2021export abstract class BaseWatcher extends Disposable implements IWatcher {2223protected readonly _onDidChangeFile = this._register(new Emitter<IFileChange[]>());24readonly onDidChangeFile = this._onDidChangeFile.event;2526protected readonly _onDidLogMessage = this._register(new Emitter<ILogMessage>());27readonly onDidLogMessage = this._onDidLogMessage.event;2829protected readonly _onDidWatchFail = this._register(new Emitter<IUniversalWatchRequest>());30private readonly onDidWatchFail = this._onDidWatchFail.event;3132private readonly correlatedWatchRequests = new Map<number /* request ID */, IWatchRequestWithCorrelation>();33private readonly nonCorrelatedWatchRequests = new Map<number /* request ID */, IUniversalWatchRequest>();3435private readonly suspendedWatchRequests = this._register(new DisposableMap<number /* request ID */>());36private readonly suspendedWatchRequestsWithPolling = new Set<number /* request ID */>();3738private readonly updateWatchersDelayer = this._register(new ThrottledDelayer<void>(this.getUpdateWatchersDelay()));3940protected readonly suspendedWatchRequestPollingInterval: number = 5007; // node.js default4142private joinWatch = new DeferredPromise<void>();4344constructor() {45super();4647this._register(this.onDidWatchFail(request => this.suspendWatchRequest({48id: this.computeId(request),49correlationId: this.isCorrelated(request) ? request.correlationId : undefined,50path: request.path51})));52}5354protected isCorrelated(request: IUniversalWatchRequest): request is IWatchRequestWithCorrelation {55return isWatchRequestWithCorrelation(request);56}5758private computeId(request: IUniversalWatchRequest): number {59if (this.isCorrelated(request)) {60return request.correlationId;61} else {62// Requests without correlation do not carry any unique identifier, so we have to63// come up with one based on the options of the request. This matches what the64// file service does (vs/platform/files/common/fileService.ts#L1178).65return hash(request);66}67}6869async watch(requests: IUniversalWatchRequest[]): Promise<void> {70if (!this.joinWatch.isSettled) {71this.joinWatch.complete();72}73this.joinWatch = new DeferredPromise<void>();7475try {76this.correlatedWatchRequests.clear();77this.nonCorrelatedWatchRequests.clear();7879// Figure out correlated vs. non-correlated requests80for (const request of requests) {81if (this.isCorrelated(request)) {82this.correlatedWatchRequests.set(request.correlationId, request);83} else {84this.nonCorrelatedWatchRequests.set(this.computeId(request), request);85}86}8788// Remove all suspended watch requests that are no longer watched89for (const [id] of this.suspendedWatchRequests) {90if (!this.nonCorrelatedWatchRequests.has(id) && !this.correlatedWatchRequests.has(id)) {91this.suspendedWatchRequests.deleteAndDispose(id);92this.suspendedWatchRequestsWithPolling.delete(id);93}94}9596return await this.updateWatchers(false /* not delayed */);97} finally {98this.joinWatch.complete();99}100}101102private updateWatchers(delayed: boolean): Promise<void> {103const nonSuspendedRequests: IUniversalWatchRequest[] = [];104for (const [id, request] of [...this.nonCorrelatedWatchRequests, ...this.correlatedWatchRequests]) {105if (!this.suspendedWatchRequests.has(id)) {106nonSuspendedRequests.push(request);107}108}109110return this.updateWatchersDelayer.trigger(() => this.doWatch(nonSuspendedRequests), delayed ? this.getUpdateWatchersDelay() : 0).catch(error => onUnexpectedError(error));111}112113protected getUpdateWatchersDelay(): number {114return 800;115}116117isSuspended(request: IUniversalWatchRequest): 'polling' | boolean {118const id = this.computeId(request);119return this.suspendedWatchRequestsWithPolling.has(id) ? 'polling' : this.suspendedWatchRequests.has(id);120}121122private async suspendWatchRequest(request: ISuspendedWatchRequest): Promise<void> {123if (this.suspendedWatchRequests.has(request.id)) {124return; // already suspended125}126127const disposables = new DisposableStore();128this.suspendedWatchRequests.set(request.id, disposables);129130// It is possible that a watch request fails right during watch()131// phase while other requests succeed. To increase the chance of132// reusing another watcher for suspend/resume tracking, we await133// all watch requests having processed.134135await this.joinWatch.p;136137if (disposables.isDisposed) {138return;139}140141this.monitorSuspendedWatchRequest(request, disposables);142143this.updateWatchers(true /* delay this call as we might accumulate many failing watch requests on startup */);144}145146private resumeWatchRequest(request: ISuspendedWatchRequest): void {147this.suspendedWatchRequests.deleteAndDispose(request.id);148this.suspendedWatchRequestsWithPolling.delete(request.id);149150this.updateWatchers(false);151}152153private monitorSuspendedWatchRequest(request: ISuspendedWatchRequest, disposables: DisposableStore): void {154if (this.doMonitorWithExistingWatcher(request, disposables)) {155this.trace(`reusing an existing recursive watcher to monitor ${request.path}`);156this.suspendedWatchRequestsWithPolling.delete(request.id);157} else {158this.doMonitorWithNodeJS(request, disposables);159this.suspendedWatchRequestsWithPolling.add(request.id);160}161}162163private doMonitorWithExistingWatcher(request: ISuspendedWatchRequest, disposables: DisposableStore): boolean {164const subscription = this.recursiveWatcher?.subscribe(request.path, (error, change) => {165if (disposables.isDisposed) {166return; // return early if already disposed167}168169if (error) {170this.monitorSuspendedWatchRequest(request, disposables);171} else if (change?.type === FileChangeType.ADDED) {172this.onMonitoredPathAdded(request);173}174});175176if (subscription) {177disposables.add(subscription);178179return true;180}181182return false;183}184185private doMonitorWithNodeJS(request: ISuspendedWatchRequest, disposables: DisposableStore): void {186let pathNotFound = false;187188const watchFileCallback: (curr: Stats, prev: Stats) => void = (curr, prev) => {189if (disposables.isDisposed) {190return; // return early if already disposed191}192193const currentPathNotFound = this.isPathNotFound(curr);194const previousPathNotFound = this.isPathNotFound(prev);195const oldPathNotFound = pathNotFound;196pathNotFound = currentPathNotFound;197198// Watch path created: resume watching request199if (!currentPathNotFound && (previousPathNotFound || oldPathNotFound)) {200this.onMonitoredPathAdded(request);201}202};203204this.trace(`starting fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);205try {206watchFile(request.path, { persistent: false, interval: this.suspendedWatchRequestPollingInterval }, watchFileCallback);207} catch (error) {208this.warn(`fs.watchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);209}210211disposables.add(toDisposable(() => {212this.trace(`stopping fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);213214try {215unwatchFile(request.path, watchFileCallback);216} catch (error) {217this.warn(`fs.unwatchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);218}219}));220}221222private onMonitoredPathAdded(request: ISuspendedWatchRequest): void {223this.trace(`detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`);224225// Emit as event226const event: IFileChange = { resource: URI.file(request.path), type: FileChangeType.ADDED, cId: request.correlationId };227this._onDidChangeFile.fire([event]);228this.traceEvent(event, request);229230// Resume watching231this.resumeWatchRequest(request);232}233234private isPathNotFound(stats: Stats): boolean {235return stats.ctimeMs === 0 && stats.ino === 0;236}237238async stop(): Promise<void> {239this.suspendedWatchRequests.clearAndDisposeAll();240this.suspendedWatchRequestsWithPolling.clear();241}242243protected traceEvent(event: IFileChange, request: IUniversalWatchRequest | ISuspendedWatchRequest): void {244if (this.verboseLogging) {245const traceMsg = ` >> normalized ${event.type === FileChangeType.ADDED ? '[ADDED]' : event.type === FileChangeType.DELETED ? '[DELETED]' : '[CHANGED]'} ${event.resource.fsPath}`;246this.traceWithCorrelation(traceMsg, request);247}248}249250protected traceWithCorrelation(message: string, request: IUniversalWatchRequest | ISuspendedWatchRequest): void {251if (this.verboseLogging) {252this.trace(`${message}${typeof request.correlationId === 'number' ? ` <${request.correlationId}> ` : ``}`);253}254}255256protected requestToString(request: IUniversalWatchRequest): string {257return `${request.path} (excludes: ${request.excludes.length > 0 ? request.excludes : '<none>'}, includes: ${request.includes && request.includes.length > 0 ? JSON.stringify(request.includes) : '<all>'}, filter: ${requestFilterToString(request.filter)}, correlationId: ${typeof request.correlationId === 'number' ? request.correlationId : '<none>'})`;258}259260protected abstract doWatch(requests: IUniversalWatchRequest[]): Promise<void>;261262protected abstract readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined;263264protected abstract trace(message: string): void;265protected abstract warn(message: string): void;266267abstract onDidError: Event<IWatcherErrorEvent>;268269protected verboseLogging = false;270271async setVerboseLogging(enabled: boolean): Promise<void> {272this.verboseLogging = enabled;273}274}275276277