Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/phabricator
Path: blob/master/src/infrastructure/daemon/PhutilDaemon.php
12241 views
1
<?php
2
3
/**
4
* Scaffolding for implementing robust background processing scripts.
5
*
6
*
7
* Autoscaling
8
* ===========
9
*
10
* Autoscaling automatically launches copies of a daemon when it is busy
11
* (scaling the pool up) and stops them when they're idle (scaling the pool
12
* down). This is appropriate for daemons which perform highly parallelizable
13
* work.
14
*
15
* To make a daemon support autoscaling, the implementation should look
16
* something like this:
17
*
18
* while (!$this->shouldExit()) {
19
* if (work_available()) {
20
* $this->willBeginWork();
21
* do_work();
22
* $this->sleep(0);
23
* } else {
24
* $this->willBeginIdle();
25
* $this->sleep(1);
26
* }
27
* }
28
*
29
* In particular, call @{method:willBeginWork} before becoming busy, and
30
* @{method:willBeginIdle} when no work is available. If the daemon is launched
31
* into an autoscale pool, this will cause the pool to automatically scale up
32
* when busy and down when idle.
33
*
34
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
35
* autoscaling daemon.
36
*
37
* Launching a daemon which does not make these callbacks into an autoscale
38
* pool will have no effect.
39
*
40
* @task overseer Communicating With the Overseer
41
* @task autoscale Autoscaling Daemon Pools
42
*/
43
abstract class PhutilDaemon extends Phobject {
44
45
const MESSAGETYPE_STDOUT = 'stdout';
46
const MESSAGETYPE_HEARTBEAT = 'heartbeat';
47
const MESSAGETYPE_BUSY = 'busy';
48
const MESSAGETYPE_IDLE = 'idle';
49
const MESSAGETYPE_DOWN = 'down';
50
const MESSAGETYPE_HIBERNATE = 'hibernate';
51
52
const WORKSTATE_BUSY = 'busy';
53
const WORKSTATE_IDLE = 'idle';
54
55
private $argv;
56
private $traceMode;
57
private $traceMemory;
58
private $verbose;
59
private $notifyReceived;
60
private $inGracefulShutdown;
61
private $workState = null;
62
private $idleSince = null;
63
private $scaledownDuration;
64
65
final public function setVerbose($verbose) {
66
$this->verbose = $verbose;
67
return $this;
68
}
69
70
final public function getVerbose() {
71
return $this->verbose;
72
}
73
74
final public function setScaledownDuration($scaledown_duration) {
75
$this->scaledownDuration = $scaledown_duration;
76
return $this;
77
}
78
79
final public function getScaledownDuration() {
80
return $this->scaledownDuration;
81
}
82
83
final public function __construct(array $argv) {
84
$this->argv = $argv;
85
86
$router = PhutilSignalRouter::getRouter();
87
$handler_key = 'daemon.term';
88
if (!$router->getHandler($handler_key)) {
89
$handler = new PhutilCallbackSignalHandler(
90
SIGTERM,
91
__CLASS__.'::onTermSignal');
92
$router->installHandler($handler_key, $handler);
93
}
94
95
pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));
96
pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));
97
98
// Without discard mode, this consumes unbounded amounts of memory. Keep
99
// memory bounded.
100
PhutilServiceProfiler::getInstance()->enableDiscardMode();
101
102
$this->beginStdoutCapture();
103
}
104
105
final public function __destruct() {
106
$this->endStdoutCapture();
107
}
108
109
final public function stillWorking() {
110
$this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);
111
112
if ($this->traceMemory) {
113
$daemon = get_class($this);
114
fprintf(
115
STDERR,
116
"%s %s %s\n",
117
'<RAMS>',
118
$daemon,
119
pht(
120
'Memory Usage: %s KB',
121
new PhutilNumber(memory_get_usage() / 1024, 1)));
122
}
123
}
124
125
final public function shouldExit() {
126
return $this->inGracefulShutdown;
127
}
128
129
final protected function shouldHibernate($duration) {
130
// Don't hibernate if we don't have very long to sleep.
131
if ($duration < 30) {
132
return false;
133
}
134
135
// Never hibernate if we're part of a pool and could scale down instead.
136
// We only hibernate the last process to drop the pool size to zero.
137
if ($this->getScaledownDuration()) {
138
return false;
139
}
140
141
// Don't hibernate for too long.
142
$duration = min($duration, phutil_units('3 minutes in seconds'));
143
144
$this->emitOverseerMessage(
145
self::MESSAGETYPE_HIBERNATE,
146
array(
147
'duration' => $duration,
148
));
149
150
$this->log(
151
pht(
152
'Preparing to hibernate for %s second(s).',
153
new PhutilNumber($duration)));
154
155
return true;
156
}
157
158
final protected function sleep($duration) {
159
$this->notifyReceived = false;
160
$this->willSleep($duration);
161
$this->stillWorking();
162
163
$scale_down = $this->getScaledownDuration();
164
165
$max_sleep = 60;
166
if ($scale_down) {
167
$max_sleep = min($max_sleep, $scale_down);
168
}
169
170
if ($scale_down) {
171
if ($this->workState == self::WORKSTATE_IDLE) {
172
$dur = $this->getIdleDuration();
173
$this->log(pht('Idle for %s seconds.', $dur));
174
}
175
}
176
177
while ($duration > 0 &&
178
!$this->notifyReceived &&
179
!$this->shouldExit()) {
180
181
// If this is an autoscaling clone and we've been idle for too long,
182
// we're going to scale the pool down by exiting and not restarting. The
183
// DOWN message tells the overseer that we don't want to be restarted.
184
if ($scale_down) {
185
if ($this->workState == self::WORKSTATE_IDLE) {
186
if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
187
$this->inGracefulShutdown = true;
188
$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
189
$this->log(
190
pht(
191
'Daemon was idle for more than %s second(s), '.
192
'scaling pool down.',
193
new PhutilNumber($scale_down)));
194
break;
195
}
196
}
197
}
198
199
sleep(min($duration, $max_sleep));
200
$duration -= $max_sleep;
201
$this->stillWorking();
202
}
203
}
204
205
protected function willSleep($duration) {
206
return;
207
}
208
209
public static function onTermSignal($signo) {
210
self::didCatchSignal($signo);
211
}
212
213
final protected function getArgv() {
214
return $this->argv;
215
}
216
217
final public function execute() {
218
$this->willRun();
219
$this->run();
220
}
221
222
abstract protected function run();
223
224
final public function setTraceMemory() {
225
$this->traceMemory = true;
226
return $this;
227
}
228
229
final public function getTraceMemory() {
230
return $this->traceMemory;
231
}
232
233
final public function setTraceMode() {
234
$this->traceMode = true;
235
PhutilServiceProfiler::installEchoListener();
236
PhutilConsole::getConsole()->getServer()->setEnableLog(true);
237
$this->didSetTraceMode();
238
return $this;
239
}
240
241
final public function getTraceMode() {
242
return $this->traceMode;
243
}
244
245
final public function onGracefulSignal($signo) {
246
self::didCatchSignal($signo);
247
$this->inGracefulShutdown = true;
248
}
249
250
final public function onNotifySignal($signo) {
251
self::didCatchSignal($signo);
252
$this->notifyReceived = true;
253
$this->onNotify($signo);
254
}
255
256
protected function onNotify($signo) {
257
// This is a hook for subclasses.
258
}
259
260
protected function willRun() {
261
// This is a hook for subclasses.
262
}
263
264
protected function didSetTraceMode() {
265
// This is a hook for subclasses.
266
}
267
268
final protected function log($message) {
269
if ($this->verbose) {
270
$daemon = get_class($this);
271
fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);
272
}
273
}
274
275
private static function didCatchSignal($signo) {
276
$signame = phutil_get_signal_name($signo);
277
fprintf(
278
STDERR,
279
"%s Caught signal %s (%s).\n",
280
'<SGNL>',
281
$signo,
282
$signame);
283
}
284
285
286
/* -( Communicating With the Overseer )------------------------------------ */
287
288
289
private function beginStdoutCapture() {
290
ob_start(array($this, 'didReceiveStdout'), 2);
291
}
292
293
private function endStdoutCapture() {
294
ob_end_flush();
295
}
296
297
public function didReceiveStdout($data) {
298
if (!strlen($data)) {
299
return '';
300
}
301
302
return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);
303
}
304
305
private function encodeOverseerMessage($type, $data) {
306
$structure = array($type);
307
308
if ($data !== null) {
309
$structure[] = $data;
310
}
311
312
return json_encode($structure)."\n";
313
}
314
315
private function emitOverseerMessage($type, $data) {
316
$this->endStdoutCapture();
317
echo $this->encodeOverseerMessage($type, $data);
318
$this->beginStdoutCapture();
319
}
320
321
public static function errorListener($event, $value, array $metadata) {
322
// If the caller has redirected the error log to a file, PHP won't output
323
// messages to stderr, so the overseer can't capture them. Install a
324
// listener which just echoes errors to stderr, so the overseer is always
325
// aware of errors.
326
327
$console = PhutilConsole::getConsole();
328
$message = idx($metadata, 'default_message');
329
330
if ($message) {
331
$console->writeErr("%s\n", $message);
332
}
333
if (idx($metadata, 'trace')) {
334
$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);
335
$console->writeErr("%s\n", $trace);
336
}
337
}
338
339
340
/* -( Autoscaling )-------------------------------------------------------- */
341
342
343
/**
344
* Prepare to become busy. This may autoscale the pool up.
345
*
346
* This notifies the overseer that the daemon has become busy. If daemons
347
* that are part of an autoscale pool are continuously busy for a prolonged
348
* period of time, the overseer may scale up the pool.
349
*
350
* @return this
351
* @task autoscale
352
*/
353
protected function willBeginWork() {
354
if ($this->workState != self::WORKSTATE_BUSY) {
355
$this->workState = self::WORKSTATE_BUSY;
356
$this->idleSince = null;
357
$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
358
}
359
360
return $this;
361
}
362
363
364
/**
365
* Prepare to idle. This may autoscale the pool down.
366
*
367
* This notifies the overseer that the daemon is no longer busy. If daemons
368
* that are part of an autoscale pool are idle for a prolonged period of
369
* time, they may exit to scale the pool down.
370
*
371
* @return this
372
* @task autoscale
373
*/
374
protected function willBeginIdle() {
375
if ($this->workState != self::WORKSTATE_IDLE) {
376
$this->workState = self::WORKSTATE_IDLE;
377
$this->idleSince = time();
378
$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
379
}
380
381
return $this;
382
}
383
384
protected function getIdleDuration() {
385
if (!$this->idleSince) {
386
return null;
387
}
388
389
$now = time();
390
return ($now - $this->idleSince);
391
}
392
393
}
394
395