Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/bluesky.py
5399 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2024-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Extractors for https://bsky.app/"""
10
11
from .common import Extractor, Message, Dispatch
12
from .. import text, util, exception
13
from ..cache import cache, memcache
14
15
BASE_PATTERN = (r"(?:https?://)?"
16
r"(?:(?:www\.)?(?:c|[fv]x)?bs[ky]y[ex]?\.app|main\.bsky\.dev)")
17
USER_PATTERN = BASE_PATTERN + r"/profile/([^/?#]+)"
18
19
20
class BlueskyExtractor(Extractor):
21
"""Base class for bluesky extractors"""
22
category = "bluesky"
23
directory_fmt = ("{category}", "{author[handle]}")
24
filename_fmt = "{createdAt[:19]}_{post_id}_{num}.{extension}"
25
archive_fmt = "{filename}"
26
root = "https://bsky.app"
27
28
def _init(self):
29
if meta := self.config("metadata") or ():
30
if isinstance(meta, str):
31
meta = meta.replace(" ", "").split(",")
32
elif not isinstance(meta, (list, tuple)):
33
meta = ("user", "facets")
34
self._metadata_user = ("user" in meta)
35
self._metadata_facets = ("facets" in meta)
36
37
self.api = BlueskyAPI(self)
38
self._user = self._user_did = None
39
self.instance = self.root.partition("://")[2]
40
self.videos = self.config("videos", True)
41
self.quoted = self.config("quoted", False)
42
43
def items(self):
44
for post in self.posts():
45
if "post" in post:
46
post = post["post"]
47
if self._user_did and post["author"]["did"] != self._user_did:
48
self.log.debug("Skipping %s (repost)", self._pid(post))
49
continue
50
embed = post.get("embed")
51
try:
52
post.update(post.pop("record"))
53
except Exception:
54
self.log.debug("Skipping %s (no 'record')", self._pid(post))
55
continue
56
57
while True:
58
self._prepare(post)
59
files = self._extract_files(post)
60
61
yield Message.Directory, post
62
if files:
63
did = post["author"]["did"]
64
base = (f"{self.api.service_endpoint(did)}/xrpc"
65
f"/com.atproto.sync.getBlob?did={did}&cid=")
66
for post["num"], file in enumerate(files, 1):
67
post.update(file)
68
yield Message.Url, base + file["filename"], post
69
70
if not self.quoted or not embed or "record" not in embed:
71
break
72
73
quote = embed["record"]
74
if "record" in quote:
75
quote = quote["record"]
76
value = quote.pop("value", None)
77
if value is None:
78
break
79
quote["quote_id"] = self._pid(post)
80
quote["quote_by"] = post["author"]
81
embed = quote.get("embed")
82
quote.update(value)
83
post = quote
84
85
def posts(self):
86
return ()
87
88
def _posts_records(self, actor, collection):
89
depth = self.config("depth", "0")
90
91
for record in self.api.list_records(actor, collection):
92
uri = None
93
try:
94
uri = record["value"]["subject"]["uri"]
95
if "/app.bsky.feed.post/" in uri:
96
yield from self.api.get_post_thread_uri(uri, depth)
97
except exception.ControlException:
98
pass # deleted post
99
except Exception as exc:
100
self.log.debug(record, exc_info=exc)
101
self.log.warning("Failed to extract %s (%s: %s)",
102
uri or "record", exc.__class__.__name__, exc)
103
104
def _pid(self, post):
105
return post["uri"].rpartition("/")[2]
106
107
@memcache(keyarg=1)
108
def _instance(self, handle):
109
return ".".join(handle.rsplit(".", 2)[-2:])
110
111
def _prepare(self, post):
112
author = post["author"]
113
author["instance"] = self._instance(author["handle"])
114
115
if self._metadata_facets:
116
if "facets" in post:
117
post["hashtags"] = tags = []
118
post["mentions"] = dids = []
119
post["uris"] = uris = []
120
for facet in post["facets"]:
121
features = facet["features"][0]
122
if "tag" in features:
123
tags.append(features["tag"])
124
elif "did" in features:
125
dids.append(features["did"])
126
elif "uri" in features:
127
uris.append(features["uri"])
128
else:
129
post["hashtags"] = post["mentions"] = post["uris"] = ()
130
131
if self._metadata_user:
132
post["user"] = self._user or author
133
134
post["instance"] = self.instance
135
post["post_id"] = self._pid(post)
136
post["date"] = text.parse_datetime(
137
post["createdAt"][:19], "%Y-%m-%dT%H:%M:%S")
138
139
def _extract_files(self, post):
140
if "embed" not in post:
141
post["count"] = 0
142
return ()
143
144
files = []
145
media = post["embed"]
146
if "media" in media:
147
media = media["media"]
148
149
if "images" in media:
150
for image in media["images"]:
151
files.append(self._extract_media(image, "image"))
152
if "video" in media and self.videos:
153
files.append(self._extract_media(media, "video"))
154
155
post["count"] = len(files)
156
return files
157
158
def _extract_media(self, media, key):
159
try:
160
aspect = media["aspectRatio"]
161
width = aspect["width"]
162
height = aspect["height"]
163
except KeyError:
164
width = height = 0
165
166
data = media[key]
167
try:
168
cid = data["ref"]["$link"]
169
except KeyError:
170
cid = data["cid"]
171
172
return {
173
"description": media.get("alt") or "",
174
"width" : width,
175
"height" : height,
176
"filename" : cid,
177
"extension" : data["mimeType"].rpartition("/")[2],
178
}
179
180
def _make_post(self, actor, kind):
181
did = self.api._did_from_actor(actor)
182
profile = self.api.get_profile(did)
183
184
if kind not in profile:
185
return ()
186
cid = profile[kind].rpartition("/")[2].partition("@")[0]
187
188
return ({
189
"post": {
190
"embed": {"images": [{
191
"alt": kind,
192
"image": {
193
"$type" : "blob",
194
"ref" : {"$link": cid},
195
"mimeType": "image/jpeg",
196
"size" : 0,
197
},
198
"aspectRatio": {
199
"width" : 1000,
200
"height": 1000,
201
},
202
}]},
203
"author" : profile,
204
"record" : (),
205
"createdAt": "",
206
"uri" : cid,
207
},
208
},)
209
210
211
class BlueskyUserExtractor(Dispatch, BlueskyExtractor):
212
pattern = USER_PATTERN + r"$"
213
example = "https://bsky.app/profile/HANDLE"
214
215
def items(self):
216
base = f"{self.root}/profile/{self.groups[0]}/"
217
default = ("posts" if self.config("quoted", False) or
218
self.config("reposts", False) else "media")
219
return self._dispatch_extractors((
220
(BlueskyInfoExtractor , base + "info"),
221
(BlueskyAvatarExtractor , base + "avatar"),
222
(BlueskyBackgroundExtractor, base + "banner"),
223
(BlueskyPostsExtractor , base + "posts"),
224
(BlueskyRepliesExtractor , base + "replies"),
225
(BlueskyMediaExtractor , base + "media"),
226
(BlueskyVideoExtractor , base + "video"),
227
(BlueskyLikesExtractor , base + "likes"),
228
), (default,))
229
230
231
class BlueskyPostsExtractor(BlueskyExtractor):
232
subcategory = "posts"
233
pattern = USER_PATTERN + r"/posts"
234
example = "https://bsky.app/profile/HANDLE/posts"
235
236
def posts(self):
237
return self.api.get_author_feed(
238
self.groups[0], "posts_and_author_threads")
239
240
241
class BlueskyRepliesExtractor(BlueskyExtractor):
242
subcategory = "replies"
243
pattern = USER_PATTERN + r"/replies"
244
example = "https://bsky.app/profile/HANDLE/replies"
245
246
def posts(self):
247
return self.api.get_author_feed(
248
self.groups[0], "posts_with_replies")
249
250
251
class BlueskyMediaExtractor(BlueskyExtractor):
252
subcategory = "media"
253
pattern = USER_PATTERN + r"/media"
254
example = "https://bsky.app/profile/HANDLE/media"
255
256
def posts(self):
257
return self.api.get_author_feed(
258
self.groups[0], "posts_with_media")
259
260
261
class BlueskyVideoExtractor(BlueskyExtractor):
262
subcategory = "video"
263
pattern = USER_PATTERN + r"/video"
264
example = "https://bsky.app/profile/HANDLE/video"
265
266
def posts(self):
267
return self.api.get_author_feed(
268
self.groups[0], "posts_with_video")
269
270
271
class BlueskyLikesExtractor(BlueskyExtractor):
272
subcategory = "likes"
273
pattern = USER_PATTERN + r"/likes"
274
example = "https://bsky.app/profile/HANDLE/likes"
275
276
def posts(self):
277
if self.config("endpoint") == "getActorLikes":
278
return self.api.get_actor_likes(self.groups[0])
279
return self._posts_records(self.groups[0], "app.bsky.feed.like")
280
281
282
class BlueskyFeedExtractor(BlueskyExtractor):
283
subcategory = "feed"
284
pattern = USER_PATTERN + r"/feed/([^/?#]+)"
285
example = "https://bsky.app/profile/HANDLE/feed/NAME"
286
287
def posts(self):
288
actor, feed = self.groups
289
return self.api.get_feed(actor, feed)
290
291
292
class BlueskyListExtractor(BlueskyExtractor):
293
subcategory = "list"
294
pattern = USER_PATTERN + r"/lists/([^/?#]+)"
295
example = "https://bsky.app/profile/HANDLE/lists/ID"
296
297
def posts(self):
298
actor, list_id = self.groups
299
return self.api.get_list_feed(actor, list_id)
300
301
302
class BlueskyFollowingExtractor(BlueskyExtractor):
303
subcategory = "following"
304
pattern = USER_PATTERN + r"/follows"
305
example = "https://bsky.app/profile/HANDLE/follows"
306
307
def items(self):
308
for user in self.api.get_follows(self.groups[0]):
309
url = "https://bsky.app/profile/" + user["did"]
310
user["_extractor"] = BlueskyUserExtractor
311
yield Message.Queue, url, user
312
313
314
class BlueskyPostExtractor(BlueskyExtractor):
315
subcategory = "post"
316
pattern = USER_PATTERN + r"/post/([^/?#]+)"
317
example = "https://bsky.app/profile/HANDLE/post/ID"
318
319
def posts(self):
320
actor, post_id = self.groups
321
return self.api.get_post_thread(actor, post_id)
322
323
324
class BlueskyInfoExtractor(BlueskyExtractor):
325
subcategory = "info"
326
pattern = USER_PATTERN + r"/info"
327
example = "https://bsky.app/profile/HANDLE/info"
328
329
def items(self):
330
self._metadata_user = True
331
self.api._did_from_actor(self.groups[0])
332
return iter(((Message.Directory, self._user),))
333
334
335
class BlueskyAvatarExtractor(BlueskyExtractor):
336
subcategory = "avatar"
337
filename_fmt = "avatar_{post_id}.{extension}"
338
pattern = USER_PATTERN + r"/avatar"
339
example = "https://bsky.app/profile/HANDLE/avatar"
340
341
def posts(self):
342
return self._make_post(self.groups[0], "avatar")
343
344
345
class BlueskyBackgroundExtractor(BlueskyExtractor):
346
subcategory = "background"
347
filename_fmt = "background_{post_id}.{extension}"
348
pattern = USER_PATTERN + r"/ba(?:nner|ckground)"
349
example = "https://bsky.app/profile/HANDLE/banner"
350
351
def posts(self):
352
return self._make_post(self.groups[0], "banner")
353
354
355
class BlueskySearchExtractor(BlueskyExtractor):
356
subcategory = "search"
357
pattern = BASE_PATTERN + r"/search(?:/|\?q=)(.+)"
358
example = "https://bsky.app/search?q=QUERY"
359
360
def posts(self):
361
query = text.unquote(self.groups[0].replace("+", " "))
362
return self.api.search_posts(query)
363
364
365
class BlueskyHashtagExtractor(BlueskyExtractor):
366
subcategory = "hashtag"
367
pattern = BASE_PATTERN + r"/hashtag/([^/?#]+)(?:/(top|latest))?"
368
example = "https://bsky.app/hashtag/NAME"
369
370
def posts(self):
371
hashtag, order = self.groups
372
return self.api.search_posts("#"+hashtag, order)
373
374
375
class BlueskyAPI():
376
"""Interface for the Bluesky API
377
378
https://docs.bsky.app/docs/category/http-reference
379
"""
380
381
def __init__(self, extractor):
382
self.extractor = extractor
383
self.log = extractor.log
384
self.headers = {"Accept": "application/json"}
385
386
self.username, self.password = extractor._get_auth_info()
387
if self.username:
388
self.root = "https://bsky.social"
389
else:
390
self.root = "https://api.bsky.app"
391
self.authenticate = util.noop
392
393
def get_actor_likes(self, actor):
394
endpoint = "app.bsky.feed.getActorLikes"
395
params = {
396
"actor": self._did_from_actor(actor),
397
"limit": "100",
398
}
399
return self._pagination(endpoint, params, check_empty=True)
400
401
def get_author_feed(self, actor, filter="posts_and_author_threads"):
402
endpoint = "app.bsky.feed.getAuthorFeed"
403
params = {
404
"actor" : self._did_from_actor(actor, True),
405
"filter": filter,
406
"limit" : "100",
407
}
408
return self._pagination(endpoint, params)
409
410
def get_feed(self, actor, feed):
411
endpoint = "app.bsky.feed.getFeed"
412
uri = (f"at://{self._did_from_actor(actor)}"
413
f"/app.bsky.feed.generator/{feed}")
414
params = {"feed": uri, "limit": "100"}
415
return self._pagination(endpoint, params)
416
417
def get_follows(self, actor):
418
endpoint = "app.bsky.graph.getFollows"
419
params = {
420
"actor": self._did_from_actor(actor),
421
"limit": "100",
422
}
423
return self._pagination(endpoint, params, "follows")
424
425
def get_list_feed(self, actor, list):
426
endpoint = "app.bsky.feed.getListFeed"
427
uri = f"at://{self._did_from_actor(actor)}/app.bsky.graph.list/{list}"
428
params = {"list" : uri, "limit": "100"}
429
return self._pagination(endpoint, params)
430
431
def get_post_thread(self, actor, post_id):
432
uri = (f"at://{self._did_from_actor(actor)}"
433
f"/app.bsky.feed.post/{post_id}")
434
depth = self.extractor.config("depth", "0")
435
return self.get_post_thread_uri(uri, depth)
436
437
def get_post_thread_uri(self, uri, depth="0"):
438
endpoint = "app.bsky.feed.getPostThread"
439
params = {
440
"uri" : uri,
441
"depth" : depth,
442
"parentHeight": "0",
443
}
444
445
thread = self._call(endpoint, params)["thread"]
446
if "replies" not in thread:
447
return (thread,)
448
449
index = 0
450
posts = [thread]
451
while index < len(posts):
452
post = posts[index]
453
if "replies" in post:
454
posts.extend(post["replies"])
455
index += 1
456
return posts
457
458
@memcache(keyarg=1)
459
def get_profile(self, did):
460
endpoint = "app.bsky.actor.getProfile"
461
params = {"actor": did}
462
return self._call(endpoint, params)
463
464
def list_records(self, actor, collection):
465
endpoint = "com.atproto.repo.listRecords"
466
actor_did = self._did_from_actor(actor)
467
params = {
468
"repo" : actor_did,
469
"collection": collection,
470
"limit" : "100",
471
# "reverse" : "false",
472
}
473
return self._pagination(endpoint, params, "records",
474
self.service_endpoint(actor_did))
475
476
@memcache(keyarg=1)
477
def resolve_handle(self, handle):
478
endpoint = "com.atproto.identity.resolveHandle"
479
params = {"handle": handle}
480
return self._call(endpoint, params)["did"]
481
482
@memcache(keyarg=1)
483
def service_endpoint(self, did):
484
if did.startswith('did:web:'):
485
url = "https://" + did[8:] + "/.well-known/did.json"
486
else:
487
url = "https://plc.directory/" + did
488
489
try:
490
data = self.extractor.request_json(url)
491
for service in data["service"]:
492
if service["type"] == "AtprotoPersonalDataServer":
493
return service["serviceEndpoint"]
494
except Exception:
495
pass
496
return "https://bsky.social"
497
498
def search_posts(self, query, sort=None):
499
endpoint = "app.bsky.feed.searchPosts"
500
params = {
501
"q" : query,
502
"limit": "100",
503
"sort" : sort,
504
}
505
return self._pagination(endpoint, params, "posts")
506
507
def _did_from_actor(self, actor, user_did=False):
508
if actor.startswith("did:"):
509
did = actor
510
else:
511
did = self.resolve_handle(actor)
512
513
extr = self.extractor
514
if user_did and not extr.config("reposts", False):
515
extr._user_did = did
516
if extr._metadata_user:
517
extr._user = user = self.get_profile(did)
518
user["instance"] = extr._instance(user["handle"])
519
520
return did
521
522
def authenticate(self):
523
self.headers["Authorization"] = self._authenticate_impl(self.username)
524
525
@cache(maxage=3600, keyarg=1)
526
def _authenticate_impl(self, username):
527
refresh_token = _refresh_token_cache(username)
528
529
if refresh_token:
530
self.log.info("Refreshing access token for %s", username)
531
endpoint = "com.atproto.server.refreshSession"
532
headers = {"Authorization": "Bearer " + refresh_token}
533
data = None
534
else:
535
self.log.info("Logging in as %s", username)
536
endpoint = "com.atproto.server.createSession"
537
headers = None
538
data = {
539
"identifier": username,
540
"password" : self.password,
541
}
542
543
url = f"{self.root}/xrpc/{endpoint}"
544
response = self.extractor.request(
545
url, method="POST", headers=headers, json=data, fatal=None)
546
data = response.json()
547
548
if response.status_code != 200:
549
self.log.debug("Server response: %s", data)
550
raise exception.AuthenticationError(
551
f"\"{data.get('error')}: {data.get('message')}\"")
552
553
_refresh_token_cache.update(self.username, data["refreshJwt"])
554
return "Bearer " + data["accessJwt"]
555
556
def _call(self, endpoint, params, root=None):
557
if root is None:
558
root = self.root
559
url = f"{root}/xrpc/{endpoint}"
560
561
while True:
562
self.authenticate()
563
response = self.extractor.request(
564
url, params=params, headers=self.headers, fatal=None)
565
566
if response.status_code < 400:
567
return response.json()
568
if response.status_code == 429:
569
until = response.headers.get("RateLimit-Reset")
570
self.extractor.wait(until=until)
571
continue
572
573
msg = "API request failed"
574
try:
575
data = response.json()
576
msg = f"{msg} ('{data['error']}: {data['message']}')"
577
except Exception:
578
msg = f"{msg} ({response.status_code} {response.reason})"
579
580
self.extractor.log.debug("Server response: %s", response.text)
581
raise exception.AbortExtraction(msg)
582
583
def _pagination(self, endpoint, params,
584
key="feed", root=None, check_empty=False):
585
while True:
586
data = self._call(endpoint, params, root)
587
588
if check_empty and not data[key]:
589
return
590
yield from data[key]
591
592
cursor = data.get("cursor")
593
if not cursor:
594
return
595
params["cursor"] = cursor
596
597
598
@cache(maxage=84*86400, keyarg=0)
599
def _refresh_token_cache(username):
600
return None
601
602