Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/bluesky.py
8935 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2024-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Extractors for https://bsky.app/"""
10
11
from .common import Extractor, Message, Dispatch
12
from .. import text, util, exception
13
from ..cache import cache, memcache
14
15
BASE_PATTERN = (r"(?:https?://)?"
16
r"(?:(?:www\.)?(?:c|[fv]x)?bs[ky]y[ex]?\.app|main\.bsky\.dev)")
17
USER_PATTERN = BASE_PATTERN + r"/profile/([^/?#]+)"
18
19
20
class BlueskyExtractor(Extractor):
21
"""Base class for bluesky extractors"""
22
category = "bluesky"
23
directory_fmt = ("{category}", "{author[handle]}")
24
filename_fmt = "{createdAt[:19]}_{post_id}_{num}.{extension}"
25
archive_fmt = "{filename}"
26
root = "https://bsky.app"
27
28
def _init(self):
29
if meta := self.config("metadata") or ():
30
if isinstance(meta, str):
31
meta = meta.replace(" ", "").split(",")
32
elif not isinstance(meta, (list, tuple)):
33
meta = ("user", "facets")
34
self._metadata_user = ("user" in meta)
35
self._metadata_facets = ("facets" in meta)
36
37
self.api = BlueskyAPI(self)
38
self._user = self._user_did = None
39
self.instance = self.root.partition("://")[2]
40
self.videos = self.config("videos", True)
41
self.quoted = self.config("quoted", False)
42
43
def items(self):
44
for post in self.posts():
45
if "post" in post:
46
post = post["post"]
47
elif "item" in post:
48
post = post["item"]
49
if self._user_did and post["author"]["did"] != self._user_did:
50
self.log.debug("Skipping %s (repost)", self._pid(post))
51
continue
52
embed = post.get("embed")
53
try:
54
post.update(post.pop("record"))
55
except Exception:
56
self.log.debug("Skipping %s (no 'record')", self._pid(post))
57
continue
58
59
while True:
60
self._prepare(post)
61
files = self._extract_files(post)
62
63
yield Message.Directory, "", post
64
if files:
65
did = post["author"]["did"]
66
base = (f"{self.api.service_endpoint(did)}/xrpc"
67
f"/com.atproto.sync.getBlob?did={did}&cid=")
68
for post["num"], file in enumerate(files, 1):
69
post.update(file)
70
yield Message.Url, base + file["filename"], post
71
72
if not self.quoted or not embed or "record" not in embed:
73
break
74
75
quote = embed["record"]
76
if "record" in quote:
77
quote = quote["record"]
78
value = quote.pop("value", None)
79
if value is None:
80
break
81
quote["quote_id"] = self._pid(post)
82
quote["quote_by"] = post["author"]
83
embed = quote.get("embed")
84
quote.update(value)
85
post = quote
86
87
def posts(self):
88
return ()
89
90
def _posts_records(self, actor, collection):
91
depth = self.config("depth", "0")
92
93
for record in self.api.list_records(actor, collection):
94
uri = None
95
try:
96
uri = record["value"]["subject"]["uri"]
97
if "/app.bsky.feed.post/" in uri:
98
yield from self.api.get_post_thread_uri(uri, depth)
99
except exception.ControlException:
100
pass # deleted post
101
except Exception as exc:
102
self.log.debug(record, exc_info=exc)
103
self.log.warning("Failed to extract %s (%s: %s)",
104
uri or "record", exc.__class__.__name__, exc)
105
106
def _pid(self, post):
107
return post["uri"].rpartition("/")[2]
108
109
@memcache(keyarg=1)
110
def _instance(self, handle):
111
return ".".join(handle.rsplit(".", 2)[-2:])
112
113
def _prepare(self, post):
114
author = post["author"]
115
author["instance"] = self._instance(author["handle"])
116
117
if self._metadata_facets:
118
if "facets" in post:
119
post["hashtags"] = tags = []
120
post["mentions"] = dids = []
121
post["uris"] = uris = []
122
for facet in post["facets"]:
123
features = facet["features"][0]
124
if "tag" in features:
125
tags.append(features["tag"])
126
elif "did" in features:
127
dids.append(features["did"])
128
elif "uri" in features:
129
uris.append(features["uri"])
130
else:
131
post["hashtags"] = post["mentions"] = post["uris"] = ()
132
133
if self._metadata_user:
134
post["user"] = self._user or author
135
136
post["instance"] = self.instance
137
post["post_id"] = self._pid(post)
138
post["date"] = self.parse_datetime_iso(post["createdAt"][:19])
139
140
def _extract_files(self, post):
141
if "embed" not in post:
142
post["count"] = 0
143
return ()
144
145
files = []
146
media = post["embed"]
147
if "media" in media:
148
media = media["media"]
149
150
if "images" in media:
151
for image in media["images"]:
152
try:
153
files.append(self._extract_media(image, "image"))
154
except Exception:
155
pass
156
if "video" in media and self.videos:
157
try:
158
files.append(self._extract_media(media, "video"))
159
except Exception:
160
pass
161
162
post["count"] = len(files)
163
return files
164
165
def _extract_media(self, media, key):
166
try:
167
aspect = media["aspectRatio"]
168
width = aspect["width"]
169
height = aspect["height"]
170
except KeyError:
171
width = height = 0
172
173
data = media[key]
174
try:
175
cid = data["ref"]["$link"]
176
except KeyError:
177
cid = data["cid"]
178
179
return {
180
"description": media.get("alt") or "",
181
"width" : width,
182
"height" : height,
183
"filename" : cid,
184
"extension" : data["mimeType"].rpartition("/")[2],
185
}
186
187
def _make_post(self, actor, kind):
188
did = self.api._did_from_actor(actor)
189
profile = self.api.get_profile(did)
190
191
if kind not in profile:
192
return ()
193
cid = profile[kind].rpartition("/")[2].partition("@")[0]
194
195
return ({
196
"post": {
197
"embed": {"images": [{
198
"alt": kind,
199
"image": {
200
"$type" : "blob",
201
"ref" : {"$link": cid},
202
"mimeType": "image/jpeg",
203
"size" : 0,
204
},
205
"aspectRatio": {
206
"width" : 1000,
207
"height": 1000,
208
},
209
}]},
210
"author" : profile,
211
"record" : (),
212
"createdAt": "",
213
"uri" : cid,
214
},
215
},)
216
217
218
class BlueskyUserExtractor(Dispatch, BlueskyExtractor):
219
pattern = USER_PATTERN + r"$"
220
example = "https://bsky.app/profile/HANDLE"
221
222
def items(self):
223
base = f"{self.root}/profile/{self.groups[0]}/"
224
default = ("posts" if self.config("quoted", False) or
225
self.config("reposts", False) else "media")
226
return self._dispatch_extractors((
227
(BlueskyInfoExtractor , base + "info"),
228
(BlueskyAvatarExtractor , base + "avatar"),
229
(BlueskyBackgroundExtractor, base + "banner"),
230
(BlueskyPostsExtractor , base + "posts"),
231
(BlueskyRepliesExtractor , base + "replies"),
232
(BlueskyMediaExtractor , base + "media"),
233
(BlueskyVideoExtractor , base + "video"),
234
(BlueskyLikesExtractor , base + "likes"),
235
), (default,))
236
237
238
class BlueskyPostsExtractor(BlueskyExtractor):
239
subcategory = "posts"
240
pattern = USER_PATTERN + r"/posts"
241
example = "https://bsky.app/profile/HANDLE/posts"
242
243
def posts(self):
244
return self.api.get_author_feed(
245
self.groups[0], "posts_and_author_threads")
246
247
248
class BlueskyRepliesExtractor(BlueskyExtractor):
249
subcategory = "replies"
250
pattern = USER_PATTERN + r"/replies"
251
example = "https://bsky.app/profile/HANDLE/replies"
252
253
def posts(self):
254
return self.api.get_author_feed(
255
self.groups[0], "posts_with_replies")
256
257
258
class BlueskyMediaExtractor(BlueskyExtractor):
259
subcategory = "media"
260
pattern = USER_PATTERN + r"/media"
261
example = "https://bsky.app/profile/HANDLE/media"
262
263
def posts(self):
264
return self.api.get_author_feed(
265
self.groups[0], "posts_with_media")
266
267
268
class BlueskyVideoExtractor(BlueskyExtractor):
269
subcategory = "video"
270
pattern = USER_PATTERN + r"/video"
271
example = "https://bsky.app/profile/HANDLE/video"
272
273
def posts(self):
274
return self.api.get_author_feed(
275
self.groups[0], "posts_with_video")
276
277
278
class BlueskyLikesExtractor(BlueskyExtractor):
279
subcategory = "likes"
280
pattern = USER_PATTERN + r"/likes"
281
example = "https://bsky.app/profile/HANDLE/likes"
282
283
def posts(self):
284
if self.config("endpoint") == "getActorLikes":
285
return self.api.get_actor_likes(self.groups[0])
286
return self._posts_records(self.groups[0], "app.bsky.feed.like")
287
288
289
class BlueskyFeedExtractor(BlueskyExtractor):
290
subcategory = "feed"
291
pattern = USER_PATTERN + r"/feed/([^/?#]+)"
292
example = "https://bsky.app/profile/HANDLE/feed/NAME"
293
294
def posts(self):
295
actor, feed = self.groups
296
return self.api.get_feed(actor, feed)
297
298
299
class BlueskyListExtractor(BlueskyExtractor):
300
subcategory = "list"
301
pattern = USER_PATTERN + r"/lists/([^/?#]+)"
302
example = "https://bsky.app/profile/HANDLE/lists/ID"
303
304
def posts(self):
305
actor, list_id = self.groups
306
return self.api.get_list_feed(actor, list_id)
307
308
309
class BlueskyFollowingExtractor(BlueskyExtractor):
310
subcategory = "following"
311
pattern = USER_PATTERN + r"/follows"
312
example = "https://bsky.app/profile/HANDLE/follows"
313
314
def items(self):
315
for user in self.api.get_follows(self.groups[0]):
316
url = "https://bsky.app/profile/" + user["did"]
317
user["_extractor"] = BlueskyUserExtractor
318
yield Message.Queue, url, user
319
320
321
class BlueskyPostExtractor(BlueskyExtractor):
322
subcategory = "post"
323
pattern = USER_PATTERN + r"/post/([^/?#]+)"
324
example = "https://bsky.app/profile/HANDLE/post/ID"
325
326
def posts(self):
327
actor, post_id = self.groups
328
return self.api.get_post_thread(actor, post_id)
329
330
331
class BlueskyInfoExtractor(BlueskyExtractor):
332
subcategory = "info"
333
pattern = USER_PATTERN + r"/info"
334
example = "https://bsky.app/profile/HANDLE/info"
335
336
def items(self):
337
self._metadata_user = True
338
self.api._did_from_actor(self.groups[0])
339
return iter(((Message.Directory, "", self._user),))
340
341
342
class BlueskyAvatarExtractor(BlueskyExtractor):
343
subcategory = "avatar"
344
filename_fmt = "avatar_{post_id}.{extension}"
345
pattern = USER_PATTERN + r"/avatar"
346
example = "https://bsky.app/profile/HANDLE/avatar"
347
348
def posts(self):
349
return self._make_post(self.groups[0], "avatar")
350
351
352
class BlueskyBackgroundExtractor(BlueskyExtractor):
353
subcategory = "background"
354
filename_fmt = "background_{post_id}.{extension}"
355
pattern = USER_PATTERN + r"/ba(?:nner|ckground)"
356
example = "https://bsky.app/profile/HANDLE/banner"
357
358
def posts(self):
359
return self._make_post(self.groups[0], "banner")
360
361
362
class BlueskySearchExtractor(BlueskyExtractor):
363
subcategory = "search"
364
pattern = BASE_PATTERN + r"/search(?:/|\?q=)(.+)"
365
example = "https://bsky.app/search?q=QUERY"
366
367
def posts(self):
368
query = text.unquote(self.groups[0].replace("+", " "))
369
return self.api.search_posts(query)
370
371
372
class BlueskyHashtagExtractor(BlueskyExtractor):
373
subcategory = "hashtag"
374
pattern = BASE_PATTERN + r"/hashtag/([^/?#]+)(?:/(top|latest))?"
375
example = "https://bsky.app/hashtag/NAME"
376
377
def posts(self):
378
hashtag, order = self.groups
379
return self.api.search_posts("#"+hashtag, order)
380
381
382
class BlueskyBookmarkExtractor(BlueskyExtractor):
383
subcategory = "bookmark"
384
pattern = BASE_PATTERN + r"/saved"
385
example = "https://bsky.app/saved"
386
387
def posts(self):
388
return self.api.get_bookmarks()
389
390
391
class BlueskyAPI():
392
"""Interface for the Bluesky API
393
394
https://docs.bsky.app/docs/category/http-reference
395
"""
396
397
def __init__(self, extractor):
398
self.extractor = extractor
399
self.log = extractor.log
400
self.headers = {"Accept": "application/json"}
401
402
self.username, self.password = extractor._get_auth_info()
403
if srv := extractor.config("api-server", False):
404
self.root = srv.rstrip("/")
405
elif self.username:
406
self.root = "https://bsky.social"
407
else:
408
self.root = "https://api.bsky.app"
409
self.authenticate = util.noop
410
411
def get_actor_likes(self, actor):
412
endpoint = "app.bsky.feed.getActorLikes"
413
params = {
414
"actor": self._did_from_actor(actor),
415
"limit": "100",
416
}
417
return self._pagination(endpoint, params, check_empty=True)
418
419
def get_author_feed(self, actor, filter="posts_and_author_threads"):
420
endpoint = "app.bsky.feed.getAuthorFeed"
421
params = {
422
"actor" : self._did_from_actor(actor, True),
423
"filter": filter,
424
"limit" : "100",
425
}
426
return self._pagination(endpoint, params)
427
428
def get_bookmarks(self):
429
endpoint = "app.bsky.bookmark.getBookmarks"
430
return self._pagination(endpoint, {}, "bookmarks", check_empty=True)
431
432
def get_feed(self, actor, feed):
433
endpoint = "app.bsky.feed.getFeed"
434
uri = (f"at://{self._did_from_actor(actor)}"
435
f"/app.bsky.feed.generator/{feed}")
436
params = {"feed": uri, "limit": "100"}
437
return self._pagination(endpoint, params)
438
439
def get_follows(self, actor):
440
endpoint = "app.bsky.graph.getFollows"
441
params = {
442
"actor": self._did_from_actor(actor),
443
"limit": "100",
444
}
445
return self._pagination(endpoint, params, "follows")
446
447
def get_list_feed(self, actor, list):
448
endpoint = "app.bsky.feed.getListFeed"
449
uri = f"at://{self._did_from_actor(actor)}/app.bsky.graph.list/{list}"
450
params = {"list" : uri, "limit": "100"}
451
return self._pagination(endpoint, params)
452
453
def get_post_thread(self, actor, post_id):
454
uri = (f"at://{self._did_from_actor(actor)}"
455
f"/app.bsky.feed.post/{post_id}")
456
depth = self.extractor.config("depth", "0")
457
return self.get_post_thread_uri(uri, depth)
458
459
def get_post_thread_uri(self, uri, depth="0"):
460
endpoint = "app.bsky.feed.getPostThread"
461
params = {
462
"uri" : uri,
463
"depth" : depth,
464
"parentHeight": "0",
465
}
466
467
thread = self._call(endpoint, params)["thread"]
468
if "replies" not in thread:
469
return (thread,)
470
471
index = 0
472
posts = [thread]
473
while index < len(posts):
474
post = posts[index]
475
if "replies" in post:
476
posts.extend(post["replies"])
477
index += 1
478
return posts
479
480
@memcache(keyarg=1)
481
def get_profile(self, did):
482
endpoint = "app.bsky.actor.getProfile"
483
params = {"actor": did}
484
return self._call(endpoint, params)
485
486
def list_records(self, actor, collection):
487
endpoint = "com.atproto.repo.listRecords"
488
actor_did = self._did_from_actor(actor)
489
params = {
490
"repo" : actor_did,
491
"collection": collection,
492
"limit" : "100",
493
# "reverse" : "false",
494
}
495
return self._pagination(endpoint, params, "records",
496
self.service_endpoint(actor_did))
497
498
@memcache(keyarg=1)
499
def resolve_handle(self, handle):
500
endpoint = "com.atproto.identity.resolveHandle"
501
params = {"handle": handle}
502
return self._call(endpoint, params)["did"]
503
504
@memcache(keyarg=1)
505
def service_endpoint(self, did):
506
if did.startswith('did:web:'):
507
url = "https://" + did[8:] + "/.well-known/did.json"
508
else:
509
url = "https://plc.directory/" + did
510
511
try:
512
data = self.extractor.request_json(url)
513
for service in data["service"]:
514
if service["type"] == "AtprotoPersonalDataServer":
515
return service["serviceEndpoint"]
516
except Exception:
517
pass
518
return "https://bsky.social"
519
520
def search_posts(self, query, sort=None):
521
endpoint = "app.bsky.feed.searchPosts"
522
params = {
523
"q" : query,
524
"limit": "100",
525
"sort" : sort,
526
}
527
return self._pagination(endpoint, params, "posts")
528
529
def _did_from_actor(self, actor, user_did=False):
530
if actor.startswith("did:"):
531
did = actor
532
else:
533
did = self.resolve_handle(actor)
534
535
extr = self.extractor
536
if user_did and not extr.config("reposts", False):
537
extr._user_did = did
538
if extr._metadata_user:
539
extr._user = user = self.get_profile(did)
540
user["instance"] = extr._instance(user["handle"])
541
542
return did
543
544
def authenticate(self):
545
self.headers["Authorization"] = self._authenticate_impl(self.username)
546
547
@cache(maxage=3600, keyarg=1)
548
def _authenticate_impl(self, username):
549
refresh_token = _refresh_token_cache(username)
550
551
if refresh_token:
552
self.log.info("Refreshing access token for %s", username)
553
endpoint = "com.atproto.server.refreshSession"
554
headers = {"Authorization": "Bearer " + refresh_token}
555
data = None
556
else:
557
self.log.info("Logging in as %s", username)
558
endpoint = "com.atproto.server.createSession"
559
headers = None
560
data = {
561
"identifier": username,
562
"password" : self.password,
563
}
564
565
url = f"{self.root}/xrpc/{endpoint}"
566
response = self.extractor.request(
567
url, method="POST", headers=headers, json=data, fatal=None)
568
data = response.json()
569
570
if response.status_code != 200:
571
self.log.debug("Server response: %s", data)
572
raise exception.AuthenticationError(
573
f"\"{data.get('error')}: {data.get('message')}\"")
574
575
_refresh_token_cache.update(self.username, data["refreshJwt"])
576
return "Bearer " + data["accessJwt"]
577
578
def _call(self, endpoint, params, root=None):
579
if root is None:
580
root = self.root
581
url = f"{root}/xrpc/{endpoint}"
582
583
while True:
584
self.authenticate()
585
response = self.extractor.request(
586
url, params=params, headers=self.headers, fatal=None)
587
588
if response.status_code < 400:
589
return response.json()
590
if response.status_code == 429:
591
until = response.headers.get("RateLimit-Reset")
592
self.extractor.wait(until=until)
593
continue
594
595
msg = "API request failed"
596
try:
597
data = response.json()
598
msg = f"{msg} ('{data['error']}: {data['message']}')"
599
except Exception:
600
msg = f"{msg} ({response.status_code} {response.reason})"
601
602
self.extractor.log.debug("Server response: %s", response.text)
603
raise exception.AbortExtraction(msg)
604
605
def _pagination(self, endpoint, params,
606
key="feed", root=None, check_empty=False):
607
while True:
608
data = self._call(endpoint, params, root)
609
610
if check_empty and not data[key]:
611
return
612
yield from data[key]
613
614
cursor = data.get("cursor")
615
if not cursor:
616
return
617
params["cursor"] = cursor
618
619
620
@cache(maxage=84*86400, keyarg=0)
621
def _refresh_token_cache(username):
622
return None
623
624