"""Download Archives"""
import os
import logging
from . import util, formatter
log = logging.getLogger("archive")
def connect(path, prefix, format,
table=None, mode=None, pragma=None, kwdict=None, cache_key=None):
keygen = formatter.parse(prefix + format).format_map
if isinstance(path, str) and path.startswith(
("postgres://", "postgresql://")):
if mode == "memory":
cls = DownloadArchivePostgresqlMemory
else:
cls = DownloadArchivePostgresql
else:
path = util.expand_path(path)
if kwdict is not None and "{" in path:
path = formatter.parse(path).format_map(kwdict)
if mode == "memory":
cls = DownloadArchiveMemory
else:
cls = DownloadArchive
if kwdict is not None and table:
table = formatter.parse(table).format_map(kwdict)
return cls(path, keygen, table, pragma, cache_key)
def sanitize(name):
return f'''"{name.replace('"', '_')}"'''
class DownloadArchive():
_sqlite3 = None
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
if self._sqlite3 is None:
DownloadArchive._sqlite3 = __import__("sqlite3")
try:
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
except self._sqlite3.OperationalError:
os.makedirs(os.path.dirname(path))
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
con.isolation_level = None
self.keygen = keygen
self.connection = con
self.close = con.close
self.cursor = cursor = con.cursor()
self._cache_key = cache_key or "_archive_key"
table = "archive" if table is None else sanitize(table)
self._stmt_select = (
f"SELECT 1 "
f"FROM {table} "
f"WHERE entry=? "
f"LIMIT 1")
self._stmt_insert = (
f"INSERT OR IGNORE INTO {table} "
f"(entry) VALUES (?)")
if pragma:
for stmt in pragma:
cursor.execute(f"PRAGMA {stmt}")
try:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
f"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except self._sqlite3.OperationalError:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
f"(entry TEXT PRIMARY KEY)")
def add(self, kwdict):
"""Add item described by 'kwdict' to archive"""
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
self.cursor.execute(self._stmt_insert, (key,))
def check(self, kwdict):
"""Return True if the item described by 'kwdict' exists in archive"""
key = kwdict[self._cache_key] = self.keygen(kwdict)
self.cursor.execute(self._stmt_select, (key,))
return self.cursor.fetchone()
def finalize(self):
pass
class DownloadArchiveMemory(DownloadArchive):
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
DownloadArchive.__init__(
self, path, keygen, table, pragma, cache_key)
self.keys = set()
def add(self, kwdict):
self.keys.add(
kwdict.get(self._cache_key) or
self.keygen(kwdict))
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys:
return True
self.cursor.execute(self._stmt_select, (key,))
return self.cursor.fetchone()
def finalize(self):
if not self.keys:
return
cursor = self.cursor
with self.connection:
try:
cursor.execute("BEGIN")
except self._sqlite3.OperationalError:
pass
stmt = self._stmt_insert
if len(self.keys) < 100:
for key in self.keys:
cursor.execute(stmt, (key,))
else:
cursor.executemany(stmt, ((key,) for key in self.keys))
class DownloadArchivePostgresql():
_psycopg = None
def __init__(self, uri, keygen, table=None, pragma=None, cache_key=None):
if self._psycopg is None:
DownloadArchivePostgresql._psycopg = __import__("psycopg")
self.connection = con = self._psycopg.connect(uri)
self.cursor = cursor = con.cursor()
self.close = con.close
self.keygen = keygen
self._cache_key = cache_key or "_archive_key"
table = "archive" if table is None else sanitize(table)
self._stmt_select = (
f"SELECT true "
f"FROM {table} "
f"WHERE entry=%s "
f"LIMIT 1")
self._stmt_insert = (
f"INSERT INTO {table} (entry) "
f"VALUES (%s) "
f"ON CONFLICT DO NOTHING")
try:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} "
f"(entry TEXT PRIMARY KEY)")
con.commit()
except Exception as exc:
log.error("%s: %s when creating '%s' table: %s",
con, exc.__class__.__name__, table, exc)
con.rollback()
raise
def add(self, kwdict):
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
try:
self.cursor.execute(self._stmt_insert, (key,))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
try:
self.cursor.execute(self._stmt_select, (key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
pass
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
DownloadArchivePostgresql.__init__(
self, path, keygen, table, pragma, cache_key)
self.keys = set()
def add(self, kwdict):
self.keys.add(
kwdict.get(self._cache_key) or
self.keygen(kwdict))
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys:
return True
try:
self.cursor.execute(self._stmt_select, (key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
if not self.keys:
return
try:
self.cursor.executemany(
self._stmt_insert,
((key,) for key in self.keys))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entries: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()