Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/csv.py
12 views
1
2
"""
3
csv.py - read/write/investigate CSV files
4
"""
5
6
import re
7
import types
8
from _csv import Error, __version__, writer, reader, register_dialect, \
9
unregister_dialect, get_dialect, list_dialects, \
10
field_size_limit, \
11
QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
12
QUOTE_STRINGS, QUOTE_NOTNULL, \
13
__doc__
14
from _csv import Dialect as _Dialect
15
16
from io import StringIO
17
18
__all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
19
"QUOTE_STRINGS", "QUOTE_NOTNULL",
20
"Error", "Dialect", "__doc__", "excel", "excel_tab",
21
"field_size_limit", "reader", "writer",
22
"register_dialect", "get_dialect", "list_dialects", "Sniffer",
23
"unregister_dialect", "__version__", "DictReader", "DictWriter",
24
"unix_dialect"]
25
26
class Dialect:
27
"""Describe a CSV dialect.
28
29
This must be subclassed (see csv.excel). Valid attributes are:
30
delimiter, quotechar, escapechar, doublequote, skipinitialspace,
31
lineterminator, quoting.
32
33
"""
34
_name = ""
35
_valid = False
36
# placeholders
37
delimiter = None
38
quotechar = None
39
escapechar = None
40
doublequote = None
41
skipinitialspace = None
42
lineterminator = None
43
quoting = None
44
45
def __init__(self):
46
if self.__class__ != Dialect:
47
self._valid = True
48
self._validate()
49
50
def _validate(self):
51
try:
52
_Dialect(self)
53
except TypeError as e:
54
# We do this for compatibility with py2.3
55
raise Error(str(e))
56
57
class excel(Dialect):
58
"""Describe the usual properties of Excel-generated CSV files."""
59
delimiter = ','
60
quotechar = '"'
61
doublequote = True
62
skipinitialspace = False
63
lineterminator = '\r\n'
64
quoting = QUOTE_MINIMAL
65
register_dialect("excel", excel)
66
67
class excel_tab(excel):
68
"""Describe the usual properties of Excel-generated TAB-delimited files."""
69
delimiter = '\t'
70
register_dialect("excel-tab", excel_tab)
71
72
class unix_dialect(Dialect):
73
"""Describe the usual properties of Unix-generated CSV files."""
74
delimiter = ','
75
quotechar = '"'
76
doublequote = True
77
skipinitialspace = False
78
lineterminator = '\n'
79
quoting = QUOTE_ALL
80
register_dialect("unix", unix_dialect)
81
82
83
class DictReader:
84
def __init__(self, f, fieldnames=None, restkey=None, restval=None,
85
dialect="excel", *args, **kwds):
86
if fieldnames is not None and iter(fieldnames) is fieldnames:
87
fieldnames = list(fieldnames)
88
self._fieldnames = fieldnames # list of keys for the dict
89
self.restkey = restkey # key to catch long rows
90
self.restval = restval # default value for short rows
91
self.reader = reader(f, dialect, *args, **kwds)
92
self.dialect = dialect
93
self.line_num = 0
94
95
def __iter__(self):
96
return self
97
98
@property
99
def fieldnames(self):
100
if self._fieldnames is None:
101
try:
102
self._fieldnames = next(self.reader)
103
except StopIteration:
104
pass
105
self.line_num = self.reader.line_num
106
return self._fieldnames
107
108
@fieldnames.setter
109
def fieldnames(self, value):
110
self._fieldnames = value
111
112
def __next__(self):
113
if self.line_num == 0:
114
# Used only for its side effect.
115
self.fieldnames
116
row = next(self.reader)
117
self.line_num = self.reader.line_num
118
119
# unlike the basic reader, we prefer not to return blanks,
120
# because we will typically wind up with a dict full of None
121
# values
122
while row == []:
123
row = next(self.reader)
124
d = dict(zip(self.fieldnames, row))
125
lf = len(self.fieldnames)
126
lr = len(row)
127
if lf < lr:
128
d[self.restkey] = row[lf:]
129
elif lf > lr:
130
for key in self.fieldnames[lr:]:
131
d[key] = self.restval
132
return d
133
134
__class_getitem__ = classmethod(types.GenericAlias)
135
136
137
class DictWriter:
138
def __init__(self, f, fieldnames, restval="", extrasaction="raise",
139
dialect="excel", *args, **kwds):
140
if fieldnames is not None and iter(fieldnames) is fieldnames:
141
fieldnames = list(fieldnames)
142
self.fieldnames = fieldnames # list of keys for the dict
143
self.restval = restval # for writing short dicts
144
extrasaction = extrasaction.lower()
145
if extrasaction not in ("raise", "ignore"):
146
raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
147
% extrasaction)
148
self.extrasaction = extrasaction
149
self.writer = writer(f, dialect, *args, **kwds)
150
151
def writeheader(self):
152
header = dict(zip(self.fieldnames, self.fieldnames))
153
return self.writerow(header)
154
155
def _dict_to_list(self, rowdict):
156
if self.extrasaction == "raise":
157
wrong_fields = rowdict.keys() - self.fieldnames
158
if wrong_fields:
159
raise ValueError("dict contains fields not in fieldnames: "
160
+ ", ".join([repr(x) for x in wrong_fields]))
161
return (rowdict.get(key, self.restval) for key in self.fieldnames)
162
163
def writerow(self, rowdict):
164
return self.writer.writerow(self._dict_to_list(rowdict))
165
166
def writerows(self, rowdicts):
167
return self.writer.writerows(map(self._dict_to_list, rowdicts))
168
169
__class_getitem__ = classmethod(types.GenericAlias)
170
171
172
class Sniffer:
173
'''
174
"Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
175
Returns a Dialect object.
176
'''
177
def __init__(self):
178
# in case there is more than one possible delimiter
179
self.preferred = [',', '\t', ';', ' ', ':']
180
181
182
def sniff(self, sample, delimiters=None):
183
"""
184
Returns a dialect (or None) corresponding to the sample
185
"""
186
187
quotechar, doublequote, delimiter, skipinitialspace = \
188
self._guess_quote_and_delimiter(sample, delimiters)
189
if not delimiter:
190
delimiter, skipinitialspace = self._guess_delimiter(sample,
191
delimiters)
192
193
if not delimiter:
194
raise Error("Could not determine delimiter")
195
196
class dialect(Dialect):
197
_name = "sniffed"
198
lineterminator = '\r\n'
199
quoting = QUOTE_MINIMAL
200
# escapechar = ''
201
202
dialect.doublequote = doublequote
203
dialect.delimiter = delimiter
204
# _csv.reader won't accept a quotechar of ''
205
dialect.quotechar = quotechar or '"'
206
dialect.skipinitialspace = skipinitialspace
207
208
return dialect
209
210
211
def _guess_quote_and_delimiter(self, data, delimiters):
212
"""
213
Looks for text enclosed between two identical quotes
214
(the probable quotechar) which are preceded and followed
215
by the same character (the probable delimiter).
216
For example:
217
,'some text',
218
The quote with the most wins, same with the delimiter.
219
If there is no quotechar the delimiter can't be determined
220
this way.
221
"""
222
223
matches = []
224
for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
225
r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?",
226
r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?"
227
r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space)
228
regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
229
matches = regexp.findall(data)
230
if matches:
231
break
232
233
if not matches:
234
# (quotechar, doublequote, delimiter, skipinitialspace)
235
return ('', False, None, 0)
236
quotes = {}
237
delims = {}
238
spaces = 0
239
groupindex = regexp.groupindex
240
for m in matches:
241
n = groupindex['quote'] - 1
242
key = m[n]
243
if key:
244
quotes[key] = quotes.get(key, 0) + 1
245
try:
246
n = groupindex['delim'] - 1
247
key = m[n]
248
except KeyError:
249
continue
250
if key and (delimiters is None or key in delimiters):
251
delims[key] = delims.get(key, 0) + 1
252
try:
253
n = groupindex['space'] - 1
254
except KeyError:
255
continue
256
if m[n]:
257
spaces += 1
258
259
quotechar = max(quotes, key=quotes.get)
260
261
if delims:
262
delim = max(delims, key=delims.get)
263
skipinitialspace = delims[delim] == spaces
264
if delim == '\n': # most likely a file with a single column
265
delim = ''
266
else:
267
# there is *no* delimiter, it's a single column of quoted data
268
delim = ''
269
skipinitialspace = 0
270
271
# if we see an extra quote between delimiters, we've got a
272
# double quoted format
273
dq_regexp = re.compile(
274
r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
275
{'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
276
277
278
279
if dq_regexp.search(data):
280
doublequote = True
281
else:
282
doublequote = False
283
284
return (quotechar, doublequote, delim, skipinitialspace)
285
286
287
def _guess_delimiter(self, data, delimiters):
288
"""
289
The delimiter /should/ occur the same number of times on
290
each row. However, due to malformed data, it may not. We don't want
291
an all or nothing approach, so we allow for small variations in this
292
number.
293
1) build a table of the frequency of each character on every line.
294
2) build a table of frequencies of this frequency (meta-frequency?),
295
e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,
296
7 times in 2 rows'
297
3) use the mode of the meta-frequency to determine the /expected/
298
frequency for that character
299
4) find out how often the character actually meets that goal
300
5) the character that best meets its goal is the delimiter
301
For performance reasons, the data is evaluated in chunks, so it can
302
try and evaluate the smallest portion of the data possible, evaluating
303
additional chunks as necessary.
304
"""
305
306
data = list(filter(None, data.split('\n')))
307
308
ascii = [chr(c) for c in range(127)] # 7-bit ASCII
309
310
# build frequency tables
311
chunkLength = min(10, len(data))
312
iteration = 0
313
charFrequency = {}
314
modes = {}
315
delims = {}
316
start, end = 0, chunkLength
317
while start < len(data):
318
iteration += 1
319
for line in data[start:end]:
320
for char in ascii:
321
metaFrequency = charFrequency.get(char, {})
322
# must count even if frequency is 0
323
freq = line.count(char)
324
# value is the mode
325
metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
326
charFrequency[char] = metaFrequency
327
328
for char in charFrequency.keys():
329
items = list(charFrequency[char].items())
330
if len(items) == 1 and items[0][0] == 0:
331
continue
332
# get the mode of the frequencies
333
if len(items) > 1:
334
modes[char] = max(items, key=lambda x: x[1])
335
# adjust the mode - subtract the sum of all
336
# other frequencies
337
items.remove(modes[char])
338
modes[char] = (modes[char][0], modes[char][1]
339
- sum(item[1] for item in items))
340
else:
341
modes[char] = items[0]
342
343
# build a list of possible delimiters
344
modeList = modes.items()
345
total = float(min(chunkLength * iteration, len(data)))
346
# (rows of consistent data) / (number of rows) = 100%
347
consistency = 1.0
348
# minimum consistency threshold
349
threshold = 0.9
350
while len(delims) == 0 and consistency >= threshold:
351
for k, v in modeList:
352
if v[0] > 0 and v[1] > 0:
353
if ((v[1]/total) >= consistency and
354
(delimiters is None or k in delimiters)):
355
delims[k] = v
356
consistency -= 0.01
357
358
if len(delims) == 1:
359
delim = list(delims.keys())[0]
360
skipinitialspace = (data[0].count(delim) ==
361
data[0].count("%c " % delim))
362
return (delim, skipinitialspace)
363
364
# analyze another chunkLength lines
365
start = end
366
end += chunkLength
367
368
if not delims:
369
return ('', 0)
370
371
# if there's more than one, fall back to a 'preferred' list
372
if len(delims) > 1:
373
for d in self.preferred:
374
if d in delims.keys():
375
skipinitialspace = (data[0].count(d) ==
376
data[0].count("%c " % d))
377
return (d, skipinitialspace)
378
379
# nothing else indicates a preference, pick the character that
380
# dominates(?)
381
items = [(v,k) for (k,v) in delims.items()]
382
items.sort()
383
delim = items[-1][1]
384
385
skipinitialspace = (data[0].count(delim) ==
386
data[0].count("%c " % delim))
387
return (delim, skipinitialspace)
388
389
390
def has_header(self, sample):
391
# Creates a dictionary of types of data in each column. If any
392
# column is of a single type (say, integers), *except* for the first
393
# row, then the first row is presumed to be labels. If the type
394
# can't be determined, it is assumed to be a string in which case
395
# the length of the string is the determining factor: if all of the
396
# rows except for the first are the same length, it's a header.
397
# Finally, a 'vote' is taken at the end for each column, adding or
398
# subtracting from the likelihood of the first row being a header.
399
400
rdr = reader(StringIO(sample), self.sniff(sample))
401
402
header = next(rdr) # assume first row is header
403
404
columns = len(header)
405
columnTypes = {}
406
for i in range(columns): columnTypes[i] = None
407
408
checked = 0
409
for row in rdr:
410
# arbitrary number of rows to check, to keep it sane
411
if checked > 20:
412
break
413
checked += 1
414
415
if len(row) != columns:
416
continue # skip rows that have irregular number of columns
417
418
for col in list(columnTypes.keys()):
419
thisType = complex
420
try:
421
thisType(row[col])
422
except (ValueError, OverflowError):
423
# fallback to length of string
424
thisType = len(row[col])
425
426
if thisType != columnTypes[col]:
427
if columnTypes[col] is None: # add new column type
428
columnTypes[col] = thisType
429
else:
430
# type is inconsistent, remove column from
431
# consideration
432
del columnTypes[col]
433
434
# finally, compare results against first row and "vote"
435
# on whether it's a header
436
hasHeader = 0
437
for col, colType in columnTypes.items():
438
if isinstance(colType, int): # it's a length
439
if len(header[col]) != colType:
440
hasHeader += 1
441
else:
442
hasHeader -= 1
443
else: # attempt typecast
444
try:
445
colType(header[col])
446
except (ValueError, TypeError):
447
hasHeader += 1
448
else:
449
hasHeader -= 1
450
451
return hasHeader > 0
452
453