Path: blob/master/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php
12242 views
<?php12/**3* Schedule and execute event triggers, which run code at specific times.4*5* Also performs garbage collection of old logs, caches, etc.6*7* @task garbage Garbage Collection8*/9final class PhabricatorTriggerDaemon10extends PhabricatorDaemon {1112const COUNTER_VERSION = 'trigger.version';13const COUNTER_CURSOR = 'trigger.cursor';1415private $garbageCollectors;16private $nextCollection;1718private $anyNuanceData;19private $nuanceSources;20private $nuanceCursors;2122private $calendarEngine;2324protected function run() {2526// The trigger daemon is a low-level infrastructure daemon which schedules27// and executes chronological events. Examples include a subscription which28// generates a bill on the 12th of every month, or a reminder email 1529// minutes before a meeting.3031// Only one trigger daemon can run at a time, and very little work should32// happen in the daemon process. In general, triggered events should33// just schedule a task into the normal daemon worker queue and then34// return. This allows the real work to take longer to execute without35// disrupting other triggers.3637// The trigger mechanism guarantees that events will execute exactly once,38// but does not guarantee that they will execute at precisely the specified39// time. Under normal circumstances, they should execute within a minute or40// so of the desired time, so this mechanism can be used for things like41// meeting reminders.4243// If the trigger queue backs up (for example, because it is overwhelmed by44// trigger updates, doesn't run for a while, or a trigger action is written45// inefficiently) or the daemon queue backs up (usually for similar46// reasons), events may execute an arbitrarily long time after they were47// scheduled to execute. In some cases (like billing a subscription) this48// may be desirable; in other cases (like sending a meeting reminder) the49// action may want to check the current time and see if the event is still50// relevant.5152// The trigger daemon works in two phases:53//54// 1. A scheduling phase processes recently updated triggers and55// schedules them for future execution. For example, this phase would56// see that a meeting trigger had been changed recently, determine57// when the reminder for it should execute, and then schedule the58// action to execute at that future date.59// 2. An execution phase runs the actions for any scheduled events which60// are due to execute.61//62// The major goal of this design is to deliver on the guarantee that events63// will execute exactly once. It prevents race conditions in scheduling64// and execution by ensuring there is only one writer for either of these65// phases. Without this separation of responsibilities, web processes66// trying to reschedule events after an update could race with other web67// processes or the daemon.6869// We want to start the first GC cycle right away, not wait 4 hours.70$this->nextCollection = PhabricatorTime::getNow();7172do {73PhabricatorCaches::destroyRequestCache();7475$lock = PhabricatorGlobalLock::newLock('trigger');7677try {78$lock->lock(5);79} catch (PhutilLockException $ex) {80throw new PhutilProxyException(81pht(82'Another process is holding the trigger lock. Usually, this '.83'means another copy of the trigger daemon is running elsewhere. '.84'Multiple processes are not permitted to update triggers '.85'simultaneously.'),86$ex);87}8889// Run the scheduling phase. This finds updated triggers which we have90// not scheduled yet and schedules them.91$last_version = $this->loadCurrentCursor();92$head_version = $this->loadCurrentVersion();9394// The cursor points at the next record to process, so we can only skip95// this step if we're ahead of the version number.96if ($last_version <= $head_version) {97$this->scheduleTriggers($last_version);98}99100// Run the execution phase. This finds events which are due to execute101// and runs them.102$this->executeTriggers();103104$lock->unlock();105106$sleep_duration = $this->getSleepDuration();107$sleep_duration = $this->runNuanceImportCursors($sleep_duration);108$sleep_duration = $this->runGarbageCollection($sleep_duration);109$sleep_duration = $this->runCalendarNotifier($sleep_duration);110111if ($this->shouldHibernate($sleep_duration)) {112break;113}114115$this->sleep($sleep_duration);116} while (!$this->shouldExit());117}118119120/**121* Process all of the triggers which have been updated since the last time122* the daemon ran, scheduling them into the event table.123*124* @param int Cursor for the next version update to process.125* @return void126*/127private function scheduleTriggers($cursor) {128$limit = 100;129130$query = id(new PhabricatorWorkerTriggerQuery())131->setViewer($this->getViewer())132->withVersionBetween($cursor, null)133->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION)134->needEvents(true)135->setLimit($limit);136while (true) {137$triggers = $query->execute();138139foreach ($triggers as $trigger) {140$event = $trigger->getEvent();141if ($event) {142$last_epoch = $event->getLastEventEpoch();143} else {144$last_epoch = null;145}146147$next_epoch = $trigger->getNextEventEpoch(148$last_epoch,149$is_reschedule = false);150151$new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)152->setLastEventEpoch($last_epoch)153->setNextEventEpoch($next_epoch);154155$new_event->openTransaction();156if ($event) {157$event->delete();158}159160// Always save the new event. Note that we save it even if the next161// epoch is `null`, indicating that it will never fire, because we162// would lose the last epoch information if we delete it.163//164// In particular, some events may want to execute exactly once.165// Retaining the last epoch allows them to do this, even if the166// trigger is updated.167$new_event->save();168169// Move the cursor forward to make sure we don't reprocess this170// trigger until it is updated again.171$this->updateCursor($trigger->getTriggerVersion() + 1);172$new_event->saveTransaction();173}174175// If we saw fewer than a full page of updated triggers, we're caught176// up, so we can move on to the execution phase.177if (count($triggers) < $limit) {178break;179}180181// Otherwise, skip past the stuff we just processed and grab another182// page of updated triggers.183$min = last($triggers)->getTriggerVersion() + 1;184$query->withVersionBetween($min, null);185186$this->stillWorking();187}188}189190191/**192* Run scheduled event triggers which are due for execution.193*194* @return void195*/196private function executeTriggers() {197198// We run only a limited number of triggers before ending the execution199// phase. If we ran until exhaustion, we could end up executing very200// out-of-date triggers if there was a long backlog: trigger changes201// during this phase are not reflected in the event table until we run202// another scheduling phase.203204// If we exit this phase with triggers still ready to execute we'll205// jump back into the scheduling phase immediately, so this just makes206// sure we don't spend an unreasonably long amount of time without207// processing trigger updates and doing rescheduling.208209$limit = 100;210$now = PhabricatorTime::getNow();211212$triggers = id(new PhabricatorWorkerTriggerQuery())213->setViewer($this->getViewer())214->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)215->withNextEventBetween(null, $now)216->needEvents(true)217->setLimit($limit)218->execute();219foreach ($triggers as $trigger) {220$event = $trigger->getEvent();221222// Execute the trigger action.223$trigger->executeTrigger(224$event->getLastEventEpoch(),225$event->getNextEventEpoch());226227// Now that we've executed the trigger, the current trigger epoch is228// going to become the last epoch.229$last_epoch = $event->getNextEventEpoch();230231// If this is a recurring trigger, give it an opportunity to reschedule.232$reschedule_epoch = $trigger->getNextEventEpoch(233$last_epoch,234$is_reschedule = true);235236// Don't reschedule events unless the next occurrence is in the future.237if (($reschedule_epoch !== null) &&238($last_epoch !== null) &&239($reschedule_epoch <= $last_epoch)) {240throw new Exception(241pht(242'Trigger is attempting to perform a routine reschedule where '.243'the next event (at %s) does not occur after the previous event '.244'(at %s). Routine reschedules must strictly move event triggers '.245'forward through time to avoid executing a trigger an infinite '.246'number of times instantaneously.',247$reschedule_epoch,248$last_epoch));249}250251$new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)252->setLastEventEpoch($last_epoch)253->setNextEventEpoch($reschedule_epoch);254255$event->openTransaction();256// Remove the event we just processed.257$event->delete();258259// See note in the scheduling phase about this; we save the new event260// even if the next epoch is `null`.261$new_event->save();262$event->saveTransaction();263}264}265266267/**268* Get the number of seconds to sleep for before starting the next scheduling269* phase.270*271* If no events are scheduled soon, we'll sleep briefly. Otherwise,272* we'll sleep until the next scheduled event.273*274* @return int Number of seconds to sleep for.275*/276private function getSleepDuration() {277$sleep = phutil_units('3 minutes in seconds');278279$next_triggers = id(new PhabricatorWorkerTriggerQuery())280->setViewer($this->getViewer())281->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)282->withNextEventBetween(0, null)283->setLimit(1)284->needEvents(true)285->execute();286if ($next_triggers) {287$next_trigger = head($next_triggers);288$next_epoch = $next_trigger->getEvent()->getNextEventEpoch();289$until = max(0, $next_epoch - PhabricatorTime::getNow());290$sleep = min($sleep, $until);291}292293return $sleep;294}295296297/* -( Counters )----------------------------------------------------------- */298299300private function loadCurrentCursor() {301return $this->loadCurrentCounter(self::COUNTER_CURSOR);302}303304private function loadCurrentVersion() {305return $this->loadCurrentCounter(self::COUNTER_VERSION);306}307308private function updateCursor($value) {309LiskDAO::overwriteCounterValue(310id(new PhabricatorWorkerTrigger())->establishConnection('w'),311self::COUNTER_CURSOR,312$value);313}314315private function loadCurrentCounter($counter_name) {316return (int)LiskDAO::loadCurrentCounterValue(317id(new PhabricatorWorkerTrigger())->establishConnection('w'),318$counter_name);319}320321322/* -( Garbage Collection )------------------------------------------------- */323324325/**326* Run the garbage collector for up to a specified number of seconds.327*328* @param int Number of seconds the GC may run for.329* @return int Number of seconds remaining in the time budget.330* @task garbage331*/332private function runGarbageCollection($duration) {333$run_until = (PhabricatorTime::getNow() + $duration);334335// NOTE: We always run at least one GC cycle to make sure the GC can make336// progress even if the trigger queue is busy.337do {338$more_garbage = $this->updateGarbageCollection();339if (!$more_garbage) {340// If we don't have any more collection work to perform, we're all341// done.342break;343}344} while (PhabricatorTime::getNow() <= $run_until);345346$remaining = max(0, $run_until - PhabricatorTime::getNow());347348return $remaining;349}350351352/**353* Update garbage collection, possibly collecting a small amount of garbage.354*355* @return bool True if there is more garbage to collect.356* @task garbage357*/358private function updateGarbageCollection() {359// If we're ready to start the next collection cycle, load all the360// collectors.361$next = $this->nextCollection;362if ($next && (PhabricatorTime::getNow() >= $next)) {363$this->nextCollection = null;364365$all_collectors = PhabricatorGarbageCollector::getAllCollectors();366$this->garbageCollectors = $all_collectors;367}368369// If we're in a collection cycle, continue collection.370if ($this->garbageCollectors) {371foreach ($this->garbageCollectors as $key => $collector) {372$more_garbage = $collector->runCollector();373if (!$more_garbage) {374unset($this->garbageCollectors[$key]);375}376// We only run one collection per call, to prevent triggers from being377// thrown too far off schedule if there's a lot of garbage to collect.378break;379}380381if ($this->garbageCollectors) {382// If we have more work to do, return true.383return true;384}385386// Otherwise, reschedule another cycle in 4 hours.387$now = PhabricatorTime::getNow();388$wait = phutil_units('4 hours in seconds');389$this->nextCollection = $now + $wait;390}391392return false;393}394395396/* -( Nuance Importers )--------------------------------------------------- */397398399private function runNuanceImportCursors($duration) {400$run_until = (PhabricatorTime::getNow() + $duration);401402do {403$more_data = $this->updateNuanceImportCursors();404if (!$more_data) {405break;406}407} while (PhabricatorTime::getNow() <= $run_until);408409$remaining = max(0, $run_until - PhabricatorTime::getNow());410411return $remaining;412}413414415private function updateNuanceImportCursors() {416$nuance_app = 'PhabricatorNuanceApplication';417if (!PhabricatorApplication::isClassInstalled($nuance_app)) {418return false;419}420421// If we haven't loaded sources yet, load them first.422if (!$this->nuanceSources && !$this->nuanceCursors) {423$this->anyNuanceData = false;424425$sources = id(new NuanceSourceQuery())426->setViewer($this->getViewer())427->withIsDisabled(false)428->withHasImportCursors(true)429->execute();430if (!$sources) {431return false;432}433434$this->nuanceSources = array_reverse($sources);435}436437// If we don't have any cursors, move to the next source and generate its438// cursors.439if (!$this->nuanceCursors) {440$source = array_pop($this->nuanceSources);441442$definition = $source->getDefinition()443->setViewer($this->getViewer())444->setSource($source);445446$cursors = $definition->getImportCursors();447$this->nuanceCursors = array_reverse($cursors);448}449450// Update the next cursor.451$cursor = array_pop($this->nuanceCursors);452if ($cursor) {453$more_data = $cursor->importFromSource();454if ($more_data) {455$this->anyNuanceData = true;456}457}458459if (!$this->nuanceSources && !$this->nuanceCursors) {460return $this->anyNuanceData;461}462463return true;464}465466467/* -( Calendar Notifier )-------------------------------------------------- */468469470private function runCalendarNotifier($duration) {471$run_until = (PhabricatorTime::getNow() + $duration);472473if (!$this->calendarEngine) {474$this->calendarEngine = new PhabricatorCalendarNotificationEngine();475}476477$this->calendarEngine->publishNotifications();478479$remaining = max(0, $run_until - PhabricatorTime::getNow());480return $remaining;481}482483}484485486