Path: blob/master/src/infrastructure/daemon/workers/PhabricatorWorker.php
12242 views
<?php12/**3* @task config Configuring Retries and Failures4*/5abstract class PhabricatorWorker extends Phobject {67private $data;8private static $runAllTasksInProcess = false;9private $queuedTasks = array();10private $currentWorkerTask;1112// NOTE: Lower priority numbers execute first. The priority numbers have to13// have the same ordering that IDs do (lowest first) so MySQL can use a14// multipart key across both of them efficiently.1516const PRIORITY_ALERTS = 1000;17const PRIORITY_DEFAULT = 2000;18const PRIORITY_COMMIT = 2500;19const PRIORITY_BULK = 3000;20const PRIORITY_INDEX = 3500;21const PRIORITY_IMPORT = 4000;2223/**24* Special owner indicating that the task has yielded.25*/26const YIELD_OWNER = '(yield)';2728/* -( Configuring Retries and Failures )----------------------------------- */293031/**32* Return the number of seconds this worker needs hold a lease on the task for33* while it performs work. For most tasks you can leave this at `null`, which34* will give you a default lease (currently 2 hours).35*36* For tasks which may take a very long time to complete, you should return37* an upper bound on the amount of time the task may require.38*39* @return int|null Number of seconds this task needs to remain leased for,40* or null for a default lease.41*42* @task config43*/44public function getRequiredLeaseTime() {45return null;46}474849/**50* Return the maximum number of times this task may be retried before it is51* considered permanently failed. By default, tasks retry indefinitely. You52* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an53* immediate permanent failure.54*55* @return int|null Number of times the task will retry before permanent56* failure. Return `null` to retry indefinitely.57*58* @task config59*/60public function getMaximumRetryCount() {61return null;62}636465/**66* Return the number of seconds a task should wait after a failure before67* retrying. For most tasks you can leave this at `null`, which will give you68* a short default retry period (currently 60 seconds).69*70* @param PhabricatorWorkerTask The task itself. This object is probably71* useful mostly to examine the failure count72* if you want to implement staggered retries,73* or to examine the execution exception if74* you want to react to different failures in75* different ways.76* @return int|null Number of seconds to wait between retries,77* or null for a default retry period78* (currently 60 seconds).79*80* @task config81*/82public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {83return null;84}8586public function setCurrentWorkerTask(PhabricatorWorkerTask $task) {87$this->currentWorkerTask = $task;88return $this;89}9091public function getCurrentWorkerTask() {92return $this->currentWorkerTask;93}9495public function getCurrentWorkerTaskID() {96$task = $this->getCurrentWorkerTask();97if (!$task) {98return null;99}100return $task->getID();101}102103abstract protected function doWork();104105final public function __construct($data) {106$this->data = $data;107}108109final protected function getTaskData() {110return $this->data;111}112113final protected function getTaskDataValue($key, $default = null) {114$data = $this->getTaskData();115if (!is_array($data)) {116throw new PhabricatorWorkerPermanentFailureException(117pht('Expected task data to be a dictionary.'));118}119return idx($data, $key, $default);120}121122final public function executeTask() {123$this->doWork();124}125126final public static function scheduleTask(127$task_class,128$data,129$options = array()) {130131PhutilTypeSpec::checkMap(132$options,133array(134'priority' => 'optional int|null',135'objectPHID' => 'optional string|null',136'containerPHID' => 'optional string|null',137'delayUntil' => 'optional int|null',138));139140$priority = idx($options, 'priority');141if ($priority === null) {142$priority = self::PRIORITY_DEFAULT;143}144$object_phid = idx($options, 'objectPHID');145$container_phid = idx($options, 'containerPHID');146147$task = id(new PhabricatorWorkerActiveTask())148->setTaskClass($task_class)149->setData($data)150->setPriority($priority)151->setObjectPHID($object_phid)152->setContainerPHID($container_phid);153154$delay = idx($options, 'delayUntil');155if ($delay) {156$task->setLeaseExpires($delay);157}158159if (self::$runAllTasksInProcess) {160// Do the work in-process.161$worker = newv($task_class, array($data));162163while (true) {164try {165$worker->executeTask();166$worker->flushTaskQueue();167168$task_result = PhabricatorWorkerArchiveTask::RESULT_SUCCESS;169break;170} catch (PhabricatorWorkerPermanentFailureException $ex) {171$proxy = new PhutilProxyException(172pht(173'In-process task ("%s") failed permanently.',174$task_class),175$ex);176177phlog($proxy);178179$task_result = PhabricatorWorkerArchiveTask::RESULT_FAILURE;180break;181} catch (PhabricatorWorkerYieldException $ex) {182phlog(183pht(184'In-process task "%s" yielded for %s seconds, sleeping...',185$task_class,186$ex->getDuration()));187sleep($ex->getDuration());188}189}190191// Now, save a task row and immediately archive it so we can return an192// object with a valid ID.193$task->openTransaction();194$task->save();195$archived = $task->archiveTask($task_result, 0);196$task->saveTransaction();197198return $archived;199} else {200$task->save();201return $task;202}203}204205206public function renderForDisplay(PhabricatorUser $viewer) {207return null;208}209210/**211* Set this flag to execute scheduled tasks synchronously, in the same212* process. This is useful for debugging, and otherwise dramatically worse213* in every way imaginable.214*/215public static function setRunAllTasksInProcess($all) {216self::$runAllTasksInProcess = $all;217}218219final protected function log($pattern /* , ... */) {220$console = PhutilConsole::getConsole();221$argv = func_get_args();222call_user_func_array(array($console, 'writeLog'), $argv);223return $this;224}225226227/**228* Queue a task to be executed after this one succeeds.229*230* The followup task will be queued only if this task completes cleanly.231*232* @param string Task class to queue.233* @param array Data for the followup task.234* @param array Options for the followup task.235* @return this236*/237final protected function queueTask(238$class,239array $data,240array $options = array()) {241$this->queuedTasks[] = array($class, $data, $options);242return $this;243}244245246/**247* Get tasks queued as followups by @{method:queueTask}.248*249* @return list<tuple<string, wild, int|null>> Queued task specifications.250*/251final protected function getQueuedTasks() {252return $this->queuedTasks;253}254255256/**257* Schedule any queued tasks, then empty the task queue.258*259* By default, the queue is flushed only if a task succeeds. You can call260* this method to force the queue to flush before failing (for example, if261* you are using queues to improve locking behavior).262*263* @param map<string, wild> Optional default options.264* @return this265*/266final public function flushTaskQueue($defaults = array()) {267foreach ($this->getQueuedTasks() as $task) {268list($class, $data, $options) = $task;269270$options = $options + $defaults;271272self::scheduleTask($class, $data, $options);273}274275$this->queuedTasks = array();276}277278279/**280* Awaken tasks that have yielded.281*282* Reschedules the specified tasks if they are currently queued in a yielded,283* unleased, unretried state so they'll execute sooner. This can let the284* queue avoid unnecessary waits.285*286* This method does not provide any assurances about when these tasks will287* execute, or even guarantee that it will have any effect at all.288*289* @param list<id> List of task IDs to try to awaken.290* @return void291*/292final public static function awakenTaskIDs(array $ids) {293if (!$ids) {294return;295}296297$table = new PhabricatorWorkerActiveTask();298$conn_w = $table->establishConnection('w');299300// NOTE: At least for now, we're keeping these tasks yielded, just301// pretending that they threw a shorter yield than they really did.302303// Overlap the windows here to handle minor client/server time differences304// and because it's likely correct to push these tasks to the head of their305// respective priorities. There is a good chance they are ready to execute.306$window = phutil_units('1 hour in seconds');307$epoch_ago = (PhabricatorTime::getNow() - $window);308309queryfx(310$conn_w,311'UPDATE %T SET leaseExpires = %d312WHERE id IN (%Ld)313AND leaseOwner = %s314AND leaseExpires > %d315AND failureCount = 0',316$table->getTableName(),317$epoch_ago,318$ids,319self::YIELD_OWNER,320$epoch_ago);321}322323protected function newContentSource() {324return PhabricatorContentSource::newForSource(325PhabricatorDaemonContentSource::SOURCECONST);326}327328}329330331