Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/twitchio/http.py
7774 views
1
"""
2
The MIT License (MIT)
3
4
Copyright (c) 2017-2021 TwitchIO
5
6
Permission is hereby granted, free of charge, to any person obtaining a
7
copy of this software and associated documentation files (the "Software"),
8
to deal in the Software without restriction, including without limitation
9
the rights to use, copy, modify, merge, publish, distribute, sublicense,
10
and/or sell copies of the Software, and to permit persons to whom the
11
Software is furnished to do so, subject to the following conditions:
12
13
The above copyright notice and this permission notice shall be included in
14
all copies or substantial portions of the Software.
15
16
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22
DEALINGS IN THE SOFTWARE.
23
"""
24
25
import asyncio
26
import copy
27
import datetime
28
import logging
29
from typing import TYPE_CHECKING, Union, List, Tuple, Any, Dict, Optional
30
31
import aiohttp
32
from yarl import URL
33
34
from . import errors
35
from .cooldowns import RateBucket
36
37
try:
38
import ujson as json
39
except:
40
import json
41
42
if TYPE_CHECKING:
43
from .client import Client
44
45
46
logger = logging.getLogger("twitchio.http")
47
48
49
class Route:
50
51
BASE_URL = "https://api.twitch.tv/helix"
52
53
__slots__ = "path", "body", "headers", "query", "method"
54
55
def __init__(
56
self,
57
method: str,
58
path: Union[str, URL],
59
body: Union[str, dict] = None,
60
query: List[Tuple[str, Any]] = None,
61
headers: dict = None,
62
token: str = None,
63
):
64
self.headers = headers or {}
65
self.method = method
66
self.query = query
67
68
if token:
69
self.headers["Authorization"] = "Bearer " + token
70
71
if isinstance(path, URL):
72
self.path = path
73
else:
74
self.path = URL(self.BASE_URL + "/" + path.rstrip("/"))
75
76
if query:
77
self.path = self.path.with_query(query)
78
79
if isinstance(body, dict):
80
self.body = json.dumps(body)
81
self.headers["Content-Type"] = "application/json"
82
else:
83
self.body = body
84
85
86
class TwitchHTTP:
87
88
TOKEN_BASE = "https://id.twitch.tv/oauth2/token"
89
90
def __init__(
91
self, client: "Client", *, api_token: str = None, client_secret: str = None, client_id: str = None, **kwargs
92
):
93
self.client = client
94
self.session = None
95
self.token = api_token
96
self.app_token = None
97
self._refresh_token = None
98
self.client_secret = client_secret
99
self.client_id = client_id
100
self.nick = None
101
self.user_id: Optional[int] = None
102
103
self.bucket = RateBucket(method="http")
104
self.scopes = kwargs.get("scopes", [])
105
106
async def request(self, route: Route, *, paginate=True, limit=100, full_body=False, force_app_token=False):
107
"""
108
Fulfills an API request
109
110
Parameters
111
-----------
112
route : :class:`twitchio.http.Route`
113
The route to follow
114
paginate : :class:`bool`
115
whether or not to paginate the requests where possible. Defaults to True
116
limit : :class:`int`
117
The data limit per request when paginating. Defaults to 100
118
full_body : class:`bool`
119
Whether to return the full response body or to accumulate the `data` key. Defaults to False. `paginate` must be False if this is True.
120
force_app_token : :class:`bool`
121
Forcibly use the client_id and client_secret generated token, if available. Otherwise fail the request immediately
122
"""
123
if full_body:
124
assert not paginate
125
126
if (not self.client_id or not self.nick) and self.token:
127
await self.validate(token=self.token)
128
129
if not self.client_id:
130
raise errors.NoClientID("A Client ID is required to use the Twitch API")
131
132
headers = route.headers or {}
133
134
if force_app_token and "Authorization" not in headers:
135
if not self.client_secret:
136
raise errors.NoToken(
137
"An app access token is required for this route, please provide a client id and client secret"
138
)
139
140
if self.app_token is None:
141
await self._generate_login()
142
headers["Authorization"] = f"Bearer {self.app_token}"
143
144
elif not self.token and not self.client_secret and "Authorization" not in headers:
145
raise errors.NoToken(
146
"Authorization is required to use the Twitch API. Pass token and/or client_secret to the Client constructor"
147
)
148
149
if "Authorization" not in headers:
150
if not self.token:
151
await self._generate_login()
152
153
headers["Authorization"] = f"Bearer {self.token}"
154
155
headers["Client-ID"] = self.client_id
156
157
if not self.session:
158
self.session = aiohttp.ClientSession()
159
160
if self.bucket.limited:
161
await self.bucket
162
163
cursor = None
164
data = []
165
166
def reached_limit():
167
return limit and len(data) >= limit
168
169
def get_limit():
170
if limit is None:
171
return "100"
172
173
to_get = limit - len(data)
174
return str(to_get) if to_get < 100 else "100"
175
176
is_finished = False
177
while not is_finished:
178
path = copy.copy(route.path)
179
180
if limit is not None and paginate:
181
q = route.query or []
182
if cursor is not None:
183
q = [("after", cursor), *q]
184
q = [("first", get_limit()), *q]
185
path = path.with_query(q)
186
187
body, is_text = await self._request(route, path, headers)
188
if is_text:
189
return body
190
191
if full_body:
192
return body
193
194
data += body["data"]
195
196
try:
197
cursor = body["pagination"].get("cursor", None)
198
except KeyError:
199
break
200
else:
201
if not cursor:
202
break
203
204
is_finished = reached_limit() if limit is not None else True if paginate else True
205
206
return data
207
208
async def _request(self, route, path, headers, utilize_bucket=True):
209
reason = None
210
211
for attempt in range(5):
212
if utilize_bucket and self.bucket.limited:
213
await self.bucket.wait_reset()
214
215
async with self.session.request(route.method, path, headers=headers, data=route.body) as resp:
216
try:
217
logger.debug(f"Received a response from a request with status {resp.status}: {await resp.json()}")
218
except Exception:
219
logger.debug(f"Received a response from a request with status {resp.status} and without body")
220
221
if 500 <= resp.status <= 504:
222
reason = resp.reason
223
await asyncio.sleep(2**attempt + 1)
224
continue
225
226
if utilize_bucket:
227
reset = resp.headers.get("Ratelimit-Reset")
228
remaining = resp.headers.get("Ratelimit-Remaining")
229
230
self.bucket.update(reset=reset, remaining=remaining)
231
232
if 200 <= resp.status < 300:
233
if resp.content_type == "application/json":
234
return await resp.json(), False
235
236
return await resp.text(encoding="utf-8"), True
237
238
if resp.status == 401:
239
if "WWW-Authenticate" in resp.headers:
240
try:
241
await self._generate_login()
242
except:
243
raise errors.Unauthorized(
244
"Your oauth token is invalid, and a new one could not be generated"
245
)
246
247
print(resp.reason, await resp.json(), resp)
248
raise errors.Unauthorized("You're not authorized to use this route.")
249
250
if resp.status == 429:
251
reason = "Ratelimit Reached"
252
253
if not utilize_bucket: # non Helix APIs don't have ratelimit headers
254
await asyncio.sleep(3**attempt + 1)
255
continue
256
257
raise errors.HTTPException(
258
f"Failed to fulfil request ({resp.status}).", reason=resp.reason, status=resp.status
259
)
260
261
raise errors.HTTPException("Failed to reach Twitch API", reason=reason, status=resp.status)
262
263
async def _generate_login(self):
264
try:
265
token = await self.client.event_token_expired()
266
if token is not None:
267
assert isinstance(token, str), TypeError(f"Expected a string, got {type(token)}")
268
self.token = self.app_token = token
269
return
270
except Exception as e:
271
self.client.run_event("error", e)
272
273
if not self.client_id or not self.client_secret:
274
raise errors.HTTPException("Unable to generate a token, client id and/or client secret not given")
275
276
if self._refresh_token:
277
url = (
278
self.TOKEN_BASE
279
+ "?grant_type=refresh_token&refresh_token={0}&client_id={1}&client_secret={2}".format(
280
self._refresh_token, self.client_id, self.client_secret
281
)
282
)
283
284
else:
285
url = self.TOKEN_BASE + "?client_id={0}&client_secret={1}&grant_type=client_credentials".format(
286
self.client_id, self.client_secret
287
)
288
if self.scopes:
289
url += "&scope=" + " ".join(self.scopes)
290
291
if not self.session:
292
self.session = aiohttp.ClientSession()
293
294
async with self.session.post(url) as resp:
295
if resp.status > 300 or resp.status < 200:
296
raise errors.HTTPException("Unable to generate a token: " + await resp.text())
297
298
data = await resp.json()
299
self.token = self.app_token = data["access_token"]
300
self._refresh_token = data.get("refresh_token", None)
301
logger.info("Invalid or no token found, generated new token: %s", self.token)
302
303
async def validate(self, *, token: str = None) -> dict:
304
if not token:
305
token = self.token
306
if not self.session:
307
self.session = aiohttp.ClientSession()
308
309
url = "https://id.twitch.tv/oauth2/validate"
310
headers = {"Authorization": f"OAuth {token}"}
311
312
async with self.session.get(url, headers=headers) as resp:
313
if resp.status == 401:
314
raise errors.AuthenticationError("Invalid or unauthorized Access Token passed.")
315
316
if resp.status > 300 or resp.status < 200:
317
raise errors.HTTPException("Unable to validate Access Token: " + await resp.text())
318
319
data: dict = await resp.json()
320
321
if not self.nick:
322
self.nick = data.get("login")
323
self.user_id = data.get("user_id") and int(data["user_id"])
324
self.client_id = data.get("client_id")
325
326
return data
327
328
async def post_commercial(self, token: str, broadcaster_id: str, length: int):
329
assert length in {30, 60, 90, 120, 150, 180}
330
data = await self.request(
331
Route(
332
"POST", "channels/commercial", body={"broadcaster_id": broadcaster_id, "length": length}, token=token
333
),
334
paginate=False,
335
)
336
data = data[0]
337
if data["message"]:
338
raise errors.HTTPException(data["message"], extra=data["retry_after"])
339
340
async def get_extension_analytics(
341
self,
342
token: str,
343
extension_id: str = None,
344
type: str = None,
345
started_at: datetime.datetime = None,
346
ended_at: datetime.datetime = None,
347
):
348
raise NotImplementedError # TODO
349
350
async def get_game_analytics(
351
self,
352
token: str,
353
game_id: str = None,
354
type: str = None,
355
started_at: datetime.datetime = None,
356
ended_at: datetime.datetime = None,
357
):
358
raise NotImplementedError # TODO
359
360
async def get_bits_board(
361
self, token: str, period: str = "all", user_id: str = None, started_at: datetime.datetime = None
362
):
363
assert period in {"all", "day", "week", "month", "year"}
364
route = Route(
365
"GET",
366
"bits/leaderboard",
367
"",
368
query=[
369
("period", period),
370
("started_at", started_at.isoformat() if started_at else None),
371
("user_id", user_id),
372
],
373
token=token,
374
)
375
return await self.request(route, full_body=True, paginate=False)
376
377
async def get_cheermotes(self, broadcaster_id: str):
378
return await self.request(Route("GET", "bits/cheermotes", "", query=[("broadcaster_id", broadcaster_id)]))
379
380
async def get_extension_transactions(self, extension_id: str, ids: List[Any] = None):
381
q = [("extension_id", extension_id)]
382
if ids:
383
for id in ids:
384
q.append(("id", id))
385
386
return await self.request(Route("GET", "extensions/transactions", "", query=q))
387
388
async def create_reward(
389
self,
390
token: str,
391
broadcaster_id: int,
392
title: str,
393
cost: int,
394
prompt: str = None,
395
is_enabled: bool = True,
396
background_color: str = None,
397
user_input_required: bool = False,
398
max_per_stream: int = None,
399
max_per_user: int = None,
400
global_cooldown: int = None,
401
fufill_immediatly: bool = False,
402
):
403
params = [("broadcaster_id", str(broadcaster_id))]
404
data = {
405
"title": title,
406
"cost": cost,
407
"prompt": prompt,
408
"is_enabled": is_enabled,
409
"is_user_input_required": user_input_required,
410
"should_redemptions_skip_request_queue": fufill_immediatly,
411
}
412
if max_per_stream:
413
data["max_per_stream"] = max_per_stream
414
data["max_per_stream_enabled"] = True
415
416
if max_per_user:
417
data["max_per_user_per_stream"] = max_per_user
418
data["max_per_user_per_stream_enabled"] = True
419
420
if background_color:
421
data["background_color"] = background_color
422
423
if global_cooldown:
424
data["global_cooldown_seconds"] = global_cooldown
425
data["is_global_cooldown_enabled"] = True
426
427
return await self.request(Route("POST", "channel_points/custom_rewards", query=params, body=data, token=token))
428
429
async def get_rewards(self, token: str, broadcaster_id: int, only_manageable: bool = False, ids: List[int] = None):
430
params = [("broadcaster_id", str(broadcaster_id)), ("only_manageable_rewards", str(only_manageable))]
431
432
if ids:
433
for id in ids:
434
params.append(("id", str(id)))
435
436
return await self.request(Route("GET", "channel_points/custom_rewards", query=params, token=token))
437
438
async def update_reward(
439
self,
440
token: str,
441
broadcaster_id: int,
442
reward_id: str,
443
title: str = None,
444
prompt: str = None,
445
cost: int = None,
446
background_color: str = None,
447
enabled: bool = None,
448
input_required: bool = None,
449
max_per_stream_enabled: bool = None,
450
max_per_stream: int = None,
451
max_per_user_per_stream_enabled: bool = None,
452
max_per_user_per_stream: int = None,
453
global_cooldown_enabled: bool = None,
454
global_cooldown: int = None,
455
paused: bool = None,
456
redemptions_skip_queue: bool = None,
457
):
458
data = {
459
"title": title,
460
"prompt": prompt,
461
"cost": cost,
462
"background_color": background_color,
463
"enabled": enabled,
464
"is_user_input_required": input_required,
465
"is_max_per_stream_enabled": max_per_stream_enabled,
466
"max_per_stream": max_per_stream,
467
"is_max_per_user_per_stream_enabled": max_per_user_per_stream_enabled,
468
"max_per_user_per_stream": max_per_user_per_stream,
469
"is_global_cooldown_enabled": global_cooldown_enabled,
470
"global_cooldown_seconds": global_cooldown,
471
"is_paused": paused,
472
"should_redemptions_skip_request_queue": redemptions_skip_queue,
473
}
474
475
data = {k: v for k, v in data.items() if v is not None}
476
477
if not data:
478
raise ValueError("Nothing changed!")
479
480
params = [("broadcaster_id", str(broadcaster_id)), ("id", str(reward_id))]
481
return await self.request(
482
Route(
483
"PATCH",
484
"channel_points/custom_rewards",
485
query=params,
486
headers={"Authorization": f"Bearer {token}"},
487
body=data,
488
)
489
)
490
491
async def delete_custom_reward(self, token: str, broadcaster_id: int, reward_id: str):
492
params = [("broadcaster_id", str(broadcaster_id)), ("id", reward_id)]
493
return await self.request(Route("DELETE", "channel_points/custom_rewards", query=params, token=token))
494
495
async def get_reward_redemptions(
496
self,
497
token: str,
498
broadcaster_id: int,
499
reward_id: str,
500
redemption_id: str = None,
501
status: str = None,
502
sort: str = None,
503
):
504
params = [("broadcaster_id", str(broadcaster_id)), ("reward_id", reward_id)]
505
506
if redemption_id:
507
params.append(("id", redemption_id))
508
509
if status:
510
params.append(("status", status))
511
512
if sort:
513
params.append(("sort", sort))
514
515
return await self.request(Route("GET", "channel_points/custom_rewards/redemptions", query=params, token=token))
516
517
async def update_reward_redemption_status(
518
self, token: str, broadcaster_id: int, reward_id: str, custom_reward_id: str, status: bool
519
):
520
params = [("id", custom_reward_id), ("broadcaster_id", str(broadcaster_id)), ("reward_id", reward_id)]
521
status = "FULFILLED" if status else "CANCELLED"
522
return await self.request(
523
Route(
524
"PATCH",
525
"/channel_points/custom_rewards/redemptions",
526
query=params,
527
body={"status": status},
528
token=token,
529
)
530
)
531
532
async def get_predictions(
533
self,
534
token: str,
535
broadcaster_id: int,
536
prediction_id: str = None,
537
):
538
params = [("broadcaster_id", str(broadcaster_id))]
539
540
if prediction_id:
541
params.extend(("prediction_id", prediction_id))
542
543
return await self.request(Route("GET", "predictions", query=params, token=token), paginate=False)
544
545
async def patch_prediction(
546
self, token: str, broadcaster_id: int, prediction_id: str, status: str, winning_outcome_id: str = None
547
):
548
body = {
549
"broadcaster_id": str(broadcaster_id),
550
"id": prediction_id,
551
"status": status,
552
}
553
554
if status == "RESOLVED":
555
body["winning_outcome_id"] = winning_outcome_id
556
557
return await self.request(
558
Route(
559
"PATCH",
560
"predictions",
561
body=body,
562
token=token,
563
)
564
)
565
566
async def post_prediction(
567
self, token: str, broadcaster_id: int, title: str, blue_outcome: str, pink_outcome: str, prediction_window: int
568
):
569
body = {
570
"broadcaster_id": broadcaster_id,
571
"title": title,
572
"prediction_window": prediction_window,
573
"outcomes": [
574
{
575
"title": blue_outcome,
576
},
577
{
578
"title": pink_outcome,
579
},
580
],
581
}
582
return await self.request(
583
Route("POST", "predictions", body=body, token=token),
584
paginate=False,
585
)
586
587
async def post_create_clip(self, token: str, broadcaster_id: int, has_delay=False):
588
return await self.request(
589
Route("POST", "clips", query=[("broadcaster_id", broadcaster_id), ("has_delay", has_delay)], token=token),
590
paginate=False,
591
)
592
593
async def get_clips(
594
self,
595
broadcaster_id: int = None,
596
game_id: str = None,
597
ids: List[str] = None,
598
started_at: datetime.datetime = None,
599
ended_at: datetime.datetime = None,
600
token: str = None,
601
):
602
q = [
603
("broadcaster_id", broadcaster_id),
604
("game_id", game_id),
605
("started_at", started_at.isoformat() if started_at else None),
606
("ended_at", ended_at.isoformat() if ended_at else None),
607
]
608
for id in ids:
609
q.append(("id", id))
610
611
query = [x for x in q if x[1] is not None]
612
613
return await self.request(Route("GET", "clips", query=query, token=token))
614
615
async def post_entitlements_upload(self, manifest_id: str, type="bulk_drops_grant"):
616
return await self.request(
617
Route("POST", "entitlements/upload", query=[("manifest_id", manifest_id), ("type", type)])
618
)
619
620
async def get_entitlements(self, id: str = None, user_id: str = None, game_id: str = None):
621
return await self.request(
622
Route("GET", "entitlements/drops", query=[("id", id), ("user_id", user_id), ("game_id", game_id)])
623
)
624
625
async def get_code_status(self, codes: List[str], user_id: int):
626
q = [("user_id", user_id)]
627
for code in codes:
628
q.append(("code", code))
629
630
return await self.request(Route("GET", "entitlements/codes", query=q))
631
632
async def post_redeem_code(self, user_id: int, codes: List[str]):
633
q = [("user_id", user_id)]
634
for c in codes:
635
q.append(("code", c))
636
637
return await self.request(Route("POST", "entitlements/code", query=q))
638
639
async def get_top_games(self):
640
return await self.request(Route("GET", "games/top"))
641
642
async def get_games(self, game_ids: List[Any], game_names: List[str]):
643
q = []
644
if game_ids:
645
for id in game_ids:
646
q.append(("id", id))
647
if game_names:
648
for name in game_names:
649
q.append(("name", name))
650
651
return await self.request(Route("GET", "games", query=q))
652
653
async def get_hype_train(self, broadcaster_id: str, id: str = None, token: str = None):
654
return await self.request(
655
Route(
656
"GET",
657
"hypetrain/events",
658
query=[x for x in [("broadcaster_id", broadcaster_id), ("id", id)] if x[1] is not None],
659
token=token,
660
)
661
)
662
663
async def post_automod_check(self, token: str, broadcaster_id: str, *msgs: List[Dict[str, str]]):
664
print(msgs)
665
return await self.request(
666
Route(
667
"POST",
668
"moderation/enforcements/status",
669
query=[("broadcaster_id", broadcaster_id)],
670
body={"data": msgs},
671
token=token,
672
)
673
)
674
675
async def get_channel_ban_unban_events(self, token: str, broadcaster_id: str, user_ids: List[str] = None):
676
q = [("broadcaster_id", broadcaster_id)]
677
if user_ids:
678
for id in user_ids:
679
q.append(("user_id", id))
680
681
return await self.request(Route("GET", "moderation/banned/events", query=q, token=token))
682
683
async def get_channel_bans(self, token: str, broadcaster_id: str, user_ids: List[str] = None):
684
q = [("broadcaster_id", broadcaster_id)]
685
if user_ids:
686
for id in user_ids:
687
q.append(("user_id", id))
688
689
return await self.request(Route("GET", "moderation/banned", query=q, token=token))
690
691
async def get_channel_moderators(self, token: str, broadcaster_id: str, user_ids: List[str] = None):
692
q = [("broadcaster_id", broadcaster_id)]
693
if user_ids:
694
for id in user_ids:
695
q.append(("user_id", id))
696
697
return await self.request(Route("GET", "moderation/moderators", query=q, token=token))
698
699
async def get_channel_mod_events(self, token: str, broadcaster_id: str, user_ids: List[str] = None):
700
q = [("broadcaster_id", broadcaster_id)]
701
for id in user_ids:
702
q.append(("user_id", id))
703
704
return await self.request(Route("GET", "moderation/moderators/events", query=q, token=token))
705
706
async def get_search_categories(self, query: str, token: str = None):
707
return await self.request(Route("GET", "search/categories", query=[("query", query)], token=token))
708
709
async def get_search_channels(self, query: str, token: str = None, live: bool = False):
710
return await self.request(
711
Route("GET", "search/channels", query=[("query", query), ("live_only", str(live))], token=token)
712
)
713
714
async def get_stream_key(self, token: str, broadcaster_id: str):
715
return await self.request(
716
Route("GET", "streams/key", query=[("broadcaster_id", broadcaster_id)], token=token), paginate=False
717
)
718
719
async def get_streams(
720
self,
721
game_ids: List[str] = None,
722
user_ids: List[str] = None,
723
user_logins: List[str] = None,
724
languages: List[str] = None,
725
token: str = None,
726
):
727
q = []
728
if game_ids:
729
for g in game_ids:
730
q.append(("game_id", g))
731
732
if user_ids:
733
for u in user_ids:
734
q.append(("user_id", u))
735
736
if user_logins:
737
for l in user_logins:
738
q.append(("user_login", l))
739
740
if languages:
741
for l in languages:
742
q.append(("language", l))
743
744
return await self.request(Route("GET", "streams", query=q, token=token))
745
746
async def post_stream_marker(self, token: str, user_id: str, description: str = None):
747
return await self.request(
748
Route("POST", "streams/markers", body={"user_id": user_id, "description": description}, token=token)
749
)
750
751
async def get_stream_markers(self, token: str, user_id: str = None, video_id: str = None):
752
return await self.request(
753
Route(
754
"GET",
755
"streams/markers",
756
query=[x for x in [("user_id", user_id), ("video_id", video_id)] if x[1] is not None],
757
token=token,
758
)
759
)
760
761
async def get_channels(self, broadcaster_id: str, token: str = None):
762
return await self.request(Route("GET", "channels", query=[("broadcaster_id", broadcaster_id)], token=token))
763
764
async def patch_channel(
765
self, token: str, broadcaster_id: str, game_id: str = None, language: str = None, title: str = None
766
):
767
assert any((game_id, language, title))
768
body = {
769
k: v
770
for k, v in {"game_id": game_id, "broadcaster_language": language, "title": title}.items()
771
if v is not None
772
}
773
774
return await self.request(
775
Route("PATCH", "channels", query=[("broadcaster_id", broadcaster_id)], body=body, token=token)
776
)
777
778
async def get_channel_schedule(
779
self,
780
broadcaster_id: str,
781
segment_ids: List[str] = None,
782
start_time: datetime.datetime = None,
783
utc_offset: int = None,
784
first: int = 20,
785
):
786
787
if first is not None and (first > 25 or first < 1):
788
raise ValueError("The parameter 'first' was malformed: the value must be less than or equal to 25")
789
if segment_ids is not None and len(segment_ids) > 100:
790
raise ValueError("segment_id can only have 100 entries")
791
792
if start_time:
793
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
794
795
if utc_offset:
796
utc_offset = str(utc_offset)
797
798
q = [
799
x
800
for x in [
801
("broadcaster_id", broadcaster_id),
802
("first", first),
803
("start_time", start_time),
804
("utc_offset", utc_offset),
805
]
806
if x[1] is not None
807
]
808
809
if segment_ids:
810
for id in segment_ids:
811
q.append(
812
("id", id),
813
)
814
815
return await self.request(Route("GET", "schedule", query=q), paginate=False, full_body=True)
816
817
async def get_channel_subscriptions(self, token: str, broadcaster_id: str, user_ids: List[str] = None):
818
q = [("broadcaster_id", broadcaster_id)]
819
if user_ids:
820
for u in user_ids:
821
q.append(("user_id", u))
822
823
return await self.request(Route("GET", "subscriptions", query=q, token=token))
824
825
async def get_stream_tags(self, tag_ids: List[str] = None):
826
q = []
827
if tag_ids:
828
for u in tag_ids:
829
q.append(("tag_id", u))
830
831
return await self.request(Route("GET", "tags/streams", query=q or None))
832
833
async def get_channel_tags(self, broadcaster_id: str):
834
return await self.request(Route("GET", "streams/tags", query=[("broadcaster_id", broadcaster_id)]))
835
836
async def put_replace_channel_tags(self, token: str, broadcaster_id: str, tag_ids: List[str] = None):
837
return await self.request(
838
Route(
839
"PUT",
840
"streams/tags",
841
query=[("broadcaster_id", broadcaster_id)],
842
body={"tag_ids": tag_ids},
843
token=token,
844
)
845
)
846
847
async def post_follow_channel(self, token: str, from_id: str, to_id: str, notifications=False):
848
return await self.request(
849
Route(
850
"POST",
851
"users/follows",
852
query=[("from_id", from_id), ("to_id", to_id), ("allow_notifications", str(notifications))],
853
token=token,
854
)
855
)
856
857
async def delete_unfollow_channel(self, token: str, from_id: str, to_id: str):
858
return await self.request(
859
Route("DELETE", "users/follows", query=[("from_id", from_id), ("to_id", to_id)], token=token)
860
)
861
862
async def get_users(self, ids: List[int], logins: List[str], token: str = None):
863
q = []
864
if ids:
865
for id in ids:
866
q.append(("id", id))
867
868
if logins:
869
for login in logins:
870
q.append(("login", login))
871
872
return await self.request(Route("GET", "users", query=q, token=token))
873
874
async def get_user_follows(self, from_id: str = None, to_id: str = None, token: str = None):
875
return await self.request(
876
Route(
877
"GET",
878
"users/follows",
879
query=[x for x in [("from_id", from_id), ("to_id", to_id)] if x[1] is not None],
880
token=token,
881
)
882
)
883
884
async def put_update_user(self, token: str, description: str):
885
return await self.request(Route("PUT", "users", query=[("description", description)], token=token))
886
887
async def get_channel_extensions(self, token: str):
888
return await self.request(Route("GET", "users/extensions/list", token=token))
889
890
async def get_user_active_extensions(self, token: str, user_id: str = None):
891
return (
892
await self.request(
893
Route("GET", "users/extensions", query=[("user_id", user_id)], token=token),
894
paginate=False,
895
full_body=True,
896
)
897
)["data"]
898
899
async def put_user_extensions(self, token: str, data: Dict[str, Any]):
900
return (
901
await self.request(
902
Route("PUT", "users/extensions", token=token, body={"data": data}), paginate=False, full_body=True
903
)
904
)["data"]
905
906
async def get_videos(
907
self,
908
ids: List[str] = None,
909
user_id: str = None,
910
game_id: str = None,
911
sort: str = "time",
912
type: str = "all",
913
period: str = "all",
914
language: str = None,
915
token: str = None,
916
):
917
q = [
918
x
919
for x in [
920
("user_id", user_id),
921
("game_id", game_id),
922
("sort", sort),
923
("type", type),
924
("period", period),
925
("lanaguage", language),
926
]
927
if x[1] is not None
928
]
929
930
if ids:
931
for id in ids:
932
q.append(("id", id))
933
934
return await self.request(Route("GET", "videos", query=q, token=token))
935
936
async def delete_videos(self, token: str, ids: List[int]):
937
q = [("id", str(x)) for x in ids]
938
939
return (await self.request(Route("DELETE", "videos", query=q, token=token), paginate=False, full_body=True))[
940
"data"
941
]
942
943
async def get_webhook_subs(self):
944
return await self.request(Route("GET", "webhooks/subscriptions"))
945
946
async def get_teams(self, team_name: str = None, team_id: str = None):
947
if team_name:
948
q = [("name", team_name)]
949
elif team_id:
950
q = [("id", team_id)]
951
else:
952
raise ValueError("You need to provide a team name or id")
953
return await self.request(Route("GET", "teams", query=q))
954
955
async def get_channel_teams(self, broadcaster_id: str):
956
q = [("broadcaster_id", broadcaster_id)]
957
return await self.request(Route("GET", "teams/channel", query=q))
958
959