Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
taux1c
GitHub Repository: taux1c/onlyfans-scraper
Path: blob/main/onlyfans_scraper/db/operations.py
961 views
1
r"""
2
_ __
3
___ _ __ | | _ _ / _| __ _ _ __ ___ ___ ___ _ __ __ _ _ __ ___ _ __
4
/ _ \ | '_ \ | || | | || |_ / _` || '_ \ / __| _____ / __| / __|| '__| / _` || '_ \ / _ \| '__|
5
| (_) || | | || || |_| || _|| (_| || | | |\__ \|_____|\__ \| (__ | | | (_| || |_) || __/| |
6
\___/ |_| |_||_| \__, ||_| \__,_||_| |_||___/ |___/ \___||_| \__,_|| .__/ \___||_|
7
|___/ |_|
8
"""
9
10
import contextlib
11
import glob
12
import pathlib
13
import sqlite3
14
from itertools import chain
15
16
from ..constants import configPath, databaseFile
17
from ..utils import separate, profiles
18
19
20
def create_database(model_id, path=None):
21
profile = profiles.get_current_profile()
22
23
path = path or pathlib.Path.home() / configPath / profile / databaseFile
24
25
with contextlib.closing(sqlite3.connect(path)) as conn:
26
with contextlib.closing(conn.cursor()) as cur:
27
try:
28
model_sql = f"""
29
CREATE TABLE '{model_id}'(
30
id INTEGER PRIMARY KEY,
31
media_id INTEGER NOT NULL,
32
filename VARCHAR NOT NULL
33
);"""
34
cur.execute(model_sql)
35
except sqlite3.OperationalError:
36
pass
37
38
39
def write_from_data(data: tuple, model_id):
40
profile = profiles.get_current_profile()
41
42
database_path = pathlib.Path.home() / configPath / profile / databaseFile
43
44
with contextlib.closing(sqlite3.connect(database_path)) as conn:
45
with contextlib.closing(conn.cursor()) as cur:
46
model_insert_sql = f"""
47
INSERT INTO '{model_id}'(
48
media_id, filename
49
)
50
VALUES (?, ?);"""
51
cur.execute(model_insert_sql, data)
52
conn.commit()
53
54
55
def read_foreign_database(path) -> list:
56
database_files = glob.glob(path.strip('\'\"') + '/*.db')
57
58
database_results = []
59
for file in database_files:
60
with contextlib.closing(sqlite3.connect(file)) as conn:
61
with contextlib.closing(conn.cursor()) as cur:
62
read_sql = """SELECT media_id, filename FROM medias"""
63
cur.execute(read_sql)
64
for result in cur.fetchall():
65
database_results.append(result)
66
67
return database_results
68
69
70
def write_from_foreign_database(results: list, model_id):
71
profile = profiles.get_current_profile()
72
73
database_path = pathlib.Path.home() / configPath / profile / databaseFile
74
75
# Create the database table in case it doesn't exist:
76
create_database(model_id, database_path)
77
78
# Filter results to avoid adding duplicates to database:
79
media_ids = get_media_ids(model_id)
80
filtered_results = separate.separate_database_results_by_id(
81
results, media_ids)
82
83
# Insert results into our database:
84
with contextlib.closing(sqlite3.connect(database_path)) as conn:
85
with contextlib.closing(conn.cursor()) as cur:
86
model_insert_sql = f"""
87
INSERT INTO '{model_id}'(
88
media_id, filename
89
)
90
VALUES (?, ?);"""
91
cur.executemany(model_insert_sql, filtered_results)
92
conn.commit()
93
94
print(f'Migration complete. Migrated {len(filtered_results)} items.')
95
96
97
def get_media_ids(model_id) -> list:
98
profile = profiles.get_current_profile()
99
100
database_path = pathlib.Path.home() / configPath / profile / databaseFile
101
102
with contextlib.closing(sqlite3.connect(database_path)) as conn:
103
with contextlib.closing(conn.cursor()) as cur:
104
media_ids_sql = f"""SELECT media_id FROM '{model_id}'"""
105
cur.execute(media_ids_sql)
106
media_ids = cur.fetchall()
107
108
# A list of single elements and not iterables:
109
return list(chain.from_iterable(media_ids))
110
111