Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/fanbox.py
8967 views
1
# -*- coding: utf-8 -*-
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 2 as
5
# published by the Free Software Foundation.
6
7
"""Extractors for https://www.fanbox.cc/"""
8
9
from .common import Extractor, Message
10
from .. import text, util
11
from ..cache import memcache
12
13
BASE_PATTERN = r"(?:https?://)?(?:www\.)?fanbox\.cc"
14
USER_PATTERN = (
15
r"(?:https?://)?(?:"
16
r"(?!www\.)([\w-]+)\.fanbox\.cc|"
17
r"(?:www\.)?fanbox\.cc/@([\w-]+))"
18
)
19
20
21
class FanboxExtractor(Extractor):
22
"""Base class for Fanbox extractors"""
23
category = "fanbox"
24
root = "https://www.fanbox.cc"
25
directory_fmt = ("{category}", "{creatorId}")
26
filename_fmt = "{id}_{num}.{extension}"
27
archive_fmt = "{id}_{num}"
28
browser = "firefox"
29
_warning = True
30
31
def _init(self):
32
self.headers = {
33
"Accept" : "application/json, text/plain, */*",
34
"Origin" : "https://www.fanbox.cc",
35
"Referer": "https://www.fanbox.cc/",
36
"Cookie" : None,
37
"Sec-Fetch-Dest": "empty",
38
"Sec-Fetch-Mode": "cors",
39
"Sec-Fetch-Site": "same-site",
40
}
41
self.embeds = self.config("embeds", True)
42
43
if includes := self.config("metadata"):
44
if isinstance(includes, str):
45
includes = includes.split(",")
46
elif not isinstance(includes, (list, tuple)):
47
includes = ("user", "plan")
48
self._meta_user = ("user" in includes)
49
self._meta_plan = ("plan" in includes)
50
self._meta_comments = ("comments" in includes)
51
else:
52
self._meta_user = self._meta_plan = self._meta_comments = False
53
54
if self.config("comments"):
55
self._meta_comments = True
56
57
if self._warning:
58
if not self.cookies_check(("FANBOXSESSID",)):
59
self.log.warning("no 'FANBOXSESSID' cookie set")
60
FanboxExtractor._warning = False
61
62
def items(self):
63
fee_max = self.config("fee-max")
64
65
for item in self.posts():
66
if fee_max is not None and fee_max < item["feeRequired"]:
67
self.log.warning("Skipping post %s (feeRequired of %s > %s)",
68
item["id"], item["feeRequired"], fee_max)
69
else:
70
try:
71
url = ("https://api.fanbox.cc/post.info?postId=" +
72
item["id"])
73
item = self.request_json(url, headers=self.headers)["body"]
74
except Exception as exc:
75
self.log.warning("Skipping post %s (%s: %s)",
76
item["id"], exc.__class__.__name__, exc)
77
78
content_body, post = self._extract_post(item)
79
yield Message.Directory, "", post
80
yield from self._get_urls_from_post(content_body, post)
81
82
def posts(self):
83
"""Return all relevant post objects"""
84
85
def _pagination(self, url):
86
while url:
87
url = text.ensure_http_scheme(url)
88
body = self.request_json(url, headers=self.headers)["body"]
89
90
yield from body["items"]
91
92
url = body["nextUrl"]
93
94
def _extract_post(self, post):
95
"""Fetch and process post data"""
96
post["archives"] = ()
97
98
if content_body := post.pop("body", None):
99
if "html" in content_body:
100
post["html"] = content_body["html"]
101
if post["type"] == "article":
102
post["articleBody"] = content_body.copy()
103
if "blocks" in content_body:
104
content = [] # text content
105
images = [] # image IDs in 'body' order
106
files = [] # file IDs in 'body' order
107
108
for block in content_body["blocks"]:
109
if "text" in block:
110
content.append(block["text"])
111
if "links" in block:
112
for link in block["links"]:
113
content.append(link["url"])
114
if "imageId" in block:
115
images.append(block["imageId"])
116
if "fileId" in block:
117
files.append(block["fileId"])
118
119
post["content"] = "\n".join(content)
120
121
self._sort_map(content_body, "imageMap", images)
122
if file_map := self._sort_map(content_body, "fileMap", files):
123
exts = util.EXTS_ARCHIVE
124
post["archives"] = [
125
file
126
for file in file_map.values()
127
if file.get("extension", "").lower() in exts
128
]
129
130
try:
131
post["date"] = self.parse_datetime_iso(post["publishedDatetime"])
132
except Exception:
133
post["date"] = None
134
post["text"] = content_body.get("text") if content_body else None
135
post["isCoverImage"] = False
136
137
cid = post.get("creatorId")
138
if self._meta_user and cid is not None:
139
post["user"] = self._get_user_data(cid)
140
if self._meta_plan and cid is not None:
141
plans = self._get_plan_data(post["creatorId"])
142
fee = post.get("feeRequired") or 0
143
try:
144
post["plan"] = plans[fee]
145
except KeyError:
146
if fees := [f for f in plans if f >= fee]:
147
plan = plans[min(fees)]
148
else:
149
plan = plans[0].copy()
150
plan["fee"] = fee
151
post["plan"] = plans[fee] = plan
152
if self._meta_comments:
153
if post.get("commentCount"):
154
post["comments"] = self._get_comment_data(post["id"])
155
else:
156
post["comments"] = ()
157
158
return content_body, post
159
160
def _sort_map(self, body, key, ids):
161
orig = body.get(key)
162
if not orig:
163
return {} if orig is None else orig
164
165
body[key] = new = {
166
id: orig[id]
167
for id in ids
168
if id in orig
169
}
170
171
return new
172
173
@memcache(keyarg=1)
174
def _get_user_data(self, creator_id):
175
url = "https://api.fanbox.cc/creator.get"
176
params = {"creatorId": creator_id}
177
data = self.request_json(url, params=params, headers=self.headers)
178
179
user = data["body"]
180
user.update(user.pop("user"))
181
182
return user
183
184
@memcache(keyarg=1)
185
def _get_plan_data(self, creator_id):
186
url = "https://api.fanbox.cc/plan.listCreator"
187
params = {"creatorId": creator_id}
188
data = self.request_json(url, params=params, headers=self.headers)
189
190
plans = {0: {
191
"id" : "",
192
"title" : "",
193
"fee" : 0,
194
"description" : "",
195
"coverImageUrl" : "",
196
"creatorId" : creator_id,
197
"hasAdultContent": None,
198
"paymentMethod" : None,
199
}}
200
for plan in data["body"]:
201
del plan["user"]
202
plans[plan["fee"]] = plan
203
204
return plans
205
206
def _get_comment_data(self, post_id):
207
url = ("https://api.fanbox.cc/post.getComments"
208
"?limit=10&postId=" + post_id)
209
210
comments = []
211
try:
212
while url:
213
comlist = self.request_json(
214
text.ensure_http_scheme(url), headers=self.headers,
215
)["body"]["commentList"]
216
comments.extend(comlist["items"])
217
url = comlist["nextUrl"]
218
except Exception as exc:
219
self.log.debug("comments: %s: %s", exc.__class__.__name__, exc)
220
return comments
221
222
def _get_urls_from_post(self, content_body, post):
223
num = 0
224
if cover_image := post.get("coverImageUrl"):
225
cover_image = text.re("/c/[0-9a-z_]+").sub("", cover_image)
226
final_post = post.copy()
227
final_post["isCoverImage"] = True
228
final_post["fileUrl"] = cover_image
229
text.nameext_from_url(cover_image, final_post)
230
final_post["num"] = num
231
num += 1
232
yield Message.Url, cover_image, final_post
233
234
if not content_body:
235
return
236
237
if "html" in content_body:
238
html_urls = []
239
240
for href in text.extract_iter(content_body["html"], 'href="', '"'):
241
if "fanbox.pixiv.net/images/entry" in href:
242
html_urls.append(href)
243
elif "downloads.fanbox.cc" in href:
244
html_urls.append(href)
245
for src in text.extract_iter(content_body["html"],
246
'data-src-original="', '"'):
247
html_urls.append(src)
248
249
for url in html_urls:
250
final_post = post.copy()
251
text.nameext_from_url(url, final_post)
252
final_post["fileUrl"] = url
253
final_post["num"] = num
254
num += 1
255
yield Message.Url, url, final_post
256
257
for group in ("images", "imageMap"):
258
if group in content_body:
259
for item in content_body[group]:
260
if group == "imageMap":
261
# imageMap is a dict with image objects as values
262
item = content_body[group][item]
263
264
final_post = post.copy()
265
final_post["fileUrl"] = item["originalUrl"]
266
text.nameext_from_url(item["originalUrl"], final_post)
267
if "extension" in item:
268
final_post["extension"] = item["extension"]
269
final_post["fileId"] = item.get("id")
270
final_post["width"] = item.get("width")
271
final_post["height"] = item.get("height")
272
final_post["num"] = num
273
num += 1
274
yield Message.Url, item["originalUrl"], final_post
275
276
for group in ("files", "fileMap"):
277
if group in content_body:
278
for item in content_body[group]:
279
if group == "fileMap":
280
# fileMap is a dict with file objects as values
281
item = content_body[group][item]
282
283
final_post = post.copy()
284
final_post["fileUrl"] = item["url"]
285
text.nameext_from_url(item["url"], final_post)
286
if "extension" in item:
287
final_post["extension"] = item["extension"]
288
if "name" in item:
289
final_post["filename"] = item["name"]
290
final_post["fileId"] = item.get("id")
291
final_post["num"] = num
292
num += 1
293
yield Message.Url, item["url"], final_post
294
295
if self.embeds:
296
embeds_found = []
297
if "video" in content_body:
298
embeds_found.append(content_body["video"])
299
embeds_found.extend(content_body.get("embedMap", {}).values())
300
301
for embed in embeds_found:
302
# embed_result is (message type, url, metadata dict)
303
embed_result = self._process_embed(post, embed)
304
if not embed_result:
305
continue
306
embed_result[2]["num"] = num
307
num += 1
308
yield embed_result
309
310
def _process_embed(self, post, embed):
311
final_post = post.copy()
312
provider = embed["serviceProvider"]
313
content_id = embed.get("videoId") or embed.get("contentId")
314
prefix = "ytdl:" if self.embeds == "ytdl" else ""
315
url = None
316
is_video = False
317
318
if provider == "soundcloud":
319
url = prefix+"https://soundcloud.com/"+content_id
320
is_video = True
321
elif provider == "youtube":
322
url = prefix+"https://youtube.com/watch?v="+content_id
323
is_video = True
324
elif provider == "vimeo":
325
url = prefix+"https://vimeo.com/"+content_id
326
is_video = True
327
elif provider == "fanbox":
328
# this is an old URL format that redirects
329
# to a proper Fanbox URL
330
url = "https://www.pixiv.net/fanbox/"+content_id
331
# resolve redirect
332
try:
333
url = self.request_location(url)
334
except Exception as exc:
335
url = None
336
self.log.warning("Unable to extract fanbox embed %s (%s: %s)",
337
content_id, exc.__class__.__name__, exc)
338
else:
339
final_post["_extractor"] = FanboxPostExtractor
340
elif provider == "twitter":
341
url = "https://twitter.com/_/status/"+content_id
342
elif provider == "google_forms":
343
url = (f"https://docs.google.com/forms/d/e/"
344
f"{content_id}/viewform?usp=sf_link")
345
else:
346
self.log.warning("service not recognized: %s", provider)
347
348
if url:
349
final_post["embed"] = embed
350
final_post["embedUrl"] = url
351
text.nameext_from_url(url, final_post)
352
msg_type = Message.Queue
353
if is_video and self.embeds == "ytdl":
354
msg_type = Message.Url
355
return msg_type, url, final_post
356
357
358
class FanboxCreatorExtractor(FanboxExtractor):
359
"""Extractor for a Fanbox creator's works"""
360
subcategory = "creator"
361
pattern = USER_PATTERN + r"(?:/posts)?/?$"
362
example = "https://USER.fanbox.cc/"
363
364
def posts(self):
365
url = "https://api.fanbox.cc/post.paginateCreator?creatorId="
366
creator_id = self.groups[0] or self.groups[1]
367
return self._pagination_creator(url + creator_id)
368
369
def _pagination_creator(self, url):
370
urls = self.request_json(url, headers=self.headers)["body"]
371
if offset := self.config("offset"):
372
quotient, remainder = divmod(offset, 10)
373
if quotient:
374
urls = urls[quotient:]
375
else:
376
remainder = None
377
378
for url in urls:
379
url = text.ensure_http_scheme(url)
380
posts = self.request_json(url, headers=self.headers)["body"]
381
if remainder:
382
posts = posts[remainder:]
383
remainder = None
384
yield from posts
385
386
387
class FanboxPostExtractor(FanboxExtractor):
388
"""Extractor for media from a single Fanbox post"""
389
subcategory = "post"
390
pattern = USER_PATTERN + r"/posts/(\d+)"
391
example = "https://USER.fanbox.cc/posts/12345"
392
393
def posts(self):
394
return ({"id": self.groups[2], "feeRequired": 0},)
395
396
397
class FanboxHomeExtractor(FanboxExtractor):
398
"""Extractor for your Fanbox home feed"""
399
subcategory = "home"
400
pattern = BASE_PATTERN + r"/?$"
401
example = "https://fanbox.cc/"
402
403
def posts(self):
404
url = "https://api.fanbox.cc/post.listHome?limit=10"
405
return self._pagination(url)
406
407
408
class FanboxSupportingExtractor(FanboxExtractor):
409
"""Extractor for your supported Fanbox users feed"""
410
subcategory = "supporting"
411
pattern = BASE_PATTERN + r"/home/supporting"
412
example = "https://fanbox.cc/home/supporting"
413
414
def posts(self):
415
url = "https://api.fanbox.cc/post.listSupporting?limit=10"
416
return self._pagination(url)
417
418
419
class FanboxRedirectExtractor(Extractor):
420
"""Extractor for pixiv redirects to fanbox.cc"""
421
category = "fanbox"
422
subcategory = "redirect"
423
cookies_domain = None
424
pattern = r"(?:https?://)?(?:www\.)?pixiv\.net/fanbox/creator/(\d+)"
425
example = "https://www.pixiv.net/fanbox/creator/12345"
426
427
def items(self):
428
url = "https://www.pixiv.net/fanbox/creator/" + self.groups[0]
429
location = self.request_location(url, notfound="user")
430
yield Message.Queue, location, {"_extractor": FanboxCreatorExtractor}
431
432