Path: blob/master/src/applications/fact/extract/PhabricatorFactUpdateIterator.php
12256 views
<?php12/**3* Iterate over objects by update time in a stable way. This iterator only works4* for "normal" Lisk objects: objects with an auto-increment ID and a5* dateModified column.6*/7final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator {89private $cursor;10private $object;11private $position;12private $ignoreUpdatesDuration = 15;1314public function __construct(LiskDAO $object) {15$this->object = $object;16}1718public function setPosition($position) {19$this->position = $position;20return $this;21}2223protected function didRewind() {24$this->cursor = $this->position;25}2627protected function getCursorFromObject($object) {28if ($object->hasProperty('dateModified')) {29return $object->getDateModified().':'.$object->getID();30} else {31return $object->getID();32}33}3435public function key() {36return $this->getCursorFromObject($this->current());37}3839protected function loadPage() {40if ($this->object->hasProperty('dateModified')) {41if ($this->cursor) {42list($after_epoch, $after_id) = explode(':', $this->cursor);43} else {44$after_epoch = 0;45$after_id = 0;46}4748// NOTE: We ignore recent updates because once we process an update we'll49// never process rows behind it again. We need to read only rows which50// we're sure no new rows will be inserted behind. If we read a row that51// was updated on the current second, another update later on in this52// second could affect an object with a lower ID, and we'd skip that53// update. To avoid this, just ignore any rows which have been updated in54// the last few seconds. This also reduces the amount of work we need to55// do if an object is repeatedly updated; we will just look at the end56// state without processing the intermediate states. Finally, this gives57// us reasonable protections against clock skew between the machine the58// daemon is running on and any machines performing writes.5960$page = $this->object->loadAllWhere(61'((dateModified > %d) OR (dateModified = %d AND id > %d))62AND (dateModified < %d - %d)63ORDER BY dateModified ASC, id ASC LIMIT %d',64$after_epoch,65$after_epoch,66$after_id,67time(),68$this->ignoreUpdatesDuration,69$this->getPageSize());70} else {71if ($this->cursor) {72$after_id = $this->cursor;73} else {74$after_id = 0;75}7677$page = $this->object->loadAllWhere(78'id > %d ORDER BY id ASC LIMIT %d',79$after_id,80$this->getPageSize());81}8283if ($page) {84$this->cursor = $this->getCursorFromObject(end($page));85}8687return $page;88}8990}919293