Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80635 views
1
"use strict";
2
3
var Q = require('q');
4
var Worker = require('./Worker');
5
6
function WorkerPool(numWorkers, workerPath, workerArgs, options) {
7
options = options || {};
8
9
this._numWorkers = numWorkers;
10
this._workerArgs = workerArgs;
11
this._workerPath = workerPath;
12
this._opts = options;
13
14
this._availableWorkers = [];
15
this._allWorkers = [];
16
this._isDestroyed = false;
17
this._allPendingResponses = [];
18
this._queuedMessages = [];
19
this._queuedWorkerSpecificMessages = {};
20
this._workerPendingResponses = {};
21
22
if (!options.lazyBoot) {
23
this._eagerBootAllWorkers();
24
}
25
};
26
27
WorkerPool.prototype._bootNewWorker = function() {
28
var workerID = this._allWorkers.length;
29
var worker = new Worker(this._workerPath, this._workerArgs, {
30
initData: this._opts.initData,
31
printChildResponses: !!this._opts.printChildResponses,
32
workerName: workerID
33
});
34
this._allWorkers.push(worker);
35
this._availableWorkers.push(workerID);
36
};
37
38
WorkerPool.prototype._eagerBootAllWorkers = function() {
39
while (this._allWorkers.length < this._numWorkers) {
40
this._bootNewWorker();
41
}
42
};
43
44
WorkerPool.prototype._sendMessageToWorker = function(workerID, msg) {
45
var worker = this._allWorkers[workerID];
46
var pendingResponse = worker.sendMessage(msg).finally(function(response) {
47
if (this._queuedWorkerSpecificMessages.hasOwnProperty(workerID)
48
&& this._queuedWorkerSpecificMessages[workerID].length > 0) {
49
var queuedMsg = this._queuedWorkerSpecificMessages[workerID].shift();
50
this._sendMessageToWorker(workerID, queuedMsg.msg)
51
.catch(function(err) {
52
queuedMsg.deferred.reject(err);
53
})
54
.done(function(response) {
55
queuedMsg.deferred.resolve(response);
56
});
57
} else if (this._queuedMessages.length > 0) {
58
var queuedMsg = this._queuedMessages.shift();
59
this._sendMessageToWorker(workerID, queuedMsg.msg)
60
.catch(function(err) {
61
queuedMsg.deferred.reject(err);
62
})
63
.done(function(response) {
64
queuedMsg.deferred.resolve(response);
65
})
66
} else {
67
this._availableWorkers.push(workerID);
68
delete this._workerPendingResponses[workerID];
69
}
70
}.bind(this));
71
return this._workerPendingResponses[workerID] = pendingResponse;
72
};
73
74
WorkerPool.prototype.sendMessage = function(msg) {
75
if (this._isDestroyed) {
76
throw new Error(
77
'Attempted to send a message after the worker pool has alread been ' +
78
'(or is in the process of) shutting down!'
79
);
80
}
81
82
if (this._opts.lazyBoot && this._allWorkers.length < this._numWorkers) {
83
this._bootNewWorker();
84
}
85
86
var responsePromise;
87
if (this._availableWorkers.length > 0) {
88
responsePromise = this._sendMessageToWorker(
89
this._availableWorkers.shift(),
90
msg
91
);
92
} else {
93
var queuedMsgID = this._queuedMessages.length;
94
var deferred = Q.defer();
95
this._queuedMessages.push({
96
deferred: deferred,
97
msg: msg
98
});
99
responsePromise = deferred.promise;
100
}
101
102
this._allPendingResponses.push(responsePromise);
103
return responsePromise;
104
};
105
106
WorkerPool.prototype.sendMessageToAllWorkers = function(msg) {
107
if (this._isDestroyed) {
108
throw new Error(
109
'Attempted to send a message after the worker pool has alread been ' +
110
'(or is in the process of) shutting down!'
111
);
112
}
113
114
// Queue the message up for all currently busy workers
115
var busyWorkerResponses = [];
116
for (var workerID in this._workerPendingResponses) {
117
var deferred = Q.defer();
118
if (!this._queuedWorkerSpecificMessages.hasOwnProperty(workerID)) {
119
this._queuedWorkerSpecificMessages[workerID] = [];
120
}
121
this._queuedWorkerSpecificMessages[workerID].push({
122
deferred: deferred,
123
msg: msg
124
});
125
busyWorkerResponses.push(deferred.promise);
126
}
127
128
// Send out the message to all workers that aren't currently busy
129
var availableWorkerResponses = this._availableWorkers.map(function(workerID) {
130
return this._sendMessageToWorker(workerID, msg);
131
}, this);
132
this._availableWorkers = [];
133
134
return Q.all(availableWorkerResponses.concat(busyWorkerResponses));
135
};
136
137
WorkerPool.prototype.destroy = function() {
138
var allWorkers = this._allWorkers;
139
140
this._isDestroyed = true;
141
return Q.allSettled(this._allPendingResponses)
142
.then(function() {
143
return Q.all(allWorkers.map(function(worker) {
144
return worker.destroy();
145
}));
146
});
147
};
148
149
module.exports = WorkerPool;
150
151