Path: blob/master/src/infrastructure/daemon/PhutilDaemon.php
12241 views
<?php12/**3* Scaffolding for implementing robust background processing scripts.4*5*6* Autoscaling7* ===========8*9* Autoscaling automatically launches copies of a daemon when it is busy10* (scaling the pool up) and stops them when they're idle (scaling the pool11* down). This is appropriate for daemons which perform highly parallelizable12* work.13*14* To make a daemon support autoscaling, the implementation should look15* something like this:16*17* while (!$this->shouldExit()) {18* if (work_available()) {19* $this->willBeginWork();20* do_work();21* $this->sleep(0);22* } else {23* $this->willBeginIdle();24* $this->sleep(1);25* }26* }27*28* In particular, call @{method:willBeginWork} before becoming busy, and29* @{method:willBeginIdle} when no work is available. If the daemon is launched30* into an autoscale pool, this will cause the pool to automatically scale up31* when busy and down when idle.32*33* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple34* autoscaling daemon.35*36* Launching a daemon which does not make these callbacks into an autoscale37* pool will have no effect.38*39* @task overseer Communicating With the Overseer40* @task autoscale Autoscaling Daemon Pools41*/42abstract class PhutilDaemon extends Phobject {4344const MESSAGETYPE_STDOUT = 'stdout';45const MESSAGETYPE_HEARTBEAT = 'heartbeat';46const MESSAGETYPE_BUSY = 'busy';47const MESSAGETYPE_IDLE = 'idle';48const MESSAGETYPE_DOWN = 'down';49const MESSAGETYPE_HIBERNATE = 'hibernate';5051const WORKSTATE_BUSY = 'busy';52const WORKSTATE_IDLE = 'idle';5354private $argv;55private $traceMode;56private $traceMemory;57private $verbose;58private $notifyReceived;59private $inGracefulShutdown;60private $workState = null;61private $idleSince = null;62private $scaledownDuration;6364final public function setVerbose($verbose) {65$this->verbose = $verbose;66return $this;67}6869final public function getVerbose() {70return $this->verbose;71}7273final public function setScaledownDuration($scaledown_duration) {74$this->scaledownDuration = $scaledown_duration;75return $this;76}7778final public function getScaledownDuration() {79return $this->scaledownDuration;80}8182final public function __construct(array $argv) {83$this->argv = $argv;8485$router = PhutilSignalRouter::getRouter();86$handler_key = 'daemon.term';87if (!$router->getHandler($handler_key)) {88$handler = new PhutilCallbackSignalHandler(89SIGTERM,90__CLASS__.'::onTermSignal');91$router->installHandler($handler_key, $handler);92}9394pcntl_signal(SIGINT, array($this, 'onGracefulSignal'));95pcntl_signal(SIGUSR2, array($this, 'onNotifySignal'));9697// Without discard mode, this consumes unbounded amounts of memory. Keep98// memory bounded.99PhutilServiceProfiler::getInstance()->enableDiscardMode();100101$this->beginStdoutCapture();102}103104final public function __destruct() {105$this->endStdoutCapture();106}107108final public function stillWorking() {109$this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null);110111if ($this->traceMemory) {112$daemon = get_class($this);113fprintf(114STDERR,115"%s %s %s\n",116'<RAMS>',117$daemon,118pht(119'Memory Usage: %s KB',120new PhutilNumber(memory_get_usage() / 1024, 1)));121}122}123124final public function shouldExit() {125return $this->inGracefulShutdown;126}127128final protected function shouldHibernate($duration) {129// Don't hibernate if we don't have very long to sleep.130if ($duration < 30) {131return false;132}133134// Never hibernate if we're part of a pool and could scale down instead.135// We only hibernate the last process to drop the pool size to zero.136if ($this->getScaledownDuration()) {137return false;138}139140// Don't hibernate for too long.141$duration = min($duration, phutil_units('3 minutes in seconds'));142143$this->emitOverseerMessage(144self::MESSAGETYPE_HIBERNATE,145array(146'duration' => $duration,147));148149$this->log(150pht(151'Preparing to hibernate for %s second(s).',152new PhutilNumber($duration)));153154return true;155}156157final protected function sleep($duration) {158$this->notifyReceived = false;159$this->willSleep($duration);160$this->stillWorking();161162$scale_down = $this->getScaledownDuration();163164$max_sleep = 60;165if ($scale_down) {166$max_sleep = min($max_sleep, $scale_down);167}168169if ($scale_down) {170if ($this->workState == self::WORKSTATE_IDLE) {171$dur = $this->getIdleDuration();172$this->log(pht('Idle for %s seconds.', $dur));173}174}175176while ($duration > 0 &&177!$this->notifyReceived &&178!$this->shouldExit()) {179180// If this is an autoscaling clone and we've been idle for too long,181// we're going to scale the pool down by exiting and not restarting. The182// DOWN message tells the overseer that we don't want to be restarted.183if ($scale_down) {184if ($this->workState == self::WORKSTATE_IDLE) {185if ($this->idleSince && ($this->idleSince + $scale_down < time())) {186$this->inGracefulShutdown = true;187$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);188$this->log(189pht(190'Daemon was idle for more than %s second(s), '.191'scaling pool down.',192new PhutilNumber($scale_down)));193break;194}195}196}197198sleep(min($duration, $max_sleep));199$duration -= $max_sleep;200$this->stillWorking();201}202}203204protected function willSleep($duration) {205return;206}207208public static function onTermSignal($signo) {209self::didCatchSignal($signo);210}211212final protected function getArgv() {213return $this->argv;214}215216final public function execute() {217$this->willRun();218$this->run();219}220221abstract protected function run();222223final public function setTraceMemory() {224$this->traceMemory = true;225return $this;226}227228final public function getTraceMemory() {229return $this->traceMemory;230}231232final public function setTraceMode() {233$this->traceMode = true;234PhutilServiceProfiler::installEchoListener();235PhutilConsole::getConsole()->getServer()->setEnableLog(true);236$this->didSetTraceMode();237return $this;238}239240final public function getTraceMode() {241return $this->traceMode;242}243244final public function onGracefulSignal($signo) {245self::didCatchSignal($signo);246$this->inGracefulShutdown = true;247}248249final public function onNotifySignal($signo) {250self::didCatchSignal($signo);251$this->notifyReceived = true;252$this->onNotify($signo);253}254255protected function onNotify($signo) {256// This is a hook for subclasses.257}258259protected function willRun() {260// This is a hook for subclasses.261}262263protected function didSetTraceMode() {264// This is a hook for subclasses.265}266267final protected function log($message) {268if ($this->verbose) {269$daemon = get_class($this);270fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message);271}272}273274private static function didCatchSignal($signo) {275$signame = phutil_get_signal_name($signo);276fprintf(277STDERR,278"%s Caught signal %s (%s).\n",279'<SGNL>',280$signo,281$signame);282}283284285/* -( Communicating With the Overseer )------------------------------------ */286287288private function beginStdoutCapture() {289ob_start(array($this, 'didReceiveStdout'), 2);290}291292private function endStdoutCapture() {293ob_end_flush();294}295296public function didReceiveStdout($data) {297if (!strlen($data)) {298return '';299}300301return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data);302}303304private function encodeOverseerMessage($type, $data) {305$structure = array($type);306307if ($data !== null) {308$structure[] = $data;309}310311return json_encode($structure)."\n";312}313314private function emitOverseerMessage($type, $data) {315$this->endStdoutCapture();316echo $this->encodeOverseerMessage($type, $data);317$this->beginStdoutCapture();318}319320public static function errorListener($event, $value, array $metadata) {321// If the caller has redirected the error log to a file, PHP won't output322// messages to stderr, so the overseer can't capture them. Install a323// listener which just echoes errors to stderr, so the overseer is always324// aware of errors.325326$console = PhutilConsole::getConsole();327$message = idx($metadata, 'default_message');328329if ($message) {330$console->writeErr("%s\n", $message);331}332if (idx($metadata, 'trace')) {333$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']);334$console->writeErr("%s\n", $trace);335}336}337338339/* -( Autoscaling )-------------------------------------------------------- */340341342/**343* Prepare to become busy. This may autoscale the pool up.344*345* This notifies the overseer that the daemon has become busy. If daemons346* that are part of an autoscale pool are continuously busy for a prolonged347* period of time, the overseer may scale up the pool.348*349* @return this350* @task autoscale351*/352protected function willBeginWork() {353if ($this->workState != self::WORKSTATE_BUSY) {354$this->workState = self::WORKSTATE_BUSY;355$this->idleSince = null;356$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);357}358359return $this;360}361362363/**364* Prepare to idle. This may autoscale the pool down.365*366* This notifies the overseer that the daemon is no longer busy. If daemons367* that are part of an autoscale pool are idle for a prolonged period of368* time, they may exit to scale the pool down.369*370* @return this371* @task autoscale372*/373protected function willBeginIdle() {374if ($this->workState != self::WORKSTATE_IDLE) {375$this->workState = self::WORKSTATE_IDLE;376$this->idleSince = time();377$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);378}379380return $this;381}382383protected function getIdleDuration() {384if (!$this->idleSince) {385return null;386}387388$now = time();389return ($now - $this->idleSince);390}391392}393394395