Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/archive.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2024-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Download Archives"""
10
11
import os
12
import logging
13
from . import util, formatter
14
15
log = logging.getLogger("archive")
16
17
18
def connect(path, prefix, format,
19
table=None, mode=None, pragma=None, kwdict=None, cache_key=None):
20
keygen = formatter.parse(prefix + format).format_map
21
22
if isinstance(path, str) and path.startswith(
23
("postgres://", "postgresql://")):
24
if mode == "memory":
25
cls = DownloadArchivePostgresqlMemory
26
else:
27
cls = DownloadArchivePostgresql
28
else:
29
path = util.expand_path(path)
30
if kwdict is not None and "{" in path:
31
path = formatter.parse(path).format_map(kwdict)
32
if mode == "memory":
33
cls = DownloadArchiveMemory
34
else:
35
cls = DownloadArchive
36
37
if kwdict is not None and table:
38
table = formatter.parse(table).format_map(kwdict)
39
40
return cls(path, keygen, table, pragma, cache_key)
41
42
43
def sanitize(name):
44
return f'''"{name.replace('"', '_')}"'''
45
46
47
class DownloadArchive():
48
_sqlite3 = None
49
50
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
51
if self._sqlite3 is None:
52
DownloadArchive._sqlite3 = __import__("sqlite3")
53
54
try:
55
con = self._sqlite3.connect(
56
path, timeout=60, check_same_thread=False)
57
except self._sqlite3.OperationalError:
58
os.makedirs(os.path.dirname(path))
59
con = self._sqlite3.connect(
60
path, timeout=60, check_same_thread=False)
61
con.isolation_level = None
62
63
self.keygen = keygen
64
self.connection = con
65
self.close = con.close
66
self.cursor = cursor = con.cursor()
67
self._cache_key = cache_key or "_archive_key"
68
69
table = "archive" if table is None else sanitize(table)
70
self._stmt_select = (
71
f"SELECT 1 "
72
f"FROM {table} "
73
f"WHERE entry=? "
74
f"LIMIT 1")
75
self._stmt_insert = (
76
f"INSERT OR IGNORE INTO {table} "
77
f"(entry) VALUES (?)")
78
79
if pragma:
80
for stmt in pragma:
81
cursor.execute(f"PRAGMA {stmt}")
82
83
try:
84
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
85
f"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
86
except self._sqlite3.OperationalError:
87
# fallback for missing WITHOUT ROWID support (#553)
88
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
89
f"(entry TEXT PRIMARY KEY)")
90
91
def add(self, kwdict):
92
"""Add item described by 'kwdict' to archive"""
93
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
94
self.cursor.execute(self._stmt_insert, (key,))
95
96
def check(self, kwdict):
97
"""Return True if the item described by 'kwdict' exists in archive"""
98
key = kwdict[self._cache_key] = self.keygen(kwdict)
99
self.cursor.execute(self._stmt_select, (key,))
100
return self.cursor.fetchone()
101
102
def finalize(self):
103
pass
104
105
106
class DownloadArchiveMemory(DownloadArchive):
107
108
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
109
DownloadArchive.__init__(
110
self, path, keygen, table, pragma, cache_key)
111
self.keys = set()
112
113
def add(self, kwdict):
114
self.keys.add(
115
kwdict.get(self._cache_key) or
116
self.keygen(kwdict))
117
118
def check(self, kwdict):
119
key = kwdict[self._cache_key] = self.keygen(kwdict)
120
if key in self.keys:
121
return True
122
self.cursor.execute(self._stmt_select, (key,))
123
return self.cursor.fetchone()
124
125
def finalize(self):
126
if not self.keys:
127
return
128
129
cursor = self.cursor
130
with self.connection:
131
try:
132
cursor.execute("BEGIN")
133
except self._sqlite3.OperationalError:
134
pass
135
136
stmt = self._stmt_insert
137
if len(self.keys) < 100:
138
for key in self.keys:
139
cursor.execute(stmt, (key,))
140
else:
141
cursor.executemany(stmt, ((key,) for key in self.keys))
142
143
144
class DownloadArchivePostgresql():
145
_psycopg = None
146
147
def __init__(self, uri, keygen, table=None, pragma=None, cache_key=None):
148
if self._psycopg is None:
149
DownloadArchivePostgresql._psycopg = __import__("psycopg")
150
151
self.connection = con = self._psycopg.connect(uri)
152
self.cursor = cursor = con.cursor()
153
self.close = con.close
154
self.keygen = keygen
155
self._cache_key = cache_key or "_archive_key"
156
157
table = "archive" if table is None else sanitize(table)
158
self._stmt_select = (
159
f"SELECT true "
160
f"FROM {table} "
161
f"WHERE entry=%s "
162
f"LIMIT 1")
163
self._stmt_insert = (
164
f"INSERT INTO {table} (entry) "
165
f"VALUES (%s) "
166
f"ON CONFLICT DO NOTHING")
167
168
try:
169
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
170
f"(entry TEXT PRIMARY KEY)")
171
con.commit()
172
except Exception as exc:
173
log.error("%s: %s when creating '%s' table: %s",
174
con, exc.__class__.__name__, table, exc)
175
con.rollback()
176
raise
177
178
def add(self, kwdict):
179
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
180
try:
181
self.cursor.execute(self._stmt_insert, (key,))
182
self.connection.commit()
183
except Exception as exc:
184
log.error("%s: %s when writing entry: %s",
185
self.connection, exc.__class__.__name__, exc)
186
self.connection.rollback()
187
188
def check(self, kwdict):
189
key = kwdict[self._cache_key] = self.keygen(kwdict)
190
try:
191
self.cursor.execute(self._stmt_select, (key,))
192
return self.cursor.fetchone()
193
except Exception as exc:
194
log.error("%s: %s when checking entry: %s",
195
self.connection, exc.__class__.__name__, exc)
196
self.connection.rollback()
197
return False
198
199
def finalize(self):
200
pass
201
202
203
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
204
205
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
206
DownloadArchivePostgresql.__init__(
207
self, path, keygen, table, pragma, cache_key)
208
self.keys = set()
209
210
def add(self, kwdict):
211
self.keys.add(
212
kwdict.get(self._cache_key) or
213
self.keygen(kwdict))
214
215
def check(self, kwdict):
216
key = kwdict[self._cache_key] = self.keygen(kwdict)
217
if key in self.keys:
218
return True
219
try:
220
self.cursor.execute(self._stmt_select, (key,))
221
return self.cursor.fetchone()
222
except Exception as exc:
223
log.error("%s: %s when checking entry: %s",
224
self.connection, exc.__class__.__name__, exc)
225
self.connection.rollback()
226
return False
227
228
def finalize(self):
229
if not self.keys:
230
return
231
try:
232
self.cursor.executemany(
233
self._stmt_insert,
234
((key,) for key in self.keys))
235
self.connection.commit()
236
except Exception as exc:
237
log.error("%s: %s when writing entries: %s",
238
self.connection, exc.__class__.__name__, exc)
239
self.connection.rollback()
240
241