Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/fanbox.py
5399 views
1
# -*- coding: utf-8 -*-
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 2 as
5
# published by the Free Software Foundation.
6
7
"""Extractors for https://www.fanbox.cc/"""
8
9
from .common import Extractor, Message
10
from .. import text, util
11
from ..cache import memcache
12
13
BASE_PATTERN = r"(?:https?://)?(?:www\.)?fanbox\.cc"
14
USER_PATTERN = (
15
r"(?:https?://)?(?:"
16
r"(?!www\.)([\w-]+)\.fanbox\.cc|"
17
r"(?:www\.)?fanbox\.cc/@([\w-]+))"
18
)
19
20
21
class FanboxExtractor(Extractor):
22
"""Base class for Fanbox extractors"""
23
category = "fanbox"
24
root = "https://www.fanbox.cc"
25
directory_fmt = ("{category}", "{creatorId}")
26
filename_fmt = "{id}_{num}.{extension}"
27
archive_fmt = "{id}_{num}"
28
browser = "firefox"
29
_warning = True
30
31
def _init(self):
32
self.headers = {
33
"Accept" : "application/json, text/plain, */*",
34
"Origin" : "https://www.fanbox.cc",
35
"Referer": "https://www.fanbox.cc/",
36
"Cookie" : None,
37
"Sec-Fetch-Dest": "empty",
38
"Sec-Fetch-Mode": "cors",
39
"Sec-Fetch-Site": "same-site",
40
}
41
self.embeds = self.config("embeds", True)
42
43
if includes := self.config("metadata"):
44
if isinstance(includes, str):
45
includes = includes.split(",")
46
elif not isinstance(includes, (list, tuple)):
47
includes = ("user", "plan")
48
self._meta_user = ("user" in includes)
49
self._meta_plan = ("plan" in includes)
50
self._meta_comments = ("comments" in includes)
51
else:
52
self._meta_user = self._meta_plan = self._meta_comments = False
53
54
if self.config("comments"):
55
self._meta_comments = True
56
57
if self._warning:
58
if not self.cookies_check(("FANBOXSESSID",)):
59
self.log.warning("no 'FANBOXSESSID' cookie set")
60
FanboxExtractor._warning = False
61
62
def items(self):
63
fee_max = self.config("fee-max")
64
65
for item in self.posts():
66
if fee_max is not None and fee_max < item["feeRequired"]:
67
self.log.warning("Skipping post %s (feeRequired of %s > %s)",
68
item["id"], item["feeRequired"], fee_max)
69
continue
70
71
try:
72
url = "https://api.fanbox.cc/post.info?postId=" + item["id"]
73
body = self.request_json(url, headers=self.headers)["body"]
74
content_body, post = self._extract_post(body)
75
except Exception as exc:
76
self.log.warning("Skipping post %s (%s: %s)",
77
item["id"], exc.__class__.__name__, exc)
78
continue
79
80
yield Message.Directory, post
81
yield from self._get_urls_from_post(content_body, post)
82
83
def posts(self):
84
"""Return all relevant post objects"""
85
86
def _pagination(self, url):
87
while url:
88
url = text.ensure_http_scheme(url)
89
body = self.request_json(url, headers=self.headers)["body"]
90
91
yield from body["items"]
92
93
url = body["nextUrl"]
94
95
def _extract_post(self, post):
96
"""Fetch and process post data"""
97
post["archives"] = ()
98
99
if content_body := post.pop("body", None):
100
if "html" in content_body:
101
post["html"] = content_body["html"]
102
if post["type"] == "article":
103
post["articleBody"] = content_body.copy()
104
if "blocks" in content_body:
105
content = [] # text content
106
images = [] # image IDs in 'body' order
107
files = [] # file IDs in 'body' order
108
109
for block in content_body["blocks"]:
110
if "text" in block:
111
content.append(block["text"])
112
if "links" in block:
113
for link in block["links"]:
114
content.append(link["url"])
115
if "imageId" in block:
116
images.append(block["imageId"])
117
if "fileId" in block:
118
files.append(block["fileId"])
119
120
post["content"] = "\n".join(content)
121
122
self._sort_map(content_body, "imageMap", images)
123
if file_map := self._sort_map(content_body, "fileMap", files):
124
exts = util.EXTS_ARCHIVE
125
post["archives"] = [
126
file
127
for file in file_map.values()
128
if file.get("extension", "").lower() in exts
129
]
130
131
post["date"] = text.parse_datetime(post["publishedDatetime"])
132
post["text"] = content_body.get("text") if content_body else None
133
post["isCoverImage"] = False
134
135
if self._meta_user:
136
post["user"] = self._get_user_data(post["creatorId"])
137
if self._meta_plan:
138
plans = self._get_plan_data(post["creatorId"])
139
fee = post["feeRequired"]
140
try:
141
post["plan"] = plans[fee]
142
except KeyError:
143
if fees := [f for f in plans if f >= fee]:
144
plan = plans[min(fees)]
145
else:
146
plan = plans[0].copy()
147
plan["fee"] = fee
148
post["plan"] = plans[fee] = plan
149
if self._meta_comments:
150
if post["commentCount"]:
151
post["comments"] = list(self._get_comment_data(post["id"]))
152
else:
153
post["commentd"] = ()
154
155
return content_body, post
156
157
def _sort_map(self, body, key, ids):
158
orig = body.get(key)
159
if not orig:
160
return {} if orig is None else orig
161
162
body[key] = new = {
163
id: orig[id]
164
for id in ids
165
if id in orig
166
}
167
168
return new
169
170
@memcache(keyarg=1)
171
def _get_user_data(self, creator_id):
172
url = "https://api.fanbox.cc/creator.get"
173
params = {"creatorId": creator_id}
174
data = self.request_json(url, params=params, headers=self.headers)
175
176
user = data["body"]
177
user.update(user.pop("user"))
178
179
return user
180
181
@memcache(keyarg=1)
182
def _get_plan_data(self, creator_id):
183
url = "https://api.fanbox.cc/plan.listCreator"
184
params = {"creatorId": creator_id}
185
data = self.request_json(url, params=params, headers=self.headers)
186
187
plans = {0: {
188
"id" : "",
189
"title" : "",
190
"fee" : 0,
191
"description" : "",
192
"coverImageUrl" : "",
193
"creatorId" : creator_id,
194
"hasAdultContent": None,
195
"paymentMethod" : None,
196
}}
197
for plan in data["body"]:
198
del plan["user"]
199
plans[plan["fee"]] = plan
200
201
return plans
202
203
def _get_comment_data(self, post_id):
204
url = ("https://api.fanbox.cc/post.getComments"
205
"?limit=10&postId=" + post_id)
206
207
comments = []
208
while url:
209
url = text.ensure_http_scheme(url)
210
body = self.request_json(url, headers=self.headers)["body"]
211
data = body["commentList"]
212
comments.extend(data["items"])
213
url = data["nextUrl"]
214
return comments
215
216
def _get_urls_from_post(self, content_body, post):
217
num = 0
218
if cover_image := post.get("coverImageUrl"):
219
cover_image = util.re("/c/[0-9a-z_]+").sub("", cover_image)
220
final_post = post.copy()
221
final_post["isCoverImage"] = True
222
final_post["fileUrl"] = cover_image
223
text.nameext_from_url(cover_image, final_post)
224
final_post["num"] = num
225
num += 1
226
yield Message.Url, cover_image, final_post
227
228
if not content_body:
229
return
230
231
if "html" in content_body:
232
html_urls = []
233
234
for href in text.extract_iter(content_body["html"], 'href="', '"'):
235
if "fanbox.pixiv.net/images/entry" in href:
236
html_urls.append(href)
237
elif "downloads.fanbox.cc" in href:
238
html_urls.append(href)
239
for src in text.extract_iter(content_body["html"],
240
'data-src-original="', '"'):
241
html_urls.append(src)
242
243
for url in html_urls:
244
final_post = post.copy()
245
text.nameext_from_url(url, final_post)
246
final_post["fileUrl"] = url
247
final_post["num"] = num
248
num += 1
249
yield Message.Url, url, final_post
250
251
for group in ("images", "imageMap"):
252
if group in content_body:
253
for item in content_body[group]:
254
if group == "imageMap":
255
# imageMap is a dict with image objects as values
256
item = content_body[group][item]
257
258
final_post = post.copy()
259
final_post["fileUrl"] = item["originalUrl"]
260
text.nameext_from_url(item["originalUrl"], final_post)
261
if "extension" in item:
262
final_post["extension"] = item["extension"]
263
final_post["fileId"] = item.get("id")
264
final_post["width"] = item.get("width")
265
final_post["height"] = item.get("height")
266
final_post["num"] = num
267
num += 1
268
yield Message.Url, item["originalUrl"], final_post
269
270
for group in ("files", "fileMap"):
271
if group in content_body:
272
for item in content_body[group]:
273
if group == "fileMap":
274
# fileMap is a dict with file objects as values
275
item = content_body[group][item]
276
277
final_post = post.copy()
278
final_post["fileUrl"] = item["url"]
279
text.nameext_from_url(item["url"], final_post)
280
if "extension" in item:
281
final_post["extension"] = item["extension"]
282
if "name" in item:
283
final_post["filename"] = item["name"]
284
final_post["fileId"] = item.get("id")
285
final_post["num"] = num
286
num += 1
287
yield Message.Url, item["url"], final_post
288
289
if self.embeds:
290
embeds_found = []
291
if "video" in content_body:
292
embeds_found.append(content_body["video"])
293
embeds_found.extend(content_body.get("embedMap", {}).values())
294
295
for embed in embeds_found:
296
# embed_result is (message type, url, metadata dict)
297
embed_result = self._process_embed(post, embed)
298
if not embed_result:
299
continue
300
embed_result[2]["num"] = num
301
num += 1
302
yield embed_result
303
304
def _process_embed(self, post, embed):
305
final_post = post.copy()
306
provider = embed["serviceProvider"]
307
content_id = embed.get("videoId") or embed.get("contentId")
308
prefix = "ytdl:" if self.embeds == "ytdl" else ""
309
url = None
310
is_video = False
311
312
if provider == "soundcloud":
313
url = prefix+"https://soundcloud.com/"+content_id
314
is_video = True
315
elif provider == "youtube":
316
url = prefix+"https://youtube.com/watch?v="+content_id
317
is_video = True
318
elif provider == "vimeo":
319
url = prefix+"https://vimeo.com/"+content_id
320
is_video = True
321
elif provider == "fanbox":
322
# this is an old URL format that redirects
323
# to a proper Fanbox URL
324
url = "https://www.pixiv.net/fanbox/"+content_id
325
# resolve redirect
326
try:
327
url = self.request_location(url)
328
except Exception as exc:
329
url = None
330
self.log.warning("Unable to extract fanbox embed %s (%s: %s)",
331
content_id, exc.__class__.__name__, exc)
332
else:
333
final_post["_extractor"] = FanboxPostExtractor
334
elif provider == "twitter":
335
url = "https://twitter.com/_/status/"+content_id
336
elif provider == "google_forms":
337
url = (f"https://docs.google.com/forms/d/e/"
338
f"{content_id}/viewform?usp=sf_link")
339
else:
340
self.log.warning(f"service not recognized: {provider}")
341
342
if url:
343
final_post["embed"] = embed
344
final_post["embedUrl"] = url
345
text.nameext_from_url(url, final_post)
346
msg_type = Message.Queue
347
if is_video and self.embeds == "ytdl":
348
msg_type = Message.Url
349
return msg_type, url, final_post
350
351
352
class FanboxCreatorExtractor(FanboxExtractor):
353
"""Extractor for a Fanbox creator's works"""
354
subcategory = "creator"
355
pattern = USER_PATTERN + r"(?:/posts)?/?$"
356
example = "https://USER.fanbox.cc/"
357
358
def posts(self):
359
url = "https://api.fanbox.cc/post.paginateCreator?creatorId="
360
creator_id = self.groups[0] or self.groups[1]
361
return self._pagination_creator(url + creator_id)
362
363
def _pagination_creator(self, url):
364
urls = self.request_json(url, headers=self.headers)["body"]
365
for url in urls:
366
url = text.ensure_http_scheme(url)
367
yield from self.request_json(url, headers=self.headers)["body"]
368
369
370
class FanboxPostExtractor(FanboxExtractor):
371
"""Extractor for media from a single Fanbox post"""
372
subcategory = "post"
373
pattern = USER_PATTERN + r"/posts/(\d+)"
374
example = "https://USER.fanbox.cc/posts/12345"
375
376
def posts(self):
377
return ({"id": self.groups[2], "feeRequired": 0},)
378
379
380
class FanboxHomeExtractor(FanboxExtractor):
381
"""Extractor for your Fanbox home feed"""
382
subcategory = "home"
383
pattern = BASE_PATTERN + r"/?$"
384
example = "https://fanbox.cc/"
385
386
def posts(self):
387
url = "https://api.fanbox.cc/post.listHome?limit=10"
388
return self._pagination(url)
389
390
391
class FanboxSupportingExtractor(FanboxExtractor):
392
"""Extractor for your supported Fanbox users feed"""
393
subcategory = "supporting"
394
pattern = BASE_PATTERN + r"/home/supporting"
395
example = "https://fanbox.cc/home/supporting"
396
397
def posts(self):
398
url = "https://api.fanbox.cc/post.listSupporting?limit=10"
399
return self._pagination(url)
400
401
402
class FanboxRedirectExtractor(Extractor):
403
"""Extractor for pixiv redirects to fanbox.cc"""
404
category = "fanbox"
405
subcategory = "redirect"
406
pattern = r"(?:https?://)?(?:www\.)?pixiv\.net/fanbox/creator/(\d+)"
407
example = "https://www.pixiv.net/fanbox/creator/12345"
408
409
def items(self):
410
url = "https://www.pixiv.net/fanbox/creator/" + self.groups[0]
411
location = self.request_location(url, notfound="user")
412
yield Message.Queue, location, {"_extractor": FanboxCreatorExtractor}
413
414