Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/phabricator
Path: blob/master/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
12242 views
1
<?php
2
3
/**
4
* Manages repository synchronization for cluster repositories.
5
*
6
* @task config Configuring Synchronization
7
* @task sync Cluster Synchronization
8
* @task internal Internals
9
*/
10
final class DiffusionRepositoryClusterEngine extends Phobject {
11
12
private $repository;
13
private $viewer;
14
private $actingAsPHID;
15
private $logger;
16
17
private $clusterWriteLock;
18
private $clusterWriteVersion;
19
private $clusterWriteOwner;
20
21
22
/* -( Configuring Synchronization )---------------------------------------- */
23
24
25
public function setRepository(PhabricatorRepository $repository) {
26
$this->repository = $repository;
27
return $this;
28
}
29
30
public function getRepository() {
31
return $this->repository;
32
}
33
34
public function setViewer(PhabricatorUser $viewer) {
35
$this->viewer = $viewer;
36
return $this;
37
}
38
39
public function getViewer() {
40
return $this->viewer;
41
}
42
43
public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
44
$this->logger = $log;
45
return $this;
46
}
47
48
public function setActingAsPHID($acting_as_phid) {
49
$this->actingAsPHID = $acting_as_phid;
50
return $this;
51
}
52
53
public function getActingAsPHID() {
54
return $this->actingAsPHID;
55
}
56
57
private function getEffectiveActingAsPHID() {
58
if ($this->actingAsPHID) {
59
return $this->actingAsPHID;
60
}
61
62
return $this->getViewer()->getPHID();
63
}
64
65
66
/* -( Cluster Synchronization )-------------------------------------------- */
67
68
69
/**
70
* Synchronize repository version information after creating a repository.
71
*
72
* This initializes working copy versions for all currently bound devices to
73
* 0, so that we don't get stuck making an ambiguous choice about which
74
* devices are leaders when we later synchronize before a read.
75
*
76
* @task sync
77
*/
78
public function synchronizeWorkingCopyAfterCreation() {
79
if (!$this->shouldEnableSynchronization(false)) {
80
return;
81
}
82
83
$repository = $this->getRepository();
84
$repository_phid = $repository->getPHID();
85
86
$service = $repository->loadAlmanacService();
87
if (!$service) {
88
throw new Exception(pht('Failed to load repository cluster service.'));
89
}
90
91
$bindings = $service->getActiveBindings();
92
foreach ($bindings as $binding) {
93
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
94
$repository_phid,
95
$binding->getDevicePHID(),
96
0);
97
}
98
99
return $this;
100
}
101
102
103
/**
104
* @task sync
105
*/
106
public function synchronizeWorkingCopyAfterHostingChange() {
107
if (!$this->shouldEnableSynchronization(false)) {
108
return;
109
}
110
111
$repository = $this->getRepository();
112
$repository_phid = $repository->getPHID();
113
114
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
115
$repository_phid);
116
$versions = mpull($versions, null, 'getDevicePHID');
117
118
// After converting a hosted repository to observed, or vice versa, we
119
// need to reset version numbers because the clocks for observed and hosted
120
// repositories run on different units.
121
122
// We identify all the cluster leaders and reset their version to 0.
123
// We identify all the cluster followers and demote them.
124
125
// This allows the cluster to start over again at version 0 but keep the
126
// same leaders.
127
128
if ($versions) {
129
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
130
foreach ($versions as $version) {
131
$device_phid = $version->getDevicePHID();
132
133
if ($version->getRepositoryVersion() == $max_version) {
134
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
135
$repository_phid,
136
$device_phid,
137
0);
138
} else {
139
PhabricatorRepositoryWorkingCopyVersion::demoteDevice(
140
$repository_phid,
141
$device_phid);
142
}
143
}
144
}
145
146
return $this;
147
}
148
149
150
/**
151
* @task sync
152
*/
153
public function synchronizeWorkingCopyBeforeRead() {
154
if (!$this->shouldEnableSynchronization(true)) {
155
return;
156
}
157
158
$repository = $this->getRepository();
159
$repository_phid = $repository->getPHID();
160
161
$device = AlmanacKeys::getLiveDevice();
162
$device_phid = $device->getPHID();
163
164
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
165
$repository_phid,
166
$device_phid);
167
168
$lock_wait = phutil_units('2 minutes in seconds');
169
170
$this->logLine(
171
pht(
172
'Acquiring read lock for repository "%s" on device "%s"...',
173
$repository->getDisplayName(),
174
$device->getName()));
175
176
try {
177
$start = PhabricatorTime::getNow();
178
$read_lock->lock($lock_wait);
179
$waited = (PhabricatorTime::getNow() - $start);
180
181
if ($waited) {
182
$this->logLine(
183
pht(
184
'Acquired read lock after %s second(s).',
185
new PhutilNumber($waited)));
186
} else {
187
$this->logLine(
188
pht(
189
'Acquired read lock immediately.'));
190
}
191
} catch (PhutilLockException $ex) {
192
throw new PhutilProxyException(
193
pht(
194
'Failed to acquire read lock after waiting %s second(s). You '.
195
'may be able to retry later. (%s)',
196
new PhutilNumber($lock_wait),
197
$ex->getHint()),
198
$ex);
199
}
200
201
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
202
$repository_phid);
203
$versions = mpull($versions, null, 'getDevicePHID');
204
205
$this_version = idx($versions, $device_phid);
206
if ($this_version) {
207
$this_version = (int)$this_version->getRepositoryVersion();
208
} else {
209
$this_version = null;
210
}
211
212
if ($versions) {
213
// This is the normal case, where we have some version information and
214
// can identify which nodes are leaders. If the current node is not a
215
// leader, we want to fetch from a leader and then update our version.
216
217
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
218
if (($this_version === null) || ($max_version > $this_version)) {
219
if ($repository->isHosted()) {
220
$fetchable = array();
221
foreach ($versions as $version) {
222
if ($version->getRepositoryVersion() == $max_version) {
223
$fetchable[] = $version->getDevicePHID();
224
}
225
}
226
227
228
$this->synchronizeWorkingCopyFromDevices(
229
$fetchable,
230
$this_version,
231
$max_version);
232
} else {
233
$this->synchronizeWorkingCopyFromRemote();
234
}
235
236
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
237
$repository_phid,
238
$device_phid,
239
$max_version);
240
} else {
241
$this->logLine(
242
pht(
243
'Device "%s" is already a cluster leader and does not need '.
244
'to be synchronized.',
245
$device->getName()));
246
}
247
248
$result_version = $max_version;
249
} else {
250
// If no version records exist yet, we need to be careful, because we
251
// can not tell which nodes are leaders.
252
253
// There might be several nodes with arbitrary existing data, and we have
254
// no way to tell which one has the "right" data. If we pick wrong, we
255
// might erase some or all of the data in the repository.
256
257
// Since this is dangerous, we refuse to guess unless there is only one
258
// device. If we're the only device in the group, we obviously must be
259
// a leader.
260
261
$service = $repository->loadAlmanacService();
262
if (!$service) {
263
throw new Exception(pht('Failed to load repository cluster service.'));
264
}
265
266
$bindings = $service->getActiveBindings();
267
$device_map = array();
268
foreach ($bindings as $binding) {
269
$device_map[$binding->getDevicePHID()] = true;
270
}
271
272
if (count($device_map) > 1) {
273
throw new Exception(
274
pht(
275
'Repository "%s" exists on more than one device, but no device '.
276
'has any repository version information. There is no way for the '.
277
'software to determine which copy of the existing data is '.
278
'authoritative. Promote a device or see "Ambiguous Leaders" in '.
279
'the documentation.',
280
$repository->getDisplayName()));
281
}
282
283
if (empty($device_map[$device->getPHID()])) {
284
throw new Exception(
285
pht(
286
'Repository "%s" is being synchronized on device "%s", but '.
287
'this device is not bound to the corresponding cluster '.
288
'service ("%s").',
289
$repository->getDisplayName(),
290
$device->getName(),
291
$service->getName()));
292
}
293
294
// The current device is the only device in service, so it must be a
295
// leader. We can safely have any future nodes which come online read
296
// from it.
297
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
298
$repository_phid,
299
$device_phid,
300
0);
301
302
$result_version = 0;
303
}
304
305
$read_lock->unlock();
306
307
return $result_version;
308
}
309
310
311
/**
312
* @task sync
313
*/
314
public function synchronizeWorkingCopyBeforeWrite() {
315
if (!$this->shouldEnableSynchronization(true)) {
316
return;
317
}
318
319
$repository = $this->getRepository();
320
$viewer = $this->getViewer();
321
322
$repository_phid = $repository->getPHID();
323
324
$device = AlmanacKeys::getLiveDevice();
325
$device_phid = $device->getPHID();
326
327
$table = new PhabricatorRepositoryWorkingCopyVersion();
328
$locked_connection = $table->establishConnection('w');
329
330
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
331
$repository_phid);
332
333
$write_lock->setExternalConnection($locked_connection);
334
335
$this->logLine(
336
pht(
337
'Acquiring write lock for repository "%s"...',
338
$repository->getDisplayName()));
339
340
// See T13590. On the HTTP pathway, it's possible for us to hit the script
341
// time limit while holding the durable write lock if a user makes a big
342
// push. Remove the time limit before we acquire the durable lock.
343
set_time_limit(0);
344
345
$lock_wait = phutil_units('2 minutes in seconds');
346
try {
347
$write_wait_start = microtime(true);
348
349
$start = PhabricatorTime::getNow();
350
$step_wait = 1;
351
352
while (true) {
353
try {
354
$write_lock->lock((int)floor($step_wait));
355
$write_wait_end = microtime(true);
356
break;
357
} catch (PhutilLockException $ex) {
358
$waited = (PhabricatorTime::getNow() - $start);
359
if ($waited > $lock_wait) {
360
throw $ex;
361
}
362
$this->logActiveWriter($viewer, $repository);
363
}
364
365
// Wait a little longer before the next message we print.
366
$step_wait = $step_wait + 0.5;
367
$step_wait = min($step_wait, 3);
368
}
369
370
$waited = (PhabricatorTime::getNow() - $start);
371
if ($waited) {
372
$this->logLine(
373
pht(
374
'Acquired write lock after %s second(s).',
375
new PhutilNumber($waited)));
376
} else {
377
$this->logLine(
378
pht(
379
'Acquired write lock immediately.'));
380
}
381
} catch (PhutilLockException $ex) {
382
throw new PhutilProxyException(
383
pht(
384
'Failed to acquire write lock after waiting %s second(s). You '.
385
'may be able to retry later. (%s)',
386
new PhutilNumber($lock_wait),
387
$ex->getHint()),
388
$ex);
389
}
390
391
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
392
$repository_phid);
393
foreach ($versions as $version) {
394
if (!$version->getIsWriting()) {
395
continue;
396
}
397
398
throw new Exception(
399
pht(
400
'An previous write to this repository was interrupted; refusing '.
401
'new writes. This issue requires operator intervention to resolve, '.
402
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
403
'documentation for instructions.'));
404
}
405
406
$read_wait_start = microtime(true);
407
try {
408
$max_version = $this->synchronizeWorkingCopyBeforeRead();
409
} catch (Exception $ex) {
410
$write_lock->unlock();
411
throw $ex;
412
}
413
$read_wait_end = microtime(true);
414
415
$pid = getmypid();
416
$hash = Filesystem::readRandomCharacters(12);
417
$this->clusterWriteOwner = "{$pid}.{$hash}";
418
419
PhabricatorRepositoryWorkingCopyVersion::willWrite(
420
$locked_connection,
421
$repository_phid,
422
$device_phid,
423
array(
424
'userPHID' => $this->getEffectiveActingAsPHID(),
425
'epoch' => PhabricatorTime::getNow(),
426
'devicePHID' => $device_phid,
427
),
428
$this->clusterWriteOwner);
429
430
$this->clusterWriteVersion = $max_version;
431
$this->clusterWriteLock = $write_lock;
432
433
$write_wait = ($write_wait_end - $write_wait_start);
434
$read_wait = ($read_wait_end - $read_wait_start);
435
436
$log = $this->logger;
437
if ($log) {
438
$log->writeClusterEngineLogProperty('writeWait', $write_wait);
439
$log->writeClusterEngineLogProperty('readWait', $read_wait);
440
}
441
}
442
443
444
public function synchronizeWorkingCopyAfterDiscovery($new_version) {
445
if (!$this->shouldEnableSynchronization(true)) {
446
return;
447
}
448
449
$repository = $this->getRepository();
450
$repository_phid = $repository->getPHID();
451
if ($repository->isHosted()) {
452
return;
453
}
454
455
$device = AlmanacKeys::getLiveDevice();
456
$device_phid = $device->getPHID();
457
458
// NOTE: We are not holding a lock here because this method is only called
459
// from PhabricatorRepositoryDiscoveryEngine, which already holds a device
460
// lock. Even if we do race here and record an older version, the
461
// consequences are mild: we only do extra work to correct it later.
462
463
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
464
$repository_phid);
465
$versions = mpull($versions, null, 'getDevicePHID');
466
467
$this_version = idx($versions, $device_phid);
468
if ($this_version) {
469
$this_version = (int)$this_version->getRepositoryVersion();
470
} else {
471
$this_version = null;
472
}
473
474
if (($this_version === null) || ($new_version > $this_version)) {
475
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
476
$repository_phid,
477
$device_phid,
478
$new_version);
479
}
480
}
481
482
483
/**
484
* @task sync
485
*/
486
public function synchronizeWorkingCopyAfterWrite() {
487
if (!$this->shouldEnableSynchronization(true)) {
488
return;
489
}
490
491
if (!$this->clusterWriteLock) {
492
throw new Exception(
493
pht(
494
'Trying to synchronize after write, but not holding a write '.
495
'lock!'));
496
}
497
498
$repository = $this->getRepository();
499
$repository_phid = $repository->getPHID();
500
501
$device = AlmanacKeys::getLiveDevice();
502
$device_phid = $device->getPHID();
503
504
// It is possible that we've lost the global lock while receiving the push.
505
// For example, the master database may have been restarted between the
506
// time we acquired the global lock and now, when the push has finished.
507
508
// We wrote a durable lock while we were holding the the global lock,
509
// essentially upgrading our lock. We can still safely release this upgraded
510
// lock even if we're no longer holding the global lock.
511
512
// If we fail to release the lock, the repository will be frozen until
513
// an operator can figure out what happened, so we try pretty hard to
514
// reconnect to the database and release the lock.
515
516
$now = PhabricatorTime::getNow();
517
$duration = phutil_units('5 minutes in seconds');
518
$try_until = $now + $duration;
519
520
$did_release = false;
521
$already_failed = false;
522
while (PhabricatorTime::getNow() <= $try_until) {
523
try {
524
// NOTE: This means we're still bumping the version when pushes fail. We
525
// could select only un-rejected events instead to bump a little less
526
// often.
527
528
$new_log = id(new PhabricatorRepositoryPushEventQuery())
529
->setViewer(PhabricatorUser::getOmnipotentUser())
530
->withRepositoryPHIDs(array($repository_phid))
531
->setLimit(1)
532
->executeOne();
533
534
$old_version = $this->clusterWriteVersion;
535
if ($new_log) {
536
$new_version = $new_log->getID();
537
} else {
538
$new_version = $old_version;
539
}
540
541
PhabricatorRepositoryWorkingCopyVersion::didWrite(
542
$repository_phid,
543
$device_phid,
544
$this->clusterWriteVersion,
545
$new_version,
546
$this->clusterWriteOwner);
547
$did_release = true;
548
break;
549
} catch (AphrontConnectionQueryException $ex) {
550
$connection_exception = $ex;
551
} catch (AphrontConnectionLostQueryException $ex) {
552
$connection_exception = $ex;
553
}
554
555
if (!$already_failed) {
556
$already_failed = true;
557
$this->logLine(
558
pht('CRITICAL. Failed to release cluster write lock!'));
559
560
$this->logLine(
561
pht(
562
'The connection to the master database was lost while receiving '.
563
'the write.'));
564
565
$this->logLine(
566
pht(
567
'This process will spend %s more second(s) attempting to '.
568
'recover, then give up.',
569
new PhutilNumber($duration)));
570
}
571
572
sleep(1);
573
}
574
575
if ($did_release) {
576
if ($already_failed) {
577
$this->logLine(
578
pht('RECOVERED. Link to master database was restored.'));
579
}
580
$this->logLine(pht('Released cluster write lock.'));
581
} else {
582
throw new Exception(
583
pht(
584
'Failed to reconnect to master database and release held write '.
585
'lock ("%s") on device "%s" for repository "%s" after trying '.
586
'for %s seconds(s). This repository will be frozen.',
587
$this->clusterWriteOwner,
588
$device->getName(),
589
$this->getDisplayName(),
590
new PhutilNumber($duration)));
591
}
592
593
// We can continue even if we've lost this lock, everything is still
594
// consistent.
595
try {
596
$this->clusterWriteLock->unlock();
597
} catch (Exception $ex) {
598
// Ignore.
599
}
600
601
$this->clusterWriteLock = null;
602
$this->clusterWriteOwner = null;
603
}
604
605
606
/* -( Internals )---------------------------------------------------------- */
607
608
609
/**
610
* @task internal
611
*/
612
private function shouldEnableSynchronization($require_device) {
613
$repository = $this->getRepository();
614
615
$service_phid = $repository->getAlmanacServicePHID();
616
if (!$service_phid) {
617
return false;
618
}
619
620
if (!$repository->supportsSynchronization()) {
621
return false;
622
}
623
624
if ($require_device) {
625
$device = AlmanacKeys::getLiveDevice();
626
if (!$device) {
627
return false;
628
}
629
}
630
631
return true;
632
}
633
634
635
/**
636
* @task internal
637
*/
638
private function synchronizeWorkingCopyFromRemote() {
639
$repository = $this->getRepository();
640
$device = AlmanacKeys::getLiveDevice();
641
642
$local_path = $repository->getLocalPath();
643
$fetch_uri = $repository->getRemoteURIEnvelope();
644
645
if ($repository->isGit()) {
646
$this->requireWorkingCopy();
647
648
$argv = array(
649
'fetch --prune -- %P %s',
650
$fetch_uri,
651
'+refs/*:refs/*',
652
);
653
} else {
654
throw new Exception(pht('Remote sync only supported for git!'));
655
}
656
657
$future = DiffusionCommandEngine::newCommandEngine($repository)
658
->setArgv($argv)
659
->setSudoAsDaemon(true)
660
->setCredentialPHID($repository->getCredentialPHID())
661
->setURI($repository->getRemoteURIObject())
662
->newFuture();
663
664
$future->setCWD($local_path);
665
666
try {
667
$future->resolvex();
668
} catch (Exception $ex) {
669
$this->logLine(
670
pht(
671
'Synchronization of "%s" from remote failed: %s',
672
$device->getName(),
673
$ex->getMessage()));
674
throw $ex;
675
}
676
}
677
678
679
/**
680
* @task internal
681
*/
682
private function synchronizeWorkingCopyFromDevices(
683
array $device_phids,
684
$local_version,
685
$remote_version) {
686
687
$repository = $this->getRepository();
688
689
$service = $repository->loadAlmanacService();
690
if (!$service) {
691
throw new Exception(pht('Failed to load repository cluster service.'));
692
}
693
694
$device_map = array_fuse($device_phids);
695
$bindings = $service->getActiveBindings();
696
697
$fetchable = array();
698
foreach ($bindings as $binding) {
699
// We can't fetch from nodes which don't have the newest version.
700
$device_phid = $binding->getDevicePHID();
701
if (empty($device_map[$device_phid])) {
702
continue;
703
}
704
705
// TODO: For now, only fetch over SSH. We could support fetching over
706
// HTTP eventually.
707
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
708
continue;
709
}
710
711
$fetchable[] = $binding;
712
}
713
714
if (!$fetchable) {
715
throw new Exception(
716
pht(
717
'Leader lost: no up-to-date nodes in repository cluster are '.
718
'fetchable.'));
719
}
720
721
// If we can synchronize from multiple sources, choose one at random.
722
shuffle($fetchable);
723
724
$caught = null;
725
foreach ($fetchable as $binding) {
726
try {
727
$this->synchronizeWorkingCopyFromBinding(
728
$binding,
729
$local_version,
730
$remote_version);
731
$caught = null;
732
break;
733
} catch (Exception $ex) {
734
$caught = $ex;
735
}
736
}
737
738
if ($caught) {
739
throw $caught;
740
}
741
}
742
743
744
/**
745
* @task internal
746
*/
747
private function synchronizeWorkingCopyFromBinding(
748
AlmanacBinding $binding,
749
$local_version,
750
$remote_version) {
751
752
$repository = $this->getRepository();
753
$device = AlmanacKeys::getLiveDevice();
754
755
$this->logLine(
756
pht(
757
'Synchronizing this device ("%s") from cluster leader ("%s").',
758
$device->getName(),
759
$binding->getDevice()->getName()));
760
761
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
762
$local_path = $repository->getLocalPath();
763
764
if ($repository->isGit()) {
765
$this->requireWorkingCopy();
766
767
$argv = array(
768
'fetch --prune -- %s %s',
769
$fetch_uri,
770
'+refs/*:refs/*',
771
);
772
} else {
773
throw new Exception(pht('Binding sync only supported for git!'));
774
}
775
776
$future = DiffusionCommandEngine::newCommandEngine($repository)
777
->setArgv($argv)
778
->setConnectAsDevice(true)
779
->setSudoAsDaemon(true)
780
->setURI($fetch_uri)
781
->newFuture();
782
783
$future->setCWD($local_path);
784
785
$log = PhabricatorRepositorySyncEvent::initializeNewEvent()
786
->setRepositoryPHID($repository->getPHID())
787
->setEpoch(PhabricatorTime::getNow())
788
->setDevicePHID($device->getPHID())
789
->setFromDevicePHID($binding->getDevice()->getPHID())
790
->setDeviceVersion($local_version)
791
->setFromDeviceVersion($remote_version);
792
793
$sync_start = microtime(true);
794
795
try {
796
$future->resolvex();
797
} catch (Exception $ex) {
798
$log->setSyncWait(phutil_microseconds_since($sync_start));
799
800
if ($ex instanceof CommandException) {
801
if ($future->getWasKilledByTimeout()) {
802
$result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT;
803
} else {
804
$result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR;
805
}
806
807
$log
808
->setResultCode($ex->getError())
809
->setResultType($result_type)
810
->setProperty('stdout', $ex->getStdout())
811
->setProperty('stderr', $ex->getStderr());
812
} else {
813
$log
814
->setResultCode(1)
815
->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION)
816
->setProperty('message', $ex->getMessage());
817
}
818
819
$log->save();
820
821
$this->logLine(
822
pht(
823
'Synchronization of "%s" from leader "%s" failed: %s',
824
$device->getName(),
825
$binding->getDevice()->getName(),
826
$ex->getMessage()));
827
828
throw $ex;
829
}
830
831
$log
832
->setSyncWait(phutil_microseconds_since($sync_start))
833
->setResultCode(0)
834
->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC)
835
->save();
836
}
837
838
839
/**
840
* @task internal
841
*/
842
private function logLine($message) {
843
return $this->logText("# {$message}\n");
844
}
845
846
847
/**
848
* @task internal
849
*/
850
private function logText($message) {
851
$log = $this->logger;
852
if ($log) {
853
$log->writeClusterEngineLogMessage($message);
854
}
855
return $this;
856
}
857
858
private function requireWorkingCopy() {
859
$repository = $this->getRepository();
860
$local_path = $repository->getLocalPath();
861
862
if (!Filesystem::pathExists($local_path)) {
863
$device = AlmanacKeys::getLiveDevice();
864
865
throw new Exception(
866
pht(
867
'Repository "%s" does not have a working copy on this device '.
868
'yet, so it can not be synchronized. Wait for the daemons to '.
869
'construct one or run `bin/repository update %s` on this host '.
870
'("%s") to build it explicitly.',
871
$repository->getDisplayName(),
872
$repository->getMonogram(),
873
$device->getName()));
874
}
875
}
876
877
private function logActiveWriter(
878
PhabricatorUser $viewer,
879
PhabricatorRepository $repository) {
880
881
$writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter(
882
$repository->getPHID());
883
if (!$writer) {
884
$this->logLine(pht('Waiting on another user to finish writing...'));
885
return;
886
}
887
888
$user_phid = $writer->getWriteProperty('userPHID');
889
$device_phid = $writer->getWriteProperty('devicePHID');
890
$epoch = $writer->getWriteProperty('epoch');
891
892
$phids = array($user_phid, $device_phid);
893
$handles = $viewer->loadHandles($phids);
894
895
$duration = (PhabricatorTime::getNow() - $epoch) + 1;
896
897
$this->logLine(
898
pht(
899
'Waiting for %s to finish writing (on device "%s" for %ss)...',
900
$handles[$user_phid]->getName(),
901
$handles[$device_phid]->getName(),
902
new PhutilNumber($duration)));
903
}
904
905
public function newMaintenanceEvent() {
906
$viewer = $this->getViewer();
907
$repository = $this->getRepository();
908
$now = PhabricatorTime::getNow();
909
910
$event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer)
911
->setRepositoryPHID($repository->getPHID())
912
->setEpoch($now)
913
->setPusherPHID($this->getEffectiveActingAsPHID())
914
->setRejectCode(PhabricatorRepositoryPushLog::REJECT_ACCEPT);
915
916
return $event;
917
}
918
919
public function newMaintenanceLog() {
920
$viewer = $this->getViewer();
921
$repository = $this->getRepository();
922
$now = PhabricatorTime::getNow();
923
924
$device = AlmanacKeys::getLiveDevice();
925
if ($device) {
926
$device_phid = $device->getPHID();
927
} else {
928
$device_phid = null;
929
}
930
931
return PhabricatorRepositoryPushLog::initializeNewLog($viewer)
932
->setDevicePHID($device_phid)
933
->setRepositoryPHID($repository->getPHID())
934
->attachRepository($repository)
935
->setEpoch($now)
936
->setPusherPHID($this->getEffectiveActingAsPHID())
937
->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_MAINTENANCE)
938
->setRefType(PhabricatorRepositoryPushLog::REFTYPE_MAINTENANCE)
939
->setRefNew('');
940
}
941
942
}
943
944