Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
taux1c
GitHub Repository: taux1c/onlyfans-scraper
Path: blob/main/onlyfans_scraper/utils/auth.py
961 views
1
r"""
2
_ __
3
___ _ __ | | _ _ / _| __ _ _ __ ___ ___ ___ _ __ __ _ _ __ ___ _ __
4
/ _ \ | '_ \ | || | | || |_ / _` || '_ \ / __| _____ / __| / __|| '__| / _` || '_ \ / _ \| '__|
5
| (_) || | | || || |_| || _|| (_| || | | |\__ \|_____|\__ \| (__ | | | (_| || |_) || __/| |
6
\___/ |_| |_||_| \__, ||_| \__,_||_| |_||___/ |___/ \___||_| \__,_|| .__/ \___||_|
7
|___/ |_|
8
"""
9
10
import hashlib
11
import json
12
import pathlib
13
import time
14
from urllib.parse import urlparse
15
16
import httpx
17
18
from .profiles import get_current_profile
19
from .prompts import auth_prompt, ask_make_auth_prompt
20
from ..constants import configPath, authFile, DC_EP, requestAuth
21
22
23
def read_auth():
24
make_request_auth()
25
26
profile = get_current_profile()
27
28
p = pathlib.Path.home() / configPath / profile
29
if not p.is_dir():
30
p.mkdir(parents=True, exist_ok=True)
31
32
while True:
33
try:
34
with open(p / authFile, 'r') as f:
35
auth = json.load(f)
36
break
37
except FileNotFoundError:
38
print(
39
"You don't seem to have an `auth.json` file. Please fill the following out:")
40
make_auth(p)
41
return auth
42
43
44
def edit_auth():
45
profile = get_current_profile()
46
47
p = pathlib.Path.home() / configPath / profile
48
if not p.is_dir():
49
p.mkdir(parents=True, exist_ok=True)
50
51
try:
52
with open(p / authFile, 'r') as f:
53
auth = json.load(f)
54
make_auth(p, auth)
55
56
print('Your `auth.json` file has been edited.')
57
except FileNotFoundError:
58
if ask_make_auth_prompt():
59
make_auth(p)
60
61
62
def make_auth(path, auth=None):
63
if not auth:
64
auth = {
65
'auth': {
66
'app-token': '33d57ade8c02dbc5a333db99ff9ae26a',
67
'sess': '',
68
'auth_id': '',
69
'auth_uid_': '',
70
'user_agent': '',
71
'x-bc': ''
72
}
73
}
74
75
auth['auth'].update(auth_prompt(auth['auth']))
76
77
with open(path / authFile, 'w') as f:
78
f.write(json.dumps(auth, indent=4))
79
80
81
def get_auth_id() -> str:
82
auth_id = read_auth()['auth']['auth_id']
83
return auth_id
84
85
86
def make_headers(auth):
87
headers = {
88
'accept': 'application/json, text/plain, */*',
89
'app-token': auth['auth']['app-token'],
90
'user-id': auth['auth']['auth_id'],
91
'x-bc': auth['auth']['x-bc'],
92
'referer': 'https://onlyfans.com',
93
'user-agent': auth['auth']['user_agent'],
94
}
95
return headers
96
97
98
def add_cookies(client):
99
profile = get_current_profile()
100
101
p = pathlib.Path.home() / configPath / profile
102
with open(p / authFile, 'r') as f:
103
auth = json.load(f)
104
105
domain = 'onlyfans.com'
106
107
auth_uid = 'auth_uid_{}'.format(auth['auth']['auth_id'])
108
109
client.cookies.set('sess', auth['auth']['sess'], domain=domain)
110
client.cookies.set('auth_id', auth['auth']['auth_id'], domain=domain)
111
if auth['auth']['auth_uid_']:
112
client.cookies.set(auth_uid, auth['auth']['auth_uid_'], domain=domain)
113
114
115
def create_sign(link, headers):
116
"""
117
credit: DC and hippothon
118
"""
119
content = read_request_auth()
120
121
time2 = str(round(time.time() * 1000))
122
123
path = urlparse(link).path
124
query = urlparse(link).query
125
path = path if not query else f"{path}?{query}"
126
127
static_param = content['static_param']
128
129
a = [static_param, time2, path, headers['user-id']]
130
msg = "".join(a)
131
132
message = msg.encode("utf-8")
133
hash_object = hashlib.sha1(message)
134
sha_1_sign = hash_object.hexdigest()
135
sha_1_b = sha_1_sign.encode("ascii")
136
137
checksum_indexes = content['checksum_indexes']
138
checksum_constant = content['checksum_constant']
139
checksum = sum(sha_1_b[i] for i in checksum_indexes) + checksum_constant
140
141
final_sign = content['format'].format(sha_1_sign, abs(checksum))
142
143
144
headers.update(
145
{
146
'sign': final_sign,
147
'time': time2
148
}
149
)
150
return headers
151
152
153
def read_request_auth() -> dict:
154
profile = get_current_profile()
155
p = pathlib.Path.home() / configPath / profile / requestAuth
156
with open(p, 'r') as f:
157
content = json.load(f)
158
return content
159
160
161
def make_request_auth():
162
request_auth = {
163
'static_param': '',
164
'format': '',
165
'checksum_indexes': [],
166
'checksum_constant': 0
167
}
168
169
# *values, = get_request_auth()
170
result = get_request_auth()
171
if result:
172
*values, = result
173
174
request_auth.update(zip(request_auth.keys(), values))
175
176
profile = get_current_profile()
177
178
p = pathlib.Path.home() / configPath / profile
179
if not p.is_dir():
180
p.mkdir(parents=True, exist_ok=True)
181
182
with open(p / requestAuth, 'w') as f:
183
f.write(json.dumps(request_auth, indent=4))
184
185
186
def get_request_auth():
187
with httpx.Client(http2=True) as c:
188
r = c.get(DC_EP)
189
if not r.is_error:
190
content = r.json()
191
static_param = content['static_param']
192
fmt = content['format']
193
checksum_indexes = content['checksum_indexes']
194
checksum_constant = content['checksum_constant']
195
return (static_param, fmt, checksum_indexes, checksum_constant)
196
else:
197
return []
198
199