Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/cookies.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2022-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
# Adapted from yt-dlp's cookies module.
10
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/cookies.py
11
12
import binascii
13
import ctypes
14
import logging
15
import os
16
import shutil
17
import sqlite3
18
import struct
19
import subprocess
20
import sys
21
import tempfile
22
from hashlib import pbkdf2_hmac
23
from http.cookiejar import Cookie
24
from . import aes, text, util
25
26
27
SUPPORTED_BROWSERS_CHROMIUM = {
28
"brave", "chrome", "chromium", "edge", "opera", "thorium", "vivaldi"}
29
SUPPORTED_BROWSERS_FIREFOX = {"firefox", "librewolf", "zen"}
30
SUPPORTED_BROWSERS = \
31
SUPPORTED_BROWSERS_CHROMIUM | SUPPORTED_BROWSERS_FIREFOX | {"safari"}
32
33
logger = logging.getLogger("cookies")
34
35
36
def load_cookies(browser_specification):
37
browser_name, profile, keyring, container, domain = \
38
_parse_browser_specification(*browser_specification)
39
if browser_name in SUPPORTED_BROWSERS_FIREFOX:
40
return load_cookies_firefox(browser_name, profile, container, domain)
41
elif browser_name == "safari":
42
return load_cookies_safari(profile, domain)
43
elif browser_name in SUPPORTED_BROWSERS_CHROMIUM:
44
return load_cookies_chromium(browser_name, profile, keyring, domain)
45
else:
46
raise ValueError(f"unknown browser '{browser_name}'")
47
48
49
def load_cookies_firefox(browser_name, profile=None,
50
container=None, domain=None):
51
path, container_id = _firefox_cookies_database(browser_name,
52
profile, container)
53
54
sql = ("SELECT name, value, host, path, isSecure, expiry "
55
"FROM moz_cookies")
56
conditions = []
57
parameters = []
58
59
if container_id is False:
60
conditions.append("NOT INSTR(originAttributes,'userContextId=')")
61
elif container_id:
62
uid = f"%userContextId={container_id}"
63
conditions.append("originAttributes LIKE ? OR originAttributes LIKE ?")
64
parameters += (uid, uid + "&%")
65
66
if domain:
67
if domain[0] == ".":
68
conditions.append("host == ? OR host LIKE ?")
69
parameters += (domain[1:], "%" + domain)
70
else:
71
conditions.append("host == ? OR host == ?")
72
parameters += (domain, "." + domain)
73
74
if conditions:
75
sql = f"{sql} WHERE ( {' ) AND ( '.join(conditions)} )"
76
77
with DatabaseConnection(path) as db:
78
cookies = [
79
Cookie(
80
0, name, value, None, False,
81
domain, True if domain else False,
82
domain[0] == "." if domain else False,
83
path, True if path else False, secure, expires,
84
False, None, None, {},
85
)
86
for name, value, domain, path, secure, expires in db.execute(
87
sql, parameters)
88
]
89
90
_log_info("Extracted %s cookies from %s",
91
len(cookies), browser_name.capitalize())
92
return cookies
93
94
95
def load_cookies_safari(profile=None, domain=None):
96
"""Ref.: https://github.com/libyal/dtformats/blob
97
/main/documentation/Safari%20Cookies.asciidoc
98
- This data appears to be out of date
99
but the important parts of the database structure is the same
100
- There are a few bytes here and there
101
which are skipped during parsing
102
"""
103
with _safari_cookies_database() as fp:
104
data = fp.read()
105
page_sizes, body_start = _safari_parse_cookies_header(data)
106
p = DataParser(data[body_start:])
107
108
cookies = []
109
for page_size in page_sizes:
110
_safari_parse_cookies_page(p.read_bytes(page_size), cookies)
111
_log_info("Extracted %s cookies from Safari", len(cookies))
112
return cookies
113
114
115
def load_cookies_chromium(browser_name, profile=None,
116
keyring=None, domain=None):
117
config = _chromium_browser_settings(browser_name)
118
path = _chromium_cookies_database(profile, config)
119
_log_debug("Extracting cookies from %s", path)
120
121
if domain:
122
if domain[0] == ".":
123
condition = " WHERE host_key == ? OR host_key LIKE ?"
124
parameters = (domain[1:], "%" + domain)
125
else:
126
condition = " WHERE host_key == ? OR host_key == ?"
127
parameters = (domain, "." + domain)
128
else:
129
condition = ""
130
parameters = ()
131
132
with DatabaseConnection(path) as db:
133
db.text_factory = bytes
134
cursor = db.cursor()
135
136
try:
137
meta_version = int(cursor.execute(
138
"SELECT value FROM meta WHERE key = 'version'").fetchone()[0])
139
except Exception as exc:
140
_log_warning("Failed to get cookie database meta version (%s: %s)",
141
exc.__class__.__name__, exc)
142
meta_version = 0
143
144
try:
145
rows = cursor.execute(
146
"SELECT host_key, name, value, encrypted_value, path, "
147
"expires_utc, is_secure FROM cookies" + condition, parameters)
148
except sqlite3.OperationalError:
149
rows = cursor.execute(
150
"SELECT host_key, name, value, encrypted_value, path, "
151
"expires_utc, secure FROM cookies" + condition, parameters)
152
153
failed_cookies = 0
154
unencrypted_cookies = 0
155
decryptor = _chromium_cookie_decryptor(
156
config["directory"], config["keyring"], keyring, meta_version)
157
158
cookies = []
159
for domain, name, value, enc_value, path, expires, secure in rows:
160
161
if not value and enc_value: # encrypted
162
value = decryptor.decrypt(enc_value)
163
if value is None:
164
failed_cookies += 1
165
continue
166
else:
167
value = value.decode()
168
unencrypted_cookies += 1
169
170
if expires:
171
# https://stackoverflow.com/a/43520042
172
expires = int(expires) // 1000000 - 11644473600
173
else:
174
expires = None
175
176
domain = domain.decode()
177
path = path.decode()
178
name = name.decode()
179
180
cookies.append(Cookie(
181
0, name, value, None, False,
182
domain, True if domain else False,
183
domain[0] == "." if domain else False,
184
path, True if path else False, secure, expires,
185
False, None, None, {},
186
))
187
188
if failed_cookies > 0:
189
failed_message = f" ({failed_cookies} could not be decrypted)"
190
else:
191
failed_message = ""
192
193
_log_info("Extracted %s cookies from %s%s",
194
len(cookies), browser_name.capitalize(), failed_message)
195
counts = decryptor.cookie_counts
196
counts["unencrypted"] = unencrypted_cookies
197
_log_debug("version breakdown: %s", counts)
198
return cookies
199
200
201
# --------------------------------------------------------------------
202
# firefox
203
204
def _firefox_cookies_database(browser_name, profile=None, container=None):
205
if not profile:
206
search_root = _firefox_browser_directory(browser_name)
207
elif _is_path(profile):
208
search_root = profile
209
else:
210
search_root = os.path.join(
211
_firefox_browser_directory(browser_name), profile)
212
213
path = _find_most_recently_used_file(search_root, "cookies.sqlite")
214
if path is None:
215
raise FileNotFoundError(f"Unable to find {browser_name.capitalize()} "
216
f"cookies database in {search_root}")
217
218
_log_debug("Extracting cookies from %s", path)
219
220
if not container or container == "none":
221
container_id = False
222
_log_debug("Only loading cookies not belonging to any container")
223
224
elif container == "all":
225
container_id = None
226
227
else:
228
containers_path = os.path.join(
229
os.path.dirname(path), "containers.json")
230
231
try:
232
with open(containers_path) as fp:
233
identities = util.json_loads(fp.read())["identities"]
234
except OSError:
235
_log_error("Unable to read Firefox container database at '%s'",
236
containers_path)
237
raise
238
except KeyError:
239
identities = ()
240
241
for context in identities:
242
if container == context.get("name") or container == text.extr(
243
context.get("l10nID", ""), "userContext", ".label"):
244
container_id = context["userContextId"]
245
break
246
else:
247
raise ValueError(f"Unable to find Firefox container '{container}'")
248
_log_debug("Only loading cookies from container '%s' (ID %s)",
249
container, container_id)
250
251
return path, container_id
252
253
254
def _firefox_browser_directory(browser_name):
255
join = os.path.join
256
257
if sys.platform in ("win32", "cygwin"):
258
appdata = os.path.expandvars("%APPDATA%")
259
return {
260
"firefox" : join(appdata, R"Mozilla\Firefox\Profiles"),
261
"librewolf": join(appdata, R"librewolf\Profiles"),
262
"zen" : join(appdata, R"zen\Profiles"),
263
}[browser_name]
264
elif sys.platform == "darwin":
265
appdata = os.path.expanduser("~/Library/Application Support")
266
return {
267
"firefox" : join(appdata, R"Firefox/Profiles"),
268
"librewolf": join(appdata, R"librewolf/Profiles"),
269
"zen" : join(appdata, R"zen/Profiles"),
270
}[browser_name]
271
else:
272
home = os.path.expanduser("~")
273
return {
274
"firefox" : join(home, R".mozilla/firefox"),
275
"librewolf": join(home, R".librewolf"),
276
"zen" : join(home, R".zen"),
277
}[browser_name]
278
279
280
# --------------------------------------------------------------------
281
# safari
282
283
def _safari_cookies_database():
284
try:
285
path = os.path.expanduser("~/Library/Cookies/Cookies.binarycookies")
286
return open(path, "rb")
287
except FileNotFoundError:
288
_log_debug("Trying secondary cookie location")
289
path = os.path.expanduser("~/Library/Containers/com.apple.Safari/Data"
290
"/Library/Cookies/Cookies.binarycookies")
291
return open(path, "rb")
292
293
294
def _safari_parse_cookies_header(data):
295
p = DataParser(data)
296
p.expect_bytes(b"cook", "database signature")
297
number_of_pages = p.read_uint(big_endian=True)
298
page_sizes = [p.read_uint(big_endian=True)
299
for _ in range(number_of_pages)]
300
return page_sizes, p.cursor
301
302
303
def _safari_parse_cookies_page(data, cookies, domain=None):
304
p = DataParser(data)
305
p.expect_bytes(b"\x00\x00\x01\x00", "page signature")
306
number_of_cookies = p.read_uint()
307
record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
308
if number_of_cookies == 0:
309
_log_debug("Cookies page of size %s has no cookies", len(data))
310
return
311
312
p.skip_to(record_offsets[0], "unknown page header field")
313
314
for i, record_offset in enumerate(record_offsets):
315
p.skip_to(record_offset, "space between records")
316
record_length = _safari_parse_cookies_record(
317
data[record_offset:], cookies, domain)
318
p.read_bytes(record_length)
319
p.skip_to_end("space in between pages")
320
321
322
def _safari_parse_cookies_record(data, cookies, host=None):
323
p = DataParser(data)
324
record_size = p.read_uint()
325
p.skip(4, "unknown record field 1")
326
flags = p.read_uint()
327
is_secure = True if (flags & 0x0001) else False
328
p.skip(4, "unknown record field 2")
329
domain_offset = p.read_uint()
330
name_offset = p.read_uint()
331
path_offset = p.read_uint()
332
value_offset = p.read_uint()
333
p.skip(8, "unknown record field 3")
334
expiration_date = _mac_absolute_time_to_posix(p.read_double())
335
_creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
336
337
try:
338
p.skip_to(domain_offset)
339
domain = p.read_cstring()
340
341
if host:
342
if host[0] == ".":
343
if host[1:] != domain and not domain.endswith(host):
344
return record_size
345
else:
346
if host != domain and ("." + host) != domain:
347
return record_size
348
349
p.skip_to(name_offset)
350
name = p.read_cstring()
351
352
p.skip_to(path_offset)
353
path = p.read_cstring()
354
355
p.skip_to(value_offset)
356
value = p.read_cstring()
357
except UnicodeDecodeError:
358
_log_warning("Failed to parse Safari cookie")
359
return record_size
360
361
p.skip_to(record_size, "space at the end of the record")
362
363
cookies.append(Cookie(
364
0, name, value, None, False,
365
domain, True if domain else False,
366
domain[0] == "." if domain else False,
367
path, True if path else False, is_secure, expiration_date,
368
False, None, None, {},
369
))
370
371
return record_size
372
373
374
# --------------------------------------------------------------------
375
# chromium
376
377
def _chromium_cookies_database(profile, config):
378
if profile is None:
379
search_root = config["directory"]
380
elif _is_path(profile):
381
search_root = profile
382
config["directory"] = (os.path.dirname(profile)
383
if config["profiles"] else profile)
384
elif config["profiles"]:
385
search_root = os.path.join(config["directory"], profile)
386
else:
387
_log_warning("%s does not support profiles", config["browser"])
388
search_root = config["directory"]
389
390
path = _find_most_recently_used_file(search_root, "Cookies")
391
if path is None:
392
raise FileNotFoundError(f"Unable to find {config['browser']} cookies "
393
f"database in '{search_root}'")
394
return path
395
396
397
def _chromium_browser_settings(browser_name):
398
# https://chromium.googlesource.com/chromium
399
# /src/+/HEAD/docs/user_data_dir.md
400
join = os.path.join
401
402
if sys.platform in ("win32", "cygwin"):
403
appdata_local = os.path.expandvars("%LOCALAPPDATA%")
404
appdata_roaming = os.path.expandvars("%APPDATA%")
405
browser_dir = {
406
"brave" : join(appdata_local,
407
R"BraveSoftware\Brave-Browser\User Data"),
408
"chrome" : join(appdata_local, R"Google\Chrome\User Data"),
409
"chromium": join(appdata_local, R"Chromium\User Data"),
410
"edge" : join(appdata_local, R"Microsoft\Edge\User Data"),
411
"opera" : join(appdata_roaming, R"Opera Software\Opera Stable"),
412
"thorium" : join(appdata_local, R"Thorium\User Data"),
413
"vivaldi" : join(appdata_local, R"Vivaldi\User Data"),
414
}[browser_name]
415
416
elif sys.platform == "darwin":
417
appdata = os.path.expanduser("~/Library/Application Support")
418
browser_dir = {
419
"brave" : join(appdata, "BraveSoftware/Brave-Browser"),
420
"chrome" : join(appdata, "Google/Chrome"),
421
"chromium": join(appdata, "Chromium"),
422
"edge" : join(appdata, "Microsoft Edge"),
423
"opera" : join(appdata, "com.operasoftware.Opera"),
424
"thorium" : join(appdata, "Thorium"),
425
"vivaldi" : join(appdata, "Vivaldi"),
426
}[browser_name]
427
428
else:
429
config = (os.environ.get("XDG_CONFIG_HOME") or
430
os.path.expanduser("~/.config"))
431
browser_dir = {
432
"brave" : join(config, "BraveSoftware/Brave-Browser"),
433
"chrome" : join(config, "google-chrome"),
434
"chromium": join(config, "chromium"),
435
"edge" : join(config, "microsoft-edge"),
436
"opera" : join(config, "opera"),
437
"thorium" : join(config, "Thorium"),
438
"vivaldi" : join(config, "vivaldi"),
439
}[browser_name]
440
441
# Linux keyring names can be determined by snooping on dbus
442
# while opening the browser in KDE:
443
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
444
keyring_name = {
445
"brave" : "Brave",
446
"chrome" : "Chrome",
447
"chromium": "Chromium",
448
"edge" : "Microsoft Edge" if sys.platform == "darwin" else
449
"Chromium",
450
"opera" : "Opera" if sys.platform == "darwin" else "Chromium",
451
"thorium" : "Thorium",
452
"vivaldi" : "Vivaldi" if sys.platform == "darwin" else "Chrome",
453
}[browser_name]
454
455
browsers_without_profiles = {"opera"}
456
457
return {
458
"browser" : browser_name,
459
"directory": browser_dir,
460
"keyring" : keyring_name,
461
"profiles" : browser_name not in browsers_without_profiles
462
}
463
464
465
def _chromium_cookie_decryptor(
466
browser_root, browser_keyring_name, keyring=None, meta_version=0):
467
if sys.platform in ("win32", "cygwin"):
468
return WindowsChromiumCookieDecryptor(
469
browser_root, meta_version)
470
elif sys.platform == "darwin":
471
return MacChromiumCookieDecryptor(
472
browser_keyring_name, meta_version)
473
else:
474
return LinuxChromiumCookieDecryptor(
475
browser_keyring_name, keyring, meta_version)
476
477
478
class ChromiumCookieDecryptor:
479
"""
480
Overview:
481
482
Linux:
483
- cookies are either v10 or v11
484
- v10: AES-CBC encrypted with a fixed key
485
- v11: AES-CBC encrypted with an OS protected key (keyring)
486
- v11 keys can be stored in various places depending on the
487
activate desktop environment [2]
488
489
Mac:
490
- cookies are either v10 or not v10
491
- v10: AES-CBC encrypted with an OS protected key (keyring)
492
and more key derivation iterations than linux
493
- not v10: "old data" stored as plaintext
494
495
Windows:
496
- cookies are either v10 or not v10
497
- v10: AES-GCM encrypted with a key which is encrypted with DPAPI
498
- not v10: encrypted with DPAPI
499
500
Sources:
501
- [1] https://chromium.googlesource.com/chromium/src/+/refs/heads
502
/main/components/os_crypt/
503
- [2] https://chromium.googlesource.com/chromium/src/+/refs/heads
504
/main/components/os_crypt/key_storage_linux.cc
505
- KeyStorageLinux::CreateService
506
"""
507
508
def decrypt(self, encrypted_value):
509
raise NotImplementedError("Must be implemented by sub classes")
510
511
@property
512
def cookie_counts(self):
513
raise NotImplementedError("Must be implemented by sub classes")
514
515
516
class LinuxChromiumCookieDecryptor(ChromiumCookieDecryptor):
517
def __init__(self, browser_keyring_name, keyring=None, meta_version=0):
518
password = _get_linux_keyring_password(browser_keyring_name, keyring)
519
self._empty_key = self.derive_key(b"")
520
self._v10_key = self.derive_key(b"peanuts")
521
self._v11_key = None if password is None else self.derive_key(password)
522
self._cookie_counts = {"v10": 0, "v11": 0, "other": 0}
523
self._offset = (32 if meta_version >= 24 else 0)
524
525
def derive_key(self, password):
526
# values from
527
# https://chromium.googlesource.com/chromium/src/+/refs/heads
528
# /main/components/os_crypt/os_crypt_linux.cc
529
return pbkdf2_sha1(password, salt=b"saltysalt",
530
iterations=1, key_length=16)
531
532
@property
533
def cookie_counts(self):
534
return self._cookie_counts
535
536
def decrypt(self, encrypted_value):
537
version = encrypted_value[:3]
538
ciphertext = encrypted_value[3:]
539
540
if version == b"v10":
541
self._cookie_counts["v10"] += 1
542
value = _decrypt_aes_cbc(ciphertext, self._v10_key, self._offset)
543
544
elif version == b"v11":
545
self._cookie_counts["v11"] += 1
546
if self._v11_key is None:
547
_log_warning("Unable to decrypt v11 cookies: no key found")
548
return None
549
value = _decrypt_aes_cbc(ciphertext, self._v11_key, self._offset)
550
551
else:
552
self._cookie_counts["other"] += 1
553
return None
554
555
if value is None:
556
value = _decrypt_aes_cbc(ciphertext, self._empty_key, self._offset)
557
if value is None:
558
_log_warning("Failed to decrypt cookie (AES-CBC)")
559
return value
560
561
562
class MacChromiumCookieDecryptor(ChromiumCookieDecryptor):
563
def __init__(self, browser_keyring_name, meta_version=0):
564
password = _get_mac_keyring_password(browser_keyring_name)
565
self._v10_key = None if password is None else self.derive_key(password)
566
self._cookie_counts = {"v10": 0, "other": 0}
567
self._offset = (32 if meta_version >= 24 else 0)
568
569
def derive_key(self, password):
570
# values from
571
# https://chromium.googlesource.com/chromium/src/+/refs/heads
572
# /main/components/os_crypt/os_crypt_mac.mm
573
return pbkdf2_sha1(password, salt=b"saltysalt",
574
iterations=1003, key_length=16)
575
576
@property
577
def cookie_counts(self):
578
return self._cookie_counts
579
580
def decrypt(self, encrypted_value):
581
version = encrypted_value[:3]
582
ciphertext = encrypted_value[3:]
583
584
if version == b"v10":
585
self._cookie_counts["v10"] += 1
586
if self._v10_key is None:
587
_log_warning("Unable to decrypt v10 cookies: no key found")
588
return None
589
return _decrypt_aes_cbc(ciphertext, self._v10_key, self._offset)
590
591
else:
592
self._cookie_counts["other"] += 1
593
# other prefixes are considered "old data",
594
# which were stored as plaintext
595
# https://chromium.googlesource.com/chromium/src/+/refs/heads
596
# /main/components/os_crypt/os_crypt_mac.mm
597
return encrypted_value
598
599
600
class WindowsChromiumCookieDecryptor(ChromiumCookieDecryptor):
601
def __init__(self, browser_root, meta_version=0):
602
self._v10_key = _get_windows_v10_key(browser_root)
603
self._cookie_counts = {"v10": 0, "other": 0}
604
self._offset = (32 if meta_version >= 24 else 0)
605
606
@property
607
def cookie_counts(self):
608
return self._cookie_counts
609
610
def decrypt(self, encrypted_value):
611
version = encrypted_value[:3]
612
ciphertext = encrypted_value[3:]
613
614
if version == b"v10":
615
self._cookie_counts["v10"] += 1
616
if self._v10_key is None:
617
_log_warning("Unable to decrypt v10 cookies: no key found")
618
return None
619
620
# https://chromium.googlesource.com/chromium/src/+/refs/heads
621
# /main/components/os_crypt/os_crypt_win.cc
622
# kNonceLength
623
nonce_length = 96 // 8
624
# boringssl
625
# EVP_AEAD_AES_GCM_TAG_LEN
626
authentication_tag_length = 16
627
628
raw_ciphertext = ciphertext
629
nonce = raw_ciphertext[:nonce_length]
630
ciphertext = raw_ciphertext[
631
nonce_length:-authentication_tag_length]
632
authentication_tag = raw_ciphertext[-authentication_tag_length:]
633
634
return _decrypt_aes_gcm(
635
ciphertext, self._v10_key, nonce, authentication_tag,
636
self._offset)
637
638
else:
639
self._cookie_counts["other"] += 1
640
# any other prefix means the data is DPAPI encrypted
641
# https://chromium.googlesource.com/chromium/src/+/refs/heads
642
# /main/components/os_crypt/os_crypt_win.cc
643
return _decrypt_windows_dpapi(encrypted_value).decode()
644
645
646
# --------------------------------------------------------------------
647
# keyring
648
649
def _choose_linux_keyring():
650
"""
651
https://chromium.googlesource.com/chromium/src/+/refs/heads
652
/main/components/os_crypt/key_storage_util_linux.cc
653
SelectBackend
654
"""
655
desktop_environment = _get_linux_desktop_environment(os.environ)
656
_log_debug("Detected desktop environment: %s", desktop_environment)
657
if desktop_environment == DE_KDE:
658
return KEYRING_KWALLET
659
if desktop_environment == DE_OTHER:
660
return KEYRING_BASICTEXT
661
return KEYRING_GNOMEKEYRING
662
663
664
def _get_kwallet_network_wallet():
665
""" The name of the wallet used to store network passwords.
666
667
https://chromium.googlesource.com/chromium/src/+/refs/heads
668
/main/components/os_crypt/kwallet_dbus.cc
669
KWalletDBus::NetworkWallet
670
which does a dbus call to the following function:
671
https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
672
Wallet::NetworkWallet
673
"""
674
default_wallet = "kdewallet"
675
try:
676
proc, stdout = Popen_communicate(
677
"dbus-send", "--session", "--print-reply=literal",
678
"--dest=org.kde.kwalletd5",
679
"/modules/kwalletd5",
680
"org.kde.KWallet.networkWallet"
681
)
682
683
if proc.returncode != 0:
684
_log_warning("Failed to read NetworkWallet")
685
return default_wallet
686
else:
687
network_wallet = stdout.decode().strip()
688
_log_debug("NetworkWallet = '%s'", network_wallet)
689
return network_wallet
690
except Exception as exc:
691
_log_warning("Error while obtaining NetworkWallet (%s: %s)",
692
exc.__class__.__name__, exc)
693
return default_wallet
694
695
696
def _get_kwallet_password(browser_keyring_name):
697
_log_debug("Using kwallet-query to obtain password from kwallet")
698
699
if shutil.which("kwallet-query") is None:
700
_log_error(
701
"kwallet-query command not found. KWallet and kwallet-query "
702
"must be installed to read from KWallet. kwallet-query should be "
703
"included in the kwallet package for your distribution")
704
return b""
705
706
network_wallet = _get_kwallet_network_wallet()
707
708
try:
709
proc, stdout = Popen_communicate(
710
"kwallet-query",
711
"--read-password", browser_keyring_name + " Safe Storage",
712
"--folder", browser_keyring_name + " Keys",
713
network_wallet,
714
)
715
716
if proc.returncode != 0:
717
_log_error(f"kwallet-query failed with return code "
718
f"{proc.returncode}. Please consult the kwallet-query "
719
f"man page for details")
720
return b""
721
722
if stdout.lower().startswith(b"failed to read"):
723
_log_debug("Failed to read password from kwallet. "
724
"Using empty string instead")
725
# This sometimes occurs in KDE because chrome does not check
726
# hasEntry and instead just tries to read the value (which
727
# kwallet returns "") whereas kwallet-query checks hasEntry.
728
# To verify this:
729
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
730
# while starting chrome.
731
# This may be a bug, as the intended behaviour is to generate a
732
# random password and store it, but that doesn't matter here.
733
return b""
734
else:
735
if stdout[-1:] == b"\n":
736
stdout = stdout[:-1]
737
return stdout
738
except Exception as exc:
739
_log_warning("Error when running kwallet-query (%s: %s)",
740
exc.__class__.__name__, exc)
741
return b""
742
743
744
def _get_gnome_keyring_password(browser_keyring_name):
745
try:
746
import secretstorage
747
except ImportError:
748
_log_error("'secretstorage' Python package not available")
749
return b""
750
751
# Gnome keyring does not seem to organise keys in the same way as KWallet,
752
# using `dbus-monitor` during startup, it can be observed that chromium
753
# lists all keys and presumably searches for its key in the list.
754
# It appears that we must do the same.
755
# https://github.com/jaraco/keyring/issues/556
756
con = secretstorage.dbus_init()
757
try:
758
col = secretstorage.get_default_collection(con)
759
label = browser_keyring_name + " Safe Storage"
760
for item in col.get_all_items():
761
if item.get_label() == label:
762
return item.get_secret()
763
else:
764
_log_error("Failed to read from GNOME keyring")
765
return b""
766
finally:
767
con.close()
768
769
770
def _get_linux_keyring_password(browser_keyring_name, keyring):
771
# Note: chrome/chromium can be run with the following flags
772
# to determine which keyring backend it has chosen to use
773
# - chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
774
#
775
# Chromium supports --password-store=<basic|gnome|kwallet>
776
# so the automatic detection will not be sufficient in all cases.
777
778
if not keyring:
779
keyring = _choose_linux_keyring()
780
_log_debug("Chosen keyring: %s", keyring)
781
782
if keyring == KEYRING_KWALLET:
783
return _get_kwallet_password(browser_keyring_name)
784
elif keyring == KEYRING_GNOMEKEYRING:
785
return _get_gnome_keyring_password(browser_keyring_name)
786
elif keyring == KEYRING_BASICTEXT:
787
# when basic text is chosen, all cookies are stored as v10
788
# so no keyring password is required
789
return None
790
assert False, "Unknown keyring " + keyring
791
792
793
def _get_mac_keyring_password(browser_keyring_name):
794
_log_debug("Using find-generic-password to obtain "
795
"password from OSX keychain")
796
try:
797
proc, stdout = Popen_communicate(
798
"security", "find-generic-password",
799
"-w", # write password to stdout
800
"-a", browser_keyring_name, # match "account"
801
"-s", browser_keyring_name + " Safe Storage", # match "service"
802
)
803
804
if stdout[-1:] == b"\n":
805
stdout = stdout[:-1]
806
return stdout
807
except Exception as exc:
808
_log_warning("Error when using find-generic-password (%s: %s)",
809
exc.__class__.__name__, exc)
810
return None
811
812
813
def _get_windows_v10_key(browser_root):
814
path = _find_most_recently_used_file(browser_root, "Local State")
815
if path is None:
816
_log_error("Unable to find Local State file")
817
return None
818
_log_debug("Found Local State file at '%s'", path)
819
with open(path, encoding="utf-8") as fp:
820
data = util.json_loads(fp.read())
821
try:
822
base64_key = data["os_crypt"]["encrypted_key"]
823
except KeyError:
824
_log_error("Unable to find encrypted key in Local State")
825
return None
826
encrypted_key = binascii.a2b_base64(base64_key)
827
prefix = b"DPAPI"
828
if not encrypted_key.startswith(prefix):
829
_log_error("Invalid Local State key")
830
return None
831
return _decrypt_windows_dpapi(encrypted_key[len(prefix):])
832
833
834
# --------------------------------------------------------------------
835
# utility
836
837
class ParserError(Exception):
838
pass
839
840
841
class DataParser:
842
def __init__(self, data):
843
self.cursor = 0
844
self._data = data
845
846
def read_bytes(self, num_bytes):
847
if num_bytes < 0:
848
raise ParserError(f"invalid read of {num_bytes} bytes")
849
end = self.cursor + num_bytes
850
if end > len(self._data):
851
raise ParserError("reached end of input")
852
data = self._data[self.cursor:end]
853
self.cursor = end
854
return data
855
856
def expect_bytes(self, expected_value, message):
857
value = self.read_bytes(len(expected_value))
858
if value != expected_value:
859
raise ParserError(f"unexpected value: {value} != {expected_value} "
860
f"({message})")
861
862
def read_uint(self, big_endian=False):
863
data_format = ">I" if big_endian else "<I"
864
return struct.unpack(data_format, self.read_bytes(4))[0]
865
866
def read_double(self, big_endian=False):
867
data_format = ">d" if big_endian else "<d"
868
return struct.unpack(data_format, self.read_bytes(8))[0]
869
870
def read_cstring(self):
871
buffer = []
872
while True:
873
c = self.read_bytes(1)
874
if c == b"\x00":
875
return b"".join(buffer).decode()
876
else:
877
buffer.append(c)
878
879
def skip(self, num_bytes, description="unknown"):
880
if num_bytes > 0:
881
_log_debug(f"Skipping {num_bytes} bytes ({description}): "
882
f"{self.read_bytes(num_bytes)!r}")
883
elif num_bytes < 0:
884
raise ParserError(f"Invalid skip of {num_bytes} bytes")
885
886
def skip_to(self, offset, description="unknown"):
887
self.skip(offset - self.cursor, description)
888
889
def skip_to_end(self, description="unknown"):
890
self.skip_to(len(self._data), description)
891
892
893
class DatabaseConnection():
894
895
def __init__(self, path):
896
self.path = path
897
self.database = None
898
self.directory = None
899
900
def __enter__(self):
901
try:
902
# https://www.sqlite.org/uri.html#the_uri_path
903
path = self.path.replace("?", "%3f").replace("#", "%23")
904
if util.WINDOWS:
905
path = "/" + os.path.abspath(path)
906
907
uri = f"file:{path}?mode=ro&immutable=1"
908
self.database = sqlite3.connect(
909
uri, uri=True, isolation_level=None, check_same_thread=False)
910
return self.database
911
except Exception as exc:
912
_log_debug("Falling back to temporary database copy (%s: %s)",
913
exc.__class__.__name__, exc)
914
915
try:
916
self.directory = tempfile.TemporaryDirectory(prefix="gallery-dl-")
917
path_copy = os.path.join(self.directory.name, "copy.sqlite")
918
shutil.copyfile(self.path, path_copy)
919
self.database = sqlite3.connect(
920
path_copy, isolation_level=None, check_same_thread=False)
921
return self.database
922
except BaseException:
923
if self.directory:
924
self.directory.cleanup()
925
raise
926
927
def __exit__(self, exc_type, exc_value, traceback):
928
self.database.close()
929
if self.directory:
930
self.directory.cleanup()
931
932
933
def Popen_communicate(*args):
934
proc = util.Popen(
935
args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
936
try:
937
stdout, stderr = proc.communicate()
938
except BaseException: # Including KeyboardInterrupt
939
proc.kill()
940
proc.wait()
941
raise
942
return proc, stdout
943
944
945
"""
946
https://chromium.googlesource.com/chromium/src/+/refs/heads
947
/main/base/nix/xdg_util.h - DesktopEnvironment
948
"""
949
DE_OTHER = "other"
950
DE_CINNAMON = "cinnamon"
951
DE_GNOME = "gnome"
952
DE_KDE = "kde"
953
DE_PANTHEON = "pantheon"
954
DE_UNITY = "unity"
955
DE_XFCE = "xfce"
956
957
958
"""
959
https://chromium.googlesource.com/chromium/src/+/refs/heads
960
/main/components/os_crypt/key_storage_util_linux.h - SelectedLinuxBackend
961
"""
962
KEYRING_KWALLET = "kwallet"
963
KEYRING_GNOMEKEYRING = "gnomekeyring"
964
KEYRING_BASICTEXT = "basictext"
965
SUPPORTED_KEYRINGS = {"kwallet", "gnomekeyring", "basictext"}
966
967
968
def _get_linux_desktop_environment(env):
969
"""
970
Ref: https://chromium.googlesource.com/chromium/src/+/refs/heads
971
/main/base/nix/xdg_util.cc - GetDesktopEnvironment
972
"""
973
xdg_current_desktop = env.get("XDG_CURRENT_DESKTOP")
974
desktop_session = env.get("DESKTOP_SESSION")
975
976
if xdg_current_desktop:
977
xdg_current_desktop = (xdg_current_desktop.partition(":")[0]
978
.strip().lower())
979
980
if xdg_current_desktop == "unity":
981
if desktop_session and "gnome-fallback" in desktop_session:
982
return DE_GNOME
983
else:
984
return DE_UNITY
985
elif xdg_current_desktop == "gnome":
986
return DE_GNOME
987
elif xdg_current_desktop == "x-cinnamon":
988
return DE_CINNAMON
989
elif xdg_current_desktop == "kde":
990
return DE_KDE
991
elif xdg_current_desktop == "pantheon":
992
return DE_PANTHEON
993
elif xdg_current_desktop == "xfce":
994
return DE_XFCE
995
996
if desktop_session:
997
if desktop_session in ("mate", "gnome"):
998
return DE_GNOME
999
if "kde" in desktop_session:
1000
return DE_KDE
1001
if "xfce" in desktop_session:
1002
return DE_XFCE
1003
1004
if "GNOME_DESKTOP_SESSION_ID" in env:
1005
return DE_GNOME
1006
if "KDE_FULL_SESSION" in env:
1007
return DE_KDE
1008
return DE_OTHER
1009
1010
1011
def _mac_absolute_time_to_posix(timestamp):
1012
# 978307200 is timestamp of 2001-01-01 00:00:00
1013
return 978307200 + int(timestamp)
1014
1015
1016
def pbkdf2_sha1(password, salt, iterations, key_length):
1017
return pbkdf2_hmac("sha1", password, salt, iterations, key_length)
1018
1019
1020
def _decrypt_aes_cbc(ciphertext, key, offset=0,
1021
initialization_vector=b" " * 16):
1022
plaintext = aes.unpad_pkcs7(aes.aes_cbc_decrypt_bytes(
1023
ciphertext, key, initialization_vector))
1024
if offset:
1025
plaintext = plaintext[offset:]
1026
try:
1027
return plaintext.decode()
1028
except UnicodeDecodeError:
1029
return None
1030
1031
1032
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, offset=0):
1033
try:
1034
plaintext = aes.aes_gcm_decrypt_and_verify_bytes(
1035
ciphertext, key, authentication_tag, nonce)
1036
if offset:
1037
plaintext = plaintext[offset:]
1038
return plaintext.decode()
1039
except UnicodeDecodeError:
1040
_log_warning("Failed to decrypt cookie (AES-GCM Unicode)")
1041
except ValueError:
1042
_log_warning("Failed to decrypt cookie (AES-GCM MAC)")
1043
return None
1044
1045
1046
def _decrypt_windows_dpapi(ciphertext):
1047
"""
1048
References:
1049
- https://docs.microsoft.com/en-us/windows
1050
/win32/api/dpapi/nf-dpapi-cryptunprotectdata
1051
"""
1052
from ctypes.wintypes import DWORD
1053
1054
class DATA_BLOB(ctypes.Structure):
1055
_fields_ = [("cbData", DWORD),
1056
("pbData", ctypes.POINTER(ctypes.c_char))]
1057
1058
buffer = ctypes.create_string_buffer(ciphertext)
1059
blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
1060
blob_out = DATA_BLOB()
1061
ret = ctypes.windll.crypt32.CryptUnprotectData(
1062
ctypes.byref(blob_in), # pDataIn
1063
None, # ppszDataDescr: human readable description of pDataIn
1064
None, # pOptionalEntropy: salt?
1065
None, # pvReserved: must be NULL
1066
None, # pPromptStruct: information about prompts to display
1067
0, # dwFlags
1068
ctypes.byref(blob_out) # pDataOut
1069
)
1070
if not ret:
1071
_log_warning("Failed to decrypt cookie (DPAPI)")
1072
return None
1073
1074
result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
1075
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
1076
return result
1077
1078
1079
def _find_most_recently_used_file(root, filename):
1080
# if the provided root points to an exact profile path
1081
# check if it contains the wanted filename
1082
first_choice = os.path.join(root, filename)
1083
if os.path.exists(first_choice):
1084
return first_choice
1085
1086
# if there are multiple browser profiles, take the most recently used one
1087
paths = []
1088
for curr_root, dirs, files in os.walk(root):
1089
for file in files:
1090
if file == filename:
1091
paths.append(os.path.join(curr_root, file))
1092
if not paths:
1093
return None
1094
return max(paths, key=lambda path: os.lstat(path).st_mtime)
1095
1096
1097
def _is_path(value):
1098
return os.path.sep in value
1099
1100
1101
def _parse_browser_specification(
1102
browser, profile=None, keyring=None, container=None, domain=None):
1103
browser = browser.lower()
1104
if browser not in SUPPORTED_BROWSERS:
1105
raise ValueError(f"Unsupported browser '{browser}'")
1106
if keyring and keyring not in SUPPORTED_KEYRINGS:
1107
raise ValueError(f"Unsupported keyring '{keyring}'")
1108
if profile and _is_path(profile):
1109
profile = os.path.expanduser(profile)
1110
return browser, profile, keyring, container, domain
1111
1112
1113
_log_cache = set()
1114
_log_debug = logger.debug
1115
_log_info = logger.info
1116
1117
1118
def _log_warning(msg, *args):
1119
if msg not in _log_cache:
1120
_log_cache.add(msg)
1121
logger.warning(msg, *args)
1122
1123
1124
def _log_error(msg, *args):
1125
if msg not in _log_cache:
1126
_log_cache.add(msg)
1127
logger.error(msg, *args)
1128
1129