Path: blob/master/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
12242 views
<?php12/**3* Manages repository synchronization for cluster repositories.4*5* @task config Configuring Synchronization6* @task sync Cluster Synchronization7* @task internal Internals8*/9final class DiffusionRepositoryClusterEngine extends Phobject {1011private $repository;12private $viewer;13private $actingAsPHID;14private $logger;1516private $clusterWriteLock;17private $clusterWriteVersion;18private $clusterWriteOwner;192021/* -( Configuring Synchronization )---------------------------------------- */222324public function setRepository(PhabricatorRepository $repository) {25$this->repository = $repository;26return $this;27}2829public function getRepository() {30return $this->repository;31}3233public function setViewer(PhabricatorUser $viewer) {34$this->viewer = $viewer;35return $this;36}3738public function getViewer() {39return $this->viewer;40}4142public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {43$this->logger = $log;44return $this;45}4647public function setActingAsPHID($acting_as_phid) {48$this->actingAsPHID = $acting_as_phid;49return $this;50}5152public function getActingAsPHID() {53return $this->actingAsPHID;54}5556private function getEffectiveActingAsPHID() {57if ($this->actingAsPHID) {58return $this->actingAsPHID;59}6061return $this->getViewer()->getPHID();62}636465/* -( Cluster Synchronization )-------------------------------------------- */666768/**69* Synchronize repository version information after creating a repository.70*71* This initializes working copy versions for all currently bound devices to72* 0, so that we don't get stuck making an ambiguous choice about which73* devices are leaders when we later synchronize before a read.74*75* @task sync76*/77public function synchronizeWorkingCopyAfterCreation() {78if (!$this->shouldEnableSynchronization(false)) {79return;80}8182$repository = $this->getRepository();83$repository_phid = $repository->getPHID();8485$service = $repository->loadAlmanacService();86if (!$service) {87throw new Exception(pht('Failed to load repository cluster service.'));88}8990$bindings = $service->getActiveBindings();91foreach ($bindings as $binding) {92PhabricatorRepositoryWorkingCopyVersion::updateVersion(93$repository_phid,94$binding->getDevicePHID(),950);96}9798return $this;99}100101102/**103* @task sync104*/105public function synchronizeWorkingCopyAfterHostingChange() {106if (!$this->shouldEnableSynchronization(false)) {107return;108}109110$repository = $this->getRepository();111$repository_phid = $repository->getPHID();112113$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(114$repository_phid);115$versions = mpull($versions, null, 'getDevicePHID');116117// After converting a hosted repository to observed, or vice versa, we118// need to reset version numbers because the clocks for observed and hosted119// repositories run on different units.120121// We identify all the cluster leaders and reset their version to 0.122// We identify all the cluster followers and demote them.123124// This allows the cluster to start over again at version 0 but keep the125// same leaders.126127if ($versions) {128$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));129foreach ($versions as $version) {130$device_phid = $version->getDevicePHID();131132if ($version->getRepositoryVersion() == $max_version) {133PhabricatorRepositoryWorkingCopyVersion::updateVersion(134$repository_phid,135$device_phid,1360);137} else {138PhabricatorRepositoryWorkingCopyVersion::demoteDevice(139$repository_phid,140$device_phid);141}142}143}144145return $this;146}147148149/**150* @task sync151*/152public function synchronizeWorkingCopyBeforeRead() {153if (!$this->shouldEnableSynchronization(true)) {154return;155}156157$repository = $this->getRepository();158$repository_phid = $repository->getPHID();159160$device = AlmanacKeys::getLiveDevice();161$device_phid = $device->getPHID();162163$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(164$repository_phid,165$device_phid);166167$lock_wait = phutil_units('2 minutes in seconds');168169$this->logLine(170pht(171'Acquiring read lock for repository "%s" on device "%s"...',172$repository->getDisplayName(),173$device->getName()));174175try {176$start = PhabricatorTime::getNow();177$read_lock->lock($lock_wait);178$waited = (PhabricatorTime::getNow() - $start);179180if ($waited) {181$this->logLine(182pht(183'Acquired read lock after %s second(s).',184new PhutilNumber($waited)));185} else {186$this->logLine(187pht(188'Acquired read lock immediately.'));189}190} catch (PhutilLockException $ex) {191throw new PhutilProxyException(192pht(193'Failed to acquire read lock after waiting %s second(s). You '.194'may be able to retry later. (%s)',195new PhutilNumber($lock_wait),196$ex->getHint()),197$ex);198}199200$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(201$repository_phid);202$versions = mpull($versions, null, 'getDevicePHID');203204$this_version = idx($versions, $device_phid);205if ($this_version) {206$this_version = (int)$this_version->getRepositoryVersion();207} else {208$this_version = null;209}210211if ($versions) {212// This is the normal case, where we have some version information and213// can identify which nodes are leaders. If the current node is not a214// leader, we want to fetch from a leader and then update our version.215216$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));217if (($this_version === null) || ($max_version > $this_version)) {218if ($repository->isHosted()) {219$fetchable = array();220foreach ($versions as $version) {221if ($version->getRepositoryVersion() == $max_version) {222$fetchable[] = $version->getDevicePHID();223}224}225226227$this->synchronizeWorkingCopyFromDevices(228$fetchable,229$this_version,230$max_version);231} else {232$this->synchronizeWorkingCopyFromRemote();233}234235PhabricatorRepositoryWorkingCopyVersion::updateVersion(236$repository_phid,237$device_phid,238$max_version);239} else {240$this->logLine(241pht(242'Device "%s" is already a cluster leader and does not need '.243'to be synchronized.',244$device->getName()));245}246247$result_version = $max_version;248} else {249// If no version records exist yet, we need to be careful, because we250// can not tell which nodes are leaders.251252// There might be several nodes with arbitrary existing data, and we have253// no way to tell which one has the "right" data. If we pick wrong, we254// might erase some or all of the data in the repository.255256// Since this is dangerous, we refuse to guess unless there is only one257// device. If we're the only device in the group, we obviously must be258// a leader.259260$service = $repository->loadAlmanacService();261if (!$service) {262throw new Exception(pht('Failed to load repository cluster service.'));263}264265$bindings = $service->getActiveBindings();266$device_map = array();267foreach ($bindings as $binding) {268$device_map[$binding->getDevicePHID()] = true;269}270271if (count($device_map) > 1) {272throw new Exception(273pht(274'Repository "%s" exists on more than one device, but no device '.275'has any repository version information. There is no way for the '.276'software to determine which copy of the existing data is '.277'authoritative. Promote a device or see "Ambiguous Leaders" in '.278'the documentation.',279$repository->getDisplayName()));280}281282if (empty($device_map[$device->getPHID()])) {283throw new Exception(284pht(285'Repository "%s" is being synchronized on device "%s", but '.286'this device is not bound to the corresponding cluster '.287'service ("%s").',288$repository->getDisplayName(),289$device->getName(),290$service->getName()));291}292293// The current device is the only device in service, so it must be a294// leader. We can safely have any future nodes which come online read295// from it.296PhabricatorRepositoryWorkingCopyVersion::updateVersion(297$repository_phid,298$device_phid,2990);300301$result_version = 0;302}303304$read_lock->unlock();305306return $result_version;307}308309310/**311* @task sync312*/313public function synchronizeWorkingCopyBeforeWrite() {314if (!$this->shouldEnableSynchronization(true)) {315return;316}317318$repository = $this->getRepository();319$viewer = $this->getViewer();320321$repository_phid = $repository->getPHID();322323$device = AlmanacKeys::getLiveDevice();324$device_phid = $device->getPHID();325326$table = new PhabricatorRepositoryWorkingCopyVersion();327$locked_connection = $table->establishConnection('w');328329$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(330$repository_phid);331332$write_lock->setExternalConnection($locked_connection);333334$this->logLine(335pht(336'Acquiring write lock for repository "%s"...',337$repository->getDisplayName()));338339// See T13590. On the HTTP pathway, it's possible for us to hit the script340// time limit while holding the durable write lock if a user makes a big341// push. Remove the time limit before we acquire the durable lock.342set_time_limit(0);343344$lock_wait = phutil_units('2 minutes in seconds');345try {346$write_wait_start = microtime(true);347348$start = PhabricatorTime::getNow();349$step_wait = 1;350351while (true) {352try {353$write_lock->lock((int)floor($step_wait));354$write_wait_end = microtime(true);355break;356} catch (PhutilLockException $ex) {357$waited = (PhabricatorTime::getNow() - $start);358if ($waited > $lock_wait) {359throw $ex;360}361$this->logActiveWriter($viewer, $repository);362}363364// Wait a little longer before the next message we print.365$step_wait = $step_wait + 0.5;366$step_wait = min($step_wait, 3);367}368369$waited = (PhabricatorTime::getNow() - $start);370if ($waited) {371$this->logLine(372pht(373'Acquired write lock after %s second(s).',374new PhutilNumber($waited)));375} else {376$this->logLine(377pht(378'Acquired write lock immediately.'));379}380} catch (PhutilLockException $ex) {381throw new PhutilProxyException(382pht(383'Failed to acquire write lock after waiting %s second(s). You '.384'may be able to retry later. (%s)',385new PhutilNumber($lock_wait),386$ex->getHint()),387$ex);388}389390$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(391$repository_phid);392foreach ($versions as $version) {393if (!$version->getIsWriting()) {394continue;395}396397throw new Exception(398pht(399'An previous write to this repository was interrupted; refusing '.400'new writes. This issue requires operator intervention to resolve, '.401'see "Write Interruptions" in the "Cluster: Repositories" in the '.402'documentation for instructions.'));403}404405$read_wait_start = microtime(true);406try {407$max_version = $this->synchronizeWorkingCopyBeforeRead();408} catch (Exception $ex) {409$write_lock->unlock();410throw $ex;411}412$read_wait_end = microtime(true);413414$pid = getmypid();415$hash = Filesystem::readRandomCharacters(12);416$this->clusterWriteOwner = "{$pid}.{$hash}";417418PhabricatorRepositoryWorkingCopyVersion::willWrite(419$locked_connection,420$repository_phid,421$device_phid,422array(423'userPHID' => $this->getEffectiveActingAsPHID(),424'epoch' => PhabricatorTime::getNow(),425'devicePHID' => $device_phid,426),427$this->clusterWriteOwner);428429$this->clusterWriteVersion = $max_version;430$this->clusterWriteLock = $write_lock;431432$write_wait = ($write_wait_end - $write_wait_start);433$read_wait = ($read_wait_end - $read_wait_start);434435$log = $this->logger;436if ($log) {437$log->writeClusterEngineLogProperty('writeWait', $write_wait);438$log->writeClusterEngineLogProperty('readWait', $read_wait);439}440}441442443public function synchronizeWorkingCopyAfterDiscovery($new_version) {444if (!$this->shouldEnableSynchronization(true)) {445return;446}447448$repository = $this->getRepository();449$repository_phid = $repository->getPHID();450if ($repository->isHosted()) {451return;452}453454$device = AlmanacKeys::getLiveDevice();455$device_phid = $device->getPHID();456457// NOTE: We are not holding a lock here because this method is only called458// from PhabricatorRepositoryDiscoveryEngine, which already holds a device459// lock. Even if we do race here and record an older version, the460// consequences are mild: we only do extra work to correct it later.461462$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(463$repository_phid);464$versions = mpull($versions, null, 'getDevicePHID');465466$this_version = idx($versions, $device_phid);467if ($this_version) {468$this_version = (int)$this_version->getRepositoryVersion();469} else {470$this_version = null;471}472473if (($this_version === null) || ($new_version > $this_version)) {474PhabricatorRepositoryWorkingCopyVersion::updateVersion(475$repository_phid,476$device_phid,477$new_version);478}479}480481482/**483* @task sync484*/485public function synchronizeWorkingCopyAfterWrite() {486if (!$this->shouldEnableSynchronization(true)) {487return;488}489490if (!$this->clusterWriteLock) {491throw new Exception(492pht(493'Trying to synchronize after write, but not holding a write '.494'lock!'));495}496497$repository = $this->getRepository();498$repository_phid = $repository->getPHID();499500$device = AlmanacKeys::getLiveDevice();501$device_phid = $device->getPHID();502503// It is possible that we've lost the global lock while receiving the push.504// For example, the master database may have been restarted between the505// time we acquired the global lock and now, when the push has finished.506507// We wrote a durable lock while we were holding the the global lock,508// essentially upgrading our lock. We can still safely release this upgraded509// lock even if we're no longer holding the global lock.510511// If we fail to release the lock, the repository will be frozen until512// an operator can figure out what happened, so we try pretty hard to513// reconnect to the database and release the lock.514515$now = PhabricatorTime::getNow();516$duration = phutil_units('5 minutes in seconds');517$try_until = $now + $duration;518519$did_release = false;520$already_failed = false;521while (PhabricatorTime::getNow() <= $try_until) {522try {523// NOTE: This means we're still bumping the version when pushes fail. We524// could select only un-rejected events instead to bump a little less525// often.526527$new_log = id(new PhabricatorRepositoryPushEventQuery())528->setViewer(PhabricatorUser::getOmnipotentUser())529->withRepositoryPHIDs(array($repository_phid))530->setLimit(1)531->executeOne();532533$old_version = $this->clusterWriteVersion;534if ($new_log) {535$new_version = $new_log->getID();536} else {537$new_version = $old_version;538}539540PhabricatorRepositoryWorkingCopyVersion::didWrite(541$repository_phid,542$device_phid,543$this->clusterWriteVersion,544$new_version,545$this->clusterWriteOwner);546$did_release = true;547break;548} catch (AphrontConnectionQueryException $ex) {549$connection_exception = $ex;550} catch (AphrontConnectionLostQueryException $ex) {551$connection_exception = $ex;552}553554if (!$already_failed) {555$already_failed = true;556$this->logLine(557pht('CRITICAL. Failed to release cluster write lock!'));558559$this->logLine(560pht(561'The connection to the master database was lost while receiving '.562'the write.'));563564$this->logLine(565pht(566'This process will spend %s more second(s) attempting to '.567'recover, then give up.',568new PhutilNumber($duration)));569}570571sleep(1);572}573574if ($did_release) {575if ($already_failed) {576$this->logLine(577pht('RECOVERED. Link to master database was restored.'));578}579$this->logLine(pht('Released cluster write lock.'));580} else {581throw new Exception(582pht(583'Failed to reconnect to master database and release held write '.584'lock ("%s") on device "%s" for repository "%s" after trying '.585'for %s seconds(s). This repository will be frozen.',586$this->clusterWriteOwner,587$device->getName(),588$this->getDisplayName(),589new PhutilNumber($duration)));590}591592// We can continue even if we've lost this lock, everything is still593// consistent.594try {595$this->clusterWriteLock->unlock();596} catch (Exception $ex) {597// Ignore.598}599600$this->clusterWriteLock = null;601$this->clusterWriteOwner = null;602}603604605/* -( Internals )---------------------------------------------------------- */606607608/**609* @task internal610*/611private function shouldEnableSynchronization($require_device) {612$repository = $this->getRepository();613614$service_phid = $repository->getAlmanacServicePHID();615if (!$service_phid) {616return false;617}618619if (!$repository->supportsSynchronization()) {620return false;621}622623if ($require_device) {624$device = AlmanacKeys::getLiveDevice();625if (!$device) {626return false;627}628}629630return true;631}632633634/**635* @task internal636*/637private function synchronizeWorkingCopyFromRemote() {638$repository = $this->getRepository();639$device = AlmanacKeys::getLiveDevice();640641$local_path = $repository->getLocalPath();642$fetch_uri = $repository->getRemoteURIEnvelope();643644if ($repository->isGit()) {645$this->requireWorkingCopy();646647$argv = array(648'fetch --prune -- %P %s',649$fetch_uri,650'+refs/*:refs/*',651);652} else {653throw new Exception(pht('Remote sync only supported for git!'));654}655656$future = DiffusionCommandEngine::newCommandEngine($repository)657->setArgv($argv)658->setSudoAsDaemon(true)659->setCredentialPHID($repository->getCredentialPHID())660->setURI($repository->getRemoteURIObject())661->newFuture();662663$future->setCWD($local_path);664665try {666$future->resolvex();667} catch (Exception $ex) {668$this->logLine(669pht(670'Synchronization of "%s" from remote failed: %s',671$device->getName(),672$ex->getMessage()));673throw $ex;674}675}676677678/**679* @task internal680*/681private function synchronizeWorkingCopyFromDevices(682array $device_phids,683$local_version,684$remote_version) {685686$repository = $this->getRepository();687688$service = $repository->loadAlmanacService();689if (!$service) {690throw new Exception(pht('Failed to load repository cluster service.'));691}692693$device_map = array_fuse($device_phids);694$bindings = $service->getActiveBindings();695696$fetchable = array();697foreach ($bindings as $binding) {698// We can't fetch from nodes which don't have the newest version.699$device_phid = $binding->getDevicePHID();700if (empty($device_map[$device_phid])) {701continue;702}703704// TODO: For now, only fetch over SSH. We could support fetching over705// HTTP eventually.706if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {707continue;708}709710$fetchable[] = $binding;711}712713if (!$fetchable) {714throw new Exception(715pht(716'Leader lost: no up-to-date nodes in repository cluster are '.717'fetchable.'));718}719720// If we can synchronize from multiple sources, choose one at random.721shuffle($fetchable);722723$caught = null;724foreach ($fetchable as $binding) {725try {726$this->synchronizeWorkingCopyFromBinding(727$binding,728$local_version,729$remote_version);730$caught = null;731break;732} catch (Exception $ex) {733$caught = $ex;734}735}736737if ($caught) {738throw $caught;739}740}741742743/**744* @task internal745*/746private function synchronizeWorkingCopyFromBinding(747AlmanacBinding $binding,748$local_version,749$remote_version) {750751$repository = $this->getRepository();752$device = AlmanacKeys::getLiveDevice();753754$this->logLine(755pht(756'Synchronizing this device ("%s") from cluster leader ("%s").',757$device->getName(),758$binding->getDevice()->getName()));759760$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);761$local_path = $repository->getLocalPath();762763if ($repository->isGit()) {764$this->requireWorkingCopy();765766$argv = array(767'fetch --prune -- %s %s',768$fetch_uri,769'+refs/*:refs/*',770);771} else {772throw new Exception(pht('Binding sync only supported for git!'));773}774775$future = DiffusionCommandEngine::newCommandEngine($repository)776->setArgv($argv)777->setConnectAsDevice(true)778->setSudoAsDaemon(true)779->setURI($fetch_uri)780->newFuture();781782$future->setCWD($local_path);783784$log = PhabricatorRepositorySyncEvent::initializeNewEvent()785->setRepositoryPHID($repository->getPHID())786->setEpoch(PhabricatorTime::getNow())787->setDevicePHID($device->getPHID())788->setFromDevicePHID($binding->getDevice()->getPHID())789->setDeviceVersion($local_version)790->setFromDeviceVersion($remote_version);791792$sync_start = microtime(true);793794try {795$future->resolvex();796} catch (Exception $ex) {797$log->setSyncWait(phutil_microseconds_since($sync_start));798799if ($ex instanceof CommandException) {800if ($future->getWasKilledByTimeout()) {801$result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT;802} else {803$result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR;804}805806$log807->setResultCode($ex->getError())808->setResultType($result_type)809->setProperty('stdout', $ex->getStdout())810->setProperty('stderr', $ex->getStderr());811} else {812$log813->setResultCode(1)814->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION)815->setProperty('message', $ex->getMessage());816}817818$log->save();819820$this->logLine(821pht(822'Synchronization of "%s" from leader "%s" failed: %s',823$device->getName(),824$binding->getDevice()->getName(),825$ex->getMessage()));826827throw $ex;828}829830$log831->setSyncWait(phutil_microseconds_since($sync_start))832->setResultCode(0)833->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC)834->save();835}836837838/**839* @task internal840*/841private function logLine($message) {842return $this->logText("# {$message}\n");843}844845846/**847* @task internal848*/849private function logText($message) {850$log = $this->logger;851if ($log) {852$log->writeClusterEngineLogMessage($message);853}854return $this;855}856857private function requireWorkingCopy() {858$repository = $this->getRepository();859$local_path = $repository->getLocalPath();860861if (!Filesystem::pathExists($local_path)) {862$device = AlmanacKeys::getLiveDevice();863864throw new Exception(865pht(866'Repository "%s" does not have a working copy on this device '.867'yet, so it can not be synchronized. Wait for the daemons to '.868'construct one or run `bin/repository update %s` on this host '.869'("%s") to build it explicitly.',870$repository->getDisplayName(),871$repository->getMonogram(),872$device->getName()));873}874}875876private function logActiveWriter(877PhabricatorUser $viewer,878PhabricatorRepository $repository) {879880$writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter(881$repository->getPHID());882if (!$writer) {883$this->logLine(pht('Waiting on another user to finish writing...'));884return;885}886887$user_phid = $writer->getWriteProperty('userPHID');888$device_phid = $writer->getWriteProperty('devicePHID');889$epoch = $writer->getWriteProperty('epoch');890891$phids = array($user_phid, $device_phid);892$handles = $viewer->loadHandles($phids);893894$duration = (PhabricatorTime::getNow() - $epoch) + 1;895896$this->logLine(897pht(898'Waiting for %s to finish writing (on device "%s" for %ss)...',899$handles[$user_phid]->getName(),900$handles[$device_phid]->getName(),901new PhutilNumber($duration)));902}903904public function newMaintenanceEvent() {905$viewer = $this->getViewer();906$repository = $this->getRepository();907$now = PhabricatorTime::getNow();908909$event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer)910->setRepositoryPHID($repository->getPHID())911->setEpoch($now)912->setPusherPHID($this->getEffectiveActingAsPHID())913->setRejectCode(PhabricatorRepositoryPushLog::REJECT_ACCEPT);914915return $event;916}917918public function newMaintenanceLog() {919$viewer = $this->getViewer();920$repository = $this->getRepository();921$now = PhabricatorTime::getNow();922923$device = AlmanacKeys::getLiveDevice();924if ($device) {925$device_phid = $device->getPHID();926} else {927$device_phid = null;928}929930return PhabricatorRepositoryPushLog::initializeNewLog($viewer)931->setDevicePHID($device_phid)932->setRepositoryPHID($repository->getPHID())933->attachRepository($repository)934->setEpoch($now)935->setPusherPHID($this->getEffectiveActingAsPHID())936->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_MAINTENANCE)937->setRefType(PhabricatorRepositoryPushLog::REFTYPE_MAINTENANCE)938->setRefNew('');939}940941}942943944