Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/phabricator
Path: blob/master/src/applications/fact/extract/PhabricatorFactUpdateIterator.php
12256 views
1
<?php
2
3
/**
4
* Iterate over objects by update time in a stable way. This iterator only works
5
* for "normal" Lisk objects: objects with an auto-increment ID and a
6
* dateModified column.
7
*/
8
final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator {
9
10
private $cursor;
11
private $object;
12
private $position;
13
private $ignoreUpdatesDuration = 15;
14
15
public function __construct(LiskDAO $object) {
16
$this->object = $object;
17
}
18
19
public function setPosition($position) {
20
$this->position = $position;
21
return $this;
22
}
23
24
protected function didRewind() {
25
$this->cursor = $this->position;
26
}
27
28
protected function getCursorFromObject($object) {
29
if ($object->hasProperty('dateModified')) {
30
return $object->getDateModified().':'.$object->getID();
31
} else {
32
return $object->getID();
33
}
34
}
35
36
public function key() {
37
return $this->getCursorFromObject($this->current());
38
}
39
40
protected function loadPage() {
41
if ($this->object->hasProperty('dateModified')) {
42
if ($this->cursor) {
43
list($after_epoch, $after_id) = explode(':', $this->cursor);
44
} else {
45
$after_epoch = 0;
46
$after_id = 0;
47
}
48
49
// NOTE: We ignore recent updates because once we process an update we'll
50
// never process rows behind it again. We need to read only rows which
51
// we're sure no new rows will be inserted behind. If we read a row that
52
// was updated on the current second, another update later on in this
53
// second could affect an object with a lower ID, and we'd skip that
54
// update. To avoid this, just ignore any rows which have been updated in
55
// the last few seconds. This also reduces the amount of work we need to
56
// do if an object is repeatedly updated; we will just look at the end
57
// state without processing the intermediate states. Finally, this gives
58
// us reasonable protections against clock skew between the machine the
59
// daemon is running on and any machines performing writes.
60
61
$page = $this->object->loadAllWhere(
62
'((dateModified > %d) OR (dateModified = %d AND id > %d))
63
AND (dateModified < %d - %d)
64
ORDER BY dateModified ASC, id ASC LIMIT %d',
65
$after_epoch,
66
$after_epoch,
67
$after_id,
68
time(),
69
$this->ignoreUpdatesDuration,
70
$this->getPageSize());
71
} else {
72
if ($this->cursor) {
73
$after_id = $this->cursor;
74
} else {
75
$after_id = 0;
76
}
77
78
$page = $this->object->loadAllWhere(
79
'id > %d ORDER BY id ASC LIMIT %d',
80
$after_id,
81
$this->getPageSize());
82
}
83
84
if ($page) {
85
$this->cursor = $this->getCursorFromObject(end($page));
86
}
87
88
return $page;
89
}
90
91
}
92
93