Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/tgtg/tgtg_client.py
1494 views
1
# Copied and modified from https://github.com/ahivert/tgtg-python
2
3
import html
4
import json
5
import logging
6
import random
7
import re
8
import threading
9
import time
10
import uuid
11
from datetime import datetime
12
from http import HTTPStatus
13
from http.server import BaseHTTPRequestHandler, HTTPServer
14
from urllib.parse import parse_qs, urljoin, urlparse, urlsplit
15
16
import requests
17
from requests.adapters import HTTPAdapter
18
from urllib3.util import Retry
19
20
from tgtg_scanner.errors import (
21
TgtgAPIError,
22
TgtgConfigurationError,
23
TgtgLoginError,
24
TgtgPollingError,
25
)
26
27
log = logging.getLogger("tgtg")
28
BASE_URL = "https://apptoogoodtogo.com/api/"
29
API_ITEM_ENDPOINT = "item/v8/"
30
FAVORITE_ITEM_ENDPOINT = "user/favorite/v1/{}/update"
31
AUTH_BY_EMAIL_ENDPOINT = "auth/v5/authByEmail"
32
AUTH_BY_REQUEST_PIN_ENDPOINT = "auth/v5/authByRequestPin"
33
AUTH_POLLING_ENDPOINT = "auth/v5/authByRequestPollingId"
34
SIGNUP_BY_EMAIL_ENDPOINT = "auth/v5/signUpByEmail"
35
REFRESH_ENDPOINT = "token/v1/refresh"
36
ACTIVE_ORDER_ENDPOINT = "order/v8/active"
37
INACTIVE_ORDER_ENDPOINT = "order/v8/inactive"
38
CREATE_ORDER_ENDPOINT = "order/v8/create/"
39
ABORT_ORDER_ENDPOINT = "order/v8/{}/abort"
40
ORDER_STATUS_ENDPOINT = "order/v8/{}/status"
41
API_BUCKET_ENDPOINT = "discover/v1/bucket"
42
MANUFACTURERITEM_ENDPOINT = "manufactureritem/v2/"
43
USER_AGENTS = [
44
"TGTG/{} Dalvik/2.1.0 (Linux; U; Android 9; Nexus 5 Build/M4B30Z)",
45
"TGTG/{} Dalvik/2.1.0 (Linux; U; Android 10; SM-G935F Build/NRD90M)",
46
"TGTG/{} Dalvik/2.1.0 (Linux; Android 12; SM-G920V Build/MMB29K)",
47
]
48
DEFAULT_ACCESS_TOKEN_LIFETIME = 3600 * 4 # 4 hours
49
DEFAULT_MAX_POLLING_TRIES = 24 # 24 * POLLING_WAIT_TIME = 2 minutes
50
DEFAULT_POLLING_WAIT_TIME = 5 # Seconds
51
DEFAULT_MIN_TIME_BETWEEN_REQUESTS = 15 # Seconds
52
DEFAULT_APK_VERSION = "24.11.0"
53
54
APK_RE_SCRIPT = re.compile(r"AF_initDataCallback\({key:\s*'ds:5'.*?data:([\s\S]*?), sideChannel:.+<\/script")
55
56
57
class TgtgSession(requests.Session):
58
http_adapter = HTTPAdapter(
59
max_retries=Retry(
60
total=5,
61
status_forcelist=[429, 500, 502, 503, 504],
62
allowed_methods=["GET", "POST"],
63
backoff_factor=1,
64
)
65
)
66
67
correlation_id = str(uuid.uuid4())
68
69
last_api_request: datetime | None = None
70
71
# DataDome cookie cache
72
_datadome_cache_cookie: str | None = None
73
_datadome_cache_expires_at: float | None = None
74
_datadome_cache_duration_s: int = 5 * 60 # 5 minutes
75
76
def __init__(
77
self,
78
user_agent: str | None = None,
79
apk_version: str | None = None,
80
language: str = "en-UK",
81
timeout: int | None = None,
82
proxies: dict | None = None,
83
base_url: str = BASE_URL,
84
*args,
85
**kwargs,
86
) -> None:
87
super().__init__(*args, **kwargs)
88
self.mount("https://", self.http_adapter)
89
self.mount("http://", self.http_adapter)
90
self.headers = {
91
"Accept-Language": language,
92
"Accept": "application/json",
93
"Content-Type": "application/json; charset=utf-8",
94
"Accept-Encoding": "gzip",
95
"x-correlation-id": self.correlation_id,
96
}
97
if user_agent:
98
self.headers["User-Agent"] = user_agent
99
self.timeout = timeout
100
self.apk_version = apk_version
101
self.user_agent = user_agent
102
if proxies:
103
self.proxies = proxies
104
self._base_url = base_url
105
106
def send(self, request: requests.PreparedRequest, *args, **kwargs) -> requests.Response:
107
if self.last_api_request:
108
wait = max(0, DEFAULT_MIN_TIME_BETWEEN_REQUESTS - (datetime.now() - self.last_api_request).seconds)
109
log.debug(f"Waiting {wait} seconds.")
110
time.sleep(wait)
111
112
response = super().send(request, *args, **kwargs)
113
self.last_api_request = datetime.now()
114
return response
115
116
def post(self, *args, access_token: str | None = None, **kwargs) -> requests.Response:
117
if "headers" not in kwargs:
118
kwargs["headers"] = self.headers
119
if access_token:
120
kwargs["headers"]["authorization"] = f"Bearer {access_token}"
121
return super().post(*args, **kwargs)
122
123
def request(self, method, url, **kwargs):
124
time.sleep(1)
125
for key in ["timeout", "proxies"]:
126
val = kwargs.get(key)
127
if val is None and hasattr(self, key):
128
kwargs[key] = getattr(self, key)
129
# Ensure DataDome cookie exists BEFORE request gets prepared (so Cookie header includes it)
130
try:
131
self._ensure_datadome_cookie_for_url(url, headers=kwargs.get("headers"))
132
except Exception as e:
133
log.debug("DataDome auto-fetch failed (continuing without): %s", e)
134
return super().request(method, url, **kwargs)
135
136
def _ensure_datadome_cookie_for_url(self, url: str, headers: dict | None = None) -> None:
137
# If caller already set Cookie header with datadome, do nothing
138
if headers:
139
ch = headers.get("Cookie") or headers.get("cookie")
140
if ch and "datadome=" in ch:
141
return
142
if self._datadome_cache_valid() or ("datadome" in self.cookies):
143
return
144
145
cid = self._generate_datadome_cid()
146
dd = self._fetch_datadome_cookie(request_url=url, cid=cid)
147
if dd:
148
self._set_datadome_cookie_value(dd)
149
150
def _datadome_cache_valid(self) -> bool:
151
if not self._datadome_cache_cookie or not self._datadome_cache_expires_at:
152
return False
153
return time.time() < self._datadome_cache_expires_at
154
155
@staticmethod
156
def _generate_datadome_cid() -> str:
157
chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~_"
158
return "".join(random.choice(chars) for _ in range(120))
159
160
def _set_datadome_cookie_value(self, cookie_value: str) -> None:
161
domain = urlsplit(self._base_url).hostname
162
domain = f".{'local' if domain == 'localhost' else domain}"
163
self.cookies.set("datadome", cookie_value, domain=domain, path="/", secure=True)
164
self._datadome_cache_cookie = cookie_value
165
self._datadome_cache_expires_at = time.time() + self._datadome_cache_duration_s
166
167
def invalidate_datadome_cache(self) -> None:
168
self._datadome_cache_cookie = None
169
self._datadome_cache_expires_at = None
170
# also remove cookie from jar (best-effort)
171
try:
172
if "datadome" in self.cookies:
173
del self.cookies["datadome"]
174
except Exception:
175
pass
176
177
def _ensure_datadome_cookie(self, request) -> None:
178
# If request already has a Cookie header containing datadome, do nothing
179
cookie_header = request.headers.get("Cookie") or request.headers.get("cookie")
180
if cookie_header and "datadome=" in cookie_header:
181
return
182
# If cookie jar already contains datadome and it's fresh enough, do nothing
183
if self._datadome_cache_valid():
184
if "datadome" not in self.cookies and self._datadome_cache_cookie:
185
self._set_datadome_cookie_value(self._datadome_cache_cookie)
186
return
187
if "datadome" in self.cookies:
188
# cache it (even if we didn't fetch it ourselves)
189
self._datadome_cache_cookie = self.cookies.get("datadome")
190
self._datadome_cache_expires_at = time.time() + self._datadome_cache_duration_s
191
return
192
193
# Fetch a new DataDome cookie from the SDK endpoint
194
request_url = request.url
195
cid = self._generate_datadome_cid()
196
datadome_cookie_value = self._fetch_datadome_cookie(
197
request_url=str(request_url),
198
cid=cid,
199
)
200
if datadome_cookie_value:
201
self._set_datadome_cookie_value(datadome_cookie_value)
202
203
def _fetch_datadome_cookie(self, request_url: str, cid: str) -> str | None:
204
params = {
205
"camera": '{"auth":"true", "info":"{\\"front\\":\\"2000x1500\\",\\"back\\":\\"5472x3648\\"}"}',
206
"cid": cid,
207
"ddk": "1D42C2CA6131C526E09F294FE96F94",
208
"ddv": "3.0.4",
209
"ddvc": self.apk_version,
210
"events": '[{"id":1,"message":"response validation","source":"sdk","date":' + str(int(time.time() * 1000)) + "}]",
211
"inte": "android-java-okhttp",
212
"mdl": "Pixel 7 Pro",
213
"os": "Android",
214
"osn": "UPSIDE_DOWN_CAKE",
215
"osr": "14",
216
"osv": "34",
217
"request": request_url,
218
"screen_d": "3.5",
219
"screen_x": "1440",
220
"screen_y": "3120",
221
"ua": self.user_agent,
222
}
223
url = "https://api-sdk.datadome.co/sdk/"
224
try:
225
r = requests.post(
226
url,
227
data=params,
228
headers={
229
"Content-Type": "application/x-www-form-urlencoded",
230
"Accept": "*/*",
231
"User-Agent": self.user_agent,
232
"Accept-Encoding": "gzip, deflate, br",
233
},
234
timeout=10,
235
)
236
r.raise_for_status()
237
data = r.json()
238
if data.get("status") == 200 and data.get("cookie"):
239
m = re.search(r"datadome=([^;]+)", data["cookie"])
240
if m:
241
return m.group(1) # store raw value; cookie jar will format header
242
except Exception as e:
243
log.debug("Error fetching DataDome cookie: %s", e)
244
return None
245
246
247
class TgtgClient:
248
def __init__(
249
self,
250
base_url=BASE_URL,
251
email=None,
252
access_token=None,
253
refresh_token=None,
254
datadome_cookie=None,
255
apk_version=None,
256
user_agent=None,
257
language="en-GB",
258
proxies=None,
259
timeout=None,
260
port=0,
261
access_token_lifetime=DEFAULT_ACCESS_TOKEN_LIFETIME,
262
max_polling_tries=DEFAULT_MAX_POLLING_TRIES,
263
polling_wait_time=DEFAULT_POLLING_WAIT_TIME,
264
device_type="ANDROID",
265
):
266
if base_url != BASE_URL:
267
log.warning("Using custom tgtg base url: %s", base_url)
268
269
self.base_url = base_url
270
271
self.email = email
272
self.access_token = access_token
273
self.refresh_token = refresh_token
274
self.datadome_cookie = datadome_cookie
275
276
self.last_time_token_refreshed = None
277
self.access_token_lifetime = access_token_lifetime
278
self.max_polling_tries = max_polling_tries
279
self.polling_wait_time = polling_wait_time
280
281
self.device_type = device_type
282
self.apk_version = apk_version
283
self.fixed_user_agent = user_agent
284
self.user_agent = user_agent
285
self.language = language
286
self.proxies = proxies
287
self.timeout = timeout
288
self.session = None
289
self.port = port
290
291
self.captcha_error_count = 0
292
293
def __del__(self) -> None:
294
if self.session:
295
self.session.close()
296
297
def _get_url(self, path) -> str:
298
return urljoin(self.base_url, path)
299
300
def _create_session(self) -> TgtgSession:
301
if not self.user_agent:
302
self.user_agent = self._get_user_agent()
303
return TgtgSession(
304
self.user_agent,
305
self.apk_version,
306
self.language,
307
self.timeout,
308
self.proxies,
309
self.base_url,
310
)
311
312
def get_credentials(self) -> dict:
313
"""Returns current tgtg api credentials.
314
315
Returns:
316
dict: Dictionary containing access token, refresh token and user id
317
318
"""
319
self.login()
320
return {
321
"email": self.email,
322
"access_token": self.access_token,
323
"refresh_token": self.refresh_token,
324
"datadome_cookie": self.datadome_cookie,
325
}
326
327
def _post(self, path, **kwargs) -> requests.Response:
328
if not self.session:
329
self.session = self._create_session()
330
response = self.session.post(
331
self._get_url(path),
332
access_token=self.access_token,
333
**kwargs,
334
)
335
self.datadome_cookie = self.session.cookies.get("datadome")
336
if response.status_code in (HTTPStatus.OK, HTTPStatus.ACCEPTED):
337
self.captcha_error_count = 0
338
return response
339
# Status Code == 403
340
# --> Blocked due to rate limit / wrong user_agent.
341
# 1. Try: Get latest APK Version from google
342
# 2. Try: Reset session
343
# 3. Try: Delete datadome cookie and reset session
344
# 10.Try: Sleep 10 minutes, and reset session
345
if response.status_code == 403:
346
log.debug("Captcha Error 403!")
347
self.captcha_error_count += 1
348
# If we had a DataDome cookie, invalidate it so the next retry fetches a new one
349
if self.session:
350
self.session.invalidate_datadome_cache()
351
time.sleep(0.5)
352
if self.captcha_error_count == 1:
353
self.user_agent = self._get_user_agent()
354
elif self.captcha_error_count == 2:
355
self.session = self._create_session()
356
elif self.captcha_error_count == 4:
357
self.datadome_cookie = None
358
self.session = self._create_session()
359
elif self.captcha_error_count >= 10:
360
log.warning("Too many captcha Errors! Sleeping for 10 minutes...")
361
time.sleep(10 * 60)
362
log.info("Retrying ...")
363
self.captcha_error_count = 0
364
self.session = self._create_session()
365
time.sleep(3)
366
return self._post(path, **kwargs)
367
raise TgtgAPIError(response.status_code, response.content)
368
369
def _get_user_agent(self) -> str:
370
if self.fixed_user_agent:
371
return self.fixed_user_agent
372
version = DEFAULT_APK_VERSION
373
if self.apk_version is None:
374
try:
375
version = self.get_latest_apk_version()
376
except Exception:
377
log.warning("Failed to get latest APK version!")
378
else:
379
version = self.apk_version
380
log.debug("Using APK version %s.", version)
381
return random.choice(USER_AGENTS).format(version)
382
383
@staticmethod
384
def get_latest_apk_version() -> str:
385
"""Returns latest APK version of the official Android TGTG App.
386
387
Returns:
388
str: APK Version string
389
390
"""
391
response = requests.get(
392
"https://play.google.com/store/apps/details?id=com.app.tgtg&hl=en&gl=US",
393
timeout=30,
394
)
395
match = APK_RE_SCRIPT.search(response.text)
396
if not match:
397
raise TgtgAPIError("Failed to get latest APK version from Google Play Store.")
398
data = json.loads(match.group(1))
399
return data[1][2][140][0][0][0]
400
401
@property
402
def _already_logged(self) -> bool:
403
return bool(self.access_token and self.refresh_token)
404
405
def _refresh_token(self) -> None:
406
if (
407
self.last_time_token_refreshed
408
and (datetime.now() - self.last_time_token_refreshed).seconds <= self.access_token_lifetime
409
):
410
return
411
response = self._post(REFRESH_ENDPOINT, json={"refresh_token": self.refresh_token})
412
self.access_token = response.json().get("access_token")
413
self.refresh_token = response.json().get("refresh_token")
414
self.last_time_token_refreshed = datetime.now()
415
416
def login(self) -> None:
417
if not (self.email or self.access_token and self.refresh_token):
418
raise TgtgConfigurationError("You must provide at least email or access_token and refresh_token")
419
if self._already_logged:
420
self._refresh_token()
421
else:
422
log.info("Starting login process ...")
423
response = self._post(
424
AUTH_BY_EMAIL_ENDPOINT,
425
json={
426
"device_type": self.device_type,
427
"email": self.email,
428
},
429
)
430
first_login_response = response.json()
431
if first_login_response["state"] == "TERMS":
432
raise TgtgPollingError(
433
f"This email {self.email} is not linked to a tgtg account. Please signup with this email first."
434
)
435
if first_login_response.get("state") == "WAIT":
436
pin = prompt_via_browser("Paste your pin:", title="Pin Input", port=self.port)
437
self.start_polling(first_login_response.get("polling_id"), pin)
438
else:
439
raise TgtgLoginError(response.status_code, response.content)
440
441
def auth_by_request_pin(self, polling_id: str, pin: str) -> None:
442
"""Finish login using numeric code (PIN) from email, via authByRequestPin.
443
444
Mirrors node-toogoodtogo-watcher PR #282 behavior. :contentReference[oaicite:11]{index=11}
445
"""
446
response = self._post(
447
AUTH_BY_REQUEST_PIN_ENDPOINT,
448
json={
449
"device_type": self.device_type,
450
"email": self.email,
451
"request_pin": pin,
452
"request_polling_id": polling_id,
453
},
454
)
455
if response.status_code == HTTPStatus.OK:
456
log.info("Logged in (PIN)!")
457
login_response = response.json()
458
self.access_token = login_response.get("access_token")
459
self.refresh_token = login_response.get("refresh_token")
460
self.last_time_token_refreshed = datetime.now()
461
return
462
raise TgtgLoginError(response.status_code, response.content)
463
464
def start_polling(self, polling_id: str, request_pin: str | None = None) -> None:
465
# If a pin is provided, do a single authByRequestPin call instead of polling.
466
if request_pin:
467
return self.auth_by_request_pin(polling_id, request_pin)
468
for _ in range(self.max_polling_tries):
469
response = self._post(
470
AUTH_POLLING_ENDPOINT,
471
json={
472
"device_type": self.device_type,
473
"email": self.email,
474
"request_polling_id": polling_id,
475
},
476
)
477
if response.status_code == HTTPStatus.ACCEPTED:
478
log.warning(
479
"Check your mailbox on PC to continue... (Mailbox on mobile won't work, if you have installed tgtg app.)"
480
)
481
time.sleep(self.polling_wait_time)
482
continue
483
if response.status_code == HTTPStatus.OK:
484
log.info("Logged in!")
485
login_response = response.json()
486
self.access_token = login_response.get("access_token")
487
self.refresh_token = login_response.get("refresh_token")
488
self.last_time_token_refreshed = datetime.now()
489
return
490
raise TgtgPollingError("Max polling retries reached. Try again.")
491
492
def get_items(
493
self,
494
*,
495
latitude=0.0,
496
longitude=0.0,
497
radius=21,
498
page_size=20,
499
page=1,
500
discover=False,
501
favorites_only=True,
502
item_categories=None,
503
diet_categories=None,
504
pickup_earliest=None,
505
pickup_latest=None,
506
search_phrase=None,
507
with_stock_only=False,
508
hidden_only=False,
509
we_care_only=False,
510
) -> list[dict]:
511
self.login()
512
# fields are sorted like in the app
513
data = {
514
"origin": {"latitude": latitude, "longitude": longitude},
515
"radius": radius,
516
"page_size": page_size,
517
"page": page,
518
"discover": discover,
519
"favorites_only": favorites_only,
520
"item_categories": item_categories if item_categories else [],
521
"diet_categories": diet_categories if diet_categories else [],
522
"pickup_earliest": pickup_earliest,
523
"pickup_latest": pickup_latest,
524
"search_phrase": search_phrase if search_phrase else None,
525
"with_stock_only": with_stock_only,
526
"hidden_only": hidden_only,
527
"we_care_only": we_care_only,
528
}
529
response = self._post(API_ITEM_ENDPOINT, json=data)
530
return response.json().get("items", [])
531
532
def get_item(self, item_id: str) -> dict:
533
self.login()
534
response = self._post(
535
f"{API_ITEM_ENDPOINT}/{item_id}",
536
json={"origin": None},
537
)
538
return response.json()
539
540
def get_favorites(self) -> list[dict]:
541
"""Returns favorites of the current tgtg account.
542
543
Returns:
544
List: List of items
545
546
"""
547
items = []
548
page = 1
549
page_size = 100
550
while True:
551
new_items = self.get_items(favorites_only=True, page_size=page_size, page=page)
552
items += new_items
553
if len(new_items) < page_size:
554
break
555
page += 1
556
return items
557
558
def set_favorite(self, item_id: str, is_favorite: bool) -> None:
559
self.login()
560
self._post(
561
FAVORITE_ITEM_ENDPOINT.format(item_id),
562
json={"is_favorite": is_favorite},
563
)
564
565
def create_order(self, item_id: str, item_count: int) -> dict[str, str]:
566
self.login()
567
response = self._post(f"{CREATE_ORDER_ENDPOINT}/{item_id}", json={"item_count": item_count})
568
if response.json().get("state") != "SUCCESS":
569
raise TgtgAPIError(response.status_code, response.content)
570
return response.json().get("order", {})
571
572
def get_order_status(self, order_id: str) -> dict[str, str]:
573
self.login()
574
response = self._post(ORDER_STATUS_ENDPOINT.format(order_id))
575
return response.json()
576
577
def abort_order(self, order_id: str) -> None:
578
"""Use this when your order is not yet paid."""
579
self.login()
580
response = self._post(ABORT_ORDER_ENDPOINT.format(order_id), json={"cancel_reason_id": 1})
581
if response.json().get("state") != "SUCCESS":
582
raise TgtgAPIError(response.status_code, response.content)
583
584
def get_manufactureritems(self) -> dict:
585
self.login()
586
response = self._post(
587
MANUFACTURERITEM_ENDPOINT,
588
json={
589
"action_types_accepted": ["QUERY"],
590
"display_types_accepted": ["LIST", "FILL"],
591
"element_types_accepted": [
592
"ITEM",
593
"HIGHLIGHTED_ITEM",
594
"MANUFACTURER_STORY_CARD",
595
"DUO_ITEMS",
596
"DUO_ITEMS_V2",
597
"TEXT",
598
"PARCEL_TEXT",
599
"NPS",
600
"SMALL_CARDS_CAROUSEL",
601
"ITEM_CARDS_CAROUSEL",
602
],
603
},
604
)
605
return response.json()
606
607
608
def prompt_via_browser(prompt="Enter value:", title="Input required", port=0):
609
done = threading.Event()
610
result = {"value": None}
611
612
class Handler(BaseHTTPRequestHandler):
613
def log_message(self, *args, **kwargs):
614
# silence default logging
615
pass
616
617
def _send(self, code, body, content_type="text/html; charset=utf-8"):
618
data = body.encode("utf-8")
619
self.send_response(code)
620
self.send_header("Content-Type", content_type)
621
self.send_header("Content-Length", str(len(data)))
622
self.end_headers()
623
self.wfile.write(data)
624
625
def do_GET(self):
626
if self.path.startswith("/submit"):
627
qs = parse_qs(urlparse(self.path).query)
628
result["value"] = (qs.get("value", [""])[0]).strip()
629
self._send(200, "<h3>You can close this tab.</h3>")
630
done.set()
631
return
632
633
page = f"""<!doctype html>
634
<html>
635
<head><meta charset="utf-8"><title>{html.escape(title)}</title></head>
636
<body style="font-family: system-ui; padding: 2rem;">
637
<h2>{html.escape(prompt)}</h2>
638
<form action="/submit" method="get">
639
<input name="value" autofocus style="font-size: 1.1rem; padding: .4rem; width: 28rem; max-width: 90vw;" />
640
<button type="submit" style="font-size: 1.1rem; padding: .4rem .8rem;">OK</button>
641
</form>
642
</body>
643
</html>"""
644
self._send(200, page)
645
646
# Bind to ephemeral port
647
server = HTTPServer(("0.0.0.0", port), Handler)
648
url = f"http://localhost:{server.server_port}/"
649
650
# Run server in background thread; stop after first submission
651
def serve_until_done():
652
while not done.is_set():
653
server.handle_request()
654
655
t = threading.Thread(target=serve_until_done, daemon=True)
656
t.start()
657
658
log.info(f"Enter Pin: {url}")
659
660
done.wait() # block until user submits
661
server.server_close()
662
return result["value"]
663
664