Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AndrewVSutherland
GitHub Repository: AndrewVSutherland/lmfdb
Path: blob/main/scripts/abvar/fq_isog.py
1128 views
1
#!/projects/sage/sage-7.3/local/bin/python
2
# -*- coding: utf-8 -*-
3
r""" Import abelian variety isogeny class data.
4
5
The main import function is do_import_yaml. It requires creating
6
a yaml file with like
7
8
B_ready_to_upload:
9
- 2:
10
- 1
11
- 2
12
- 3
13
- 4
14
- 5
15
- 6
16
- 3:
17
- 1
18
- 2
19
- 3
20
- 4
21
- 5:
22
- 1
23
- 2
24
- 3
25
- 4
26
27
This file will coordinate uploading among multiple processes.
28
As processes begin uploading, they will claim g and q, changing
29
the yaml file, and eventually marking them as completed.
30
"""
31
32
33
import os
34
import sys, time, datetime
35
import shutil
36
import re
37
import json
38
from subprocess import Popen
39
from itertools import izip_longest
40
from math import sqrt
41
42
#mypath = os.path.realpath(__file__)
43
#while os.path.basename(mypath) != 'lmfdb':
44
# mypath = os.path.dirname(mypath)
45
## now move up one more time...
46
#mypath = os.path.dirname(mypath)
47
#sys.path.append(mypath)
48
49
from pymongo.mongo_client import MongoClient
50
import yaml
51
52
## Main importing function
53
54
def do_import(ll, db, saving):
55
"""
56
INPUT:
57
58
- ``ll`` -- a list of 19 entries, consisting of the data to be uploaded to the database.
59
- ``db`` -- an authenticated connection to the abvar.fq collection.
60
- ``saving`` -- a boolean: whether to actually perform the upsert.
61
"""
62
label, g, q, polynomial, angle_numbers, angle_ranks, p_rank, slopes, A_counts, C_counts, known_jacobian, principally_polarizable, decomposition, brauer_invariants, places, primitive_models, number_field, galois_n, galois_t = ll
63
mykeys = ['label', 'g', 'q', 'polynomial', 'angle_numbers', 'angle_ranks', 'p_rank', 'slopes', 'A_counts', 'C_counts', 'known_jacobian', 'principally_polarizable', 'decomposition', 'brauer_invariants', 'places', 'primitive_models', 'number_field', 'galois_n', 'galois_t']
64
data = {}
65
for key, val in zip(mykeys, ll):
66
data[key] = val
67
if saving:
68
db.update({'label': label} , {"$set": data}, upsert=True)
69
else:
70
print(data)
71
72
73
class lock_yaml():
74
"""
75
An object preventing simultaneous access to the yaml file
76
using a coarse grained lock file.
77
"""
78
def __init__(self, rootdir):
79
self.lock_file = os.path.join(rootdir, 'yaml_lock')
80
def __enter__(self):
81
while True:
82
if os.path.exists(self.lock_file):
83
time.sleep(0.1)
84
else:
85
with open(self.lock_file, 'w') as F:
86
F.write('\n')
87
break
88
def __exit__(self, typ, value, tb):
89
os.unlink(self.lock_file)
90
91
92
def do_import_one(g, q, db, status_file, datadir):
93
"""
94
Imports all of the data from a single file.
95
96
Intermediate progress is reported and stored in a file,
97
so that uploading can be halted and resumed as desired.
98
99
The file ``weil-all-g6-q2.txt`` (for example) must
100
exist and contain json encoded lines with the relevant
101
data (see the lmfdb inventory for more details on
102
the data types).
103
104
INPUT:
105
106
- ``g`` -- the dimension of the isogeny class.
107
- ``q`` -- the cardinality of the base field.
108
- ``db`` -- an authenticated connection to the abvar.fq collection.
109
- ``status_file`` -- the path to the yaml file coordinating the uploading.
110
- ``datadir`` -- the folder where data is contained.
111
"""
112
progress_file = os.path.join(datadir, 'weil-ultmp-g%s-q%s.txt'%(g, q))
113
all_file = os.path.join(datadir, 'weil-all-g%s-q%s.txt'%(g, q))
114
saving=True
115
with open(all_file) as Fall:
116
for num_lines, line in enumerate(Fall,1):
117
pass
118
# Now num_lines is the number of lines in Fall
119
with open(all_file) as Fall:
120
with open(progress_file, 'a+') as Fwrite:
121
with open(progress_file) as Fprog:
122
print_next_time = time.time()
123
sum_of_times = float(0)
124
sum_of_squares = float(0)
125
start_line = None
126
malformed = False
127
for cur_line, (line_all, line_prog) in enumerate(izip_longest(Fall, Fprog, fillvalue=None), 1):
128
if line_prog is not None:
129
if line_all[2:line_all.find(',')-1] != line_prog.strip():
130
if malformed:
131
raise RuntimeError("Multiple malformed lines")
132
malformed = True
133
if not line_all[2:line_all.find(',')-1].startswith(line_prog.strip()):
134
raise RuntimeError("Label mismatch")
135
if cur_line % 1000 == 0:
136
print("Skipping previously uploaded (%s/%s)"%(cur_line, num_lines))
137
continue
138
if start_line is None:
139
start_line = cur_line
140
data = json.loads(line_all.strip())
141
t = time.time()
142
do_import(data, db, saving)
143
t = time.time()- t
144
sum_of_times += t
145
sum_of_squares += t**2
146
if time.time() > print_next_time:
147
print_next_time = time.time() + 15
148
to_print = "Uploaded (g=%s, q=%s) %s/%s."%(g, q, cur_line, num_lines)
149
if cur_line - start_line > 10:
150
elapsed_lines = cur_line - start_line + 1
151
scaling = float(num_lines - elapsed_lines - start_line + 1) / elapsed_lines
152
sigma = sqrt(sum_of_squares - sum_of_times**2 / elapsed_lines)
153
lower_bound = max(0, sum_of_times*scaling - 2*sigma*sqrt(scaling))
154
upper_bound = sum_of_times*scaling + 2*sigma*sqrt(scaling)
155
lower_bound = datetime.timedelta(seconds=int(lower_bound))
156
upper_bound = datetime.timedelta(seconds=int(upper_bound))
157
to_print += " %s to %s remaining."%(lower_bound, upper_bound)
158
print(to_print)
159
Fwrite.write(data[0] + '\n')
160
os.unlink(progress_file)
161
print("Upload (g=%s, q=%s) finished."%(g, q))
162
sys.stdout.flush()
163
with lock_yaml():
164
with open(status_file) as F:
165
status = yaml.load(F)
166
pid = os.getpid()
167
in_progress = status.get('D_uploading_%s'%(pid), [])
168
for qq, L in in_progress.items():
169
if q != qq:
170
continue
171
try:
172
L.remove(g)
173
except ValueError:
174
raise RuntimeError("g not found")
175
if not L:
176
del in_progress[qq]
177
break
178
else:
179
raise RuntimeError("q not found")
180
done = status['A_uploaded']
181
qfound = False
182
for D in done:
183
for qq, L in D.items():
184
if q != qq:
185
continue
186
L.append(g)
187
qfound = True
188
break
189
if qfound:
190
break
191
else:
192
done.append({q: [g]})
193
with open(status_file + '.tmp%s'%pid, 'w') as F:
194
yaml.dump(status, F)
195
shutil.move(status_file + '.tmp%s'%pid, status_file)
196
197
def authenticated_db(port=37010, rootdir=None):
198
"""
199
Create a database connection to the abvar database,
200
authenticated by the passwords yaml file in rootdir.
201
"""
202
rootdir = rootdir or os.path.expanduser('~')
203
C = MongoClient(port=port)
204
with open(os.path.join(rootdir, "passwords.yaml")) as pw_file:
205
pw_dict = yaml.load(pw_file)
206
username = pw_dict['data']['username']
207
password = pw_dict['data']['password']
208
C['abvar'].authenticate(username, password)
209
return C.abvar
210
211
def do_import_yaml(port=None, status_file=None, rootdir=None, datadir=None, reset=False):
212
"""
213
This function is designed to allow multiple processes to upload data, controlled
214
by a single yaml file.
215
216
INPUT:
217
218
- ``port`` -- an int, the port to connect to the database.
219
- ``status_file`` -- the path to the yaml file controlling the uploading.
220
- ``rootdir`` -- Folder in which to create various temporary files, and
221
which is assumed to contain status.yaml if status_file is
222
not specified. Defaults to the user's home directory.
223
- ``datadir`` -- Folder containing the data to be uploaded. Defaults to
224
rootdir/root-unitary/data
225
- ``reset`` -- Boolean. If True, will reset status.yaml so that everything
226
not yet finished will be marked as ready-to-begin (rather than in-progress)
227
This function will then immediately return.
228
"""
229
rootdir = rootdir or os.path.expanduser('~')
230
status_file = status_file or os.path.join(rootdir, 'status.yaml')
231
if reset:
232
with lock_yaml():
233
with open(status_file) as F:
234
status = yaml.load(F)
235
ready = status.get('B_ready_to_upload', [])
236
for label, val in status.items():
237
if label.startswith('D_uploading_'):
238
ready.append(val)
239
del status[label]
240
ready.sort(key = lambda D: D.keys()[0])
241
status['B_ready_to_upload'] = ready
242
with open(status_file + '.tmpreset', 'w') as F:
243
yaml.dump(status, F)
244
shutil.move(status_file + '.tmpreset', status_file)
245
print("Status file reset")
246
return
247
port_file = os.path.join(rootdir, 'curport')
248
if port is None:
249
with lock_yaml():
250
if not os.path.exists(port_file):
251
port = 37010
252
else:
253
with open(port_file) as F:
254
for line in F:
255
port = int(line.strip()) + 1
256
break
257
with open(port_file, 'w') as F:
258
F.write(str(port))
259
datadir = datadir or os.path.join(rootdir, 'root-unitary', 'data')
260
pid = os.getpid()
261
port_forwarder = Popen(["ssh", "-o", "TCPKeepAlive=yes", "-o", "ServerAliveInterval=50", "-C", "-N", "-L", "%s:localhost:37010"%port, "[email protected]"])
262
try:
263
db = authenticated_db(port, rootdir).fq_isog
264
db.create_index('g')
265
db.create_index('q')
266
db.create_index('label')
267
db.create_index('polynomial')
268
db.create_index('p_rank')
269
db.create_index('slopes')
270
db.create_index('A_counts')
271
db.create_index('C_counts')
272
db.create_index('known_jacobian')
273
db.create_index('principally_polarizable')
274
db.create_index('decomposition')
275
print("finished indices")
276
277
while True:
278
with lock_yaml():
279
with open(status_file) as F:
280
status = yaml.load(F)
281
in_progress = status.get('D_uploading_%s'%(pid), {})
282
if not in_progress:
283
ready = status.get('B_ready_to_upload', [])
284
if not ready:
285
print("No more data to upload")
286
break
287
in_progress = status['D_uploading_%s'%(pid)] = ready.pop(0)
288
with open(status_file + '.tmp%s'%pid, 'w') as F:
289
yaml.dump(status, F)
290
shutil.move(status_file + '.tmp%s'%pid, status_file)
291
for q, L in in_progress.items():
292
for g in L:
293
do_import_one(g, q, db, status_file, datadir)
294
finally:
295
port_forwarder.kill()
296
297
def update_stats(port = 37010, datadir = None):
298
"""
299
Updates the fq_isog.stats collection.
300
301
INPUT:
302
303
- ``port`` -- the port on which to open a connection to the database. Defaults to 37010.
304
- ``datadir`` -- the directory containing the data. Defaults to $HOME/root-unitary/data
305
"""
306
datadir = datadir or os.path.join(os.path.expanduser('~'), 'root-unitary', 'data')
307
port_forwarder = Popen(["ssh", "-o", "TCPKeepAlive=yes", "-o", "ServerAliveInterval=50", "-C", "-N", "-L", "%s:localhost:37010"%port, "[email protected]"])
308
allmatcher = re.compile(r"weil-all-g(\d+)-q(\d+).txt")
309
try:
310
C = authenticated_db(port)
311
db = C.fq_isog
312
stats_collection = C.fq_isog.stats
313
qs = db.distinct('q')
314
gs = db.distinct('g')
315
maxg = max(gs)
316
counts = {}
317
mismatches = []
318
for q in qs:
319
counts[str(q)] = [0]
320
found_zero = False
321
for g in range(1, maxg+1):
322
print("Counting g=%s, q=%s..."%(g, q),)
323
try:
324
n = db.find({'g': g, 'q': q}).count()
325
except KeyError:
326
n = 0
327
filename = os.path.join(datadir, 'weil-all-g%s-q%s.txt'%(g, q))
328
num_lines = None
329
if os.path.exists(filename):
330
with open(filename) as F:
331
for num_lines, line in enumerate(F, 1):
332
pass
333
if n == 0:
334
found_zero = True
335
if num_lines:
336
print("File not uploaded!")
337
mismatches.append((g, q, 0, num_lines))
338
else:
339
print("OK.")
340
else:
341
if found_zero:
342
print("Nonzero count after zero count!", end=" ")
343
mismatches.append((g, q, None, None))
344
if num_lines:
345
with open(filename) as F:
346
for num_lines, line in enumerate(F, 1):
347
pass
348
if num_lines == n:
349
print("OK.")
350
counts[str(q)].append(n)
351
else:
352
print("Count mismatch!")
353
mismatches.append((g, q, n, num_lines))
354
else:
355
print("Extra data in database!")
356
mismatches.append((g, q, n, None))
357
print("Checking for missing files....")
358
missing = False
359
for filename in os.listdir(datadir):
360
match = allmatcher.match(filename)
361
if match:
362
g, q = map(int, match.groups())
363
if g not in gs or q not in qs:
364
print(filename, "not uploaded!")
365
missing = True
366
mismatches.append((g, q, None, -1))
367
if not missing:
368
print("No files missing.")
369
if mismatches:
370
print("There were errors:")
371
for g, q, n, num_lines in mismatches:
372
print("g=%s, q=%s,"%(g, q),)
373
if n is None and num_lines is None:
374
print("nonzero count after zero count.")
375
elif n == 0:
376
print("file not uploaded.")
377
elif num_lines is None:
378
print("extra data in database.")
379
else:
380
print("mismatched count (%s in database, %s in file)"%(n, num_lines))
381
else:
382
stats_collection.update({'label': 'counts'} , {"$set": {'label': 'counts', 'counts': counts}}, upsert=True)
383
print("Counts updated!")
384
finally:
385
port_forwarder.kill()
386
387
def label_progress(filename, label):
388
"""
389
Utility function to look for a label within a file.
390
"""
391
found = None
392
counter = 0
393
startcheck = '["%s"'%label
394
with open(filename) as F:
395
for line in F.readlines():
396
counter += 1
397
if line.startswith(startcheck):
398
found = counter
399
if found is None:
400
print("Label %s not found" % label)
401
else:
402
print("Label %s is at %s/%s"%(label, found, counter))
403
404
#if __name__ == '__main__':
405
# do_import_yaml()
406
407