Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/arcalive.py
8900 views
1
# -*- coding: utf-8 -*-
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 2 as
5
# published by the Free Software Foundation.
6
7
"""Extractors for https://arca.live/"""
8
9
from .common import Extractor, Message
10
from .. import text, util, exception
11
12
BASE_PATTERN = r"(?:https?://)?(?:www\.)?arca\.live"
13
14
15
class ArcaliveExtractor(Extractor):
16
"""Base class for Arca.live extractors"""
17
category = "arcalive"
18
root = "https://arca.live"
19
useragent = "net.umanle.arca.android.playstore/0.9.75"
20
request_interval = (0.5, 1.5)
21
22
def _init(self):
23
self.api = ArcaliveAPI(self)
24
25
def items(self):
26
for article in self.articles():
27
article["_extractor"] = ArcalivePostExtractor
28
board = self.board or article.get("boardSlug") or "breaking"
29
url = f"{self.root}/b/{board}/{article['id']}"
30
yield Message.Queue, url, article
31
32
33
class ArcalivePostExtractor(ArcaliveExtractor):
34
"""Extractor for an arca.live post"""
35
subcategory = "post"
36
directory_fmt = ("{category}", "{boardSlug}")
37
filename_fmt = "{id}_{num}{title:? //[b:230]}.{extension}"
38
archive_fmt = "{id}_{num}"
39
pattern = BASE_PATTERN + r"/b/(?:\w+)/(\d+)"
40
example = "https://arca.live/b/breaking/123456789"
41
42
def items(self):
43
self.emoticons = self.config("emoticons", False)
44
self.gifs = gifs = self.config("gifs", True)
45
if gifs:
46
self.gifs_fallback = (gifs != "check")
47
48
post = self.api.post(self.groups[0])
49
files = self._extract_files(post)
50
51
post["count"] = len(files)
52
post["date"] = self.parse_datetime_iso(post["createdAt"][:19])
53
post["post_url"] = post_url = \
54
f"{self.root}/b/{post['boardSlug']}/{post['id']}"
55
post["_http_headers"] = {"Referer": post_url + "?p=1"}
56
57
yield Message.Directory, "", post
58
for post["num"], file in enumerate(files, 1):
59
post.update(file)
60
url = file["url"]
61
yield Message.Url, url, text.nameext_from_url(url, post)
62
63
def _extract_files(self, post):
64
files = []
65
66
for video, media in text.re(r"<(?:img|vide(o)) ([^>]+)").findall(
67
post["content"]):
68
if not self.emoticons and 'class="arca-emoticon"' in media:
69
continue
70
71
src = (text.extr(media, 'data-originalurl="', '"') or
72
text.extr(media, 'src="', '"'))
73
if not src:
74
continue
75
76
src, _, query = text.unescape(src).partition("?")
77
if src[0] == "/":
78
if src[1] == "/":
79
url = "https:" + src.replace(
80
"//ac-p.namu", "//ac-o.namu", 1)
81
else:
82
url = self.root + src
83
else:
84
url = src
85
86
fallback = ()
87
query = "?type=orig&" + query
88
if orig := text.extr(media, 'data-orig="', '"'):
89
path, _, ext = url.rpartition(".")
90
if ext != orig:
91
fallback = (url + query,)
92
url = path + "." + orig
93
elif video and self.gifs:
94
url_gif = url.rpartition(".")[0] + ".gif"
95
if self.gifs_fallback:
96
fallback = (url + query,)
97
url = url_gif
98
else:
99
response = self.request(
100
url_gif + query, method="HEAD", fatal=False)
101
if response.status_code < 400:
102
fallback = (url + query,)
103
url = url_gif
104
105
files.append({
106
"url" : url + query,
107
"width" : text.parse_int(text.extr(media, 'width="', '"')),
108
"height": text.parse_int(text.extr(media, 'height="', '"')),
109
"_fallback": fallback,
110
})
111
112
return files
113
114
115
class ArcaliveBoardExtractor(ArcaliveExtractor):
116
"""Extractor for an arca.live board's posts"""
117
subcategory = "board"
118
pattern = BASE_PATTERN + r"/b/([^/?#]+)/?(?:\?([^#]+))?$"
119
example = "https://arca.live/b/breaking"
120
121
def articles(self):
122
self.board, query = self.groups
123
params = text.parse_query(query)
124
return self.api.board(self.board, params)
125
126
127
class ArcaliveUserExtractor(ArcaliveExtractor):
128
"""Extractor for an arca.live users's posts"""
129
subcategory = "user"
130
pattern = BASE_PATTERN + r"/u/@([^/?#]+)/?(?:\?([^#]+))?$"
131
example = "https://arca.live/u/@USER"
132
133
def articles(self):
134
self.board = None
135
user, query = self.groups
136
params = text.parse_query(query)
137
return self.api.user_posts(text.unquote(user), params)
138
139
140
class ArcaliveAPI():
141
142
def __init__(self, extractor):
143
self.extractor = extractor
144
self.log = extractor.log
145
self.root = extractor.root + "/api/app"
146
147
extractor.session.headers["X-Device-Token"] = util.generate_token(64)
148
149
def board(self, board_slug, params):
150
endpoint = "/list/channel/" + board_slug
151
return self._pagination(endpoint, params, "articles")
152
153
def post(self, post_id):
154
endpoint = "/view/article/breaking/" + str(post_id)
155
return self._call(endpoint)
156
157
def user_posts(self, username, params):
158
endpoint = "/list/channel/breaking"
159
params["target"] = "nickname"
160
params["keyword"] = username
161
return self._pagination(endpoint, params, "articles")
162
163
def _call(self, endpoint, params=None):
164
url = self.root + endpoint
165
response = self.extractor.request(url, params=params)
166
167
data = response.json()
168
if response.status_code == 200:
169
return data
170
171
self.log.debug("Server response: %s", data)
172
if msg := data.get("message"):
173
msg = "API request failed: " + msg
174
else:
175
msg = "API request failed"
176
raise exception.AbortExtraction(msg)
177
178
def _pagination(self, endpoint, params, key):
179
while True:
180
data = self._call(endpoint, params)
181
182
posts = data.get(key)
183
if not posts:
184
break
185
yield from posts
186
187
params.update(data["next"])
188
189