Path: blob/master/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
12242 views
<?php12abstract class DiffusionGitSSHWorkflow3extends DiffusionSSHWorkflow4implements DiffusionRepositoryClusterEngineLogInterface {56private $engineLogProperties = array();7private $protocolLog;89private $wireProtocol;10private $ioBytesRead = 0;11private $ioBytesWritten = 0;12private $requestAttempts = 0;13private $requestFailures = 0;1415protected function writeError($message) {16// Git assumes we'll add our own newlines.17return parent::writeError($message."\n");18}1920public function writeClusterEngineLogMessage($message) {21parent::writeError($message);22$this->getErrorChannel()->update();23}2425public function writeClusterEngineLogProperty($key, $value) {26$this->engineLogProperties[$key] = $value;27}2829protected function getClusterEngineLogProperty($key, $default = null) {30return idx($this->engineLogProperties, $key, $default);31}3233protected function identifyRepository() {34$args = $this->getArgs();35$path = head($args->getArg('dir'));36return $this->loadRepositoryWithPath(37$path,38PhabricatorRepositoryType::REPOSITORY_TYPE_GIT);39}4041protected function waitForGitClient() {42$io_channel = $this->getIOChannel();4344// If we don't wait for the client to close the connection, `git` will45// consider it an early abort and fail. Sit around until Git is comfortable46// that it really received all the data.47while ($io_channel->isOpenForReading()) {48$io_channel->update();49$this->getErrorChannel()->flush();50PhutilChannel::waitForAny(array($io_channel));51}52}5354protected function raiseWrongVCSException(55PhabricatorRepository $repository) {56throw new Exception(57pht(58'This repository ("%s") is not a Git repository. Use "%s" to '.59'interact with this repository.',60$repository->getDisplayName(),61$repository->getVersionControlSystem()));62}6364protected function newPassthruCommand() {65return parent::newPassthruCommand()66->setWillWriteCallback(array($this, 'willWriteMessageCallback'))67->setWillReadCallback(array($this, 'willReadMessageCallback'));68}6970protected function newProtocolLog($is_proxy) {71if ($is_proxy) {72return null;73}7475// While developing, do this to write a full protocol log to disk:76//77// return new PhabricatorProtocolLog('/tmp/git-protocol.log');7879return null;80}8182final protected function getProtocolLog() {83return $this->protocolLog;84}8586final protected function setProtocolLog(PhabricatorProtocolLog $log) {87$this->protocolLog = $log;88}8990final protected function getWireProtocol() {91return $this->wireProtocol;92}9394final protected function setWireProtocol(95DiffusionGitWireProtocol $protocol) {96$this->wireProtocol = $protocol;97return $this;98}99100public function willWriteMessageCallback(101PhabricatorSSHPassthruCommand $command,102$message) {103104$this->ioBytesWritten += strlen($message);105106$log = $this->getProtocolLog();107if ($log) {108$log->didWriteBytes($message);109}110111$protocol = $this->getWireProtocol();112if ($protocol) {113$message = $protocol->willWriteBytes($message);114}115116return $message;117}118119public function willReadMessageCallback(120PhabricatorSSHPassthruCommand $command,121$message) {122123$log = $this->getProtocolLog();124if ($log) {125$log->didReadBytes($message);126}127128$protocol = $this->getWireProtocol();129if ($protocol) {130$message = $protocol->willReadBytes($message);131}132133// Note that bytes aren't counted until they're emittted by the protocol134// layer. This means the underlying command might emit bytes, but if they135// are buffered by the protocol layer they won't count as read bytes yet.136137$this->ioBytesRead += strlen($message);138139return $message;140}141142final protected function getIOBytesRead() {143return $this->ioBytesRead;144}145146final protected function getIOBytesWritten() {147return $this->ioBytesWritten;148}149150final protected function executeRepositoryProxyOperations($for_write) {151$device = AlmanacKeys::getLiveDevice();152153$refs = $this->getAlmanacServiceRefs($for_write);154$err = 1;155156while (true) {157$ref = head($refs);158159$command = $this->getProxyCommandForServiceRef($ref);160161if ($device) {162$this->writeClusterEngineLogMessage(163pht(164"# Request received by \"%s\", forwarding to cluster ".165"host \"%s\".\n",166$device->getName(),167$ref->getDeviceName()));168}169170$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);171172$future = id(new ExecFuture('%C', $command))173->setEnv($this->getEnvironment());174175$this->didBeginRequest();176177$err = $this->newPassthruCommand()178->setIOChannel($this->getIOChannel())179->setCommandChannelFromExecFuture($future)180->execute();181182// TODO: Currently, when proxying, we do not write an event log on the183// proxy. Perhaps we should write a "proxy log". This is not very useful184// for statistics or auditing, but could be useful for diagnostics.185// Marking the proxy logs as proxied (and recording devicePHID on all186// logs) would make differentiating between these use cases easier.187188if (!$err) {189$this->waitForGitClient();190return $err;191}192193// Throw away this service: the request failed and we're treating the194// failure as persistent, so we don't want to retry another request to195// the same host.196array_shift($refs);197198$should_retry = $this->shouldRetryRequest($refs);199if (!$should_retry) {200return $err;201}202203// If we haven't bailed out yet, we'll retry the request with the next204// service.205}206207throw new Exception(pht('Reached an unreachable place.'));208}209210private function didBeginRequest() {211$this->requestAttempts++;212return $this;213}214215private function shouldRetryRequest(array $remaining_refs) {216$this->requestFailures++;217218if ($this->requestFailures > $this->requestAttempts) {219throw new Exception(220pht(221"Workflow has recorded more failures than attempts; there is a ".222"missing call to \"didBeginRequest()\".\n"));223}224225if (!$remaining_refs) {226$this->writeClusterEngineLogMessage(227pht(228"# All available services failed to serve the request, ".229"giving up.\n"));230return false;231}232233$read_len = $this->getIOBytesRead();234if ($read_len) {235$this->writeClusterEngineLogMessage(236pht(237"# Client already read from service (%s bytes), unable to retry.\n",238new PhutilNumber($read_len)));239return false;240}241242$write_len = $this->getIOBytesWritten();243if ($write_len) {244$this->writeClusterEngineLogMessage(245pht(246"# Client already wrote to service (%s bytes), unable to retry.\n",247new PhutilNumber($write_len)));248return false;249}250251$this->writeClusterEngineLogMessage(252pht(253"# Service request failed, retrying (making attempt %s of %s).\n",254new PhutilNumber($this->requestAttempts + 1),255new PhutilNumber($this->requestAttempts + count($remaining_refs))));256257return true;258}259260}261262263