Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/wapython
Path: blob/main/core/kernel/src/wasm/import.ts
1067 views
1
import type { Options } from "./worker/import";
2
import { callback } from "awaiting";
3
import { EventEmitter } from "events";
4
import reuseInFlight from "./reuseInFlight";
5
import { SendToWasmAbstractBase } from "./worker/send-to-wasm";
6
import { RecvFromWasmAbstractBase } from "./worker/recv-from-wasm";
7
export { Options };
8
import { IOProvider, Stream } from "./types";
9
import { SIGINT } from "./constants";
10
import debug from "debug";
11
12
const MAX_OUTPUT_DELAY_MS = 250;
13
14
const log = debug("wasm-main");
15
16
export interface WorkerThread extends EventEmitter {
17
postMessage: (message: object) => void;
18
terminate: () => void;
19
}
20
21
// TODO: typescript actually has "export abstract class" ! No need to fake it...
22
23
// This implements WasmInstanceAsync from ./
24
export class WasmInstanceAbstractBaseClass extends EventEmitter {
25
private callId: number = 0;
26
private options: Options;
27
private ioProvider: IOProvider;
28
private outputMonitorDelay: number = MAX_OUTPUT_DELAY_MS;
29
result: any;
30
exports: any;
31
wasmSource: string;
32
33
protected worker?: WorkerThread;
34
35
public send: SendToWasmAbstractBase;
36
public recv: RecvFromWasmAbstractBase;
37
38
constructor(wasmSource: string, options: Options, IOProviderClass) {
39
super();
40
log("constructor", options);
41
this.wasmSource = wasmSource;
42
this.options = options;
43
this.init = reuseInFlight(this.init);
44
this.send = new SendToWasmAbstractBase();
45
this.recv = new RecvFromWasmAbstractBase();
46
this.ioProvider = new IOProviderClass();
47
}
48
49
signal(sig: number = SIGINT): void {
50
this.ioProvider.signal(sig);
51
}
52
53
// MUST override in derived class
54
protected initWorker(): WorkerThread {
55
abstract("initWorker");
56
return null as any; // for typescript
57
}
58
59
writeToStdin(data): void {
60
log("writeToStdin", data);
61
this.ioProvider.writeToStdin(Buffer.from(data));
62
if (data.toString().includes("\u0003")) {
63
this.signal(SIGINT);
64
// This is a hack, but for some reason everything feels better with this included:
65
this.ioProvider.writeToStdin(Buffer.from("\n"));
66
}
67
setTimeout(() => {
68
this.readOutput();
69
}, 1);
70
}
71
72
private async init() {
73
if (this.worker) return;
74
this.worker = this.initWorker();
75
if (!this.worker) throw Error("init - bug");
76
77
const options = { ...this.ioProvider.getExtraOptions(), ...this.options };
78
log("options = ", options);
79
80
this.worker.postMessage({
81
event: "init",
82
name: this.wasmSource,
83
options,
84
// debug: this passes the debug state from the main thread to the worker thread; otherwise,
85
// we would have no way to ever see any debug logging from worker. This is really nice!
86
debug: debug.load(),
87
});
88
89
this.worker.on("exit", () => this.terminate());
90
91
this.worker.on("message", (message) => {
92
if (message == null) return;
93
log("main thread got message", message);
94
// This can be useful in some low-level debugging situations:
95
// if (message.event == "stderr" || message.event == "stdout") {
96
// console.warn(new TextDecoder().decode(message.data));
97
// }
98
// message with id handled elsewhere -- used for getting data back.
99
if (message.id != null) {
100
// message with id handled elsewhere -- used for getting data back.
101
this.emit("id", message);
102
return;
103
}
104
switch (message.event) {
105
case "init":
106
this.emit("init", message);
107
return;
108
109
case "stdout":
110
this.emit("stdout", message.data);
111
break;
112
113
case "stderr":
114
this.emit("stderr", message.data);
115
break;
116
}
117
});
118
119
this.monitorOutput();
120
121
await callback((cb) =>
122
this.once("init", (message) => {
123
cb(message.error);
124
})
125
);
126
}
127
128
private async readOutput(): Promise<number> {
129
if (this.worker == null) return 0;
130
const data = await this.ioProvider.readOutput();
131
if (data.length > 0) {
132
this.outputMonitorDelay = 1;
133
this.emit(
134
data[0] == Stream.STDOUT ? "stdout" : "stderr",
135
data.subarray(1)
136
);
137
} else {
138
this.outputMonitorDelay = Math.min(
139
MAX_OUTPUT_DELAY_MS,
140
this.outputMonitorDelay * 1.3
141
);
142
}
143
return data.length;
144
}
145
146
async monitorOutput() {
147
while (this.worker != null) {
148
await this.readOutput();
149
await delay(this.outputMonitorDelay);
150
}
151
}
152
153
terminate() {
154
if (this.worker == null) return;
155
const worker = this.worker;
156
delete this.worker;
157
worker.emit("exit");
158
worker.terminate();
159
worker.removeAllListeners();
160
this.emit("terminate");
161
this.removeAllListeners();
162
}
163
164
async callWithString(
165
name: string | { name: string; dll: string },
166
str: string | string[],
167
...args
168
): Promise<any> {
169
await this.init();
170
if (!this.worker) {
171
throw Error(
172
`callWithString (name=${JSON.stringify(name)}, str='${JSON.stringify(
173
str
174
)}') - worker is not running`
175
);
176
}
177
this.callId += 1;
178
this.worker.postMessage({
179
id: this.callId,
180
event: "callWithString",
181
name,
182
str,
183
args,
184
});
185
return await this.waitForResponse(this.callId);
186
}
187
188
async waitUntilFsLoaded(): Promise<void> {
189
if (!this.worker) {
190
throw Error(`waitUntilFsLoaded - bug; worker must be defined`);
191
}
192
this.callId += 1;
193
this.worker.postMessage({
194
id: this.callId,
195
event: "waitUntilFsLoaded",
196
});
197
await this.waitForResponse(this.callId);
198
}
199
200
private async waitForResponse(id: number): Promise<any> {
201
return (
202
await callback((cb) => {
203
const removeListeners = () => {
204
this.removeListener("id", messageListener);
205
this.removeListener("sigint", sigintListener);
206
};
207
208
const messageListener = (message) => {
209
if (message.id == id) {
210
removeListeners();
211
if (message.error) {
212
cb(message.error);
213
} else {
214
cb(undefined, message);
215
}
216
}
217
};
218
this.on("id", messageListener);
219
220
const sigintListener = () => {
221
removeListeners();
222
cb("KeyboardInterrupt");
223
};
224
this.once("sigint", sigintListener);
225
226
this.worker?.on("exit", () => {
227
removeListeners();
228
cb("exit");
229
});
230
})
231
).result;
232
}
233
234
// Optionally override in derived class
235
protected configureTerminal() {}
236
237
async exec(argv: string[] = ["command"]): Promise<number> {
238
await this.init();
239
if (this.worker == null) throw Error("exec: bug - worker must be defined");
240
if (!this.options.noStdio) {
241
this.configureTerminal();
242
}
243
let r = 0;
244
try {
245
r = await this.callWithString("cowasm_exec", argv);
246
this.terminate();
247
} catch (_) {
248
// expected to fail -- call doesn't get output...
249
}
250
return r;
251
}
252
253
getFunction(_name: string, _dll?: string): Function | undefined {
254
throw Error("not implemented");
255
}
256
257
getcwd(): string {
258
throw Error("not implemented");
259
}
260
261
async fetch(
262
url: string,
263
path: string,
264
mode?: number | string
265
): Promise<void> {
266
if (this.worker == null) throw Error("fetch: bug - worker must be defined");
267
this.callId += 1;
268
this.worker.postMessage({
269
id: this.callId,
270
event: "fetch",
271
url,
272
path,
273
mode,
274
});
275
await this.waitForResponse(this.callId);
276
}
277
}
278
279
function abstract(name: string) {
280
throw Error(`${name} -- must be defined in derived class`);
281
}
282
283
function delay(milliseconds) {
284
return new Promise((resolve) => setTimeout(resolve, milliseconds));
285
}
286
287