Path: blob/main/onlyfans_scraper/interaction/like.py
961 views
r"""1_ __2___ _ __ | | _ _ / _| __ _ _ __ ___ ___ ___ _ __ __ _ _ __ ___ _ __3/ _ \ | '_ \ | || | | || |_ / _` || '_ \ / __| _____ / __| / __|| '__| / _` || '_ \ / _ \| '__|4| (_) || | | || || |_| || _|| (_| || | | |\__ \|_____|\__ \| (__ | | | (_| || |_) || __/| |5\___/ |_| |_||_| \__, ||_| \__,_||_| |_||___/ |___/ \___||_| \__,_|| .__/ \___||_|6|___/ |_|7"""89import random10import time11from typing import Union1213import httpx14from revolution import Revolution1516from ..api import posts17from ..constants import favoriteEP, postURL18from ..utils import auth192021def get_posts(headers, model_id):22with Revolution(desc='Getting posts...') as _:23pinned_posts = posts.scrape_pinned_posts(headers, model_id)24timeline_posts = posts.scrape_timeline_posts(headers, model_id)25archived_posts = posts.scrape_archived_posts(headers, model_id)2627return pinned_posts + timeline_posts + archived_posts282930def filter_for_unfavorited(posts: list) -> list:31unfavorited_posts = [post for post in posts if 'isFavorite' in post and not post['isFavorite']]32return unfavorited_posts333435def filter_for_favorited(posts: list) -> list:36favorited_posts = [post for post in posts if 'isFavorite' in post and post['isFavorite']]37return favorited_posts383940def get_post_ids(posts: list) -> list:41ids = [post['id'] for post in posts if 'isOpened' in post and post['isOpened']]42return ids434445def like(headers, model_id, username, ids: list):46_like(headers, model_id, username, ids, True)474849def unlike(headers, model_id, username, ids: list):50_like(headers, model_id, username, ids, False)515253def _like(headers, model_id, username, ids: list, like_action: bool):54title = "Liking" if like_action else "Unliking"55with Revolution(desc=f'{title} posts...', total=len(ids)) as rev:56for i in ids:57with httpx.Client(http2=True, headers=headers) as c:58url = favoriteEP.format(i, model_id)5960auth.add_cookies(c)61c.headers.update(auth.create_sign(url, headers))6263retries = 064while retries <= 1:65time.sleep(random.uniform(0.8, 0.9))66retries += 167try:68r = c.post(url)69if not r.is_error or r.status_code == 400:70break71else:72_handle_err(r, postURL.format(i, username))73except httpx.TransportError as e:74_handle_err(e, postURL.format(i, username))75rev.update()767778def _handle_err(param: Union[httpx.Response, httpx.TransportError], url: str) -> str:79message = 'unable to execute action'80status = ''81try:82if isinstance(param, httpx.Response):83json = param.json()84if 'error' in json and 'message' in json['error']:85message = json['error']['message']86status = f'STATUS CODE {param.status_code}: '87else:88message = str(param)89except:90pass91print(f'{status}{message}, post at {url}')929394