Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/files/node/watcher/baseWatcher.ts
3296 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { watchFile, unwatchFile, Stats } from 'fs';
7
import { Disposable, DisposableMap, DisposableStore, toDisposable } from '../../../../base/common/lifecycle.js';
8
import { ILogMessage, IRecursiveWatcherWithSubscribe, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, IWatcherErrorEvent, isWatchRequestWithCorrelation, requestFilterToString } from '../../common/watcher.js';
9
import { Emitter, Event } from '../../../../base/common/event.js';
10
import { FileChangeType, IFileChange } from '../../common/files.js';
11
import { URI } from '../../../../base/common/uri.js';
12
import { DeferredPromise, ThrottledDelayer } from '../../../../base/common/async.js';
13
import { hash } from '../../../../base/common/hash.js';
14
import { onUnexpectedError } from '../../../../base/common/errors.js';
15
16
interface ISuspendedWatchRequest {
17
readonly id: number;
18
readonly correlationId: number | undefined;
19
readonly path: string;
20
}
21
22
export abstract class BaseWatcher extends Disposable implements IWatcher {
23
24
protected readonly _onDidChangeFile = this._register(new Emitter<IFileChange[]>());
25
readonly onDidChangeFile = this._onDidChangeFile.event;
26
27
protected readonly _onDidLogMessage = this._register(new Emitter<ILogMessage>());
28
readonly onDidLogMessage = this._onDidLogMessage.event;
29
30
protected readonly _onDidWatchFail = this._register(new Emitter<IUniversalWatchRequest>());
31
private readonly onDidWatchFail = this._onDidWatchFail.event;
32
33
private readonly correlatedWatchRequests = new Map<number /* request ID */, IWatchRequestWithCorrelation>();
34
private readonly nonCorrelatedWatchRequests = new Map<number /* request ID */, IUniversalWatchRequest>();
35
36
private readonly suspendedWatchRequests = this._register(new DisposableMap<number /* request ID */>());
37
private readonly suspendedWatchRequestsWithPolling = new Set<number /* request ID */>();
38
39
private readonly updateWatchersDelayer = this._register(new ThrottledDelayer<void>(this.getUpdateWatchersDelay()));
40
41
protected readonly suspendedWatchRequestPollingInterval: number = 5007; // node.js default
42
43
private joinWatch = new DeferredPromise<void>();
44
45
constructor() {
46
super();
47
48
this._register(this.onDidWatchFail(request => this.suspendWatchRequest({
49
id: this.computeId(request),
50
correlationId: this.isCorrelated(request) ? request.correlationId : undefined,
51
path: request.path
52
})));
53
}
54
55
protected isCorrelated(request: IUniversalWatchRequest): request is IWatchRequestWithCorrelation {
56
return isWatchRequestWithCorrelation(request);
57
}
58
59
private computeId(request: IUniversalWatchRequest): number {
60
if (this.isCorrelated(request)) {
61
return request.correlationId;
62
} else {
63
// Requests without correlation do not carry any unique identifier, so we have to
64
// come up with one based on the options of the request. This matches what the
65
// file service does (vs/platform/files/common/fileService.ts#L1178).
66
return hash(request);
67
}
68
}
69
70
async watch(requests: IUniversalWatchRequest[]): Promise<void> {
71
if (!this.joinWatch.isSettled) {
72
this.joinWatch.complete();
73
}
74
this.joinWatch = new DeferredPromise<void>();
75
76
try {
77
this.correlatedWatchRequests.clear();
78
this.nonCorrelatedWatchRequests.clear();
79
80
// Figure out correlated vs. non-correlated requests
81
for (const request of requests) {
82
if (this.isCorrelated(request)) {
83
this.correlatedWatchRequests.set(request.correlationId, request);
84
} else {
85
this.nonCorrelatedWatchRequests.set(this.computeId(request), request);
86
}
87
}
88
89
// Remove all suspended watch requests that are no longer watched
90
for (const [id] of this.suspendedWatchRequests) {
91
if (!this.nonCorrelatedWatchRequests.has(id) && !this.correlatedWatchRequests.has(id)) {
92
this.suspendedWatchRequests.deleteAndDispose(id);
93
this.suspendedWatchRequestsWithPolling.delete(id);
94
}
95
}
96
97
return await this.updateWatchers(false /* not delayed */);
98
} finally {
99
this.joinWatch.complete();
100
}
101
}
102
103
private updateWatchers(delayed: boolean): Promise<void> {
104
const nonSuspendedRequests: IUniversalWatchRequest[] = [];
105
for (const [id, request] of [...this.nonCorrelatedWatchRequests, ...this.correlatedWatchRequests]) {
106
if (!this.suspendedWatchRequests.has(id)) {
107
nonSuspendedRequests.push(request);
108
}
109
}
110
111
return this.updateWatchersDelayer.trigger(() => this.doWatch(nonSuspendedRequests), delayed ? this.getUpdateWatchersDelay() : 0).catch(error => onUnexpectedError(error));
112
}
113
114
protected getUpdateWatchersDelay(): number {
115
return 800;
116
}
117
118
isSuspended(request: IUniversalWatchRequest): 'polling' | boolean {
119
const id = this.computeId(request);
120
return this.suspendedWatchRequestsWithPolling.has(id) ? 'polling' : this.suspendedWatchRequests.has(id);
121
}
122
123
private async suspendWatchRequest(request: ISuspendedWatchRequest): Promise<void> {
124
if (this.suspendedWatchRequests.has(request.id)) {
125
return; // already suspended
126
}
127
128
const disposables = new DisposableStore();
129
this.suspendedWatchRequests.set(request.id, disposables);
130
131
// It is possible that a watch request fails right during watch()
132
// phase while other requests succeed. To increase the chance of
133
// reusing another watcher for suspend/resume tracking, we await
134
// all watch requests having processed.
135
136
await this.joinWatch.p;
137
138
if (disposables.isDisposed) {
139
return;
140
}
141
142
this.monitorSuspendedWatchRequest(request, disposables);
143
144
this.updateWatchers(true /* delay this call as we might accumulate many failing watch requests on startup */);
145
}
146
147
private resumeWatchRequest(request: ISuspendedWatchRequest): void {
148
this.suspendedWatchRequests.deleteAndDispose(request.id);
149
this.suspendedWatchRequestsWithPolling.delete(request.id);
150
151
this.updateWatchers(false);
152
}
153
154
private monitorSuspendedWatchRequest(request: ISuspendedWatchRequest, disposables: DisposableStore): void {
155
if (this.doMonitorWithExistingWatcher(request, disposables)) {
156
this.trace(`reusing an existing recursive watcher to monitor ${request.path}`);
157
this.suspendedWatchRequestsWithPolling.delete(request.id);
158
} else {
159
this.doMonitorWithNodeJS(request, disposables);
160
this.suspendedWatchRequestsWithPolling.add(request.id);
161
}
162
}
163
164
private doMonitorWithExistingWatcher(request: ISuspendedWatchRequest, disposables: DisposableStore): boolean {
165
const subscription = this.recursiveWatcher?.subscribe(request.path, (error, change) => {
166
if (disposables.isDisposed) {
167
return; // return early if already disposed
168
}
169
170
if (error) {
171
this.monitorSuspendedWatchRequest(request, disposables);
172
} else if (change?.type === FileChangeType.ADDED) {
173
this.onMonitoredPathAdded(request);
174
}
175
});
176
177
if (subscription) {
178
disposables.add(subscription);
179
180
return true;
181
}
182
183
return false;
184
}
185
186
private doMonitorWithNodeJS(request: ISuspendedWatchRequest, disposables: DisposableStore): void {
187
let pathNotFound = false;
188
189
const watchFileCallback: (curr: Stats, prev: Stats) => void = (curr, prev) => {
190
if (disposables.isDisposed) {
191
return; // return early if already disposed
192
}
193
194
const currentPathNotFound = this.isPathNotFound(curr);
195
const previousPathNotFound = this.isPathNotFound(prev);
196
const oldPathNotFound = pathNotFound;
197
pathNotFound = currentPathNotFound;
198
199
// Watch path created: resume watching request
200
if (!currentPathNotFound && (previousPathNotFound || oldPathNotFound)) {
201
this.onMonitoredPathAdded(request);
202
}
203
};
204
205
this.trace(`starting fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);
206
try {
207
watchFile(request.path, { persistent: false, interval: this.suspendedWatchRequestPollingInterval }, watchFileCallback);
208
} catch (error) {
209
this.warn(`fs.watchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);
210
}
211
212
disposables.add(toDisposable(() => {
213
this.trace(`stopping fs.watchFile() on ${request.path} (correlationId: ${request.correlationId})`);
214
215
try {
216
unwatchFile(request.path, watchFileCallback);
217
} catch (error) {
218
this.warn(`fs.unwatchFile() failed with error ${error} on path ${request.path} (correlationId: ${request.correlationId})`);
219
}
220
}));
221
}
222
223
private onMonitoredPathAdded(request: ISuspendedWatchRequest): void {
224
this.trace(`detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`);
225
226
// Emit as event
227
const event: IFileChange = { resource: URI.file(request.path), type: FileChangeType.ADDED, cId: request.correlationId };
228
this._onDidChangeFile.fire([event]);
229
this.traceEvent(event, request);
230
231
// Resume watching
232
this.resumeWatchRequest(request);
233
}
234
235
private isPathNotFound(stats: Stats): boolean {
236
return stats.ctimeMs === 0 && stats.ino === 0;
237
}
238
239
async stop(): Promise<void> {
240
this.suspendedWatchRequests.clearAndDisposeAll();
241
this.suspendedWatchRequestsWithPolling.clear();
242
}
243
244
protected traceEvent(event: IFileChange, request: IUniversalWatchRequest | ISuspendedWatchRequest): void {
245
if (this.verboseLogging) {
246
const traceMsg = ` >> normalized ${event.type === FileChangeType.ADDED ? '[ADDED]' : event.type === FileChangeType.DELETED ? '[DELETED]' : '[CHANGED]'} ${event.resource.fsPath}`;
247
this.traceWithCorrelation(traceMsg, request);
248
}
249
}
250
251
protected traceWithCorrelation(message: string, request: IUniversalWatchRequest | ISuspendedWatchRequest): void {
252
if (this.verboseLogging) {
253
this.trace(`${message}${typeof request.correlationId === 'number' ? ` <${request.correlationId}> ` : ``}`);
254
}
255
}
256
257
protected requestToString(request: IUniversalWatchRequest): string {
258
return `${request.path} (excludes: ${request.excludes.length > 0 ? request.excludes : '<none>'}, includes: ${request.includes && request.includes.length > 0 ? JSON.stringify(request.includes) : '<all>'}, filter: ${requestFilterToString(request.filter)}, correlationId: ${typeof request.correlationId === 'number' ? request.correlationId : '<none>'})`;
259
}
260
261
protected abstract doWatch(requests: IUniversalWatchRequest[]): Promise<void>;
262
263
protected abstract readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined;
264
265
protected abstract trace(message: string): void;
266
protected abstract warn(message: string): void;
267
268
abstract onDidError: Event<IWatcherErrorEvent>;
269
270
protected verboseLogging = false;
271
272
async setVerboseLogging(enabled: boolean): Promise<void> {
273
this.verboseLogging = enabled;
274
}
275
}
276
277