Path: blob/main/onlyfans_scraper/db/operations.py
961 views
r"""1_ __2___ _ __ | | _ _ / _| __ _ _ __ ___ ___ ___ _ __ __ _ _ __ ___ _ __3/ _ \ | '_ \ | || | | || |_ / _` || '_ \ / __| _____ / __| / __|| '__| / _` || '_ \ / _ \| '__|4| (_) || | | || || |_| || _|| (_| || | | |\__ \|_____|\__ \| (__ | | | (_| || |_) || __/| |5\___/ |_| |_||_| \__, ||_| \__,_||_| |_||___/ |___/ \___||_| \__,_|| .__/ \___||_|6|___/ |_|7"""89import contextlib10import glob11import pathlib12import sqlite313from itertools import chain1415from ..constants import configPath, databaseFile16from ..utils import separate, profiles171819def create_database(model_id, path=None):20profile = profiles.get_current_profile()2122path = path or pathlib.Path.home() / configPath / profile / databaseFile2324with contextlib.closing(sqlite3.connect(path)) as conn:25with contextlib.closing(conn.cursor()) as cur:26try:27model_sql = f"""28CREATE TABLE '{model_id}'(29id INTEGER PRIMARY KEY,30media_id INTEGER NOT NULL,31filename VARCHAR NOT NULL32);"""33cur.execute(model_sql)34except sqlite3.OperationalError:35pass363738def write_from_data(data: tuple, model_id):39profile = profiles.get_current_profile()4041database_path = pathlib.Path.home() / configPath / profile / databaseFile4243with contextlib.closing(sqlite3.connect(database_path)) as conn:44with contextlib.closing(conn.cursor()) as cur:45model_insert_sql = f"""46INSERT INTO '{model_id}'(47media_id, filename48)49VALUES (?, ?);"""50cur.execute(model_insert_sql, data)51conn.commit()525354def read_foreign_database(path) -> list:55database_files = glob.glob(path.strip('\'\"') + '/*.db')5657database_results = []58for file in database_files:59with contextlib.closing(sqlite3.connect(file)) as conn:60with contextlib.closing(conn.cursor()) as cur:61read_sql = """SELECT media_id, filename FROM medias"""62cur.execute(read_sql)63for result in cur.fetchall():64database_results.append(result)6566return database_results676869def write_from_foreign_database(results: list, model_id):70profile = profiles.get_current_profile()7172database_path = pathlib.Path.home() / configPath / profile / databaseFile7374# Create the database table in case it doesn't exist:75create_database(model_id, database_path)7677# Filter results to avoid adding duplicates to database:78media_ids = get_media_ids(model_id)79filtered_results = separate.separate_database_results_by_id(80results, media_ids)8182# Insert results into our database:83with contextlib.closing(sqlite3.connect(database_path)) as conn:84with contextlib.closing(conn.cursor()) as cur:85model_insert_sql = f"""86INSERT INTO '{model_id}'(87media_id, filename88)89VALUES (?, ?);"""90cur.executemany(model_insert_sql, filtered_results)91conn.commit()9293print(f'Migration complete. Migrated {len(filtered_results)} items.')949596def get_media_ids(model_id) -> list:97profile = profiles.get_current_profile()9899database_path = pathlib.Path.home() / configPath / profile / databaseFile100101with contextlib.closing(sqlite3.connect(database_path)) as conn:102with contextlib.closing(conn.cursor()) as cur:103media_ids_sql = f"""SELECT media_id FROM '{model_id}'"""104cur.execute(media_ids_sql)105media_ids = cur.fetchall()106107# A list of single elements and not iterables:108return list(chain.from_iterable(media_ids))109110111