Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/blogger.py
5399 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2019-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Extractors for Blogger blogs"""
10
11
from .common import BaseExtractor, Message
12
from .. import text, util
13
14
15
def original(url):
16
return (util.re(r"(/|=)(?:[sw]\d+|w\d+-h\d+)(?=/|$)")
17
.sub(r"\1s0", url)
18
.replace("http:", "https:", 1))
19
20
21
class BloggerExtractor(BaseExtractor):
22
"""Base class for blogger extractors"""
23
basecategory = "blogger"
24
directory_fmt = ("blogger", "{blog[name]}",
25
"{post[date]:%Y-%m-%d} {post[title]}")
26
filename_fmt = "{num:>03}.{extension}"
27
archive_fmt = "{post[id]}_{num}"
28
29
def _init(self):
30
self.api = BloggerAPI(self)
31
self.blog = self.root.rpartition("/")[2]
32
self.videos = self.config("videos", True)
33
34
if self.videos:
35
self.findall_video = util.re(
36
r"""src=["'](https?://www\.blogger\.com"""
37
r"""/video\.g\?token=[^"']+)""").findall
38
39
def items(self):
40
blog = self.api.blog_by_url("http://" + self.blog)
41
blog["pages"] = blog["pages"]["totalItems"]
42
blog["posts"] = blog["posts"]["totalItems"]
43
blog["date"] = text.parse_datetime(blog["published"])
44
del blog["selfLink"]
45
46
findall_image = util.re(
47
r'src="(https?://(?:'
48
r'blogger\.googleusercontent\.com/img|'
49
r'lh\d+(?:-\w+)?\.googleusercontent\.com|'
50
r'\d+\.bp\.blogspot\.com)/[^"]+)').findall
51
metadata = self.metadata()
52
53
for post in self.posts(blog):
54
content = post["content"]
55
56
files = findall_image(content)
57
for idx, url in enumerate(files):
58
files[idx] = original(url)
59
60
if self.videos and (
61
'id="BLOG_video-' in content or
62
'class="BLOG_video_' in content):
63
self._extract_videos(files, post)
64
65
post["author"] = post["author"]["displayName"]
66
post["replies"] = post["replies"]["totalItems"]
67
post["content"] = text.remove_html(content)
68
post["date"] = text.parse_datetime(post["published"])
69
del post["selfLink"]
70
del post["blog"]
71
72
data = {"blog": blog, "post": post}
73
if metadata:
74
data.update(metadata)
75
yield Message.Directory, data
76
77
for data["num"], url in enumerate(files, 1):
78
data["url"] = url
79
yield Message.Url, url, text.nameext_from_url(url, data)
80
81
def posts(self, blog):
82
"""Return an iterable with all relevant post objects"""
83
84
def metadata(self):
85
"""Return additional metadata"""
86
87
def _extract_videos(self, files, post):
88
url = f"https://{self.blog}/feeds/posts/default/{post['id']}"
89
params = {
90
"alt" : "json",
91
"v" : "2",
92
"dynamicviews" : "1",
93
"rewriteforssl": "true",
94
}
95
96
data = self.request_json(url, params=params)
97
html = data["entry"]["content"]["$t"]
98
99
for url in self.findall_video(html):
100
page = self.request(url).text
101
video_config = util.json_loads(text.extr(
102
page, 'var VIDEO_CONFIG =', '\n'))
103
files.append(max(
104
video_config["streams"],
105
key=lambda x: x["format_id"],
106
)["play_url"])
107
108
109
BASE_PATTERN = BloggerExtractor.update({
110
"blogspot": {
111
"root": None,
112
"pattern": r"[\w-]+\.blogspot\.com",
113
},
114
})
115
116
117
class BloggerPostExtractor(BloggerExtractor):
118
"""Extractor for a single blog post"""
119
subcategory = "post"
120
pattern = BASE_PATTERN + r"(/\d\d\d\d/\d\d/[^/?#]+\.html)"
121
example = "https://BLOG.blogspot.com/1970/01/TITLE.html"
122
123
def posts(self, blog):
124
return (self.api.post_by_path(blog["id"], self.groups[-1]),)
125
126
127
class BloggerBlogExtractor(BloggerExtractor):
128
"""Extractor for an entire Blogger blog"""
129
subcategory = "blog"
130
pattern = BASE_PATTERN + r"/?$"
131
example = "https://BLOG.blogspot.com/"
132
133
def posts(self, blog):
134
return self.api.blog_posts(blog["id"])
135
136
137
class BloggerSearchExtractor(BloggerExtractor):
138
"""Extractor for Blogger search resuls"""
139
subcategory = "search"
140
pattern = BASE_PATTERN + r"/search/?\?q=([^&#]+)"
141
example = "https://BLOG.blogspot.com/search?q=QUERY"
142
143
def metadata(self):
144
self.query = query = text.unquote(self.groups[-1])
145
return {"query": query}
146
147
def posts(self, blog):
148
return self.api.blog_search(blog["id"], self.query)
149
150
151
class BloggerLabelExtractor(BloggerExtractor):
152
"""Extractor for Blogger posts by label"""
153
subcategory = "label"
154
pattern = BASE_PATTERN + r"/search/label/([^/?#]+)"
155
example = "https://BLOG.blogspot.com/search/label/LABEL"
156
157
def metadata(self):
158
self.label = label = text.unquote(self.groups[-1])
159
return {"label": label}
160
161
def posts(self, blog):
162
return self.api.blog_posts(blog["id"], self.label)
163
164
165
class BloggerAPI():
166
"""Minimal interface for the Blogger API v3
167
168
https://developers.google.com/blogger
169
"""
170
API_KEY = "AIzaSyCN9ax34oMMyM07g_M-5pjeDp_312eITK8"
171
172
def __init__(self, extractor):
173
self.extractor = extractor
174
self.api_key = extractor.config("api-key") or self.API_KEY
175
176
def blog_by_url(self, url):
177
return self._call("/blogs/byurl", {"url": url}, "blog")
178
179
def blog_posts(self, blog_id, label=None):
180
endpoint = f"/blogs/{blog_id}/posts"
181
params = {"labels": label}
182
return self._pagination(endpoint, params)
183
184
def blog_search(self, blog_id, query):
185
endpoint = f"/blogs/{blog_id}/posts/search"
186
params = {"q": query}
187
return self._pagination(endpoint, params)
188
189
def post_by_path(self, blog_id, path):
190
endpoint = f"/blogs/{blog_id}/posts/bypath"
191
return self._call(endpoint, {"path": path}, "post")
192
193
def _call(self, endpoint, params, notfound=None):
194
url = "https://www.googleapis.com/blogger/v3" + endpoint
195
params["key"] = self.api_key
196
return self.extractor.request_json(
197
url, params=params, notfound=notfound)
198
199
def _pagination(self, endpoint, params):
200
while True:
201
data = self._call(endpoint, params)
202
if "items" in data:
203
yield from data["items"]
204
if "nextPageToken" not in data:
205
return
206
params["pageToken"] = data["nextPageToken"]
207
208