Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/extractor/civitai.py
8970 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2024-2025 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""Extractors for https://www.civitai.com/"""
10
11
from .common import Extractor, Message, Dispatch
12
from .. import text, util
13
from ..cache import memcache
14
import itertools
15
import time
16
17
BASE_PATTERN = r"(?:https?://)?civitai\.com"
18
USER_PATTERN = BASE_PATTERN + r"/user/([^/?#]+)"
19
20
21
class CivitaiExtractor(Extractor):
22
"""Base class for civitai extractors"""
23
category = "civitai"
24
root = "https://civitai.com"
25
directory_fmt = ("{category}", "{user[username]}", "images")
26
filename_fmt = "{file[id]}.{extension}"
27
archive_fmt = "{file[uuid]}"
28
request_interval = (0.5, 1.5)
29
30
def _init(self):
31
if self.config("api") == "rest":
32
self.log.debug("Using REST API")
33
self.api = CivitaiRestAPI(self)
34
else:
35
self.log.debug("Using tRPC API")
36
self.api = CivitaiTrpcAPI(self)
37
38
if quality := self.config("quality"):
39
if not isinstance(quality, str):
40
quality = ",".join(quality)
41
self._image_quality = quality
42
self._image_ext = ("png" if quality == "original=true" else "jpg")
43
else:
44
self._image_quality = "original=true"
45
self._image_ext = "png"
46
47
if quality_video := self.config("quality-videos"):
48
if not isinstance(quality_video, str):
49
quality_video = ",".join(quality_video)
50
if quality_video[0] == "+":
51
quality_video = (self._image_quality + "," +
52
quality_video.lstrip("+,"))
53
self._video_quality = quality_video
54
elif quality_video is not None and quality:
55
self._video_quality = self._image_quality
56
else:
57
self._video_quality = "original=true,quality=100"
58
self._video_ext = "webm"
59
60
if metadata := self.config("metadata"):
61
if isinstance(metadata, str):
62
metadata = metadata.split(",")
63
elif not isinstance(metadata, (list, tuple)):
64
metadata = {"generation", "version", "post", "tags"}
65
self._meta_generation = ("generation" in metadata)
66
self._meta_version = ("version" in metadata)
67
self._meta_post = ("post" in metadata)
68
self._meta_tags = ("tags" in metadata)
69
else:
70
self._meta_generation = self._meta_version = self._meta_post = \
71
self._meta_tags = False
72
73
def items(self):
74
if models := self.models():
75
data = {"_extractor": CivitaiModelExtractor}
76
for model in models:
77
url = f"{self.root}/models/{model['id']}"
78
yield Message.Queue, url, data
79
return
80
81
if posts := self.posts():
82
for post in posts:
83
84
if "images" in post:
85
images = post["images"]
86
else:
87
images = self.api.images_post(post["id"])
88
89
post = self.api.post(post["id"])
90
post["date"] = self.parse_datetime_iso(post["publishedAt"])
91
data = {
92
"post": post,
93
"user": post.pop("user"),
94
}
95
if self._meta_version:
96
data["model"], data["version"] = \
97
self._extract_meta_version(post)
98
99
yield Message.Directory, "", data
100
for file in self._image_results(images):
101
file.update(data)
102
yield Message.Url, file["url"], file
103
return
104
105
if images := self.images():
106
for file in images:
107
108
data = {
109
"file": file,
110
"user": file.pop("user"),
111
}
112
113
if self._meta_generation:
114
data["generation"] = self._extract_meta_generation(file)
115
if self._meta_tags:
116
data["tags"] = self._extract_meta_tags(file)
117
if self._meta_version:
118
data["model"], data["version"] = \
119
self._extract_meta_version(file, False)
120
if "post" in file:
121
data["post"] = file.pop("post")
122
if self._meta_post and "post" not in data:
123
data["post"] = post = self._extract_meta_post(file)
124
if post:
125
post.pop("user", None)
126
file["date"] = self.parse_datetime_iso(file["createdAt"])
127
128
data["url"] = url = self._url(file)
129
text.nameext_from_url(url, data)
130
if not data["extension"]:
131
data["extension"] = (
132
self._video_ext if file.get("type") == "video" else
133
self._image_ext)
134
yield Message.Directory, "", data
135
yield Message.Url, url, data
136
return
137
138
def models(self):
139
return ()
140
141
def posts(self):
142
return ()
143
144
def images(self):
145
return ()
146
147
def _url(self, image):
148
url = image["url"]
149
video = image.get("type") == "video"
150
quality = self._video_quality if video else self._image_quality
151
152
if "/" in url:
153
parts = url.rsplit("/", 3)
154
image["uuid"] = parts[1]
155
parts[2] = quality
156
return "/".join(parts)
157
158
image["uuid"] = url
159
name = image.get("name")
160
if not name:
161
if mime := image.get("mimeType"):
162
name = f"{image.get('id')}.{mime.rpartition('/')[2]}"
163
else:
164
ext = self._video_ext if video else self._image_ext
165
name = f"{image.get('id')}.{ext}"
166
return (f"https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA"
167
f"/{url}/{quality}/{name}")
168
169
def _image_results(self, images):
170
for num, file in enumerate(images, 1):
171
data = text.nameext_from_url(file["url"], {
172
"num" : num,
173
"file": file,
174
"url" : self._url(file),
175
})
176
if not data["extension"]:
177
data["extension"] = (
178
self._video_ext if file.get("type") == "video" else
179
self._image_ext)
180
if "id" not in file and data["filename"].isdecimal():
181
file["id"] = text.parse_int(data["filename"])
182
if "date" not in file:
183
file["date"] = self.parse_datetime_iso(file["createdAt"])
184
if self._meta_generation:
185
file["generation"] = self._extract_meta_generation(file)
186
if self._meta_tags:
187
file["tags"] = self._extract_meta_tags(file)
188
yield data
189
190
def _image_reactions(self):
191
self._require_auth()
192
193
params = self.params
194
params["authed"] = True
195
params["useIndex"] = False
196
if "reactions" not in params:
197
params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry")
198
return self.api.images(params)
199
200
def _require_auth(self):
201
if "Authorization" not in self.api.headers and \
202
not self.cookies.get(
203
"__Secure-civitai-token", domain=".civitai.com"):
204
raise self.exc.AuthRequired(("api-key", "authenticated cookies"))
205
206
def _parse_query(self, value):
207
return text.parse_query_list(
208
value, {"tags", "reactions", "baseModels", "tools", "techniques",
209
"types", "fileFormats"})
210
211
def _extract_meta_generation(self, image):
212
try:
213
return self.api.image_generationdata(image["id"])
214
except Exception as exc:
215
return self.log.traceback(exc)
216
217
def _extract_meta_post(self, image):
218
try:
219
post = self.api.post(image["postId"])
220
post["date"] = self.parse_datetime_iso(post["publishedAt"])
221
return post
222
except Exception as exc:
223
return self.log.traceback(exc)
224
225
def _extract_meta_tags(self, image):
226
try:
227
return self.api.tag_getvotabletags(image["id"])
228
except Exception as exc:
229
return self.log.traceback(exc)
230
231
def _extract_meta_version(self, item, is_post=True):
232
try:
233
if version_id := self._extract_version_id(item, is_post):
234
version = self.api.model_version(version_id).copy()
235
return version.pop("model", None), version
236
except Exception as exc:
237
self.log.traceback(exc)
238
return None, None
239
240
def _extract_version_id(self, item, is_post=True):
241
if version_id := item.get("modelVersionId"):
242
return version_id
243
if version_ids := item.get("modelVersionIds"):
244
return version_ids[0]
245
if version_ids := item.get("modelVersionIdsManual"):
246
return version_ids[0]
247
248
if is_post:
249
return None
250
251
item["post"] = post = self.api.post(item["postId"])
252
post.pop("user", None)
253
return self._extract_version_id(post)
254
255
256
class CivitaiModelExtractor(CivitaiExtractor):
257
subcategory = "model"
258
directory_fmt = ("{category}", "{user[username]}",
259
"{model[id]}{model[name]:? //}",
260
"{version[id]}{version[name]:? //}")
261
pattern = BASE_PATTERN + r"/models/(\d+)(?:/?\?modelVersionId=(\d+))?"
262
example = "https://civitai.com/models/12345/TITLE"
263
264
def items(self):
265
model_id, version_id = self.groups
266
model = self.api.model(model_id)
267
268
if "user" in model:
269
user = model["user"]
270
del model["user"]
271
else:
272
user = model["creator"]
273
del model["creator"]
274
versions = model["modelVersions"]
275
del model["modelVersions"]
276
277
if version_id:
278
version_id = int(version_id)
279
for version in versions:
280
if version["id"] == version_id:
281
break
282
else:
283
version = self.api.model_version(version_id)
284
versions = (version,)
285
286
for version in versions:
287
version["date"] = self.parse_datetime_iso(version["createdAt"])
288
289
data = {
290
"model" : model,
291
"version": version,
292
"user" : user,
293
}
294
295
yield Message.Directory, "", data
296
for file in self._extract_files(model, version, user):
297
file.update(data)
298
yield Message.Url, file["url"], file
299
300
def _extract_files(self, model, version, user):
301
filetypes = self.config("files")
302
if filetypes is None:
303
return self._extract_files_image(model, version, user)
304
305
generators = {
306
"model" : self._extract_files_model,
307
"image" : self._extract_files_image,
308
"gallery" : self._extract_files_gallery,
309
"gallerie": self._extract_files_gallery,
310
}
311
if isinstance(filetypes, str):
312
filetypes = filetypes.split(",")
313
314
return itertools.chain.from_iterable(
315
generators[ft.rstrip("s")](model, version, user)
316
for ft in filetypes
317
)
318
319
def _extract_files_model(self, model, version, user):
320
files = []
321
322
for num, file in enumerate(version["files"], 1):
323
name, sep, ext = file["name"].rpartition(".")
324
if not sep:
325
name = ext
326
ext = "bin"
327
file["uuid"] = f"model-{model['id']}-{version['id']}-{file['id']}"
328
files.append({
329
"num" : num,
330
"file" : file,
331
"filename" : name,
332
"extension": ext,
333
"url" : (
334
file.get("downloadUrl") or
335
f"{self.root}/api/download/models/{version['id']}"),
336
"_http_headers" : {
337
"Authorization": self.api.headers.get("Authorization")},
338
"_http_validate": self._validate_file_model,
339
})
340
341
return files
342
343
def _extract_files_image(self, model, version, user):
344
if "images" in version:
345
images = version["images"]
346
else:
347
params = {
348
"modelVersionId": version["id"],
349
"prioritizedUserIds": (user["id"],),
350
"period" : self.api._param_period(),
351
"sort" : self.api._param_sort(),
352
"limit" : 20,
353
"pending": True,
354
}
355
images = self.api.images(params, defaults=False)
356
357
return self._image_results(images)
358
359
def _extract_files_gallery(self, model, version, user):
360
images = self.api.images_gallery(model, version, user)
361
return self._image_results(images)
362
363
def _validate_file_model(self, response):
364
if response.headers.get("Content-Type", "").startswith("text/html"):
365
alert = text.extr(
366
response.text, 'mantine-Alert-message">', "</div></div></div>")
367
if alert:
368
msg = f"\"{text.remove_html(alert)}\" - 'api-key' required"
369
else:
370
msg = "'api-key' required to download this file"
371
self.log.warning(msg)
372
return False
373
return True
374
375
376
class CivitaiImageExtractor(CivitaiExtractor):
377
subcategory = "image"
378
pattern = BASE_PATTERN + r"/images/(\d+)"
379
example = "https://civitai.com/images/12345"
380
381
def images(self):
382
return self.api.image(self.groups[0])
383
384
385
class CivitaiCollectionExtractor(CivitaiExtractor):
386
subcategory = "collection"
387
directory_fmt = ("{category}", "{user_collection[username]}",
388
"collections", "{collection[id]}{collection[name]:? //}")
389
pattern = BASE_PATTERN + r"/collections/(\d+)"
390
example = "https://civitai.com/collections/12345"
391
392
def images(self):
393
cid = int(self.groups[0])
394
self.kwdict["collection"] = col = self.api.collection(cid)
395
self.kwdict["user_collection"] = col.pop("user", None)
396
397
params = {
398
"collectionId" : cid,
399
"period" : self.api._param_period(),
400
"sort" : self.api._param_sort(),
401
"browsingLevel" : self.api.nsfw,
402
"include" : ("cosmetics",),
403
}
404
return self.api.images(params, defaults=False)
405
406
407
class CivitaiPostExtractor(CivitaiExtractor):
408
subcategory = "post"
409
directory_fmt = ("{category}", "{username|user[username]}", "posts",
410
"{post[id]}{post[title]:? //}")
411
pattern = BASE_PATTERN + r"/posts/(\d+)"
412
example = "https://civitai.com/posts/12345"
413
414
def posts(self):
415
return ({"id": int(self.groups[0])},)
416
417
418
class CivitaiTagExtractor(CivitaiExtractor):
419
subcategory = "tag"
420
pattern = BASE_PATTERN + r"/tag/([^/?&#]+)"
421
example = "https://civitai.com/tag/TAG"
422
423
def models(self):
424
tag = text.unquote(self.groups[0])
425
return self.api.models_tag(tag)
426
427
428
class CivitaiSearchModelsExtractor(CivitaiExtractor):
429
subcategory = "search-models"
430
pattern = BASE_PATTERN + r"/search/models\?([^#]+)"
431
example = "https://civitai.com/search/models?query=QUERY"
432
433
def models(self):
434
params = self._parse_query(self.groups[0])
435
return CivitaiSearchAPI(self).search_models(
436
params.get("query"), params.get("sortBy"), self.api.nsfw)
437
438
439
class CivitaiSearchImagesExtractor(CivitaiExtractor):
440
subcategory = "search-images"
441
pattern = BASE_PATTERN + r"/search/images\?([^#]+)"
442
example = "https://civitai.com/search/images?query=QUERY"
443
444
def images(self):
445
params = self._parse_query(self.groups[0])
446
return CivitaiSearchAPI(self).search_images(
447
params.get("query"), params.get("sortBy"), self.api.nsfw)
448
449
450
class CivitaiModelsExtractor(CivitaiExtractor):
451
subcategory = "models"
452
pattern = BASE_PATTERN + r"/models(?:/?\?([^#]+))?(?:$|#)"
453
example = "https://civitai.com/models"
454
455
def models(self):
456
params = self._parse_query(self.groups[0])
457
return self.api.models(params)
458
459
460
class CivitaiImagesExtractor(CivitaiExtractor):
461
subcategory = "images"
462
pattern = BASE_PATTERN + r"/images(?:/?\?([^#]+))?(?:$|#)"
463
example = "https://civitai.com/images"
464
465
def images(self):
466
params = self._parse_query(self.groups[0])
467
params["types"] = ("image",)
468
return self.api.images(params)
469
470
471
class CivitaiVideosExtractor(CivitaiExtractor):
472
subcategory = "videos"
473
pattern = BASE_PATTERN + r"/videos(?:/?\?([^#]+))?(?:$|#)"
474
example = "https://civitai.com/videos"
475
476
def images(self):
477
params = self._parse_query(self.groups[0])
478
params["types"] = ("video",)
479
return self.api.images(params)
480
481
482
class CivitaiPostsExtractor(CivitaiExtractor):
483
subcategory = "posts"
484
pattern = BASE_PATTERN + r"/posts(?:/?\?([^#]+))?(?:$|#)"
485
example = "https://civitai.com/posts"
486
487
def posts(self):
488
params = self._parse_query(self.groups[0])
489
return self.api.posts(params)
490
491
492
class CivitaiUserExtractor(Dispatch, CivitaiExtractor):
493
pattern = USER_PATTERN + r"/?(?:$|\?|#)"
494
example = "https://civitai.com/user/USER"
495
496
def items(self):
497
base = f"{self.root}/user/{self.groups[0]}/"
498
return self._dispatch_extractors((
499
(CivitaiUserModelsExtractor, base + "models"),
500
(CivitaiUserPostsExtractor , base + "posts"),
501
(CivitaiUserImagesExtractor, base + "images"),
502
(CivitaiUserVideosExtractor, base + "videos"),
503
(CivitaiUserCollectionsExtractor, base + "collections"),
504
), ("user-images", "user-videos"))
505
506
507
class CivitaiUserModelsExtractor(CivitaiExtractor):
508
subcategory = "user-models"
509
pattern = USER_PATTERN + r"/models/?(?:\?([^#]+))?"
510
example = "https://civitai.com/user/USER/models"
511
512
def models(self):
513
user, query = self.groups
514
params = self._parse_query(query)
515
params["username"] = text.unquote(user)
516
return self.api.models(params)
517
518
519
class CivitaiUserPostsExtractor(CivitaiExtractor):
520
subcategory = "user-posts"
521
directory_fmt = ("{category}", "{username|user[username]}", "posts",
522
"{post[id]}{post[title]:? //}")
523
pattern = USER_PATTERN + r"/posts/?(?:\?([^#]+))?"
524
example = "https://civitai.com/user/USER/posts"
525
526
def posts(self):
527
user, query = self.groups
528
params = self._parse_query(query)
529
params["username"] = text.unquote(user)
530
return self.api.posts(params)
531
532
533
class CivitaiUserImagesExtractor(CivitaiExtractor):
534
subcategory = "user-images"
535
pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?"
536
example = "https://civitai.com/user/USER/images"
537
538
def __init__(self, match):
539
user, query = match.groups()
540
self.params = self._parse_query(query)
541
self.params["types"] = ("image",)
542
if self.params.get("section") == "reactions":
543
self.subcategory = "reactions-images"
544
self.images = self._image_reactions
545
else:
546
self.params["username"] = text.unquote(user)
547
CivitaiExtractor.__init__(self, match)
548
549
def images(self):
550
return self.api.images(self.params)
551
552
553
class CivitaiUserVideosExtractor(CivitaiExtractor):
554
subcategory = "user-videos"
555
directory_fmt = ("{category}", "{username|user[username]}", "videos")
556
pattern = USER_PATTERN + r"/videos/?(?:\?([^#]+))?"
557
example = "https://civitai.com/user/USER/videos"
558
559
def __init__(self, match):
560
user, query = match.groups()
561
self.params = self._parse_query(query)
562
self.params["types"] = ("video",)
563
if self.params.get("section") == "reactions":
564
self.subcategory = "reactions-videos"
565
self.images = self._image_reactions
566
else:
567
self.params["username"] = text.unquote(user)
568
CivitaiExtractor.__init__(self, match)
569
570
images = CivitaiUserImagesExtractor.images
571
572
573
class CivitaiUserCollectionsExtractor(CivitaiExtractor):
574
subcategory = "user-collections"
575
pattern = USER_PATTERN + r"/collections/?(?:\?([^#]+))?"
576
example = "https://civitai.com/user/USER/collections"
577
578
def items(self):
579
user, query = self.groups
580
params = self._parse_query(query)
581
params["userId"] = self.api.user(text.unquote(user))[0]["id"]
582
583
base = self.root + "/collections/"
584
for collection in self.api.collections(params):
585
collection["_extractor"] = CivitaiCollectionExtractor
586
yield Message.Queue, base + str(collection["id"]), collection
587
588
589
class CivitaiGeneratedExtractor(CivitaiExtractor):
590
"""Extractor for your generated files feed"""
591
subcategory = "generated"
592
filename_fmt = "{filename}.{extension}"
593
directory_fmt = ("{category}", "generated")
594
pattern = BASE_PATTERN + "/generate"
595
example = "https://civitai.com/generate"
596
597
def items(self):
598
self._require_auth()
599
600
for gen in self.api.orchestrator_queryGeneratedImages():
601
gen["date"] = self.parse_datetime_iso(gen["createdAt"])
602
yield Message.Directory, "", gen
603
for step in gen.pop("steps", ()):
604
for image in step.pop("images", ()):
605
data = {"file": image, **step, **gen}
606
url = image["url"]
607
yield Message.Url, url, text.nameext_from_url(url, data)
608
609
610
class CivitaiRestAPI():
611
"""Interface for the Civitai Public REST API
612
613
https://developer.civitai.com/docs/api/public-rest
614
"""
615
616
def __init__(self, extractor):
617
self.extractor = extractor
618
self.root = extractor.root + "/api"
619
self.headers = {"Content-Type": "application/json"}
620
621
if api_key := extractor.config("api-key"):
622
extractor.log.debug("Using api_key authentication")
623
self.headers["Authorization"] = "Bearer " + api_key
624
625
nsfw = extractor.config("nsfw")
626
if nsfw is None or nsfw is True:
627
nsfw = "X"
628
elif not nsfw:
629
nsfw = "Safe"
630
self.nsfw = nsfw
631
632
def image(self, image_id):
633
return self.images({
634
"imageId": image_id,
635
})
636
637
def images(self, params):
638
endpoint = "/v1/images"
639
if "nsfw" not in params:
640
params["nsfw"] = self.nsfw
641
return self._pagination(endpoint, params)
642
643
def images_gallery(self, model, version, user):
644
return self.images({
645
"modelId" : model["id"],
646
"modelVersionId": version["id"],
647
})
648
649
def model(self, model_id):
650
endpoint = "/v1/models/" + str(model_id)
651
return self._call(endpoint)
652
653
@memcache(keyarg=1)
654
def model_version(self, model_version_id):
655
endpoint = "/v1/model-versions/" + str(model_version_id)
656
return self._call(endpoint)
657
658
def models(self, params):
659
return self._pagination("/v1/models", params)
660
661
def models_tag(self, tag):
662
return self.models({"tag": tag})
663
664
def _call(self, endpoint, params=None):
665
if endpoint[0] == "/":
666
url = self.root + endpoint
667
else:
668
url = endpoint
669
670
response = self.extractor.request(
671
url, params=params, headers=self.headers)
672
return response.json()
673
674
def _pagination(self, endpoint, params):
675
while True:
676
data = self._call(endpoint, params)
677
yield from data["items"]
678
679
try:
680
endpoint = data["metadata"]["nextPage"]
681
except KeyError:
682
return
683
params = None
684
685
686
class CivitaiTrpcAPI():
687
"""Interface for the Civitai tRPC API"""
688
689
def __init__(self, extractor):
690
self.extractor = extractor
691
self.root = extractor.root + "/api/trpc/"
692
self.headers = {
693
"content-type" : "application/json",
694
"x-client-version": "5.0.1386",
695
"x-client-date" : "",
696
"x-client" : "web",
697
"x-fingerprint" : "undefined",
698
}
699
if api_key := extractor.config("api-key"):
700
extractor.log.debug("Using api_key authentication")
701
self.headers["Authorization"] = "Bearer " + api_key
702
703
nsfw = extractor.config("nsfw")
704
if nsfw is None or nsfw is True:
705
nsfw = 31
706
elif not nsfw:
707
nsfw = 1
708
self.nsfw = nsfw
709
710
def image(self, image_id):
711
endpoint = "image.get"
712
params = {"id": int(image_id)}
713
return (self._call(endpoint, params),)
714
715
def image_generationdata(self, image_id):
716
endpoint = "image.getGenerationData"
717
params = {"id": int(image_id)}
718
return self._call(endpoint, params)
719
720
def images(self, params, defaults=True):
721
endpoint = "image.getInfinite"
722
723
if defaults:
724
params = self._merge_params(params, {
725
"useIndex" : True,
726
"period" : self._param_period(),
727
"sort" : self._param_sort(),
728
"withMeta" : False, # Metadata Only
729
"fromPlatform" : False, # Made On-Site
730
"browsingLevel": self.nsfw,
731
"include" : ("cosmetics",),
732
})
733
734
params = self._type_params(params)
735
return self._pagination(endpoint, params)
736
737
def images_gallery(self, model, version, user):
738
endpoint = "image.getImagesAsPostsInfinite"
739
params = {
740
"period" : self._param_period(),
741
"sort" : self._param_sort(),
742
"modelVersionId": version["id"],
743
"modelId" : model["id"],
744
"hidden" : False,
745
"limit" : 50,
746
"browsingLevel" : self.nsfw,
747
}
748
749
for post in self._pagination(endpoint, params):
750
yield from post["images"]
751
752
def images_post(self, post_id):
753
params = {
754
"postId" : int(post_id),
755
"pending": True,
756
}
757
return self.images(params)
758
759
def model(self, model_id):
760
endpoint = "model.getById"
761
params = {"id": int(model_id)}
762
return self._call(endpoint, params)
763
764
@memcache(keyarg=1)
765
def model_version(self, model_version_id):
766
endpoint = "modelVersion.getById"
767
params = {"id": int(model_version_id)}
768
return self._call(endpoint, params)
769
770
def models(self, params, defaults=True):
771
endpoint = "model.getAll"
772
773
if defaults:
774
params = self._merge_params(params, {
775
"period" : self._param_period(),
776
"periodMode" : "published",
777
"sort" : self._param_sort(),
778
"pending" : False,
779
"hidden" : False,
780
"followed" : False,
781
"earlyAccess" : False,
782
"fromPlatform" : False,
783
"supportsGeneration": False,
784
"browsingLevel": self.nsfw,
785
})
786
787
return self._pagination(endpoint, params)
788
789
def models_tag(self, tag):
790
return self.models({"tagname": tag})
791
792
def post(self, post_id):
793
endpoint = "post.get"
794
params = {"id": int(post_id)}
795
return self._call(endpoint, params)
796
797
def posts(self, params, defaults=True):
798
endpoint = "post.getInfinite"
799
800
if defaults:
801
params = self._merge_params(params, {
802
"browsingLevel": self.nsfw,
803
"period" : self._param_period(),
804
"periodMode" : "published",
805
"sort" : self._param_sort(),
806
"followed" : False,
807
"draftOnly" : False,
808
"pending" : True,
809
"include" : ("cosmetics",),
810
})
811
812
params = self._type_params(params)
813
return self._pagination(endpoint, params, user=("username" in params))
814
815
def collection(self, collection_id):
816
endpoint = "collection.getById"
817
params = {"id": int(collection_id)}
818
return self._call(endpoint, params)["collection"]
819
820
def collections(self, params, defaults=True):
821
endpoint = "collection.getInfinite"
822
823
if defaults:
824
params = self._merge_params(params, {
825
"browsingLevel": self.nsfw,
826
"sort" : self._param_sort(),
827
})
828
829
params = self._type_params(params)
830
return self._pagination(endpoint, params)
831
832
def tag_getvotabletags(self, image_id):
833
endpoint = "tag.getVotableTags"
834
params = {"id": int(image_id), "type": "image"}
835
return self._call(endpoint, params)
836
837
def user(self, username):
838
endpoint = "user.getCreator"
839
params = {"username": username}
840
return (self._call(endpoint, params),)
841
842
def orchestrator_queryGeneratedImages(self):
843
endpoint = "orchestrator.queryGeneratedImages"
844
params = {
845
"ascending": True if self._param_sort() == "Oldest" else False,
846
"tags" : ("gen",),
847
"authed" : True,
848
}
849
return self._pagination(endpoint, params)
850
851
def _call(self, endpoint, params, meta=None):
852
url = self.root + endpoint
853
headers = self.headers
854
855
if meta:
856
input = {"json": params, "meta": {"values": meta}}
857
else:
858
input = {"json": params}
859
860
params = {"input": util.json_dumps(input)}
861
headers["x-client-date"] = str(int(time.time() * 1000))
862
return self.extractor.request_json(
863
url, params=params, headers=headers)["result"]["data"]["json"]
864
865
def _pagination(self, endpoint, params, meta=None, user=False):
866
if "cursor" not in params:
867
params["cursor"] = None
868
meta_ = {"cursor": ("undefined",)}
869
870
data = self._call(endpoint, params, meta_)
871
if user and data["items"] and \
872
data["items"][0]["user"]["username"] != params["username"]:
873
return ()
874
875
while True:
876
yield from data["items"]
877
878
try:
879
if not data["nextCursor"]:
880
return
881
except KeyError:
882
return
883
884
params["cursor"] = data["nextCursor"]
885
meta_ = meta
886
data = self._call(endpoint, params, meta_)
887
888
def _merge_params(self, params_user, params_default):
889
"""Combine 'params_user' with 'params_default'"""
890
params_default.update(params_user)
891
return params_default
892
893
def _type_params(self, params):
894
"""Convert 'params' values to expected types"""
895
types = {
896
"tags" : int,
897
"tools" : int,
898
"techniques" : int,
899
"modelId" : int,
900
"modelVersionId": int,
901
"remixesOnly" : _bool,
902
"nonRemixesOnly": _bool,
903
"withMeta" : _bool,
904
"fromPlatform" : _bool,
905
"supportsGeneration": _bool,
906
}
907
908
for name, value in params.items():
909
if name not in types:
910
continue
911
elif isinstance(value, str):
912
params[name] = types[name](value)
913
elif isinstance(value, list):
914
type = types[name]
915
params[name] = [type(item) for item in value]
916
return params
917
918
def _param_period(self):
919
if period := self.extractor.config("period"):
920
return period
921
return "AllTime"
922
923
def _param_sort(self):
924
if sort := self.extractor.config("sort"):
925
s = sort[0].lower()
926
if s in "drn":
927
return "Newest"
928
if s in "ao":
929
return "Oldest"
930
return sort
931
return "Newest"
932
933
934
def _bool(value):
935
return value == "true"
936
937
938
class CivitaiSearchAPI():
939
940
def __init__(self, extractor):
941
self.extractor = extractor
942
self.root = "https://search-new.civitai.com"
943
944
if auth := extractor.config("token"):
945
if " " not in auth:
946
auth = "Bearer " + auth
947
else:
948
auth = ("Bearer 8c46eb2508e21db1e9828a97968d"
949
"91ab1ca1caa5f70a00e88a2ba1e286603b61")
950
951
self.headers = {
952
"Authorization": auth,
953
"Content-Type": "application/json",
954
"X-Meilisearch-Client": "Meilisearch instant-meilisearch (v0.13.5)"
955
" ; Meilisearch JavaScript (v0.34.0)",
956
"Origin": extractor.root,
957
"Sec-Fetch-Dest": "empty",
958
"Sec-Fetch-Mode": "cors",
959
"Sec-Fetch-Site": "same-site",
960
"Priority": "u=4",
961
}
962
963
def search(self, query, type, facets, nsfw=31):
964
endpoint = "/multi-search"
965
966
query = {
967
"q" : query,
968
"indexUid": type,
969
"facets" : facets,
970
"attributesToHighlight": (),
971
"highlightPreTag" : "__ais-highlight__",
972
"highlightPostTag": "__/ais-highlight__",
973
"limit" : 51,
974
"offset": 0,
975
"filter": (self._generate_filter(nsfw),),
976
}
977
978
return self._pagination(endpoint, query)
979
980
def search_models(self, query, type=None, nsfw=31):
981
facets = (
982
"category.name",
983
"checkpointType",
984
"fileFormats",
985
"lastVersionAtUnix",
986
"tags.name",
987
"type",
988
"user.username",
989
"version.baseModel",
990
)
991
return self.search(query, type or "models_v9", facets, nsfw)
992
993
def search_images(self, query, type=None, nsfw=31):
994
facets = (
995
"aspectRatio",
996
"baseModel",
997
"createdAtUnix",
998
"tagNames",
999
"techniqueNames",
1000
"toolNames",
1001
"type",
1002
"user.username",
1003
)
1004
return self.search(query, type or "images_v6", facets, nsfw)
1005
1006
def _call(self, endpoint, query):
1007
url = self.root + endpoint
1008
params = util.json_dumps({"queries": (query,)})
1009
1010
data = self.extractor.request_json(
1011
url, method="POST", headers=self.headers, data=params)
1012
1013
return data["results"][0]
1014
1015
def _pagination(self, endpoint, query):
1016
limit = query["limit"] - 1
1017
threshold = limit // 2
1018
1019
while True:
1020
data = self._call(endpoint, query)
1021
1022
items = data["hits"]
1023
yield from items
1024
1025
if len(items) < threshold:
1026
return
1027
query["offset"] += limit
1028
1029
def _generate_filter(self, level):
1030
fltr = []
1031
1032
if level & 1:
1033
fltr.append("1")
1034
if level & 2:
1035
fltr.append("2")
1036
if level & 4:
1037
fltr.append("4")
1038
if level & 8:
1039
fltr.append("8")
1040
if level & 16:
1041
fltr.append("16")
1042
1043
if not fltr:
1044
return "()"
1045
return "(nsfwLevel=" + " OR nsfwLevel=".join(fltr) + ")"
1046
1047