Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
marvel
GitHub Repository: marvel/qnf
Path: blob/master/elisp/emacs-for-python/rope-dist/rope/base/fscommands.py
1432 views
1
"""Project file system commands.
2
3
This modules implements file system operations used by rope. Different
4
version control systems can be supported by implementing the interface
5
provided by `FileSystemCommands` class. See `SubversionCommands` and
6
`MercurialCommands` for example.
7
8
"""
9
import os
10
import shutil
11
import subprocess
12
13
14
def create_fscommands(root):
15
dirlist = os.listdir(root)
16
commands = {'.hg': MercurialCommands,
17
'.svn': SubversionCommands,
18
'.git': GITCommands,
19
'_svn': SubversionCommands,
20
'_darcs': DarcsCommands}
21
for key in commands:
22
if key in dirlist:
23
try:
24
return commands[key](root)
25
except (ImportError, OSError):
26
pass
27
return FileSystemCommands()
28
29
30
class FileSystemCommands(object):
31
32
def create_file(self, path):
33
open(path, 'w').close()
34
35
def create_folder(self, path):
36
os.mkdir(path)
37
38
def move(self, path, new_location):
39
shutil.move(path, new_location)
40
41
def remove(self, path):
42
if os.path.isfile(path):
43
os.remove(path)
44
else:
45
shutil.rmtree(path)
46
47
def write(self, path, data):
48
file_ = open(path, 'wb')
49
try:
50
file_.write(data)
51
finally:
52
file_.close()
53
54
55
class SubversionCommands(object):
56
57
def __init__(self, *args):
58
self.normal_actions = FileSystemCommands()
59
import pysvn
60
self.client = pysvn.Client()
61
62
def create_file(self, path):
63
self.normal_actions.create_file(path)
64
self.client.add(path, force=True)
65
66
def create_folder(self, path):
67
self.normal_actions.create_folder(path)
68
self.client.add(path, force=True)
69
70
def move(self, path, new_location):
71
self.client.move(path, new_location, force=True)
72
73
def remove(self, path):
74
self.client.remove(path, force=True)
75
76
def write(self, path, data):
77
self.normal_actions.write(path, data)
78
79
80
class MercurialCommands(object):
81
82
def __init__(self, root):
83
self.hg = self._import_mercurial()
84
self.normal_actions = FileSystemCommands()
85
try:
86
self.ui = self.hg.ui.ui(
87
verbose=False, debug=False, quiet=True,
88
interactive=False, traceback=False, report_untrusted=False)
89
except:
90
self.ui = self.hg.ui.ui()
91
self.ui.setconfig('ui', 'interactive', 'no')
92
self.ui.setconfig('ui', 'debug', 'no')
93
self.ui.setconfig('ui', 'traceback', 'no')
94
self.ui.setconfig('ui', 'verbose', 'no')
95
self.ui.setconfig('ui', 'report_untrusted', 'no')
96
self.ui.setconfig('ui', 'quiet', 'yes')
97
98
self.repo = self.hg.hg.repository(self.ui, root)
99
100
def _import_mercurial(self):
101
import mercurial.commands
102
import mercurial.hg
103
import mercurial.ui
104
return mercurial
105
106
def create_file(self, path):
107
self.normal_actions.create_file(path)
108
self.hg.commands.add(self.ui, self.repo, path)
109
110
def create_folder(self, path):
111
self.normal_actions.create_folder(path)
112
113
def move(self, path, new_location):
114
self.hg.commands.rename(self.ui, self.repo, path,
115
new_location, after=False)
116
117
def remove(self, path):
118
self.hg.commands.remove(self.ui, self.repo, path)
119
120
def write(self, path, data):
121
self.normal_actions.write(path, data)
122
123
124
class GITCommands(object):
125
126
def __init__(self, root):
127
self.root = root
128
self._do(['version'])
129
self.normal_actions = FileSystemCommands()
130
131
def create_file(self, path):
132
self.normal_actions.create_file(path)
133
self._do(['add', self._in_dir(path)])
134
135
def create_folder(self, path):
136
self.normal_actions.create_folder(path)
137
138
def move(self, path, new_location):
139
self._do(['mv', self._in_dir(path), self._in_dir(new_location)])
140
141
def remove(self, path):
142
self._do(['rm', self._in_dir(path)])
143
144
def write(self, path, data):
145
# XXX: should we use ``git add``?
146
self.normal_actions.write(path, data)
147
148
def _do(self, args):
149
_execute(['git'] + args, cwd=self.root)
150
151
def _in_dir(self, path):
152
if path.startswith(self.root):
153
return path[len(self.root) + 1:]
154
return self.root
155
156
157
class DarcsCommands(object):
158
159
def __init__(self, root):
160
self.root = root
161
self.normal_actions = FileSystemCommands()
162
163
def create_file(self, path):
164
self.normal_actions.create_file(path)
165
self._do(['add', path])
166
167
def create_folder(self, path):
168
self.normal_actions.create_folder(path)
169
self._do(['add', path])
170
171
def move(self, path, new_location):
172
self._do(['mv', path, new_location])
173
174
def remove(self, path):
175
self.normal_actions.remove(path)
176
177
def write(self, path, data):
178
self.normal_actions.write(path, data)
179
180
def _do(self, args):
181
_execute(['darcs'] + args, cwd=self.root)
182
183
184
def _execute(args, cwd=None):
185
process = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE)
186
process.wait()
187
return process.returncode
188
189
190
def unicode_to_file_data(contents, encoding=None):
191
if not isinstance(contents, unicode):
192
return contents
193
if encoding is None:
194
encoding = read_str_coding(contents)
195
if encoding is not None:
196
return contents.encode(encoding)
197
try:
198
return contents.encode()
199
except UnicodeEncodeError:
200
return contents.encode('utf-8')
201
202
def file_data_to_unicode(data, encoding=None):
203
result = _decode_data(data, encoding)
204
if '\r' in result:
205
result = result.replace('\r\n', '\n').replace('\r', '\n')
206
return result
207
208
def _decode_data(data, encoding):
209
if isinstance(data, unicode):
210
return data
211
if encoding is None:
212
encoding = read_str_coding(data)
213
if encoding is None:
214
# there is no encoding tip, we need to guess.
215
# PEP263 says that "encoding not explicitly defined" means it is ascii,
216
# but we will use utf8 instead since utf8 fully covers ascii and btw is
217
# the only non-latin sane encoding.
218
encoding = 'utf-8'
219
try:
220
return data.decode(encoding)
221
except (UnicodeError, LookupError):
222
# fallback to latin1: it should never fail
223
return data.decode('latin1')
224
225
226
def read_file_coding(path):
227
file = open(path, 'b')
228
count = 0
229
result = []
230
buffsize = 10
231
while True:
232
current = file.read(10)
233
if not current:
234
break
235
count += current.count('\n')
236
result.append(current)
237
file.close()
238
return _find_coding(''.join(result))
239
240
241
def read_str_coding(source):
242
try:
243
first = source.index('\n') + 1
244
second = source.index('\n', first) + 1
245
except ValueError:
246
second = len(source)
247
return _find_coding(source[:second])
248
249
250
def _find_coding(text):
251
coding = 'coding'
252
try:
253
start = text.index(coding) + len(coding)
254
if text[start] not in '=:':
255
return
256
start += 1
257
while start < len(text) and text[start].isspace():
258
start += 1
259
end = start
260
while end < len(text):
261
c = text[end]
262
if not c.isalnum() and c not in '-_':
263
break
264
end += 1
265
return text[start:end]
266
except ValueError:
267
pass
268
269