Path: blob/master/elisp/emacs-for-python/rope-dist/rope/base/fscommands.py
1432 views
"""Project file system commands.12This modules implements file system operations used by rope. Different3version control systems can be supported by implementing the interface4provided by `FileSystemCommands` class. See `SubversionCommands` and5`MercurialCommands` for example.67"""8import os9import shutil10import subprocess111213def create_fscommands(root):14dirlist = os.listdir(root)15commands = {'.hg': MercurialCommands,16'.svn': SubversionCommands,17'.git': GITCommands,18'_svn': SubversionCommands,19'_darcs': DarcsCommands}20for key in commands:21if key in dirlist:22try:23return commands[key](root)24except (ImportError, OSError):25pass26return FileSystemCommands()272829class FileSystemCommands(object):3031def create_file(self, path):32open(path, 'w').close()3334def create_folder(self, path):35os.mkdir(path)3637def move(self, path, new_location):38shutil.move(path, new_location)3940def remove(self, path):41if os.path.isfile(path):42os.remove(path)43else:44shutil.rmtree(path)4546def write(self, path, data):47file_ = open(path, 'wb')48try:49file_.write(data)50finally:51file_.close()525354class SubversionCommands(object):5556def __init__(self, *args):57self.normal_actions = FileSystemCommands()58import pysvn59self.client = pysvn.Client()6061def create_file(self, path):62self.normal_actions.create_file(path)63self.client.add(path, force=True)6465def create_folder(self, path):66self.normal_actions.create_folder(path)67self.client.add(path, force=True)6869def move(self, path, new_location):70self.client.move(path, new_location, force=True)7172def remove(self, path):73self.client.remove(path, force=True)7475def write(self, path, data):76self.normal_actions.write(path, data)777879class MercurialCommands(object):8081def __init__(self, root):82self.hg = self._import_mercurial()83self.normal_actions = FileSystemCommands()84try:85self.ui = self.hg.ui.ui(86verbose=False, debug=False, quiet=True,87interactive=False, traceback=False, report_untrusted=False)88except:89self.ui = self.hg.ui.ui()90self.ui.setconfig('ui', 'interactive', 'no')91self.ui.setconfig('ui', 'debug', 'no')92self.ui.setconfig('ui', 'traceback', 'no')93self.ui.setconfig('ui', 'verbose', 'no')94self.ui.setconfig('ui', 'report_untrusted', 'no')95self.ui.setconfig('ui', 'quiet', 'yes')9697self.repo = self.hg.hg.repository(self.ui, root)9899def _import_mercurial(self):100import mercurial.commands101import mercurial.hg102import mercurial.ui103return mercurial104105def create_file(self, path):106self.normal_actions.create_file(path)107self.hg.commands.add(self.ui, self.repo, path)108109def create_folder(self, path):110self.normal_actions.create_folder(path)111112def move(self, path, new_location):113self.hg.commands.rename(self.ui, self.repo, path,114new_location, after=False)115116def remove(self, path):117self.hg.commands.remove(self.ui, self.repo, path)118119def write(self, path, data):120self.normal_actions.write(path, data)121122123class GITCommands(object):124125def __init__(self, root):126self.root = root127self._do(['version'])128self.normal_actions = FileSystemCommands()129130def create_file(self, path):131self.normal_actions.create_file(path)132self._do(['add', self._in_dir(path)])133134def create_folder(self, path):135self.normal_actions.create_folder(path)136137def move(self, path, new_location):138self._do(['mv', self._in_dir(path), self._in_dir(new_location)])139140def remove(self, path):141self._do(['rm', self._in_dir(path)])142143def write(self, path, data):144# XXX: should we use ``git add``?145self.normal_actions.write(path, data)146147def _do(self, args):148_execute(['git'] + args, cwd=self.root)149150def _in_dir(self, path):151if path.startswith(self.root):152return path[len(self.root) + 1:]153return self.root154155156class DarcsCommands(object):157158def __init__(self, root):159self.root = root160self.normal_actions = FileSystemCommands()161162def create_file(self, path):163self.normal_actions.create_file(path)164self._do(['add', path])165166def create_folder(self, path):167self.normal_actions.create_folder(path)168self._do(['add', path])169170def move(self, path, new_location):171self._do(['mv', path, new_location])172173def remove(self, path):174self.normal_actions.remove(path)175176def write(self, path, data):177self.normal_actions.write(path, data)178179def _do(self, args):180_execute(['darcs'] + args, cwd=self.root)181182183def _execute(args, cwd=None):184process = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE)185process.wait()186return process.returncode187188189def unicode_to_file_data(contents, encoding=None):190if not isinstance(contents, unicode):191return contents192if encoding is None:193encoding = read_str_coding(contents)194if encoding is not None:195return contents.encode(encoding)196try:197return contents.encode()198except UnicodeEncodeError:199return contents.encode('utf-8')200201def file_data_to_unicode(data, encoding=None):202result = _decode_data(data, encoding)203if '\r' in result:204result = result.replace('\r\n', '\n').replace('\r', '\n')205return result206207def _decode_data(data, encoding):208if isinstance(data, unicode):209return data210if encoding is None:211encoding = read_str_coding(data)212if encoding is None:213# there is no encoding tip, we need to guess.214# PEP263 says that "encoding not explicitly defined" means it is ascii,215# but we will use utf8 instead since utf8 fully covers ascii and btw is216# the only non-latin sane encoding.217encoding = 'utf-8'218try:219return data.decode(encoding)220except (UnicodeError, LookupError):221# fallback to latin1: it should never fail222return data.decode('latin1')223224225def read_file_coding(path):226file = open(path, 'b')227count = 0228result = []229buffsize = 10230while True:231current = file.read(10)232if not current:233break234count += current.count('\n')235result.append(current)236file.close()237return _find_coding(''.join(result))238239240def read_str_coding(source):241try:242first = source.index('\n') + 1243second = source.index('\n', first) + 1244except ValueError:245second = len(source)246return _find_coding(source[:second])247248249def _find_coding(text):250coding = 'coding'251try:252start = text.index(coding) + len(coding)253if text[start] not in '=:':254return255start += 1256while start < len(text) and text[start].isspace():257start += 1258end = start259while end < len(text):260c = text[end]261if not c.isalnum() and c not in '-_':262break263end += 1264return text[start:end]265except ValueError:266pass267268269