Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/phabricator
Path: blob/master/src/infrastructure/daemon/workers/PhabricatorWorker.php
12242 views
1
<?php
2
3
/**
4
* @task config Configuring Retries and Failures
5
*/
6
abstract class PhabricatorWorker extends Phobject {
7
8
private $data;
9
private static $runAllTasksInProcess = false;
10
private $queuedTasks = array();
11
private $currentWorkerTask;
12
13
// NOTE: Lower priority numbers execute first. The priority numbers have to
14
// have the same ordering that IDs do (lowest first) so MySQL can use a
15
// multipart key across both of them efficiently.
16
17
const PRIORITY_ALERTS = 1000;
18
const PRIORITY_DEFAULT = 2000;
19
const PRIORITY_COMMIT = 2500;
20
const PRIORITY_BULK = 3000;
21
const PRIORITY_INDEX = 3500;
22
const PRIORITY_IMPORT = 4000;
23
24
/**
25
* Special owner indicating that the task has yielded.
26
*/
27
const YIELD_OWNER = '(yield)';
28
29
/* -( Configuring Retries and Failures )----------------------------------- */
30
31
32
/**
33
* Return the number of seconds this worker needs hold a lease on the task for
34
* while it performs work. For most tasks you can leave this at `null`, which
35
* will give you a default lease (currently 2 hours).
36
*
37
* For tasks which may take a very long time to complete, you should return
38
* an upper bound on the amount of time the task may require.
39
*
40
* @return int|null Number of seconds this task needs to remain leased for,
41
* or null for a default lease.
42
*
43
* @task config
44
*/
45
public function getRequiredLeaseTime() {
46
return null;
47
}
48
49
50
/**
51
* Return the maximum number of times this task may be retried before it is
52
* considered permanently failed. By default, tasks retry indefinitely. You
53
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
54
* immediate permanent failure.
55
*
56
* @return int|null Number of times the task will retry before permanent
57
* failure. Return `null` to retry indefinitely.
58
*
59
* @task config
60
*/
61
public function getMaximumRetryCount() {
62
return null;
63
}
64
65
66
/**
67
* Return the number of seconds a task should wait after a failure before
68
* retrying. For most tasks you can leave this at `null`, which will give you
69
* a short default retry period (currently 60 seconds).
70
*
71
* @param PhabricatorWorkerTask The task itself. This object is probably
72
* useful mostly to examine the failure count
73
* if you want to implement staggered retries,
74
* or to examine the execution exception if
75
* you want to react to different failures in
76
* different ways.
77
* @return int|null Number of seconds to wait between retries,
78
* or null for a default retry period
79
* (currently 60 seconds).
80
*
81
* @task config
82
*/
83
public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
84
return null;
85
}
86
87
public function setCurrentWorkerTask(PhabricatorWorkerTask $task) {
88
$this->currentWorkerTask = $task;
89
return $this;
90
}
91
92
public function getCurrentWorkerTask() {
93
return $this->currentWorkerTask;
94
}
95
96
public function getCurrentWorkerTaskID() {
97
$task = $this->getCurrentWorkerTask();
98
if (!$task) {
99
return null;
100
}
101
return $task->getID();
102
}
103
104
abstract protected function doWork();
105
106
final public function __construct($data) {
107
$this->data = $data;
108
}
109
110
final protected function getTaskData() {
111
return $this->data;
112
}
113
114
final protected function getTaskDataValue($key, $default = null) {
115
$data = $this->getTaskData();
116
if (!is_array($data)) {
117
throw new PhabricatorWorkerPermanentFailureException(
118
pht('Expected task data to be a dictionary.'));
119
}
120
return idx($data, $key, $default);
121
}
122
123
final public function executeTask() {
124
$this->doWork();
125
}
126
127
final public static function scheduleTask(
128
$task_class,
129
$data,
130
$options = array()) {
131
132
PhutilTypeSpec::checkMap(
133
$options,
134
array(
135
'priority' => 'optional int|null',
136
'objectPHID' => 'optional string|null',
137
'containerPHID' => 'optional string|null',
138
'delayUntil' => 'optional int|null',
139
));
140
141
$priority = idx($options, 'priority');
142
if ($priority === null) {
143
$priority = self::PRIORITY_DEFAULT;
144
}
145
$object_phid = idx($options, 'objectPHID');
146
$container_phid = idx($options, 'containerPHID');
147
148
$task = id(new PhabricatorWorkerActiveTask())
149
->setTaskClass($task_class)
150
->setData($data)
151
->setPriority($priority)
152
->setObjectPHID($object_phid)
153
->setContainerPHID($container_phid);
154
155
$delay = idx($options, 'delayUntil');
156
if ($delay) {
157
$task->setLeaseExpires($delay);
158
}
159
160
if (self::$runAllTasksInProcess) {
161
// Do the work in-process.
162
$worker = newv($task_class, array($data));
163
164
while (true) {
165
try {
166
$worker->executeTask();
167
$worker->flushTaskQueue();
168
169
$task_result = PhabricatorWorkerArchiveTask::RESULT_SUCCESS;
170
break;
171
} catch (PhabricatorWorkerPermanentFailureException $ex) {
172
$proxy = new PhutilProxyException(
173
pht(
174
'In-process task ("%s") failed permanently.',
175
$task_class),
176
$ex);
177
178
phlog($proxy);
179
180
$task_result = PhabricatorWorkerArchiveTask::RESULT_FAILURE;
181
break;
182
} catch (PhabricatorWorkerYieldException $ex) {
183
phlog(
184
pht(
185
'In-process task "%s" yielded for %s seconds, sleeping...',
186
$task_class,
187
$ex->getDuration()));
188
sleep($ex->getDuration());
189
}
190
}
191
192
// Now, save a task row and immediately archive it so we can return an
193
// object with a valid ID.
194
$task->openTransaction();
195
$task->save();
196
$archived = $task->archiveTask($task_result, 0);
197
$task->saveTransaction();
198
199
return $archived;
200
} else {
201
$task->save();
202
return $task;
203
}
204
}
205
206
207
public function renderForDisplay(PhabricatorUser $viewer) {
208
return null;
209
}
210
211
/**
212
* Set this flag to execute scheduled tasks synchronously, in the same
213
* process. This is useful for debugging, and otherwise dramatically worse
214
* in every way imaginable.
215
*/
216
public static function setRunAllTasksInProcess($all) {
217
self::$runAllTasksInProcess = $all;
218
}
219
220
final protected function log($pattern /* , ... */) {
221
$console = PhutilConsole::getConsole();
222
$argv = func_get_args();
223
call_user_func_array(array($console, 'writeLog'), $argv);
224
return $this;
225
}
226
227
228
/**
229
* Queue a task to be executed after this one succeeds.
230
*
231
* The followup task will be queued only if this task completes cleanly.
232
*
233
* @param string Task class to queue.
234
* @param array Data for the followup task.
235
* @param array Options for the followup task.
236
* @return this
237
*/
238
final protected function queueTask(
239
$class,
240
array $data,
241
array $options = array()) {
242
$this->queuedTasks[] = array($class, $data, $options);
243
return $this;
244
}
245
246
247
/**
248
* Get tasks queued as followups by @{method:queueTask}.
249
*
250
* @return list<tuple<string, wild, int|null>> Queued task specifications.
251
*/
252
final protected function getQueuedTasks() {
253
return $this->queuedTasks;
254
}
255
256
257
/**
258
* Schedule any queued tasks, then empty the task queue.
259
*
260
* By default, the queue is flushed only if a task succeeds. You can call
261
* this method to force the queue to flush before failing (for example, if
262
* you are using queues to improve locking behavior).
263
*
264
* @param map<string, wild> Optional default options.
265
* @return this
266
*/
267
final public function flushTaskQueue($defaults = array()) {
268
foreach ($this->getQueuedTasks() as $task) {
269
list($class, $data, $options) = $task;
270
271
$options = $options + $defaults;
272
273
self::scheduleTask($class, $data, $options);
274
}
275
276
$this->queuedTasks = array();
277
}
278
279
280
/**
281
* Awaken tasks that have yielded.
282
*
283
* Reschedules the specified tasks if they are currently queued in a yielded,
284
* unleased, unretried state so they'll execute sooner. This can let the
285
* queue avoid unnecessary waits.
286
*
287
* This method does not provide any assurances about when these tasks will
288
* execute, or even guarantee that it will have any effect at all.
289
*
290
* @param list<id> List of task IDs to try to awaken.
291
* @return void
292
*/
293
final public static function awakenTaskIDs(array $ids) {
294
if (!$ids) {
295
return;
296
}
297
298
$table = new PhabricatorWorkerActiveTask();
299
$conn_w = $table->establishConnection('w');
300
301
// NOTE: At least for now, we're keeping these tasks yielded, just
302
// pretending that they threw a shorter yield than they really did.
303
304
// Overlap the windows here to handle minor client/server time differences
305
// and because it's likely correct to push these tasks to the head of their
306
// respective priorities. There is a good chance they are ready to execute.
307
$window = phutil_units('1 hour in seconds');
308
$epoch_ago = (PhabricatorTime::getNow() - $window);
309
310
queryfx(
311
$conn_w,
312
'UPDATE %T SET leaseExpires = %d
313
WHERE id IN (%Ld)
314
AND leaseOwner = %s
315
AND leaseExpires > %d
316
AND failureCount = 0',
317
$table->getTableName(),
318
$epoch_ago,
319
$ids,
320
self::YIELD_OWNER,
321
$epoch_ago);
322
}
323
324
protected function newContentSource() {
325
return PhabricatorContentSource::newForSource(
326
PhabricatorDaemonContentSource::SOURCECONST);
327
}
328
329
}
330
331