Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/facebook.py
8891 views
1
# -*- coding: utf-8 -*-
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 2 as
5
# published by the Free Software Foundation.
6
7
"""Extractors for https://www.facebook.com/"""
8
9
from .common import Extractor, Message, Dispatch
10
from .. import text, util, exception
11
from ..cache import memcache
12
13
BASE_PATTERN = r"(?:https?://)?(?:[\w-]+\.)?facebook\.com"
14
USER_PATTERN = (BASE_PATTERN +
15
r"/(?!media/|photo/|photo.php|watch/|permalink.php)"
16
r"(?:profile\.php\?id=|people/[^/?#]+/)?([^/?&#]+)")
17
18
19
class FacebookExtractor(Extractor):
20
"""Base class for Facebook extractors"""
21
category = "facebook"
22
root = "https://www.facebook.com"
23
directory_fmt = ("{category}", "{username}", "{title} ({set_id})")
24
filename_fmt = "{id}.{extension}"
25
archive_fmt = "{id}.{extension}"
26
27
def _init(self):
28
headers = self.session.headers
29
headers["Accept"] = (
30
"text/html,application/xhtml+xml,application/xml;q=0.9,"
31
"image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8"
32
)
33
headers["Sec-Fetch-Dest"] = "empty"
34
headers["Sec-Fetch-Mode"] = "navigate"
35
headers["Sec-Fetch-Site"] = "same-origin"
36
37
self.fallback_retries = self.config("fallback-retries", 2)
38
self.videos = self.config("videos", True)
39
self.author_followups = self.config("author-followups", False)
40
41
def decode_all(self, txt):
42
return text.unescape(
43
txt.encode().decode("unicode_escape")
44
.encode("utf_16", "surrogatepass").decode("utf_16")
45
).replace("\\/", "/")
46
47
def parse_set_page(self, set_page):
48
directory = {
49
"set_id": text.extr(
50
set_page, '"mediaSetToken":"', '"'
51
) or text.extr(
52
set_page, '"mediasetToken":"', '"'
53
),
54
"username": self.decode_all(
55
text.extr(
56
set_page, '"user":{"__isProfile":"User","name":"', '","'
57
) or text.extr(
58
set_page, '"actors":[{"__typename":"User","name":"', '","'
59
)
60
),
61
"user_id": text.extr(
62
set_page, '"owner":{"__typename":"User","id":"', '"'
63
),
64
"user_pfbid": "",
65
"title": self.decode_all(text.extr(
66
set_page, '"title":{"text":"', '"'
67
)),
68
"first_photo_id": text.extr(
69
set_page,
70
'{"__typename":"Photo","__isMedia":"Photo","',
71
'","creation_story"'
72
).rsplit('"id":"', 1)[-1] or
73
text.extr(
74
set_page, '{"__typename":"Photo","id":"', '"'
75
)
76
}
77
78
if directory["user_id"].startswith("pfbid"):
79
directory["user_pfbid"] = directory["user_id"]
80
directory["user_id"] = (
81
text.extr(
82
set_page, '"actors":[{"__typename":"User","id":"', '"') or
83
text.extr(
84
set_page, '"userID":"', '"') or
85
directory["set_id"].split(".")[1])
86
87
return directory
88
89
def parse_photo_page(self, photo_page):
90
photo = {
91
"id": text.extr(
92
photo_page, '"__isNode":"Photo","id":"', '"'
93
),
94
"set_id": text.extr(
95
photo_page,
96
'"url":"https:\\/\\/www.facebook.com\\/photo\\/?fbid=',
97
'"'
98
).rsplit("&set=", 1)[-1],
99
"username": self.decode_all(text.extr(
100
photo_page, '"owner":{"__typename":"User","name":"', '"'
101
)),
102
"user_id": text.extr(
103
photo_page, '"owner":{"__typename":"User","id":"', '"'
104
),
105
"user_pfbid": "",
106
"caption": self.decode_all(text.extr(
107
photo_page,
108
'"message":{"delight_ranges"',
109
'"},"message_preferred_body"'
110
).rsplit('],"text":"', 1)[-1]),
111
"date": self.parse_timestamp(
112
text.extr(photo_page, '\\"publish_time\\":', ',') or
113
text.extr(photo_page, '"created_time":', ',')
114
),
115
"url": self.decode_all(text.extr(
116
photo_page, ',"image":{"uri":"', '","'
117
)),
118
"next_photo_id": text.extr(
119
photo_page,
120
'"nextMediaAfterNodeId":{"__typename":"Photo","id":"',
121
'"'
122
) or text.extr(
123
photo_page,
124
'"nextMedia":{"edges":[{"node":{"__typename":"Photo","id":"',
125
'"'
126
)
127
}
128
129
if photo["user_id"].startswith("pfbid"):
130
photo["user_pfbid"] = photo["user_id"]
131
photo["user_id"] = text.extr(
132
photo_page, r'\"content_owner_id_new\":\"', r'\"')
133
134
text.nameext_from_url(photo["url"], photo)
135
136
photo["followups_ids"] = []
137
for comment_raw in text.extract_iter(
138
photo_page, '{"node":{"id"', '"cursor":null}'
139
):
140
if ('"is_author_original_poster":true' in comment_raw and
141
'{"__typename":"Photo","id":"' in comment_raw):
142
photo["followups_ids"].append(text.extr(
143
comment_raw,
144
'{"__typename":"Photo","id":"',
145
'"'
146
))
147
148
return photo
149
150
def parse_post_page(self, post_page):
151
first_photo_url = text.extr(
152
text.extr(
153
post_page, '"__isMedia":"Photo"', '"target_group"'
154
), '"url":"', ','
155
)
156
157
post = {
158
"set_id": text.extr(post_page, '{"mediaset_token":"', '"') or
159
text.extr(first_photo_url, 'set=', '"').rsplit("&", 1)[0]
160
}
161
162
return post
163
164
def parse_video_page(self, video_page):
165
video = {
166
"id": text.extr(
167
video_page, '\\"video_id\\":\\"', '\\"'
168
),
169
"username": self.decode_all(text.extr(
170
video_page, '"actors":[{"__typename":"User","name":"', '","'
171
)),
172
"user_id": text.extr(
173
video_page, '"owner":{"__typename":"User","id":"', '"'
174
),
175
"date": self.parse_timestamp(text.extr(
176
video_page, '\\"publish_time\\":', ','
177
)),
178
"type": "video"
179
}
180
181
if not video["username"]:
182
video["username"] = self.decode_all(text.extr(
183
video_page,
184
'"__typename":"User","id":"' + video["user_id"] + '","name":"',
185
'","'
186
))
187
188
first_video_raw = text.extr(
189
video_page, '"permalink_url"', '\\/Period>\\u003C\\/MPD>'
190
)
191
192
audio = {
193
**video,
194
"url": self.decode_all(text.extr(
195
text.extr(
196
first_video_raw,
197
"AudioChannelConfiguration",
198
"BaseURL>\\u003C"
199
),
200
"BaseURL>", "\\u003C\\/"
201
)),
202
"type": "audio"
203
}
204
205
video["urls"] = {}
206
207
for raw_url in text.extract_iter(
208
first_video_raw, 'FBQualityLabel=\\"', '\\u003C\\/BaseURL>'
209
):
210
resolution = raw_url.split('\\"', 1)[0]
211
video["urls"][resolution] = self.decode_all(
212
raw_url.split('BaseURL>', 1)[1]
213
)
214
215
if not video["urls"]:
216
return video, audio
217
218
video["url"] = max(
219
video["urls"].items(),
220
key=lambda x: text.parse_int(x[0][:-1])
221
)[1]
222
223
text.nameext_from_url(video["url"], video)
224
audio["filename"] = video["filename"]
225
audio["extension"] = "m4a"
226
227
return video, audio
228
229
def photo_page_request_wrapper(self, url, **kwargs):
230
LEFT_OFF_TXT = "" if url.endswith("&set=") else (
231
"\nYou can use this URL to continue from "
232
"where you left off (added \"&setextract\"): "
233
"\n" + url + "&setextract"
234
)
235
236
res = self.request(url, **kwargs)
237
238
if res.url.startswith(self.root + "/login"):
239
raise exception.AuthRequired(
240
message=("You must be logged in to continue viewing images." +
241
LEFT_OFF_TXT))
242
243
if b'{"__dr":"CometErrorRoot.react"}' in res.content:
244
raise exception.AbortExtraction(
245
"You've been temporarily blocked from viewing images.\n"
246
"Please try using a different account, "
247
"using a VPN or waiting before you retry." + LEFT_OFF_TXT)
248
249
return res
250
251
def extract_set(self, set_data):
252
set_id = set_data["set_id"]
253
all_photo_ids = [set_data["first_photo_id"]]
254
255
retries = 0
256
i = 0
257
258
while i < len(all_photo_ids):
259
photo_id = all_photo_ids[i]
260
photo_url = f"{self.root}/photo/?fbid={photo_id}&set={set_id}"
261
photo_page = self.photo_page_request_wrapper(photo_url).text
262
263
photo = self.parse_photo_page(photo_page)
264
photo["num"] = i + 1
265
266
if self.author_followups:
267
for followup_id in photo["followups_ids"]:
268
if followup_id not in all_photo_ids:
269
self.log.debug(
270
"Found a followup in comments: %s", followup_id
271
)
272
all_photo_ids.append(followup_id)
273
274
if not photo["url"]:
275
if retries < self.fallback_retries and self._interval_429:
276
seconds = self._interval_429()
277
self.log.warning(
278
"Failed to find photo download URL for %s. "
279
"Retrying in %s seconds.", photo_url, seconds,
280
)
281
self.wait(seconds=seconds, reason="429 Too Many Requests")
282
retries += 1
283
continue
284
else:
285
self.log.error(
286
"Failed to find photo download URL for " + photo_url +
287
". Skipping."
288
)
289
retries = 0
290
else:
291
retries = 0
292
photo.update(set_data)
293
yield Message.Directory, "", photo
294
yield Message.Url, photo["url"], photo
295
296
if not photo["next_photo_id"]:
297
self.log.debug(
298
"Can't find next image in the set. "
299
"Extraction is over."
300
)
301
elif photo["next_photo_id"] in all_photo_ids:
302
if photo["next_photo_id"] != photo["id"]:
303
self.log.debug(
304
"Detected a loop in the set, it's likely finished. "
305
"Extraction is over."
306
)
307
elif int(photo["next_photo_id"]) > int(photo["id"]) + i*120:
308
self.log.info(
309
"Detected jump to the beginning of the set. (%s -> %s)",
310
photo["id"], photo["next_photo_id"])
311
if self.config("loop", False):
312
all_photo_ids.append(photo["next_photo_id"])
313
else:
314
all_photo_ids.append(photo["next_photo_id"])
315
316
i += 1
317
318
@memcache(keyarg=1)
319
def _extract_profile(self, profile, set_id=False):
320
if set_id:
321
url = f"{self.root}/{profile}/photos_by"
322
else:
323
url = f"{self.root}/{profile}"
324
return self._extract_profile_page(url)
325
326
def _extract_profile_page(self, url):
327
for _ in range(self.fallback_retries + 1):
328
page = self.request(url).text
329
330
if page.find('>Page Not Found</title>', 0, 3000) > 0:
331
break
332
if ('"props":{"title":"This content isn\'t available right now"' in
333
page):
334
raise exception.AuthRequired(
335
"authenticated cookies", "profile",
336
"This content isn't available right now")
337
338
set_id = self._extract_profile_set_id(page)
339
user = self._extract_profile_user(page)
340
if set_id or user:
341
user["set_id"] = set_id
342
return user
343
344
self.log.debug("Got empty profile photos page, retrying...")
345
return {}
346
347
def _extract_profile_set_id(self, profile_photos_page):
348
set_ids_raw = text.extr(
349
profile_photos_page, '"pageItems"', '"page_info"'
350
)
351
352
set_id = text.extr(
353
set_ids_raw, 'set=', '"'
354
).rsplit("&", 1)[0] or text.extr(
355
set_ids_raw, '\\/photos\\/', '\\/'
356
)
357
358
return set_id
359
360
def _extract_profile_user(self, page):
361
data = text.extr(page, '","user":{"', '},"viewer":{')
362
363
user = None
364
try:
365
user = util.json_loads(f'{{"{data}}}')
366
if user["id"].startswith("pfbid"):
367
user["user_pfbid"] = user["id"]
368
user["id"] = text.extr(page, '"userID":"', '"')
369
user["username"] = (text.extr(page, '"userVanity":"', '"') or
370
text.extr(page, '"vanity":"', '"'))
371
user["profile_tabs"] = [
372
edge["node"]
373
for edge in (user["profile_tabs"]["profile_user"]
374
["timeline_nav_app_sections"]["edges"])
375
]
376
377
if bio := text.extr(page, '"best_description":{"text":"', '"'):
378
user["biography"] = self.decode_all(bio)
379
elif (pos := page.find(
380
'"__module_operation_ProfileCometTileView_profileT')) >= 0:
381
user["biography"] = self.decode_all(text.rextr(
382
page, '"text":"', '"', pos))
383
else:
384
user["biography"] = text.unescape(text.remove_html(text.extr(
385
page, "</span></span></h2>", "<ul>")))
386
except Exception:
387
if user is None:
388
self.log.debug("Failed to extract user data: %s", data)
389
user = {}
390
return user
391
392
393
class FacebookPhotoExtractor(FacebookExtractor):
394
"""Base class for Facebook Photo extractors"""
395
subcategory = "photo"
396
pattern = (BASE_PATTERN +
397
r"/(?:[^/?#]+/photos/[^/?#]+/|photo(?:.php)?/?\?"
398
r"(?:[^&#]+&)*fbid=)([^/?&#]+)[^/?#]*(?<!&setextract)$")
399
example = "https://www.facebook.com/photo/?fbid=PHOTO_ID"
400
401
def items(self):
402
photo_id = self.groups[0]
403
photo_url = f"{self.root}/photo/?fbid={photo_id}&set="
404
photo_page = self.photo_page_request_wrapper(photo_url).text
405
406
i = 1
407
photo = self.parse_photo_page(photo_page)
408
photo["num"] = i
409
410
set_url = f"{self.root}/media/set/?set={photo['set_id']}"
411
set_page = self.request(set_url).text
412
413
directory = self.parse_set_page(set_page)
414
415
yield Message.Directory, "", directory
416
yield Message.Url, photo["url"], photo
417
418
if self.author_followups:
419
for comment_photo_id in photo["followups_ids"]:
420
comment_photo = self.parse_photo_page(
421
self.photo_page_request_wrapper(
422
f"{self.root}/photo/?fbid={comment_photo_id}&set="
423
).text
424
)
425
i += 1
426
comment_photo["num"] = i
427
yield Message.Url, comment_photo["url"], comment_photo
428
429
430
class FacebookSetExtractor(FacebookExtractor):
431
"""Base class for Facebook Set extractors"""
432
subcategory = "set"
433
pattern = (
434
BASE_PATTERN +
435
r"/(?:(?:media/set|photo)/?\?(?:[^&#]+&)*set=([^&#]+)"
436
r"[^/?#]*(?<!&setextract)$"
437
r"|([^/?#]+/posts/[^/?#]+)"
438
r"|photo/\?(?:[^&#]+&)*fbid=([^/?&#]+)&set=([^/?&#]+)&setextract)"
439
)
440
example = "https://www.facebook.com/media/set/?set=SET_ID"
441
442
def items(self):
443
set_id = self.groups[0] or self.groups[3]
444
if path := self.groups[1]:
445
post_url = self.root + "/" + path
446
post_page = self.request(post_url).text
447
set_id = self.parse_post_page(post_page)["set_id"]
448
449
set_url = f"{self.root}/media/set/?set={set_id}"
450
set_page = self.request(set_url).text
451
set_data = self.parse_set_page(set_page)
452
if self.groups[2]:
453
set_data["first_photo_id"] = self.groups[2]
454
455
return self.extract_set(set_data)
456
457
458
class FacebookVideoExtractor(FacebookExtractor):
459
"""Base class for Facebook Video extractors"""
460
subcategory = "video"
461
directory_fmt = ("{category}", "{username}", "{subcategory}")
462
pattern = BASE_PATTERN + r"/(?:[^/?#]+/videos/|watch/?\?v=)([^/?&#]+)"
463
example = "https://www.facebook.com/watch/?v=VIDEO_ID"
464
465
def items(self):
466
video_id = self.groups[0]
467
video_url = self.root + "/watch/?v=" + video_id
468
video_page = self.request(video_url).text
469
470
video, audio = self.parse_video_page(video_page)
471
472
if "url" not in video:
473
return
474
475
yield Message.Directory, "", video
476
477
if self.videos == "ytdl":
478
yield Message.Url, "ytdl:" + video_url, video
479
elif self.videos:
480
yield Message.Url, video["url"], video
481
if audio["url"]:
482
yield Message.Url, audio["url"], audio
483
484
485
class FacebookInfoExtractor(FacebookExtractor):
486
"""Extractor for Facebook Profile data"""
487
subcategory = "info"
488
directory_fmt = ("{category}", "{username}")
489
pattern = USER_PATTERN + r"/info"
490
example = "https://www.facebook.com/USERNAME/info"
491
492
def items(self):
493
user = self._extract_profile(self.groups[0])
494
return iter(((Message.Directory, "", user),))
495
496
497
class FacebookAlbumsExtractor(FacebookExtractor):
498
"""Extractor for Facebook Profile albums"""
499
subcategory = "albums"
500
pattern = USER_PATTERN + r"/photos_albums(?:/([^/?#]+))?"
501
example = "https://www.facebook.com/USERNAME/photos_albums"
502
503
def items(self):
504
profile, name = self.groups
505
url = f"{self.root}/{profile}/photos_albums"
506
page = self.request(url).text
507
508
pos = page.find(
509
'"TimelineAppCollectionAlbumsRenderer","collection":{"id":"')
510
if pos < 0:
511
return
512
if name is not None:
513
name = name.lower()
514
515
items = text.extract(page, '},"pageItems":', '}}},', pos)[0]
516
edges = util.json_loads(items + "}}")["edges"]
517
518
# TODO: use /graphql API endpoint
519
for edge in edges:
520
node = edge["node"]
521
album = node["node"]
522
album["title"] = title = node["title"]["text"]
523
if name is not None and name != title.lower():
524
continue
525
album["_extractor"] = FacebookSetExtractor
526
album["thumbnail"] = (img := node["image"]) and img["uri"]
527
yield Message.Queue, album["url"], album
528
529
530
class FacebookPhotosExtractor(FacebookExtractor):
531
"""Extractor for Facebook Profile Photos"""
532
subcategory = "photos"
533
pattern = USER_PATTERN + r"/photos(?:_by)?"
534
example = "https://www.facebook.com/USERNAME/photos"
535
536
def items(self):
537
set_id = self._extract_profile(self.groups[0], True)["set_id"]
538
if not set_id:
539
return iter(())
540
541
set_url = f"{self.root}/media/set/?set={set_id}"
542
set_page = self.request(set_url).text
543
set_data = self.parse_set_page(set_page)
544
return self.extract_set(set_data)
545
546
547
class FacebookAvatarExtractor(FacebookExtractor):
548
"""Extractor for Facebook Profile Avatars"""
549
subcategory = "avatar"
550
pattern = USER_PATTERN + r"/avatar"
551
example = "https://www.facebook.com/USERNAME/avatar"
552
553
def items(self):
554
user = self._extract_profile(self.groups[0])
555
avatar_page_url = user["profilePhoto"]["url"]
556
avatar_page = self.photo_page_request_wrapper(avatar_page_url).text
557
558
avatar = self.parse_photo_page(avatar_page)
559
avatar["count"] = avatar["num"] = 1
560
avatar["type"] = "avatar"
561
562
set_url = f"{self.root}/media/set/?set={avatar['set_id']}"
563
set_page = self.request(set_url).text
564
directory = self.parse_set_page(set_page)
565
566
yield Message.Directory, "", directory
567
yield Message.Url, avatar["url"], avatar
568
569
570
class FacebookUserExtractor(Dispatch, FacebookExtractor):
571
"""Extractor for Facebook Profiles"""
572
pattern = USER_PATTERN + r"/?(?:$|\?|#)"
573
example = "https://www.facebook.com/USERNAME"
574
575
def items(self):
576
base = f"{self.root}/{self.groups[0]}/"
577
return self._dispatch_extractors((
578
(FacebookInfoExtractor , base + "info"),
579
(FacebookAvatarExtractor, base + "avatar"),
580
(FacebookPhotosExtractor, base + "photos"),
581
(FacebookAlbumsExtractor, base + "photos_albums"),
582
), ("photos",))
583
584