Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/cookies.py
8929 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2022-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
# Adapted from yt-dlp's cookies module.
10
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/cookies.py
11
12
import binascii
13
import ctypes
14
import logging
15
import os
16
import shutil
17
import sqlite3
18
import struct
19
import subprocess
20
import sys
21
import tempfile
22
from hashlib import pbkdf2_hmac
23
from http.cookiejar import Cookie
24
from . import aes, text, util
25
26
27
SUPPORTED_BROWSERS_CHROMIUM = {
28
"brave", "chrome", "chromium", "edge", "opera", "thorium", "vivaldi"}
29
SUPPORTED_BROWSERS_FIREFOX = {"firefox", "librewolf", "zen", "floorp"}
30
SUPPORTED_BROWSERS_WEBKIT = {"safari", "orion"}
31
SUPPORTED_BROWSERS = \
32
SUPPORTED_BROWSERS_CHROMIUM \
33
| SUPPORTED_BROWSERS_FIREFOX \
34
| SUPPORTED_BROWSERS_WEBKIT
35
36
logger = logging.getLogger("cookies")
37
38
39
def load_cookies(browser_specification):
40
browser_name, profile, keyring, container, domain = \
41
_parse_browser_specification(*browser_specification)
42
if browser_name in SUPPORTED_BROWSERS_FIREFOX:
43
return load_cookies_firefox(browser_name, profile, container, domain)
44
elif browser_name in SUPPORTED_BROWSERS_WEBKIT:
45
return load_cookies_webkit(browser_name, profile, domain)
46
elif browser_name in SUPPORTED_BROWSERS_CHROMIUM:
47
return load_cookies_chromium(browser_name, profile, keyring, domain)
48
else:
49
raise ValueError(f"unknown browser '{browser_name}'")
50
51
52
def load_cookies_firefox(browser_name, profile=None,
53
container=None, domain=None):
54
path, container_id = _firefox_cookies_database(
55
browser_name, profile, container)
56
57
sql = ("SELECT name, value, host, path, isSecure, expiry "
58
"FROM moz_cookies")
59
conditions = []
60
parameters = []
61
62
if container_id is False:
63
conditions.append("NOT INSTR(originAttributes,'userContextId=')")
64
elif container_id:
65
uid = f"%userContextId={container_id}"
66
conditions.append("originAttributes LIKE ? OR originAttributes LIKE ?")
67
parameters += (uid, uid + "&%")
68
69
if domain:
70
if domain[0] == ".":
71
conditions.append("host == ? OR host LIKE ?")
72
parameters += (domain[1:], "%" + domain)
73
else:
74
conditions.append("host == ? OR host == ?")
75
parameters += (domain, "." + domain)
76
77
if conditions:
78
sql = f"{sql} WHERE ( {' ) AND ( '.join(conditions)} )"
79
80
with DatabaseConnection(path) as db:
81
cookies = [
82
Cookie(
83
0, name, value, None, False,
84
domain, True if domain else False,
85
domain[0] == "." if domain else False,
86
path, True if path else False, secure, expires,
87
False, None, None, {},
88
)
89
for name, value, domain, path, secure, expires in db.execute(
90
sql, parameters)
91
]
92
93
_log_info("Extracted %s cookies from %s",
94
len(cookies), browser_name.capitalize())
95
return cookies
96
97
98
def load_cookies_webkit(browser_name, profile=None, domain=None):
99
"""Ref.: https://github.com/libyal/dtformats/blob
100
/main/documentation/Safari%20Cookies.asciidoc
101
- This data appears to be out of date
102
but the important parts of the database structure is the same
103
- There are a few bytes here and there
104
which are skipped during parsing
105
"""
106
if browser_name == "safari":
107
with _safari_cookies_database() as fp:
108
data = fp.read()
109
elif browser_name == "orion":
110
with _orion_cookies_database() as fp:
111
data = fp.read()
112
else:
113
raise ValueError(f"unknown webkit browser '{browser_name}'")
114
115
page_sizes, body_start = _webkit_parse_cookies_header(data)
116
p = DataParser(data[body_start:])
117
118
cookies = []
119
for page_size in page_sizes:
120
_webkit_parse_cookies_page(p.read_bytes(page_size), cookies)
121
_log_info("Extracted %s cookies from %s",
122
len(cookies), browser_name.capitalize())
123
124
return cookies
125
126
127
def load_cookies_chromium(browser_name, profile=None,
128
keyring=None, domain=None):
129
config = _chromium_browser_settings(browser_name)
130
path = _chromium_cookies_database(profile, config)
131
_log_debug("Extracting cookies from %s", path)
132
133
if domain:
134
if domain[0] == ".":
135
condition = " WHERE host_key == ? OR host_key LIKE ?"
136
parameters = (domain[1:], "%" + domain)
137
else:
138
condition = " WHERE host_key == ? OR host_key == ?"
139
parameters = (domain, "." + domain)
140
else:
141
condition = ""
142
parameters = ()
143
144
with DatabaseConnection(path) as db:
145
db.text_factory = bytes
146
cursor = db.cursor()
147
148
try:
149
meta_version = int(cursor.execute(
150
"SELECT value FROM meta WHERE key = 'version'").fetchone()[0])
151
except Exception as exc:
152
_log_warning("Failed to get cookie database meta version (%s: %s)",
153
exc.__class__.__name__, exc)
154
meta_version = 0
155
156
try:
157
rows = cursor.execute(
158
"SELECT host_key, name, value, encrypted_value, path, "
159
"expires_utc, is_secure FROM cookies" + condition, parameters)
160
except sqlite3.OperationalError:
161
rows = cursor.execute(
162
"SELECT host_key, name, value, encrypted_value, path, "
163
"expires_utc, secure FROM cookies" + condition, parameters)
164
165
failed_cookies = 0
166
unencrypted_cookies = 0
167
decryptor = _chromium_cookie_decryptor(
168
config["directory"], config["keyring"], keyring, meta_version)
169
170
cookies = []
171
for domain, name, value, enc_value, path, expires, secure in rows:
172
173
if not value and enc_value: # encrypted
174
value = decryptor.decrypt(enc_value)
175
if value is None:
176
failed_cookies += 1
177
continue
178
else:
179
value = value.decode()
180
unencrypted_cookies += 1
181
182
if expires:
183
# https://stackoverflow.com/a/43520042
184
expires = int(expires) // 1000000 - 11644473600
185
else:
186
expires = None
187
188
domain = domain.decode()
189
path = path.decode()
190
name = name.decode()
191
192
cookies.append(Cookie(
193
0, name, value, None, False,
194
domain, True if domain else False,
195
domain[0] == "." if domain else False,
196
path, True if path else False, secure, expires,
197
False, None, None, {},
198
))
199
200
if failed_cookies > 0:
201
failed_message = f" ({failed_cookies} could not be decrypted)"
202
else:
203
failed_message = ""
204
205
_log_info("Extracted %s cookies from %s%s",
206
len(cookies), browser_name.capitalize(), failed_message)
207
counts = decryptor.cookie_counts
208
counts["unencrypted"] = unencrypted_cookies
209
_log_debug("version breakdown: %s", counts)
210
return cookies
211
212
213
# --------------------------------------------------------------------
214
# firefox
215
216
def _firefox_cookies_database(browser_name, profile=None, container=None):
217
if not profile:
218
search_root = _firefox_browser_directory(browser_name)
219
elif _is_path(profile):
220
search_root = profile
221
else:
222
search_root = _firefox_browser_directory(browser_name)
223
if isinstance(search_root, str):
224
search_root = os.path.join(search_root, profile)
225
else:
226
search_root = [os.path.join(dir, profile) for dir in search_root]
227
228
path = _find_most_recently_used_file(search_root, "cookies.sqlite")
229
if path is None:
230
raise FileNotFoundError(f"Unable to find {browser_name.capitalize()} "
231
f"cookies database")
232
233
_log_debug("Extracting cookies from %s", path)
234
235
if not container or container == "none":
236
container_id = False
237
_log_debug("Only loading cookies not belonging to any container")
238
239
elif container == "all":
240
container_id = None
241
242
else:
243
containers_path = os.path.join(
244
os.path.dirname(path), "containers.json")
245
246
try:
247
with open(containers_path, encoding="utf-8") as fp:
248
identities = util.json_loads(fp.read())["identities"]
249
except OSError:
250
_log_error("Unable to read Firefox container database at '%s'",
251
containers_path)
252
raise
253
except KeyError:
254
identities = ()
255
256
for context in identities:
257
if container == context.get("name") or container == text.extr(
258
context.get("l10nID", ""), "userContext", ".label"):
259
container_id = context["userContextId"]
260
break
261
else:
262
raise ValueError(f"Unable to find Firefox container '{container}'")
263
_log_debug("Only loading cookies from container '%s' (ID %s)",
264
container, container_id)
265
266
return path, container_id
267
268
269
def _firefox_browser_directory(browser_name):
270
join = os.path.join
271
272
if sys.platform in ("win32", "cygwin"):
273
appdata = os.path.expandvars("%APPDATA%")
274
return {
275
"firefox" : join(appdata, R"Mozilla\Firefox\Profiles"),
276
"librewolf": join(appdata, R"librewolf\Profiles"),
277
"zen" : join(appdata, R"zen\Profiles"),
278
"floorp" : join(appdata, R"Floorp\Profiles")
279
}[browser_name]
280
elif sys.platform == "darwin":
281
appdata = os.path.expanduser("~/Library/Application Support")
282
return {
283
"firefox" : join(appdata, R"Firefox/Profiles"),
284
"librewolf": join(appdata, R"librewolf/Profiles"),
285
"zen" : join(appdata, R"zen/Profiles"),
286
"floorp" : join(appdata, R"Floorp/Profiles")
287
}[browser_name]
288
else:
289
home = os.path.expanduser("~")
290
if browser_name == "firefox":
291
config = (os.environ.get("XDG_CONFIG_HOME") or
292
os.path.expanduser("~/.config"))
293
return (
294
# versions >= 147
295
join(config, "mozilla/firefox"),
296
# versions <= 146
297
home + "/.mozilla/firefox",
298
# Flatpak
299
home + "/.var/app/org.mozilla.firefox/config/mozilla/firefox",
300
home + "/.var/app/org.mozilla.firefox/.mozilla/firefox",
301
# Snap
302
home + "/snap/firefox/common/.mozilla/firefox",
303
)
304
return f"{home}/.{browser_name}"
305
306
307
# --------------------------------------------------------------------
308
# safari/orion/webkit
309
310
311
def _safari_cookies_database():
312
try:
313
path = os.path.expanduser("~/Library/Cookies/Cookies.binarycookies")
314
return open(path, "rb")
315
except FileNotFoundError:
316
_log_debug("Trying secondary cookie location")
317
path = os.path.expanduser("~/Library/Containers/com.apple.Safari/Data"
318
"/Library/Cookies/Cookies.binarycookies")
319
return open(path, "rb")
320
321
322
def _orion_cookies_database():
323
path = os.path.expanduser(
324
"~/Library/HTTPStorages/com.kagi.kagimacOS.binarycookies")
325
return open(path, "rb")
326
327
328
def _webkit_parse_cookies_header(data):
329
p = DataParser(data)
330
p.expect_bytes(b"cook", "database signature")
331
number_of_pages = p.read_uint(big_endian=True)
332
page_sizes = [p.read_uint(big_endian=True)
333
for _ in range(number_of_pages)]
334
return page_sizes, p.cursor
335
336
337
def _webkit_parse_cookies_page(data, cookies, domain=None):
338
p = DataParser(data)
339
p.expect_bytes(b"\x00\x00\x01\x00", "page signature")
340
number_of_cookies = p.read_uint()
341
record_offsets = [p.read_uint() for _ in range(number_of_cookies)]
342
if number_of_cookies == 0:
343
_log_debug("Cookies page of size %s has no cookies", len(data))
344
return
345
346
p.skip_to(record_offsets[0], "unknown page header field")
347
348
for i, record_offset in enumerate(record_offsets):
349
p.skip_to(record_offset, "space between records")
350
record_length = _webkit_parse_cookies_record(
351
data[record_offset:], cookies, domain)
352
p.read_bytes(record_length)
353
p.skip_to_end("space in between pages")
354
355
356
def _webkit_parse_cookies_record(data, cookies, host=None):
357
p = DataParser(data)
358
record_size = p.read_uint()
359
p.skip(4, "unknown record field 1")
360
flags = p.read_uint()
361
is_secure = True if (flags & 0x0001) else False
362
p.skip(4, "unknown record field 2")
363
domain_offset = p.read_uint()
364
name_offset = p.read_uint()
365
path_offset = p.read_uint()
366
value_offset = p.read_uint()
367
p.skip(8, "unknown record field 3")
368
expiration_date = _mac_absolute_time_to_posix(p.read_double())
369
_creation_date = _mac_absolute_time_to_posix(p.read_double()) # noqa: F841
370
371
try:
372
p.skip_to(domain_offset)
373
domain = p.read_cstring()
374
375
if host:
376
if host[0] == ".":
377
if host[1:] != domain and not domain.endswith(host):
378
return record_size
379
else:
380
if host != domain and ("." + host) != domain:
381
return record_size
382
383
p.skip_to(name_offset)
384
name = p.read_cstring()
385
386
p.skip_to(path_offset)
387
path = p.read_cstring()
388
389
p.skip_to(value_offset)
390
value = p.read_cstring()
391
except UnicodeDecodeError:
392
_log_warning("Failed to parse WebKit cookie")
393
return record_size
394
395
p.skip_to(record_size, "space at the end of the record")
396
397
cookies.append(Cookie(
398
0, name, value, None, False,
399
domain, True if domain else False,
400
domain[0] == "." if domain else False,
401
path, True if path else False, is_secure, expiration_date,
402
False, None, None, {},
403
))
404
405
return record_size
406
407
408
# --------------------------------------------------------------------
409
# chromium
410
411
def _chromium_cookies_database(profile, config):
412
if profile is None:
413
search_root = config["directory"]
414
elif _is_path(profile):
415
search_root = profile
416
config["directory"] = (os.path.dirname(profile)
417
if config["profiles"] else profile)
418
elif config["profiles"]:
419
search_root = os.path.join(config["directory"], profile)
420
else:
421
_log_warning("%s does not support profiles", config["browser"])
422
search_root = config["directory"]
423
424
path = _find_most_recently_used_file(search_root, "Cookies")
425
if path is None:
426
raise FileNotFoundError(f"Unable to find {config['browser']} cookies "
427
f"database in '{search_root}'")
428
return path
429
430
431
def _chromium_browser_settings(browser_name):
432
# https://chromium.googlesource.com/chromium
433
# /src/+/HEAD/docs/user_data_dir.md
434
join = os.path.join
435
436
if sys.platform in ("win32", "cygwin"):
437
appdata_local = os.path.expandvars("%LOCALAPPDATA%")
438
appdata_roaming = os.path.expandvars("%APPDATA%")
439
browser_dir = {
440
"brave" : join(appdata_local,
441
R"BraveSoftware\Brave-Browser\User Data"),
442
"chrome" : join(appdata_local, R"Google\Chrome\User Data"),
443
"chromium": join(appdata_local, R"Chromium\User Data"),
444
"edge" : join(appdata_local, R"Microsoft\Edge\User Data"),
445
"opera" : join(appdata_roaming, R"Opera Software\Opera Stable"),
446
"thorium" : join(appdata_local, R"Thorium\User Data"),
447
"vivaldi" : join(appdata_local, R"Vivaldi\User Data"),
448
}[browser_name]
449
450
elif sys.platform == "darwin":
451
appdata = os.path.expanduser("~/Library/Application Support")
452
browser_dir = {
453
"brave" : join(appdata, "BraveSoftware/Brave-Browser"),
454
"chrome" : join(appdata, "Google/Chrome"),
455
"chromium": join(appdata, "Chromium"),
456
"edge" : join(appdata, "Microsoft Edge"),
457
"opera" : join(appdata, "com.operasoftware.Opera"),
458
"thorium" : join(appdata, "Thorium"),
459
"vivaldi" : join(appdata, "Vivaldi"),
460
}[browser_name]
461
462
else:
463
config = (os.environ.get("XDG_CONFIG_HOME") or
464
os.path.expanduser("~/.config"))
465
browser_dir = {
466
"brave" : join(config, "BraveSoftware/Brave-Browser"),
467
"chrome" : join(config, "google-chrome"),
468
"chromium": join(config, "chromium"),
469
"edge" : join(config, "microsoft-edge"),
470
"opera" : join(config, "opera"),
471
"thorium" : join(config, "Thorium"),
472
"vivaldi" : join(config, "vivaldi"),
473
}[browser_name]
474
475
# Linux keyring names can be determined by snooping on dbus
476
# while opening the browser in KDE:
477
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
478
keyring_name = {
479
"brave" : "Brave",
480
"chrome" : "Chrome",
481
"chromium": "Chromium",
482
"edge" : "Microsoft Edge" if sys.platform == "darwin" else
483
"Chromium",
484
"opera" : "Opera" if sys.platform == "darwin" else "Chromium",
485
"thorium" : "Thorium",
486
"vivaldi" : "Vivaldi" if sys.platform == "darwin" else "Chrome",
487
}[browser_name]
488
489
browsers_without_profiles = {"opera"}
490
491
return {
492
"browser" : browser_name,
493
"directory": browser_dir,
494
"keyring" : keyring_name,
495
"profiles" : browser_name not in browsers_without_profiles
496
}
497
498
499
def _chromium_cookie_decryptor(
500
browser_root, browser_keyring_name, keyring=None, meta_version=0):
501
if sys.platform in ("win32", "cygwin"):
502
return WindowsChromiumCookieDecryptor(
503
browser_root, meta_version)
504
elif sys.platform == "darwin":
505
return MacChromiumCookieDecryptor(
506
browser_keyring_name, meta_version)
507
else:
508
return LinuxChromiumCookieDecryptor(
509
browser_keyring_name, keyring, meta_version)
510
511
512
class ChromiumCookieDecryptor:
513
"""
514
Overview:
515
516
Linux:
517
- cookies are either v10 or v11
518
- v10: AES-CBC encrypted with a fixed key
519
- v11: AES-CBC encrypted with an OS protected key (keyring)
520
- v11 keys can be stored in various places depending on the
521
activate desktop environment [2]
522
523
Mac:
524
- cookies are either v10 or not v10
525
- v10: AES-CBC encrypted with an OS protected key (keyring)
526
and more key derivation iterations than linux
527
- not v10: "old data" stored as plaintext
528
529
Windows:
530
- cookies are either v10 or not v10
531
- v10: AES-GCM encrypted with a key which is encrypted with DPAPI
532
- not v10: encrypted with DPAPI
533
534
Sources:
535
- [1] https://chromium.googlesource.com/chromium/src/+/refs/heads
536
/main/components/os_crypt/
537
- [2] https://chromium.googlesource.com/chromium/src/+/refs/heads
538
/main/components/os_crypt/key_storage_linux.cc
539
- KeyStorageLinux::CreateService
540
"""
541
542
def decrypt(self, encrypted_value):
543
raise NotImplementedError("Must be implemented by sub classes")
544
545
@property
546
def cookie_counts(self):
547
raise NotImplementedError("Must be implemented by sub classes")
548
549
550
class LinuxChromiumCookieDecryptor(ChromiumCookieDecryptor):
551
def __init__(self, browser_keyring_name, keyring=None, meta_version=0):
552
password = _get_linux_keyring_password(browser_keyring_name, keyring)
553
self._empty_key = self.derive_key(b"")
554
self._v10_key = self.derive_key(b"peanuts")
555
self._v11_key = None if password is None else self.derive_key(password)
556
self._cookie_counts = {"v10": 0, "v11": 0, "other": 0}
557
self._offset = (32 if meta_version >= 24 else 0)
558
559
def derive_key(self, password):
560
# values from
561
# https://chromium.googlesource.com/chromium/src/+/refs/heads
562
# /main/components/os_crypt/os_crypt_linux.cc
563
return pbkdf2_sha1(password, salt=b"saltysalt",
564
iterations=1, key_length=16)
565
566
@property
567
def cookie_counts(self):
568
return self._cookie_counts
569
570
def decrypt(self, encrypted_value):
571
version = encrypted_value[:3]
572
ciphertext = encrypted_value[3:]
573
574
if version == b"v10":
575
self._cookie_counts["v10"] += 1
576
value = _decrypt_aes_cbc(ciphertext, self._v10_key, self._offset)
577
578
elif version == b"v11":
579
self._cookie_counts["v11"] += 1
580
if self._v11_key is None:
581
_log_warning("Unable to decrypt v11 cookies: no key found")
582
return None
583
value = _decrypt_aes_cbc(ciphertext, self._v11_key, self._offset)
584
585
else:
586
self._cookie_counts["other"] += 1
587
return None
588
589
if value is None:
590
value = _decrypt_aes_cbc(ciphertext, self._empty_key, self._offset)
591
if value is None:
592
_log_warning("Failed to decrypt cookie (AES-CBC)")
593
return value
594
595
596
class MacChromiumCookieDecryptor(ChromiumCookieDecryptor):
597
def __init__(self, browser_keyring_name, meta_version=0):
598
password = _get_mac_keyring_password(browser_keyring_name)
599
self._v10_key = None if password is None else self.derive_key(password)
600
self._cookie_counts = {"v10": 0, "other": 0}
601
self._offset = (32 if meta_version >= 24 else 0)
602
603
def derive_key(self, password):
604
# values from
605
# https://chromium.googlesource.com/chromium/src/+/refs/heads
606
# /main/components/os_crypt/os_crypt_mac.mm
607
return pbkdf2_sha1(password, salt=b"saltysalt",
608
iterations=1003, key_length=16)
609
610
@property
611
def cookie_counts(self):
612
return self._cookie_counts
613
614
def decrypt(self, encrypted_value):
615
version = encrypted_value[:3]
616
ciphertext = encrypted_value[3:]
617
618
if version == b"v10":
619
self._cookie_counts["v10"] += 1
620
if self._v10_key is None:
621
_log_warning("Unable to decrypt v10 cookies: no key found")
622
return None
623
return _decrypt_aes_cbc(ciphertext, self._v10_key, self._offset)
624
625
else:
626
self._cookie_counts["other"] += 1
627
# other prefixes are considered "old data",
628
# which were stored as plaintext
629
# https://chromium.googlesource.com/chromium/src/+/refs/heads
630
# /main/components/os_crypt/os_crypt_mac.mm
631
return encrypted_value
632
633
634
class WindowsChromiumCookieDecryptor(ChromiumCookieDecryptor):
635
def __init__(self, browser_root, meta_version=0):
636
self._v10_key = _get_windows_v10_key(browser_root)
637
self._cookie_counts = {"v10": 0, "other": 0}
638
self._offset = (32 if meta_version >= 24 else 0)
639
640
@property
641
def cookie_counts(self):
642
return self._cookie_counts
643
644
def decrypt(self, encrypted_value):
645
version = encrypted_value[:3]
646
ciphertext = encrypted_value[3:]
647
648
if version == b"v10":
649
self._cookie_counts["v10"] += 1
650
if self._v10_key is None:
651
_log_warning("Unable to decrypt v10 cookies: no key found")
652
return None
653
654
# https://chromium.googlesource.com/chromium/src/+/refs/heads
655
# /main/components/os_crypt/os_crypt_win.cc
656
# kNonceLength
657
nonce_length = 96 // 8
658
# boringssl
659
# EVP_AEAD_AES_GCM_TAG_LEN
660
authentication_tag_length = 16
661
662
raw_ciphertext = ciphertext
663
nonce = raw_ciphertext[:nonce_length]
664
ciphertext = raw_ciphertext[
665
nonce_length:-authentication_tag_length]
666
authentication_tag = raw_ciphertext[-authentication_tag_length:]
667
668
return _decrypt_aes_gcm(
669
ciphertext, self._v10_key, nonce, authentication_tag,
670
self._offset)
671
672
else:
673
self._cookie_counts["other"] += 1
674
# any other prefix means the data is DPAPI encrypted
675
# https://chromium.googlesource.com/chromium/src/+/refs/heads
676
# /main/components/os_crypt/os_crypt_win.cc
677
return _decrypt_windows_dpapi(encrypted_value).decode()
678
679
680
# --------------------------------------------------------------------
681
# keyring
682
683
def _choose_linux_keyring():
684
"""
685
https://chromium.googlesource.com/chromium/src/+/refs/heads
686
/main/components/os_crypt/key_storage_util_linux.cc
687
SelectBackend
688
"""
689
desktop_environment = _get_linux_desktop_environment(os.environ)
690
_log_debug("Detected desktop environment: %s", desktop_environment)
691
if desktop_environment == DE_KDE:
692
return KEYRING_KWALLET
693
if desktop_environment == DE_OTHER:
694
return KEYRING_BASICTEXT
695
return KEYRING_GNOMEKEYRING
696
697
698
def _get_kwallet_network_wallet():
699
""" The name of the wallet used to store network passwords.
700
701
https://chromium.googlesource.com/chromium/src/+/refs/heads
702
/main/components/os_crypt/kwallet_dbus.cc
703
KWalletDBus::NetworkWallet
704
which does a dbus call to the following function:
705
https://api.kde.org/frameworks/kwallet/html/classKWallet_1_1Wallet.html
706
Wallet::NetworkWallet
707
"""
708
default_wallet = "kdewallet"
709
try:
710
proc, stdout = Popen_communicate(
711
"dbus-send", "--session", "--print-reply=literal",
712
"--dest=org.kde.kwalletd5",
713
"/modules/kwalletd5",
714
"org.kde.KWallet.networkWallet"
715
)
716
717
if proc.returncode != 0:
718
_log_warning("Failed to read NetworkWallet")
719
return default_wallet
720
else:
721
network_wallet = stdout.decode().strip()
722
_log_debug("NetworkWallet = '%s'", network_wallet)
723
return network_wallet
724
except Exception as exc:
725
_log_warning("Error while obtaining NetworkWallet (%s: %s)",
726
exc.__class__.__name__, exc)
727
return default_wallet
728
729
730
def _get_kwallet_password(browser_keyring_name):
731
_log_debug("Using kwallet-query to obtain password from kwallet")
732
733
if shutil.which("kwallet-query") is None:
734
_log_error(
735
"kwallet-query command not found. KWallet and kwallet-query "
736
"must be installed to read from KWallet. kwallet-query should be "
737
"included in the kwallet package for your distribution")
738
return b""
739
740
network_wallet = _get_kwallet_network_wallet()
741
742
try:
743
proc, stdout = Popen_communicate(
744
"kwallet-query",
745
"--read-password", browser_keyring_name + " Safe Storage",
746
"--folder", browser_keyring_name + " Keys",
747
network_wallet,
748
)
749
750
if proc.returncode != 0:
751
_log_error(f"kwallet-query failed with return code "
752
f"{proc.returncode}. Please consult the kwallet-query "
753
f"man page for details")
754
return b""
755
756
if stdout.lower().startswith(b"failed to read"):
757
_log_debug("Failed to read password from kwallet. "
758
"Using empty string instead")
759
# This sometimes occurs in KDE because chrome does not check
760
# hasEntry and instead just tries to read the value (which
761
# kwallet returns "") whereas kwallet-query checks hasEntry.
762
# To verify this:
763
# dbus-monitor "interface="org.kde.KWallet"" "type=method_return"
764
# while starting chrome.
765
# This may be a bug, as the intended behaviour is to generate a
766
# random password and store it, but that doesn't matter here.
767
return b""
768
else:
769
if stdout[-1:] == b"\n":
770
stdout = stdout[:-1]
771
return stdout
772
except Exception as exc:
773
_log_warning("Error when running kwallet-query (%s: %s)",
774
exc.__class__.__name__, exc)
775
return b""
776
777
778
def _get_gnome_keyring_password(browser_keyring_name):
779
try:
780
import secretstorage
781
except ImportError:
782
_log_error("'secretstorage' Python package not available")
783
return b""
784
785
# Gnome keyring does not seem to organise keys in the same way as KWallet,
786
# using `dbus-monitor` during startup, it can be observed that chromium
787
# lists all keys and presumably searches for its key in the list.
788
# It appears that we must do the same.
789
# https://github.com/jaraco/keyring/issues/556
790
con = secretstorage.dbus_init()
791
try:
792
col = secretstorage.get_default_collection(con)
793
label = browser_keyring_name + " Safe Storage"
794
for item in col.get_all_items():
795
if item.get_label() == label:
796
return item.get_secret()
797
else:
798
_log_error("Failed to read from GNOME keyring")
799
return b""
800
finally:
801
con.close()
802
803
804
def _get_linux_keyring_password(browser_keyring_name, keyring):
805
# Note: chrome/chromium can be run with the following flags
806
# to determine which keyring backend it has chosen to use
807
# - chromium --enable-logging=stderr --v=1 2>&1 | grep key_storage_
808
#
809
# Chromium supports --password-store=<basic|gnome|kwallet>
810
# so the automatic detection will not be sufficient in all cases.
811
812
if not keyring:
813
keyring = _choose_linux_keyring()
814
_log_debug("Chosen keyring: %s", keyring)
815
816
if keyring == KEYRING_KWALLET:
817
return _get_kwallet_password(browser_keyring_name)
818
elif keyring == KEYRING_GNOMEKEYRING:
819
return _get_gnome_keyring_password(browser_keyring_name)
820
elif keyring == KEYRING_BASICTEXT:
821
# when basic text is chosen, all cookies are stored as v10
822
# so no keyring password is required
823
return None
824
assert False, "Unknown keyring " + keyring
825
826
827
def _get_mac_keyring_password(browser_keyring_name):
828
_log_debug("Using find-generic-password to obtain "
829
"password from OSX keychain")
830
try:
831
proc, stdout = Popen_communicate(
832
"security", "find-generic-password",
833
"-w", # write password to stdout
834
"-a", browser_keyring_name, # match "account"
835
"-s", browser_keyring_name + " Safe Storage", # match "service"
836
)
837
838
if stdout[-1:] == b"\n":
839
stdout = stdout[:-1]
840
return stdout
841
except Exception as exc:
842
_log_warning("Error when using find-generic-password (%s: %s)",
843
exc.__class__.__name__, exc)
844
return None
845
846
847
def _get_windows_v10_key(browser_root):
848
path = _find_most_recently_used_file(browser_root, "Local State")
849
if path is None:
850
_log_error("Unable to find Local State file")
851
return None
852
_log_debug("Found Local State file at '%s'", path)
853
with open(path, encoding="utf-8") as fp:
854
data = util.json_loads(fp.read())
855
try:
856
base64_key = data["os_crypt"]["encrypted_key"]
857
except KeyError:
858
_log_error("Unable to find encrypted key in Local State")
859
return None
860
encrypted_key = binascii.a2b_base64(base64_key)
861
prefix = b"DPAPI"
862
if not encrypted_key.startswith(prefix):
863
_log_error("Invalid Local State key")
864
return None
865
return _decrypt_windows_dpapi(encrypted_key[len(prefix):])
866
867
868
# --------------------------------------------------------------------
869
# utility
870
871
class ParserError(Exception):
872
pass
873
874
875
class DataParser:
876
def __init__(self, data):
877
self.cursor = 0
878
self._data = data
879
880
def read_bytes(self, num_bytes):
881
if num_bytes < 0:
882
raise ParserError(f"invalid read of {num_bytes} bytes")
883
end = self.cursor + num_bytes
884
if end > len(self._data):
885
raise ParserError("reached end of input")
886
data = self._data[self.cursor:end]
887
self.cursor = end
888
return data
889
890
def expect_bytes(self, expected_value, message):
891
value = self.read_bytes(len(expected_value))
892
if value != expected_value:
893
raise ParserError(f"unexpected value: {value} != {expected_value} "
894
f"({message})")
895
896
def read_uint(self, big_endian=False):
897
data_format = ">I" if big_endian else "<I"
898
return struct.unpack(data_format, self.read_bytes(4))[0]
899
900
def read_double(self, big_endian=False):
901
data_format = ">d" if big_endian else "<d"
902
return struct.unpack(data_format, self.read_bytes(8))[0]
903
904
def read_cstring(self):
905
buffer = []
906
while True:
907
c = self.read_bytes(1)
908
if c == b"\x00":
909
return b"".join(buffer).decode()
910
else:
911
buffer.append(c)
912
913
def skip(self, num_bytes, description="unknown"):
914
if num_bytes > 0:
915
_log_debug(f"Skipping {num_bytes} bytes ({description}): "
916
f"{self.read_bytes(num_bytes)!r}")
917
elif num_bytes < 0:
918
raise ParserError(f"Invalid skip of {num_bytes} bytes")
919
920
def skip_to(self, offset, description="unknown"):
921
self.skip(offset - self.cursor, description)
922
923
def skip_to_end(self, description="unknown"):
924
self.skip_to(len(self._data), description)
925
926
927
class DatabaseConnection():
928
929
def __init__(self, path):
930
self.path = path
931
self.database = None
932
self.directory = None
933
934
def __enter__(self):
935
try:
936
# https://www.sqlite.org/uri.html#the_uri_path
937
path = self.path.replace("?", "%3f").replace("#", "%23")
938
if util.WINDOWS:
939
path = "/" + os.path.abspath(path)
940
941
uri = f"file:{path}?mode=ro&immutable=1"
942
self.database = sqlite3.connect(
943
uri, uri=True, isolation_level=None, check_same_thread=False)
944
return self.database
945
except Exception as exc:
946
_log_debug("Falling back to temporary database copy (%s: %s)",
947
exc.__class__.__name__, exc)
948
949
try:
950
self.directory = tempfile.TemporaryDirectory(prefix="gallery-dl-")
951
path_copy = os.path.join(self.directory.name, "copy.sqlite")
952
shutil.copyfile(self.path, path_copy)
953
self.database = sqlite3.connect(
954
path_copy, isolation_level=None, check_same_thread=False)
955
return self.database
956
except BaseException:
957
if self.directory:
958
self.directory.cleanup()
959
raise
960
961
def __exit__(self, exc_type, exc_value, traceback):
962
self.database.close()
963
if self.directory:
964
self.directory.cleanup()
965
966
967
def Popen_communicate(*args):
968
proc = util.Popen(
969
args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
970
try:
971
stdout, stderr = proc.communicate()
972
except BaseException: # Including KeyboardInterrupt
973
proc.kill()
974
proc.wait()
975
raise
976
return proc, stdout
977
978
979
"""
980
https://chromium.googlesource.com/chromium/src/+/refs/heads
981
/main/base/nix/xdg_util.h - DesktopEnvironment
982
"""
983
DE_OTHER = "other"
984
DE_CINNAMON = "cinnamon"
985
DE_GNOME = "gnome"
986
DE_KDE = "kde"
987
DE_PANTHEON = "pantheon"
988
DE_UNITY = "unity"
989
DE_XFCE = "xfce"
990
991
992
"""
993
https://chromium.googlesource.com/chromium/src/+/refs/heads
994
/main/components/os_crypt/key_storage_util_linux.h - SelectedLinuxBackend
995
"""
996
KEYRING_KWALLET = "kwallet"
997
KEYRING_GNOMEKEYRING = "gnomekeyring"
998
KEYRING_BASICTEXT = "basictext"
999
SUPPORTED_KEYRINGS = {"kwallet", "gnomekeyring", "basictext"}
1000
1001
1002
def _get_linux_desktop_environment(env):
1003
"""
1004
Ref: https://chromium.googlesource.com/chromium/src/+/refs/heads
1005
/main/base/nix/xdg_util.cc - GetDesktopEnvironment
1006
"""
1007
xdg_current_desktop = env.get("XDG_CURRENT_DESKTOP")
1008
desktop_session = env.get("DESKTOP_SESSION")
1009
1010
if xdg_current_desktop:
1011
xdg_current_desktop = (xdg_current_desktop.partition(":")[0]
1012
.strip().lower())
1013
1014
if xdg_current_desktop == "unity":
1015
if desktop_session and "gnome-fallback" in desktop_session:
1016
return DE_GNOME
1017
else:
1018
return DE_UNITY
1019
elif xdg_current_desktop == "gnome":
1020
return DE_GNOME
1021
elif xdg_current_desktop == "x-cinnamon":
1022
return DE_CINNAMON
1023
elif xdg_current_desktop == "kde":
1024
return DE_KDE
1025
elif xdg_current_desktop == "pantheon":
1026
return DE_PANTHEON
1027
elif xdg_current_desktop == "xfce":
1028
return DE_XFCE
1029
1030
if desktop_session:
1031
if desktop_session in ("mate", "gnome"):
1032
return DE_GNOME
1033
if "kde" in desktop_session:
1034
return DE_KDE
1035
if "xfce" in desktop_session:
1036
return DE_XFCE
1037
1038
if "GNOME_DESKTOP_SESSION_ID" in env:
1039
return DE_GNOME
1040
if "KDE_FULL_SESSION" in env:
1041
return DE_KDE
1042
return DE_OTHER
1043
1044
1045
def _mac_absolute_time_to_posix(timestamp):
1046
# 978307200 is timestamp of 2001-01-01 00:00:00
1047
return 978307200 + int(timestamp)
1048
1049
1050
def pbkdf2_sha1(password, salt, iterations, key_length):
1051
return pbkdf2_hmac("sha1", password, salt, iterations, key_length)
1052
1053
1054
def _decrypt_aes_cbc(ciphertext, key, offset=0,
1055
initialization_vector=b" " * 16):
1056
plaintext = aes.unpad_pkcs7(aes.aes_cbc_decrypt_bytes(
1057
ciphertext, key, initialization_vector))
1058
if offset:
1059
plaintext = plaintext[offset:]
1060
try:
1061
return plaintext.decode()
1062
except UnicodeDecodeError:
1063
return None
1064
1065
1066
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, offset=0):
1067
try:
1068
plaintext = aes.aes_gcm_decrypt_and_verify_bytes(
1069
ciphertext, key, authentication_tag, nonce)
1070
if offset:
1071
plaintext = plaintext[offset:]
1072
return plaintext.decode()
1073
except UnicodeDecodeError:
1074
_log_warning("Failed to decrypt cookie (AES-GCM Unicode)")
1075
except ValueError:
1076
_log_warning("Failed to decrypt cookie (AES-GCM MAC)")
1077
return None
1078
1079
1080
def _decrypt_windows_dpapi(ciphertext):
1081
"""
1082
References:
1083
- https://docs.microsoft.com/en-us/windows
1084
/win32/api/dpapi/nf-dpapi-cryptunprotectdata
1085
"""
1086
from ctypes.wintypes import DWORD
1087
1088
class DATA_BLOB(ctypes.Structure):
1089
_fields_ = [("cbData", DWORD),
1090
("pbData", ctypes.POINTER(ctypes.c_char))]
1091
1092
buffer = ctypes.create_string_buffer(ciphertext)
1093
blob_in = DATA_BLOB(ctypes.sizeof(buffer), buffer)
1094
blob_out = DATA_BLOB()
1095
ret = ctypes.windll.crypt32.CryptUnprotectData(
1096
ctypes.byref(blob_in), # pDataIn
1097
None, # ppszDataDescr: human readable description of pDataIn
1098
None, # pOptionalEntropy: salt?
1099
None, # pvReserved: must be NULL
1100
None, # pPromptStruct: information about prompts to display
1101
0, # dwFlags
1102
ctypes.byref(blob_out) # pDataOut
1103
)
1104
if not ret:
1105
_log_warning("Failed to decrypt cookie (DPAPI)")
1106
return None
1107
1108
result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
1109
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
1110
return result
1111
1112
1113
def _find_most_recently_used_file(roots, filename):
1114
if isinstance(roots, str):
1115
roots = (roots,)
1116
1117
# if the provided root points to an exact profile path
1118
# check if it contains the wanted filename
1119
for root in roots:
1120
first_choice = os.path.join(root, filename)
1121
if os.path.exists(first_choice):
1122
return first_choice
1123
1124
# if there are multiple browser profiles, take the most recently used one
1125
paths = []
1126
for root in roots:
1127
for curr_root, dirs, files in os.walk(root):
1128
for file in files:
1129
if file == filename:
1130
paths.append(os.path.join(curr_root, file))
1131
if not paths:
1132
return None
1133
return max(paths, key=lambda path: os.lstat(path).st_mtime)
1134
1135
1136
def _is_path(value):
1137
return os.path.sep in value
1138
1139
1140
def _parse_browser_specification(
1141
browser, profile=None, keyring=None, container=None, domain=None):
1142
browser = browser.lower()
1143
if browser not in SUPPORTED_BROWSERS:
1144
raise ValueError(f"Unsupported browser '{browser}'")
1145
if keyring and keyring not in SUPPORTED_KEYRINGS:
1146
raise ValueError(f"Unsupported keyring '{keyring}'")
1147
if profile and _is_path(profile):
1148
profile = os.path.expanduser(profile)
1149
return browser, profile, keyring, container, domain
1150
1151
1152
_log_cache = set()
1153
_log_debug = logger.debug
1154
_log_info = logger.info
1155
1156
1157
def _log_warning(msg, *args):
1158
if msg not in _log_cache:
1159
_log_cache.add(msg)
1160
logger.warning(msg, *args)
1161
1162
1163
def _log_error(msg, *args):
1164
if msg not in _log_cache:
1165
_log_cache.add(msg)
1166
logger.error(msg, *args)
1167
1168