Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80635 views
1
"use strict";
2
3
var child_process = require('child_process');
4
var JSONStreamParser = require('./lib/JSONStreamParser');
5
var Q = require('q');
6
7
function _middleTruncate(str, cutoffLength) {
8
if (str.length > cutoffLength) {
9
var halfCutoff = Math.floor(cutoffLength / 2);
10
str =
11
str.substr(0, halfCutoff) +
12
"\n[...truncated...]\n" +
13
str.substr(-1 * halfCutoff);
14
}
15
return str;
16
}
17
18
function Worker(workerPath, workerArgs, options) {
19
options = options || {}
20
21
var child = child_process.spawn(workerPath, workerArgs);
22
child.on('exit', this._onChildExit.bind(this));
23
child.stderr.setEncoding('utf8');
24
child.stderr.on('data', this._onStderr.bind(this));
25
child.stdout.setEncoding('utf8');
26
child.stdout.on('data', this._onStdout.bind(this));
27
28
this._childProcess = child;
29
this._isDestroyed = false;
30
this._opts = options;
31
this._pendingResponseDeferred = null;
32
this._stderrData = '';
33
this._streamParser = new JSONStreamParser();
34
this._workerArgs = workerArgs;
35
this._workerPath = workerPath;
36
37
// Send init data to the child first thing
38
this._initDeferred = Q.defer();
39
this._initialized = false;
40
child.stdin.write(JSON.stringify({initData: options.initData}));
41
}
42
43
Worker.prototype._handleInitializationResponse = function(response) {
44
if (response.hasOwnProperty('initError')) {
45
throw new Error('Error initializing worker: ' + response.initError);
46
} else if (response.hasOwnProperty('initSuccess')) {
47
this._initDeferred.resolve();
48
} else {
49
throw new Error(
50
'Invalid initialization response received: ' +
51
JSON.stringify(response)
52
);
53
}
54
};
55
56
Worker.prototype._handleMessageResponse = function(response) {
57
if (response.hasOwnProperty('error')) {
58
this._pendingResponseDeferred.reject(response.error);
59
} else if (response.hasOwnProperty('response')) {
60
this._pendingResponseDeferred.resolve(response.response);
61
} else {
62
this._pendingResponseDeferred.reject(
63
new Error(
64
'Malformed child response message: ' + JSON.stringify(response)
65
)
66
);
67
}
68
this._pendingResponseDeferred = null;
69
};
70
71
Worker.prototype._onChildExit = function(code, signalStr) {
72
if (this._isDestroyed) {
73
return;
74
}
75
76
var trimmedStderr = _middleTruncate(this._stderrData.trim(), 10000);
77
78
var errorMsg =
79
' exit code: ' + code + ', exit signal: ' + signalStr + '\n' +
80
'stderr:\n' +
81
' ' + trimmedStderr + '\n';
82
83
if (this._initialized === false) {
84
throw new Error(
85
'Worker process exited before it could be initialized!' +
86
errorMsg
87
);
88
} else if (this._pendingResponseDeferred !== null) {
89
this._pendingResponseDeferred.reject(new Error(
90
'Worker process exited before responding!' +
91
errorMsg
92
));
93
}
94
95
// Try re-booting this worker
96
Worker.call(this, this._workerPath, this._workerArgs, this._opts);
97
};
98
99
Worker.prototype._onStderr = function(data) {
100
this._stderrData += data;
101
process.stderr.write(data);
102
};
103
104
Worker.prototype._onStdout = function(data) {
105
if (this._pendingResponseDeferred === null && this._initialized === true) {
106
throw new Error('Received unexpected data from child process: ' + data);
107
}
108
109
var responses;
110
try {
111
responses = this._streamParser.parse(data);
112
} catch (e) {
113
e = new Error('Unable to parse child response data: ' + this._streamParser.getBuffer());
114
if (this._initialized === false) {
115
throw e;
116
} else {
117
this._pendingResponseDeferred.reject(e);
118
return;
119
}
120
}
121
122
if (this._opts.printChildResponses) {
123
var workerName =
124
this._opts.hasOwnProperty('workerName')
125
? this._opts.workerName
126
: 'unnamed';
127
128
console.log(
129
'----Start Worker Responses (' + workerName + ')----\n' +
130
JSON.stringify(responses, null, 2) + '\n' +
131
'----End Worker Responses (' + workerName + ')----\n'
132
);
133
}
134
135
if (responses.length === 1) {
136
var response = responses[0];
137
if (this._initialized === false) {
138
this._handleInitializationResponse(response);
139
this._initialized = true;
140
} else {
141
this._handleMessageResponse(response);
142
}
143
} else if (responses.length > 1) {
144
this._pendingResponseDeferred.reject(
145
new Error(
146
'Received multiple responses when we were only expecting one: ' +
147
JSON.stringify(responses)
148
)
149
);
150
}
151
};
152
153
Worker.prototype.destroy = function() {
154
this._isDestroyed = true;
155
156
var pendingWork =
157
this._pendingResponseDeferred === null
158
? this._initDeferred.promise
159
: this._pendingResponseDeferred.promise;
160
161
return pendingWork.finally(function() {
162
this._childProcess.stdin.end();
163
this._childProcess.kill();
164
}.bind(this));
165
};
166
167
Worker.prototype.sendMessage = function(messageObj) {
168
if (this._isDestroyed) {
169
throw new Error(
170
'Attempted to send a message to a worker that has been (or is in the ' +
171
'process of being) destroyed!'
172
);
173
}
174
175
if (this._pendingResponseDeferred !== null) {
176
throw new Error(
177
'Attempted to send a message to the worker before the response from ' +
178
'the last message was received! Worker processes can only handle one ' +
179
'message at a time.'
180
);
181
}
182
this._pendingResponseDeferred = Q.defer();
183
var responsePromise = this._pendingResponseDeferred.promise;
184
185
var workerName = this._opts.workerName;
186
return this._initDeferred.promise.then(function() {
187
if (typeof messageObj !== 'object') {
188
throw new Error('Worker messages must always be an object: ' + messageObj);
189
}
190
if (messageObj === null) {
191
throw new Error('Worker messages must always be an object: null');
192
}
193
194
this._childProcess.stdin.write(JSON.stringify({message: messageObj}));
195
return responsePromise;
196
}.bind(this));
197
};
198
199
module.exports = Worker;
200
201