Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mikf
GitHub Repository: mikf/gallery-dl
Path: blob/master/gallery_dl/oauth.py
5457 views
1
# -*- coding: utf-8 -*-
2
3
# Copyright 2018-2020 Mike Fährmann
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License version 2 as
7
# published by the Free Software Foundation.
8
9
"""OAuth helper functions and classes"""
10
11
import hmac
12
import time
13
import random
14
import string
15
import hashlib
16
import binascii
17
import urllib.parse
18
19
import requests
20
import requests.auth
21
22
from . import text
23
from .cache import cache
24
25
26
def nonce(size, alphabet=string.ascii_letters):
27
"""Generate a nonce value with 'size' characters"""
28
return "".join(random.choice(alphabet) for _ in range(size))
29
30
31
def quote(value, quote=urllib.parse.quote):
32
"""Quote 'value' according to the OAuth1.0 standard"""
33
return quote(value, "~")
34
35
36
def concat(*args):
37
"""Concatenate 'args' as expected by OAuth1.0"""
38
return "&".join(quote(item) for item in args)
39
40
41
class OAuth1Session(requests.Session):
42
"""Extension to requests.Session to support OAuth 1.0"""
43
44
def __init__(self, consumer_key, consumer_secret,
45
token=None, token_secret=None):
46
47
requests.Session.__init__(self)
48
self.auth = OAuth1Client(
49
consumer_key, consumer_secret,
50
token, token_secret,
51
)
52
53
def rebuild_auth(self, prepared_request, response):
54
if "Authorization" in prepared_request.headers:
55
del prepared_request.headers["Authorization"]
56
prepared_request.prepare_auth(self.auth)
57
58
59
class OAuth1Client(requests.auth.AuthBase):
60
"""OAuth1.0a authentication"""
61
62
def __init__(self, consumer_key, consumer_secret,
63
token=None, token_secret=None):
64
65
self.consumer_key = consumer_key
66
self.consumer_secret = consumer_secret
67
self.token = token
68
self.token_secret = token_secret
69
70
def __call__(self, request):
71
oauth_params = [
72
("oauth_consumer_key", self.consumer_key),
73
("oauth_nonce", nonce(16)),
74
("oauth_signature_method", "HMAC-SHA1"),
75
("oauth_timestamp", str(int(time.time()))),
76
("oauth_version", "1.0"),
77
]
78
if self.token:
79
oauth_params.append(("oauth_token", self.token))
80
81
signature = self.generate_signature(request, oauth_params)
82
oauth_params.append(("oauth_signature", signature))
83
84
request.headers["Authorization"] = "OAuth " + ",".join(
85
key + '="' + value + '"' for key, value in oauth_params)
86
87
return request
88
89
def generate_signature(self, request, params):
90
"""Generate 'oauth_signature' value"""
91
url, _, query = request.url.partition("?")
92
93
params = params.copy()
94
for key, value in text.parse_query(query).items():
95
params.append((quote(key), quote(value)))
96
params.sort()
97
query = "&".join("=".join(item) for item in params)
98
99
message = concat(request.method, url, query).encode()
100
key = concat(self.consumer_secret, self.token_secret or "").encode()
101
signature = hmac.new(key, message, hashlib.sha1).digest()
102
103
return quote(binascii.b2a_base64(signature)[:-1].decode())
104
105
106
class OAuth1API():
107
"""Base class for OAuth1.0 based API interfaces"""
108
API_KEY = None
109
API_SECRET = None
110
111
def __init__(self, extractor):
112
self.log = extractor.log
113
self.extractor = extractor
114
115
api_key = extractor.config("api-key", self.API_KEY)
116
api_secret = extractor.config("api-secret", self.API_SECRET)
117
token = extractor.config("access-token")
118
token_secret = extractor.config("access-token-secret")
119
key_type = "default" if api_key == self.API_KEY else "custom"
120
121
if token is None or token == "cache":
122
key = (extractor.category, api_key)
123
token, token_secret = _token_cache(key)
124
125
if api_key and api_secret and token and token_secret:
126
self.log.debug("Using %s OAuth1.0 authentication", key_type)
127
self.session = OAuth1Session(
128
api_key, api_secret, token, token_secret)
129
self.api_key = None
130
else:
131
self.log.debug("Using %s api_key authentication", key_type)
132
self.session = extractor.session
133
self.api_key = api_key
134
135
def request(self, url, **kwargs):
136
kwargs["fatal"] = None
137
kwargs["session"] = self.session
138
return self.extractor.request(url, **kwargs)
139
140
141
@cache(maxage=36500*86400, keyarg=0)
142
def _token_cache(key):
143
return None, None
144
145