Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sisilicon
GitHub Repository: sisilicon/worldedit-be
Path: blob/master/src/library/utils/multithreading.ts
1784 views
1
import { configuration } from "../configurations.js";
2
import { system } from "@minecraft/server";
3
import { contentLog } from "./contentlog.js";
4
import { setTickTimeout, sleep } from "./scheduling.js";
5
6
let threadId = 1;
7
8
const threads: Thread[] = [];
9
const tasks = new WeakMap<Thread, Generator>();
10
const promises = new WeakMap<Thread, unknown>();
11
const waitForPromise = { waitingForPromise: true } as const;
12
13
class ErrorPackage {
14
constructor(public err: unknown) {}
15
}
16
17
// eslint-disable-next-line @typescript-eslint/no-explicit-any
18
class Thread {
19
public readonly id: number;
20
21
private active = false;
22
private valid = true;
23
24
constructor() {
25
this.id = ++threadId;
26
}
27
28
get isActive() {
29
return this.active;
30
}
31
32
get isValid() {
33
return this.active;
34
}
35
36
start<T extends any[]>(task: (...args: T) => Generator<unknown>, ...args: T) {
37
if (!this.valid) return;
38
39
tasks.set(this, task(...args));
40
threads.unshift(this);
41
this.active = true;
42
this.valid = false;
43
44
if (!configuration.multiThreadingEnabled) this.join();
45
}
46
47
async join() {
48
if (this.valid || !this.active) return;
49
50
if (threads.includes(this)) threads.splice(threads.indexOf(this), 1);
51
this.active = false;
52
53
while (Object.is(promises.get(this), waitForPromise)) await sleep(1);
54
55
let promiseRes = promises.get(this);
56
const task = tasks.get(this);
57
promises.delete(this);
58
let next = promiseRes instanceof ErrorPackage ? task.throw(promiseRes.err) : task.next(promiseRes);
59
while (!next.done) {
60
if (next.value instanceof Promise) {
61
try {
62
promiseRes = await next.value;
63
} catch (err) {
64
promiseRes = new ErrorPackage(err);
65
}
66
}
67
next = promiseRes instanceof ErrorPackage ? task.throw(promiseRes.err) : task.next(promiseRes);
68
promiseRes = undefined;
69
}
70
}
71
72
abort() {
73
if (this.valid || !this.active) return;
74
if (threads.includes(this)) threads.splice(threads.indexOf(this), 1);
75
promises.delete(this);
76
this.active = false;
77
}
78
79
toString() {
80
return `[thread #${this.id}]`;
81
}
82
}
83
84
let currentThread: Thread = undefined;
85
system.runInterval(() => {
86
const ms = Date.now();
87
while (Date.now() - ms < configuration.multiThreadingTimeBudget && threads.length) {
88
const thread = threads.pop();
89
const task = tasks.get(thread);
90
currentThread = thread;
91
try {
92
const promiseRes = promises.get(thread);
93
if (Object.is(promiseRes, waitForPromise)) {
94
setTickTimeout(() => threads.unshift(thread));
95
continue;
96
}
97
promises.delete(thread);
98
const next = promiseRes instanceof ErrorPackage ? task.throw(promiseRes.err) : task.next(promiseRes);
99
if (next.done) {
100
thread.join();
101
continue;
102
} else if (next.value instanceof Promise) {
103
promises.set(thread, waitForPromise);
104
next.value.then((result) => promises.set(thread, result)).catch((err) => promises.set(thread, new ErrorPackage(err)));
105
}
106
threads.unshift(thread);
107
} catch (e) {
108
contentLog.error(e);
109
task.throw(e);
110
thread.join();
111
}
112
currentThread = undefined;
113
}
114
});
115
116
let iterCount = 0;
117
function* iterateChunk<T>(val: T) {
118
if (iterCount++ > 16) {
119
iterCount = 0;
120
yield val;
121
}
122
}
123
124
function getCurrentThread() {
125
return currentThread;
126
}
127
128
function shutdownThreads() {
129
threads.length = 0;
130
}
131
132
export { Thread, iterateChunk, getCurrentThread, shutdownThreads };
133
134