Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
taux1c
GitHub Repository: taux1c/onlyfans-scraper
Path: blob/main/onlyfans_scraper/scraper.py
801 views
1
r"""
2
_ __
3
___ _ __ | | _ _ / _| __ _ _ __ ___ ___ ___ _ __ __ _ _ __ ___ _ __
4
/ _ \ | '_ \ | || | | || |_ / _` || '_ \ / __| _____ / __| / __|| '__| / _` || '_ \ / _ \| '__|
5
| (_) || | | || || |_| || _|| (_| || | | |\__ \|_____|\__ \| (__ | | | (_| || |_) || __/| |
6
\___/ |_| |_||_| \__, ||_| \__,_||_| |_||___/ |___/ \___||_| \__,_|| .__/ \___||_|
7
|___/ |_|
8
"""
9
10
import argparse
11
import asyncio
12
import os
13
import sys
14
import platform
15
from time import sleep
16
17
from .api import init, highlights, me, messages, posts, profile, subscriptions, paid
18
from .db import operations
19
from .interaction import like
20
from .utils import auth, config, download, profiles, prompts
21
from revolution import Revolution
22
from .utils.nap import nap_or_sleep
23
from onlyfans_scraper.api.lists import get_list
24
25
26
# @need_revolution("Getting messages...")
27
@Revolution(desc='Getting messages...')
28
def process_messages(headers, model_id):
29
messages_ = messages.scrape_messages(headers, model_id)
30
31
if messages_:
32
messages_urls = messages.parse_messages(messages_, model_id)
33
return messages_urls
34
return []
35
36
# @need_revolution("Getting highlights...")
37
@Revolution(desc='Getting highlights...')
38
def process_highlights(headers, model_id):
39
highlights_, stories = highlights.scrape_highlights(headers, model_id)
40
41
if highlights_ or stories:
42
highlights_ids = highlights.parse_highlights(highlights_)
43
stories += asyncio.run(
44
highlights.process_highlights_ids(headers, highlights_ids))
45
stories_urls = highlights.parse_stories(stories)
46
return stories_urls
47
return []
48
49
# @need_revolution("Getting subscriptions...")
50
@Revolution(desc='Getting archived media...')
51
def process_archived_posts(headers, model_id):
52
archived_posts = posts.scrape_archived_posts(headers, model_id)
53
54
if archived_posts:
55
archived_posts_urls = posts.parse_posts(archived_posts)
56
return archived_posts_urls
57
return []
58
59
# @need_revolution("Getting timeline media...")
60
@Revolution(desc='Getting timeline media...')
61
def process_timeline_posts(headers, model_id):
62
timeline_posts = posts.scrape_timeline_posts(headers, model_id)
63
64
if timeline_posts:
65
timeline_posts_urls = posts.parse_posts(timeline_posts)
66
return timeline_posts_urls
67
return []
68
69
# @need_revolution("Getting pinned media...")
70
@Revolution(desc='Getting pinned media...')
71
def process_pinned_posts(headers, model_id):
72
pinned_posts = posts.scrape_pinned_posts(headers, model_id)
73
74
if pinned_posts:
75
pinned_posts_urls = posts.parse_posts(pinned_posts)
76
return pinned_posts_urls
77
return []
78
79
80
def process_profile(headers, username) -> list:
81
user_profile = profile.scrape_profile(headers, username)
82
urls, info = profile.parse_profile(user_profile)
83
profile.print_profile_info(info)
84
return urls
85
86
87
def process_areas_all(headers, username, model_id) -> list:
88
profile_urls = process_profile(headers, username)
89
90
pinned_posts_urls = process_pinned_posts(headers, model_id)
91
timeline_posts_urls = process_timeline_posts(headers, model_id)
92
archived_posts_urls = process_archived_posts(headers, model_id)
93
highlights_urls = process_highlights(headers, model_id)
94
messages_urls = process_messages(headers, model_id)
95
96
combined_urls = profile_urls + pinned_posts_urls + timeline_posts_urls + \
97
archived_posts_urls + highlights_urls + messages_urls
98
99
return combined_urls
100
101
102
def process_areas(headers, username, model_id) -> list:
103
result_areas_prompt = prompts.areas_prompt()
104
105
if 'All' in result_areas_prompt:
106
combined_urls = process_areas_all(headers, username, model_id)
107
108
else:
109
pinned_posts_urls = []
110
timeline_posts_urls = []
111
archived_posts_urls = []
112
highlights_urls = []
113
messages_urls = []
114
115
profile_urls = process_profile(headers, username)
116
117
if 'Timeline' in result_areas_prompt:
118
pinned_posts_urls = process_pinned_posts(headers, model_id)
119
timeline_posts_urls = process_timeline_posts(headers, model_id)
120
121
if 'Archived' in result_areas_prompt:
122
archived_posts_urls = process_archived_posts(headers, model_id)
123
124
if 'Highlights' in result_areas_prompt:
125
highlights_urls = process_highlights(headers, model_id)
126
127
if 'Messages' in result_areas_prompt:
128
messages_urls = process_messages(headers, model_id)
129
130
combined_urls = profile_urls + pinned_posts_urls + timeline_posts_urls + \
131
archived_posts_urls + highlights_urls + messages_urls
132
133
return combined_urls
134
135
136
def do_download_content(headers, username, model_id, ignore_prompt=False):
137
# If we should ignore the process_areas prompt:
138
if ignore_prompt:
139
combined_urls = process_areas_all(headers, username, model_id)
140
# Otherwise, display the prompt to the user
141
else:
142
combined_urls = process_areas(headers, username, model_id)
143
# If we shouldn't ignore the areas prompt:
144
145
asyncio.run(download.process_urls(
146
headers,
147
username,
148
model_id,
149
combined_urls))
150
151
152
def do_database_migration(path, model_id):
153
results = operations.read_foreign_database(path)
154
operations.write_from_foreign_database(results, model_id)
155
156
157
def get_usernames(parsed_subscriptions: list) -> list:
158
usernames = [sub[0] for sub in parsed_subscriptions]
159
return usernames
160
161
162
def get_model(parsed_subscriptions: list) -> tuple:
163
"""
164
Prints user's subscriptions to console and accepts input from user corresponding
165
to the model whose content they would like to scrape.
166
"""
167
subscriptions.print_subscriptions(parsed_subscriptions)
168
169
print('\nEnter the number next to the user whose content you would like to download:')
170
while True:
171
try:
172
num = int(input('> '))
173
return parsed_subscriptions[num - 1]
174
except ValueError:
175
print("Incorrect value. Please enter an actual number.")
176
except IndexError:
177
print("Value out of range. Please pick a number that's in range")
178
179
180
def get_models(headers, subscribe_count) -> list:
181
"""
182
Get user's subscriptions in form of a list.
183
"""
184
with Revolution(desc='Getting your subscriptions (this may take awhile)...') as _:
185
list_subscriptions = asyncio.run(
186
subscriptions.get_subscriptions(headers, subscribe_count))
187
parsed_subscriptions = subscriptions.parse_subscriptions(
188
list_subscriptions)
189
return parsed_subscriptions
190
191
192
def process_me(headers):
193
my_profile = me.scrape_user(headers)
194
name, username, subscribe_count = me.parse_user(my_profile)
195
me.print_user(name, username)
196
return subscribe_count
197
198
199
def process_prompts():
200
loop = process_prompts
201
202
profiles.print_current_profile()
203
headers = auth.make_headers(auth.read_auth())
204
init.print_sign_status(headers)
205
206
result_main_prompt = prompts.main_prompt()
207
208
if result_main_prompt == 0:
209
# Download content from user
210
result_username_or_list_prompt = prompts.username_or_list_prompt()
211
212
# Print a list of users:
213
if result_username_or_list_prompt == 0:
214
subscribe_count = process_me(headers)
215
parsed_subscriptions = get_models(headers, subscribe_count)
216
username, model_id, *_ = get_model(parsed_subscriptions)
217
218
do_download_content(headers, username, model_id)
219
220
# Ask for a username to be entered:
221
elif result_username_or_list_prompt == 1:
222
username = prompts.username_prompt()
223
model_id = profile.get_id(headers, username)
224
225
do_download_content(headers, username, model_id)
226
227
else:
228
# Ask if we should scrape all users
229
result_verify_all_users = prompts.verify_all_users_username_or_list_prompt()
230
# If we should, then:
231
if result_verify_all_users:
232
subscribe_count = process_me(headers)
233
parsed_subscriptions = get_models(headers, subscribe_count)
234
usernames = get_usernames(parsed_subscriptions)
235
236
for username in usernames:
237
try:
238
model_id = profile.get_id(headers, username)
239
do_download_content(
240
headers, username, model_id, ignore_prompt=True)
241
except Exception as e:
242
print(f"There was an error with profile {username}.\nWe encountered the following exception: \n\n{e}")
243
244
elif result_main_prompt == 1:
245
# Like a user's posts
246
username = prompts.username_prompt()
247
model_id = profile.get_id(headers, username)
248
249
posts = like.get_posts(headers, model_id)
250
unfavorited_posts = like.filter_for_unfavorited(posts)
251
post_ids = like.get_post_ids(unfavorited_posts)
252
like.like(headers, model_id, username, post_ids)
253
254
elif result_main_prompt == 2:
255
# Unlike a user's posts
256
username = prompts.username_prompt()
257
model_id = profile.get_id(headers, username)
258
259
posts = like.get_posts(headers, model_id)
260
favorited_posts = like.filter_for_favorited(posts)
261
post_ids = like.get_post_ids(favorited_posts)
262
like.unlike(headers, model_id, username, post_ids)
263
264
elif result_main_prompt == 3:
265
# Migrate from old database
266
path, username = prompts.database_prompt()
267
model_id = profile.get_id(headers, username)
268
do_database_migration(path, model_id)
269
270
loop()
271
272
elif result_main_prompt == 4:
273
# Edit `auth.json` file
274
auth.edit_auth()
275
276
loop()
277
278
elif result_main_prompt == 5:
279
# Edit `config.json` file
280
config.edit_config()
281
282
loop()
283
elif result_main_prompt == 6:
284
paid_content = paid.scrape_paid()
285
paid.download_paid(paid_content)
286
287
elif result_main_prompt == 7:
288
# Display `Profiles` menu
289
result_profiles_prompt = prompts.profiles_prompt()
290
print('This feature has been retired.')
291
elif result_main_prompt == 8:
292
export_file = input('Enter a name for the output excel file: ')
293
get_list(export_file = export_file)
294
295
296
if result_profiles_prompt == 0:
297
# Change profiles
298
profiles.change_profile()
299
300
if result_profiles_prompt == 1:
301
# Edit a profile
302
profiles_ = profiles.get_profiles()
303
304
old_profile_name = prompts.edit_profiles_prompt(profiles_)
305
new_profile_name = prompts.new_name_edit_profiles_prompt(
306
old_profile_name)
307
308
profiles.edit_profile_name(old_profile_name, new_profile_name)
309
310
elif result_profiles_prompt == 2:
311
# Create a new profile
312
profile_path = profiles.get_profile_path()
313
profile_name = prompts.create_profiles_prompt()
314
315
profiles.create_profile(profile_path, profile_name)
316
317
elif result_profiles_prompt == 3:
318
# Delete a profile
319
profiles.delete_profile()
320
321
elif result_profiles_prompt == 4:
322
# View profiles
323
profiles.print_profiles()
324
325
loop()
326
327
def download_user(username):
328
headers = auth.make_headers(auth.read_auth())
329
do_download_content(headers, username, profile.get_id(headers, username), ignore_prompt=True)
330
331
332
333
def silent_run():
334
headers = auth.make_headers(auth.read_auth())
335
336
try:
337
resp = me.scrape_user(headers)
338
except Exception as e:
339
print("Silent run failed with exception: ", e)
340
return
341
subscribe_count = process_me(headers)
342
parsed_subscriptions = get_models(headers, subscribe_count)
343
usernames = get_usernames(parsed_subscriptions)
344
paid_content = paid.scrape_paid()
345
paid.download_paid(paid_content)
346
347
for username in usernames:
348
try:
349
model_id = profile.get_id(headers, username)
350
do_download_content(
351
headers, username, model_id, ignore_prompt=True)
352
except Exception as e:
353
print("Silent run failed with exception: ", e)
354
355
356
def daemon():
357
358
while True:
359
# Trying vs running allows the daemon to recover from errors and try again later.
360
try:
361
silent_run()
362
except Exception as e:
363
print("Daemon failed with exception: ", e)
364
finally:
365
s = nap_or_sleep()
366
sleep(s)
367
368
369
370
371
def main():
372
if platform.system == 'Windows':
373
os.system('color')
374
# try:
375
# webbrowser.open(donateEP)
376
# except:
377
# pass
378
379
380
parser = argparse.ArgumentParser()
381
parser.add_argument(
382
'-e', '--edit', help='view or edit your current auth', action='store_true')
383
parser.add_argument(
384
'-u', '--username', help="Download content from a user or list of users (name,name2)"
385
)
386
parser.add_argument(
387
'-a', '--all', help='scrape the content of all users', action='store_true')
388
parser.add_argument(
389
'-d', '--daemon', help='This will run the program in the background and scrape everything from everyone. It will run untill manually killed.', action='store_true'
390
)
391
parser.add_argument(
392
'-p', '--purchased', help = 'Download only individually purchased content.', action = 'store_true'
393
)
394
args = parser.parse_args()
395
if args.edit:
396
pass
397
if args.username:
398
usernames = args.username
399
for username in usernames.strip().split(','):
400
download_user(username)
401
sys.exit()
402
if args.all:
403
silent = True
404
silent_run()
405
sys.exit()
406
if args.daemon:
407
daemon()
408
if args.purchased:
409
paid_content = paid.scrape_paid()
410
paid.download_paid(paid_content)
411
sys.exit()
412
413
414
415
try:
416
process_prompts()
417
except KeyboardInterrupt:
418
sys.exit(1)
419
420
421
if __name__ == '__main__':
422
main()
423
424