Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/civitai.py
5399 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2024-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Extractors for https://www.civitai.com/"""
10
11
from .common import Extractor, Message, Dispatch
12
from .. import text, util, exception
13
from ..cache import memcache
14
import itertools
15
import time
16
17
BASE_PATTERN = r"(?:https?://)?civitai\.com"
18
USER_PATTERN = BASE_PATTERN + r"/user/([^/?#]+)"
19
20
21
class CivitaiExtractor(Extractor):
22
"""Base class for civitai extractors"""
23
category = "civitai"
24
root = "https://civitai.com"
25
directory_fmt = ("{category}", "{user[username]}", "images")
26
filename_fmt = "{file[id]}.{extension}"
27
archive_fmt = "{file[uuid]}"
28
request_interval = (0.5, 1.5)
29
30
def _init(self):
31
if self.config("api") == "rest":
32
self.log.debug("Using REST API")
33
self.api = CivitaiRestAPI(self)
34
else:
35
self.log.debug("Using tRPC API")
36
self.api = CivitaiTrpcAPI(self)
37
38
if quality := self.config("quality"):
39
if not isinstance(quality, str):
40
quality = ",".join(quality)
41
self._image_quality = quality
42
self._image_ext = ("png" if quality == "original=true" else "jpg")
43
else:
44
self._image_quality = "original=true"
45
self._image_ext = "png"
46
47
if quality_video := self.config("quality-videos"):
48
if not isinstance(quality_video, str):
49
quality_video = ",".join(quality_video)
50
if quality_video[0] == "+":
51
quality_video = (self._image_quality + "," +
52
quality_video.lstrip("+,"))
53
self._video_quality = quality_video
54
elif quality_video is not None and quality:
55
self._video_quality = self._image_quality
56
else:
57
self._video_quality = "quality=100"
58
self._video_ext = "webm"
59
60
if metadata := self.config("metadata"):
61
if isinstance(metadata, str):
62
metadata = metadata.split(",")
63
elif not isinstance(metadata, (list, tuple)):
64
metadata = ("generation", "version", "post")
65
self._meta_generation = ("generation" in metadata)
66
self._meta_version = ("version" in metadata)
67
self._meta_post = ("post" in metadata)
68
else:
69
self._meta_generation = self._meta_version = self._meta_post = \
70
False
71
72
def items(self):
73
if models := self.models():
74
data = {"_extractor": CivitaiModelExtractor}
75
for model in models:
76
url = f"{self.root}/models/{model['id']}"
77
yield Message.Queue, url, data
78
return
79
80
if posts := self.posts():
81
for post in posts:
82
83
if "images" in post:
84
images = post["images"]
85
else:
86
images = self.api.images_post(post["id"])
87
88
post = self.api.post(post["id"])
89
post["date"] = text.parse_datetime(
90
post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
91
data = {
92
"post": post,
93
"user": post.pop("user"),
94
}
95
if self._meta_version:
96
data["model"], data["version"] = \
97
self._extract_meta_version(post)
98
99
yield Message.Directory, data
100
for file in self._image_results(images):
101
file.update(data)
102
yield Message.Url, file["url"], file
103
return
104
105
if images := self.images():
106
for file in images:
107
108
data = {
109
"file": file,
110
"user": file.pop("user"),
111
}
112
113
if self._meta_generation:
114
data["generation"] = \
115
self._extract_meta_generation(file)
116
if self._meta_version:
117
data["model"], data["version"] = \
118
self._extract_meta_version(file, False)
119
if "post" in file:
120
data["post"] = file.pop("post")
121
if self._meta_post and "post" not in data:
122
data["post"] = post = self._extract_meta_post(file)
123
if post:
124
post.pop("user", None)
125
file["date"] = text.parse_datetime(
126
file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
127
128
data["url"] = url = self._url(file)
129
text.nameext_from_url(url, data)
130
if not data["extension"]:
131
data["extension"] = (
132
self._video_ext if file.get("type") == "video" else
133
self._image_ext)
134
yield Message.Directory, data
135
yield Message.Url, url, data
136
return
137
138
def models(self):
139
return ()
140
141
def posts(self):
142
return ()
143
144
def images(self):
145
return ()
146
147
def _url(self, image):
148
url = image["url"]
149
video = image.get("type") == "video"
150
quality = self._video_quality if video else self._image_quality
151
152
if "/" in url:
153
parts = url.rsplit("/", 3)
154
image["uuid"] = parts[1]
155
parts[2] = quality
156
return "/".join(parts)
157
158
image["uuid"] = url
159
name = image.get("name")
160
if not name:
161
if mime := image.get("mimeType"):
162
name = f"{image.get('id')}.{mime.rpartition('/')[2]}"
163
else:
164
ext = self._video_ext if video else self._image_ext
165
name = f"{image.get('id')}.{ext}"
166
return (f"https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA"
167
f"/{url}/{quality}/{name}")
168
169
def _image_results(self, images):
170
for num, file in enumerate(images, 1):
171
data = text.nameext_from_url(file["url"], {
172
"num" : num,
173
"file": file,
174
"url" : self._url(file),
175
})
176
if not data["extension"]:
177
data["extension"] = (
178
self._video_ext if file.get("type") == "video" else
179
self._image_ext)
180
if "id" not in file and data["filename"].isdecimal():
181
file["id"] = text.parse_int(data["filename"])
182
if "date" not in file:
183
file["date"] = text.parse_datetime(
184
file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
185
if self._meta_generation:
186
file["generation"] = self._extract_meta_generation(file)
187
yield data
188
189
def _image_reactions(self):
190
self._require_auth()
191
192
params = self.params
193
params["authed"] = True
194
params["useIndex"] = False
195
if "reactions" not in params:
196
params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry")
197
return self.api.images(params)
198
199
def _require_auth(self):
200
if "Authorization" not in self.api.headers and \
201
not self.cookies.get(
202
"__Secure-civitai-token", domain=".civitai.com"):
203
raise exception.AuthRequired(("api-key", "authenticated cookies"))
204
205
def _parse_query(self, value):
206
return text.parse_query_list(
207
value, {"tags", "reactions", "baseModels", "tools", "techniques",
208
"types", "fileFormats"})
209
210
def _extract_meta_generation(self, image):
211
try:
212
return self.api.image_generationdata(image["id"])
213
except Exception as exc:
214
return self.log.debug("", exc_info=exc)
215
216
def _extract_meta_post(self, image):
217
try:
218
post = self.api.post(image["postId"])
219
post["date"] = text.parse_datetime(
220
post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
221
return post
222
except Exception as exc:
223
return self.log.debug("", exc_info=exc)
224
225
def _extract_meta_version(self, item, is_post=True):
226
try:
227
if version_id := self._extract_version_id(item, is_post):
228
version = self.api.model_version(version_id).copy()
229
return version.pop("model", None), version
230
except Exception as exc:
231
self.log.debug("", exc_info=exc)
232
return None, None
233
234
def _extract_version_id(self, item, is_post=True):
235
if version_id := item.get("modelVersionId"):
236
return version_id
237
if version_ids := item.get("modelVersionIds"):
238
return version_ids[0]
239
if version_ids := item.get("modelVersionIdsManual"):
240
return version_ids[0]
241
242
if is_post:
243
return None
244
245
item["post"] = post = self.api.post(item["postId"])
246
post.pop("user", None)
247
return self._extract_version_id(post)
248
249
250
class CivitaiModelExtractor(CivitaiExtractor):
251
subcategory = "model"
252
directory_fmt = ("{category}", "{user[username]}",
253
"{model[id]}{model[name]:? //}",
254
"{version[id]}{version[name]:? //}")
255
pattern = BASE_PATTERN + r"/models/(\d+)(?:/?\?modelVersionId=(\d+))?"
256
example = "https://civitai.com/models/12345/TITLE"
257
258
def items(self):
259
model_id, version_id = self.groups
260
model = self.api.model(model_id)
261
262
if "user" in model:
263
user = model["user"]
264
del model["user"]
265
else:
266
user = model["creator"]
267
del model["creator"]
268
versions = model["modelVersions"]
269
del model["modelVersions"]
270
271
if version_id:
272
version_id = int(version_id)
273
for version in versions:
274
if version["id"] == version_id:
275
break
276
else:
277
version = self.api.model_version(version_id)
278
versions = (version,)
279
280
for version in versions:
281
version["date"] = text.parse_datetime(
282
version["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
283
284
data = {
285
"model" : model,
286
"version": version,
287
"user" : user,
288
}
289
290
yield Message.Directory, data
291
for file in self._extract_files(model, version, user):
292
file.update(data)
293
yield Message.Url, file["url"], file
294
295
def _extract_files(self, model, version, user):
296
filetypes = self.config("files")
297
if filetypes is None:
298
return self._extract_files_image(model, version, user)
299
300
generators = {
301
"model" : self._extract_files_model,
302
"image" : self._extract_files_image,
303
"gallery" : self._extract_files_gallery,
304
"gallerie": self._extract_files_gallery,
305
}
306
if isinstance(filetypes, str):
307
filetypes = filetypes.split(",")
308
309
return itertools.chain.from_iterable(
310
generators[ft.rstrip("s")](model, version, user)
311
for ft in filetypes
312
)
313
314
def _extract_files_model(self, model, version, user):
315
files = []
316
317
for num, file in enumerate(version["files"], 1):
318
name, sep, ext = file["name"].rpartition(".")
319
if not sep:
320
name = ext
321
ext = "bin"
322
file["uuid"] = f"model-{model['id']}-{version['id']}-{file['id']}"
323
files.append({
324
"num" : num,
325
"file" : file,
326
"filename" : name,
327
"extension": ext,
328
"url" : (
329
file.get("downloadUrl") or
330
f"{self.root}/api/download/models/{version['id']}"),
331
"_http_headers" : {
332
"Authorization": self.api.headers.get("Authorization")},
333
"_http_validate": self._validate_file_model,
334
})
335
336
return files
337
338
def _extract_files_image(self, model, version, user):
339
if "images" in version:
340
images = version["images"]
341
else:
342
params = {
343
"modelVersionId": version["id"],
344
"prioritizedUserIds": (user["id"],),
345
"period": "AllTime",
346
"sort": "Most Reactions",
347
"limit": 20,
348
"pending": True,
349
}
350
images = self.api.images(params, defaults=False)
351
352
return self._image_results(images)
353
354
def _extract_files_gallery(self, model, version, user):
355
images = self.api.images_gallery(model, version, user)
356
return self._image_results(images)
357
358
def _validate_file_model(self, response):
359
if response.headers.get("Content-Type", "").startswith("text/html"):
360
alert = text.extr(
361
response.text, 'mantine-Alert-message">', "</div></div></div>")
362
if alert:
363
msg = f"\"{text.remove_html(alert)}\" - 'api-key' required"
364
else:
365
msg = "'api-key' required to download this file"
366
self.log.warning(msg)
367
return False
368
return True
369
370
371
class CivitaiImageExtractor(CivitaiExtractor):
372
subcategory = "image"
373
pattern = BASE_PATTERN + r"/images/(\d+)"
374
example = "https://civitai.com/images/12345"
375
376
def images(self):
377
return self.api.image(self.groups[0])
378
379
380
class CivitaiCollectionExtractor(CivitaiExtractor):
381
subcategory = "collection"
382
directory_fmt = ("{category}", "{user_collection[username]}",
383
"collections", "{collection[id]}{collection[name]:? //}")
384
pattern = BASE_PATTERN + r"/collections/(\d+)"
385
example = "https://civitai.com/collections/12345"
386
387
def images(self):
388
cid = int(self.groups[0])
389
self.kwdict["collection"] = col = self.api.collection(cid)
390
self.kwdict["user_collection"] = col.pop("user", None)
391
392
params = {
393
"collectionId" : cid,
394
"period" : "AllTime",
395
"sort" : "Newest",
396
"browsingLevel" : self.api.nsfw,
397
"include" : ("cosmetics",),
398
}
399
return self.api.images(params, defaults=False)
400
401
402
class CivitaiPostExtractor(CivitaiExtractor):
403
subcategory = "post"
404
directory_fmt = ("{category}", "{username|user[username]}", "posts",
405
"{post[id]}{post[title]:? //}")
406
pattern = BASE_PATTERN + r"/posts/(\d+)"
407
example = "https://civitai.com/posts/12345"
408
409
def posts(self):
410
return ({"id": int(self.groups[0])},)
411
412
413
class CivitaiTagExtractor(CivitaiExtractor):
414
subcategory = "tag"
415
pattern = BASE_PATTERN + r"/tag/([^/?&#]+)"
416
example = "https://civitai.com/tag/TAG"
417
418
def models(self):
419
tag = text.unquote(self.groups[0])
420
return self.api.models_tag(tag)
421
422
423
class CivitaiSearchModelsExtractor(CivitaiExtractor):
424
subcategory = "search-models"
425
pattern = BASE_PATTERN + r"/search/models\?([^#]+)"
426
example = "https://civitai.com/search/models?query=QUERY"
427
428
def models(self):
429
params = self._parse_query(self.groups[0])
430
return CivitaiSearchAPI(self).search_models(
431
params.get("query"), params.get("sortBy"), self.api.nsfw)
432
433
434
class CivitaiSearchImagesExtractor(CivitaiExtractor):
435
subcategory = "search-images"
436
pattern = BASE_PATTERN + r"/search/images\?([^#]+)"
437
example = "https://civitai.com/search/images?query=QUERY"
438
439
def images(self):
440
params = self._parse_query(self.groups[0])
441
return CivitaiSearchAPI(self).search_images(
442
params.get("query"), params.get("sortBy"), self.api.nsfw)
443
444
445
class CivitaiModelsExtractor(CivitaiExtractor):
446
subcategory = "models"
447
pattern = BASE_PATTERN + r"/models(?:/?\?([^#]+))?(?:$|#)"
448
example = "https://civitai.com/models"
449
450
def models(self):
451
params = self._parse_query(self.groups[0])
452
return self.api.models(params)
453
454
455
class CivitaiImagesExtractor(CivitaiExtractor):
456
subcategory = "images"
457
pattern = BASE_PATTERN + r"/images(?:/?\?([^#]+))?(?:$|#)"
458
example = "https://civitai.com/images"
459
460
def images(self):
461
params = self._parse_query(self.groups[0])
462
params["types"] = ("image",)
463
return self.api.images(params)
464
465
466
class CivitaiVideosExtractor(CivitaiExtractor):
467
subcategory = "videos"
468
pattern = BASE_PATTERN + r"/videos(?:/?\?([^#]+))?(?:$|#)"
469
example = "https://civitai.com/videos"
470
471
def images(self):
472
params = self._parse_query(self.groups[0])
473
params["types"] = ("video",)
474
return self.api.images(params)
475
476
477
class CivitaiPostsExtractor(CivitaiExtractor):
478
subcategory = "posts"
479
pattern = BASE_PATTERN + r"/posts(?:/?\?([^#]+))?(?:$|#)"
480
example = "https://civitai.com/posts"
481
482
def posts(self):
483
params = self._parse_query(self.groups[0])
484
return self.api.posts(params)
485
486
487
class CivitaiUserExtractor(Dispatch, CivitaiExtractor):
488
pattern = USER_PATTERN + r"/?(?:$|\?|#)"
489
example = "https://civitai.com/user/USER"
490
491
def items(self):
492
base = f"{self.root}/user/{self.groups[0]}/"
493
return self._dispatch_extractors((
494
(CivitaiUserModelsExtractor, base + "models"),
495
(CivitaiUserPostsExtractor , base + "posts"),
496
(CivitaiUserImagesExtractor, base + "images"),
497
(CivitaiUserVideosExtractor, base + "videos"),
498
(CivitaiUserCollectionsExtractor, base + "collections"),
499
), ("user-images", "user-videos"))
500
501
502
class CivitaiUserModelsExtractor(CivitaiExtractor):
503
subcategory = "user-models"
504
pattern = USER_PATTERN + r"/models/?(?:\?([^#]+))?"
505
example = "https://civitai.com/user/USER/models"
506
507
def models(self):
508
user, query = self.groups
509
params = self._parse_query(query)
510
params["username"] = text.unquote(user)
511
return self.api.models(params)
512
513
514
class CivitaiUserPostsExtractor(CivitaiExtractor):
515
subcategory = "user-posts"
516
directory_fmt = ("{category}", "{username|user[username]}", "posts",
517
"{post[id]}{post[title]:? //}")
518
pattern = USER_PATTERN + r"/posts/?(?:\?([^#]+))?"
519
example = "https://civitai.com/user/USER/posts"
520
521
def posts(self):
522
user, query = self.groups
523
params = self._parse_query(query)
524
params["username"] = text.unquote(user)
525
return self.api.posts(params)
526
527
528
class CivitaiUserImagesExtractor(CivitaiExtractor):
529
subcategory = "user-images"
530
pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?"
531
example = "https://civitai.com/user/USER/images"
532
533
def __init__(self, match):
534
user, query = match.groups()
535
self.params = self._parse_query(query)
536
self.params["types"] = ("image",)
537
if self.params.get("section") == "reactions":
538
self.subcategory = "reactions-images"
539
self.images = self._image_reactions
540
else:
541
self.params["username"] = text.unquote(user)
542
CivitaiExtractor.__init__(self, match)
543
544
def images(self):
545
return self.api.images(self.params)
546
547
548
class CivitaiUserVideosExtractor(CivitaiExtractor):
549
subcategory = "user-videos"
550
directory_fmt = ("{category}", "{username|user[username]}", "videos")
551
pattern = USER_PATTERN + r"/videos/?(?:\?([^#]+))?"
552
example = "https://civitai.com/user/USER/videos"
553
554
def __init__(self, match):
555
user, query = match.groups()
556
self.params = self._parse_query(query)
557
self.params["types"] = ("video",)
558
if self.params.get("section") == "reactions":
559
self.subcategory = "reactions-videos"
560
self.images = self._image_reactions
561
else:
562
self.params["username"] = text.unquote(user)
563
CivitaiExtractor.__init__(self, match)
564
565
images = CivitaiUserImagesExtractor.images
566
567
568
class CivitaiUserCollectionsExtractor(CivitaiExtractor):
569
subcategory = "user-collections"
570
pattern = USER_PATTERN + r"/collections/?(?:\?([^#]+))?"
571
example = "https://civitai.com/user/USER/collections"
572
573
def items(self):
574
user, query = self.groups
575
params = self._parse_query(query)
576
params["userId"] = self.api.user(text.unquote(user))[0]["id"]
577
578
base = f"{self.root}/collections/"
579
for collection in self.api.collections(params):
580
collection["_extractor"] = CivitaiCollectionExtractor
581
yield Message.Queue, f"{base}{collection['id']}", collection
582
583
584
class CivitaiGeneratedExtractor(CivitaiExtractor):
585
"""Extractor for your generated files feed"""
586
subcategory = "generated"
587
filename_fmt = "{filename}.{extension}"
588
directory_fmt = ("{category}", "generated")
589
pattern = f"{BASE_PATTERN}/generate"
590
example = "https://civitai.com/generate"
591
592
def items(self):
593
self._require_auth()
594
595
for gen in self.api.orchestrator_queryGeneratedImages():
596
gen["date"] = text.parse_datetime(
597
gen["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
598
yield Message.Directory, gen
599
for step in gen.pop("steps", ()):
600
for image in step.pop("images", ()):
601
data = {"file": image, **step, **gen}
602
url = image["url"]
603
yield Message.Url, url, text.nameext_from_url(url, data)
604
605
606
class CivitaiRestAPI():
607
"""Interface for the Civitai Public REST API
608
609
https://developer.civitai.com/docs/api/public-rest
610
"""
611
612
def __init__(self, extractor):
613
self.extractor = extractor
614
self.root = extractor.root + "/api"
615
self.headers = {"Content-Type": "application/json"}
616
617
if api_key := extractor.config("api-key"):
618
extractor.log.debug("Using api_key authentication")
619
self.headers["Authorization"] = "Bearer " + api_key
620
621
nsfw = extractor.config("nsfw")
622
if nsfw is None or nsfw is True:
623
nsfw = "X"
624
elif not nsfw:
625
nsfw = "Safe"
626
self.nsfw = nsfw
627
628
def image(self, image_id):
629
return self.images({
630
"imageId": image_id,
631
})
632
633
def images(self, params):
634
endpoint = "/v1/images"
635
if "nsfw" not in params:
636
params["nsfw"] = self.nsfw
637
return self._pagination(endpoint, params)
638
639
def images_gallery(self, model, version, user):
640
return self.images({
641
"modelId" : model["id"],
642
"modelVersionId": version["id"],
643
})
644
645
def model(self, model_id):
646
endpoint = f"/v1/models/{model_id}"
647
return self._call(endpoint)
648
649
@memcache(keyarg=1)
650
def model_version(self, model_version_id):
651
endpoint = f"/v1/model-versions/{model_version_id}"
652
return self._call(endpoint)
653
654
def models(self, params):
655
return self._pagination("/v1/models", params)
656
657
def models_tag(self, tag):
658
return self.models({"tag": tag})
659
660
def _call(self, endpoint, params=None):
661
if endpoint[0] == "/":
662
url = self.root + endpoint
663
else:
664
url = endpoint
665
666
response = self.extractor.request(
667
url, params=params, headers=self.headers)
668
return response.json()
669
670
def _pagination(self, endpoint, params):
671
while True:
672
data = self._call(endpoint, params)
673
yield from data["items"]
674
675
try:
676
endpoint = data["metadata"]["nextPage"]
677
except KeyError:
678
return
679
params = None
680
681
682
class CivitaiTrpcAPI():
683
"""Interface for the Civitai tRPC API"""
684
685
def __init__(self, extractor):
686
self.extractor = extractor
687
self.root = extractor.root + "/api/trpc/"
688
self.headers = {
689
"content-type" : "application/json",
690
"x-client-version": "5.0.954",
691
"x-client-date" : "",
692
"x-client" : "web",
693
"x-fingerprint" : "undefined",
694
}
695
if api_key := extractor.config("api-key"):
696
extractor.log.debug("Using api_key authentication")
697
self.headers["Authorization"] = "Bearer " + api_key
698
699
nsfw = extractor.config("nsfw")
700
if nsfw is None or nsfw is True:
701
nsfw = 31
702
elif not nsfw:
703
nsfw = 1
704
self.nsfw = nsfw
705
706
def image(self, image_id):
707
endpoint = "image.get"
708
params = {"id": int(image_id)}
709
return (self._call(endpoint, params),)
710
711
def image_generationdata(self, image_id):
712
endpoint = "image.getGenerationData"
713
params = {"id": int(image_id)}
714
return self._call(endpoint, params)
715
716
def images(self, params, defaults=True):
717
endpoint = "image.getInfinite"
718
719
if defaults:
720
params = self._merge_params(params, {
721
"useIndex" : True,
722
"period" : "AllTime",
723
"sort" : "Newest",
724
"withMeta" : False, # Metadata Only
725
"fromPlatform" : False, # Made On-Site
726
"browsingLevel": self.nsfw,
727
"include" : ("cosmetics",),
728
})
729
730
params = self._type_params(params)
731
return self._pagination(endpoint, params)
732
733
def images_gallery(self, model, version, user):
734
endpoint = "image.getImagesAsPostsInfinite"
735
params = {
736
"period" : "AllTime",
737
"sort" : "Newest",
738
"modelVersionId": version["id"],
739
"modelId" : model["id"],
740
"hidden" : False,
741
"limit" : 50,
742
"browsingLevel" : self.nsfw,
743
}
744
745
for post in self._pagination(endpoint, params):
746
yield from post["images"]
747
748
def images_post(self, post_id):
749
params = {
750
"postId" : int(post_id),
751
"pending": True,
752
}
753
return self.images(params)
754
755
def model(self, model_id):
756
endpoint = "model.getById"
757
params = {"id": int(model_id)}
758
return self._call(endpoint, params)
759
760
@memcache(keyarg=1)
761
def model_version(self, model_version_id):
762
endpoint = "modelVersion.getById"
763
params = {"id": int(model_version_id)}
764
return self._call(endpoint, params)
765
766
def models(self, params, defaults=True):
767
endpoint = "model.getAll"
768
769
if defaults:
770
params = self._merge_params(params, {
771
"period" : "AllTime",
772
"periodMode" : "published",
773
"sort" : "Newest",
774
"pending" : False,
775
"hidden" : False,
776
"followed" : False,
777
"earlyAccess" : False,
778
"fromPlatform" : False,
779
"supportsGeneration": False,
780
"browsingLevel": self.nsfw,
781
})
782
783
return self._pagination(endpoint, params)
784
785
def models_tag(self, tag):
786
return self.models({"tagname": tag})
787
788
def post(self, post_id):
789
endpoint = "post.get"
790
params = {"id": int(post_id)}
791
return self._call(endpoint, params)
792
793
def posts(self, params, defaults=True):
794
endpoint = "post.getInfinite"
795
meta = {"cursor": ("Date",)}
796
797
if defaults:
798
params = self._merge_params(params, {
799
"browsingLevel": self.nsfw,
800
"period" : "AllTime",
801
"periodMode" : "published",
802
"sort" : "Newest",
803
"followed" : False,
804
"draftOnly" : False,
805
"pending" : True,
806
"include" : ("cosmetics",),
807
})
808
809
params = self._type_params(params)
810
return self._pagination(endpoint, params, meta)
811
812
def collection(self, collection_id):
813
endpoint = "collection.getById"
814
params = {"id": int(collection_id)}
815
return self._call(endpoint, params)["collection"]
816
817
def collections(self, params, defaults=True):
818
endpoint = "collection.getInfinite"
819
820
if defaults:
821
params = self._merge_params(params, {
822
"browsingLevel": self.nsfw,
823
"sort" : "Newest",
824
})
825
826
params = self._type_params(params)
827
return self._pagination(endpoint, params)
828
829
def user(self, username):
830
endpoint = "user.getCreator"
831
params = {"username": username}
832
return (self._call(endpoint, params),)
833
834
def orchestrator_queryGeneratedImages(self):
835
endpoint = "orchestrator.queryGeneratedImages"
836
params = {
837
"ascending": False,
838
"tags" : ("gen",),
839
"authed" : True,
840
}
841
return self._pagination(endpoint, params)
842
843
def _call(self, endpoint, params, meta=None):
844
url = self.root + endpoint
845
headers = self.headers
846
847
if meta:
848
input = {"json": params, "meta": {"values": meta}}
849
else:
850
input = {"json": params}
851
852
params = {"input": util.json_dumps(input)}
853
headers["x-client-date"] = str(int(time.time() * 1000))
854
return self.extractor.request_json(
855
url, params=params, headers=headers)["result"]["data"]["json"]
856
857
def _pagination(self, endpoint, params, meta=None):
858
if "cursor" not in params:
859
params["cursor"] = None
860
meta_ = {"cursor": ("undefined",)}
861
862
while True:
863
data = self._call(endpoint, params, meta_)
864
yield from data["items"]
865
866
try:
867
if not data["nextCursor"]:
868
return
869
except KeyError:
870
return
871
872
params["cursor"] = data["nextCursor"]
873
meta_ = meta
874
875
def _merge_params(self, params_user, params_default):
876
"""Combine 'params_user' with 'params_default'"""
877
params_default.update(params_user)
878
return params_default
879
880
def _type_params(self, params):
881
"""Convert 'params' values to expected types"""
882
types = {
883
"tags" : int,
884
"tools" : int,
885
"techniques" : int,
886
"modelId" : int,
887
"modelVersionId": int,
888
"remixesOnly" : _bool,
889
"nonRemixesOnly": _bool,
890
"withMeta" : _bool,
891
"fromPlatform" : _bool,
892
"supportsGeneration": _bool,
893
}
894
895
for name, value in params.items():
896
if name not in types:
897
continue
898
elif isinstance(value, str):
899
params[name] = types[name](value)
900
elif isinstance(value, list):
901
type = types[name]
902
params[name] = [type(item) for item in value]
903
return params
904
905
906
def _bool(value):
907
return value == "true"
908
909
910
class CivitaiSearchAPI():
911
912
def __init__(self, extractor):
913
self.extractor = extractor
914
self.root = "https://search-new.civitai.com"
915
916
if auth := extractor.config("token"):
917
if " " not in auth:
918
auth = f"Bearer {auth}"
919
else:
920
auth = ("Bearer 8c46eb2508e21db1e9828a97968d"
921
"91ab1ca1caa5f70a00e88a2ba1e286603b61")
922
923
self.headers = {
924
"Authorization": auth,
925
"Content-Type": "application/json",
926
"X-Meilisearch-Client": "Meilisearch instant-meilisearch (v0.13.5)"
927
" ; Meilisearch JavaScript (v0.34.0)",
928
"Origin": extractor.root,
929
"Sec-Fetch-Dest": "empty",
930
"Sec-Fetch-Mode": "cors",
931
"Sec-Fetch-Site": "same-site",
932
"Priority": "u=4",
933
}
934
935
def search(self, query, type, facets, nsfw=31):
936
endpoint = "/multi-search"
937
938
query = {
939
"q" : query,
940
"indexUid": type,
941
"facets" : facets,
942
"attributesToHighlight": (),
943
"highlightPreTag" : "__ais-highlight__",
944
"highlightPostTag": "__/ais-highlight__",
945
"limit" : 51,
946
"offset": 0,
947
"filter": (self._generate_filter(nsfw),),
948
}
949
950
return self._pagination(endpoint, query)
951
952
def search_models(self, query, type=None, nsfw=31):
953
facets = (
954
"category.name",
955
"checkpointType",
956
"fileFormats",
957
"lastVersionAtUnix",
958
"tags.name",
959
"type",
960
"user.username",
961
"version.baseModel",
962
)
963
return self.search(query, type or "models_v9", facets, nsfw)
964
965
def search_images(self, query, type=None, nsfw=31):
966
facets = (
967
"aspectRatio",
968
"baseModel",
969
"createdAtUnix",
970
"tagNames",
971
"techniqueNames",
972
"toolNames",
973
"type",
974
"user.username",
975
)
976
return self.search(query, type or "images_v6", facets, nsfw)
977
978
def _call(self, endpoint, query):
979
url = self.root + endpoint
980
params = util.json_dumps({"queries": (query,)})
981
982
data = self.extractor.request_json(
983
url, method="POST", headers=self.headers, data=params)
984
985
return data["results"][0]
986
987
def _pagination(self, endpoint, query):
988
limit = query["limit"] - 1
989
threshold = limit // 2
990
991
while True:
992
data = self._call(endpoint, query)
993
994
items = data["hits"]
995
yield from items
996
997
if len(items) < threshold:
998
return
999
query["offset"] += limit
1000
1001
def _generate_filter(self, level):
1002
fltr = []
1003
1004
if level & 1:
1005
fltr.append("1")
1006
if level & 2:
1007
fltr.append("2")
1008
if level & 4:
1009
fltr.append("4")
1010
if level & 8:
1011
fltr.append("8")
1012
if level & 16:
1013
fltr.append("16")
1014
1015
if not fltr:
1016
return "()"
1017
return "(nsfwLevel=" + " OR nsfwLevel=".join(fltr) + ")"
1018
1019