Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/__main__.py
1494 views
1
import argparse
2
import datetime
3
import http.client as http_client
4
import json
5
import logging
6
import os
7
import platform
8
import signal
9
import sys
10
from os import getenv
11
from pathlib import Path
12
from typing import Any, NoReturn
13
14
import colorlog
15
import requests
16
from packaging import version
17
from requests.exceptions import RequestException
18
19
from tgtg_scanner._version import __author__, __description__, __url__, __version__
20
from tgtg_scanner.errors import ConfigurationError, TgtgAPIError
21
from tgtg_scanner.models import Config
22
from tgtg_scanner.scanner import Scanner
23
24
VERSION_URL = "https://api.github.com/repos/Der-Henning/tgtg/releases/latest"
25
26
HEADER = (
27
r" ____ ___ ____ ___ ____ ___ __ __ _ __ _ ____ ____ ",
28
r" (_ _)/ __)(_ _)/ __) / ___) / __) / _\ ( ( \( ( \( __)( _ \ ",
29
r" )( ( (_ \ )( ( (_ \ \___ \( (__ / \/ // / ) _) ) / ",
30
r" (__) \___/ (__) \___/ (____/ \___)\_/\_/\_)__)\_)__)(____)(__\_) ",
31
)
32
33
34
# set to 1 to debug http headers
35
http_client.HTTPConnection.debuglevel = 0
36
37
SYS_PLATFORM = platform.system()
38
IS_WINDOWS = SYS_PLATFORM.lower() in {"windows", "cygwin"}
39
IS_EXECUTABLE = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")
40
PROG_PATH = Path(sys.executable).parent if IS_EXECUTABLE else Path(os.getcwd())
41
IS_DOCKER = getenv("DOCKER", "False").lower() in {"true", "1", "t", "y", "yes"}
42
LOGS_PATH = Path(getenv("LOGS_PATH", PROG_PATH))
43
44
45
def main():
46
"""Wrapper for Scanner and Helper functions."""
47
_register_signals()
48
49
config_file = _get_config_file()
50
log_file = Path(LOGS_PATH, "scanner.log")
51
52
parser = argparse.ArgumentParser(description=__description__)
53
parser.add_argument("-v", "--version", action="version", version=f"v{__version__}")
54
parser.add_argument("-d", "--debug", action="store_true", help="activate debugging mode")
55
parser.add_argument(
56
"-c",
57
"--config",
58
metavar="config_file",
59
type=Path,
60
default=config_file,
61
help="path to config file (default: config.ini)",
62
)
63
parser.add_argument(
64
"-l",
65
"--log_file",
66
metavar="log_file",
67
type=Path,
68
default=log_file,
69
help="path to log file (default: scanner.log)",
70
)
71
helper_group = parser.add_mutually_exclusive_group(required=False)
72
helper_group.add_argument(
73
"-t",
74
"--tokens",
75
action="store_true",
76
help="display your current access tokens and exit",
77
)
78
helper_group.add_argument("-f", "--favorites", action="store_true", help="display your favorites and exit")
79
helper_group.add_argument(
80
"-F",
81
"--favorite_ids",
82
action="store_true",
83
help="display the item ids of your favorites and exit",
84
)
85
helper_group.add_argument(
86
"-a",
87
"--add",
88
nargs="+",
89
metavar="item_id",
90
help="add item ids to favorites and exit",
91
)
92
helper_group.add_argument(
93
"-r",
94
"--remove",
95
nargs="+",
96
metavar="item_id",
97
help="remove item ids from favorites and exit",
98
)
99
helper_group.add_argument("-R", "--remove_all", action="store_true", help="remove all favorites and exit")
100
json_group = parser.add_mutually_exclusive_group(required=False)
101
json_group.add_argument("-j", "--json", action="store_true", help="output as plain json")
102
json_group.add_argument("-J", "--json_pretty", action="store_true", help="output as pretty json")
103
parser.add_argument("--base_url", default=None, help="Overwrite TGTG API URL for testing")
104
args = parser.parse_args()
105
106
# Disable logging for json output
107
if args.json or args.json_pretty:
108
logging.disable(logging.CRITICAL)
109
110
# Remove all handlers
111
for handler in logging.root.handlers:
112
logging.root.removeHandler(handler)
113
114
# Set all loggers to level Error
115
for logger_name in logging.root.manager.loggerDict:
116
logging.getLogger(logger_name).setLevel(logging.CRITICAL)
117
118
# Define stream formatter and handler
119
stream_formatter = colorlog.ColoredFormatter(
120
fmt=("%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)-8s%(reset)s %(message)s"),
121
datefmt="%Y-%m-%d %H:%M:%S",
122
log_colors={
123
"DEBUG": "purple",
124
"INFO": "green",
125
"WARNING": "yellow",
126
"ERROR": "red",
127
"CRITICAL": "red",
128
},
129
)
130
stream_handler = logging.StreamHandler()
131
stream_handler.setFormatter(stream_formatter)
132
logging.root.addHandler(stream_handler)
133
134
# Define file formatter and handler
135
args.log_file.parent.mkdir(parents=True, exist_ok=True)
136
file_handler = logging.FileHandler(args.log_file, mode="w", encoding="utf-8")
137
file_formatter = logging.Formatter(
138
fmt=("[%(asctime)s][%(name)s][%(filename)s:%(funcName)s:%(lineno)d][%(levelname)s] %(message)s"),
139
datefmt="%Y-%m-%d %H:%M:%S",
140
)
141
file_handler.setFormatter(file_formatter)
142
logging.root.addHandler(file_handler)
143
144
# Create tgtg logger
145
log = logging.getLogger("tgtg")
146
log.setLevel(logging.INFO)
147
148
try:
149
# Load config
150
config = Config(args.config)
151
config.docker = IS_DOCKER
152
153
# Activate debugging mode
154
if args.debug:
155
config.debug = True
156
if config.debug:
157
for logger_name in logging.root.manager.loggerDict:
158
logging.getLogger(logger_name).setLevel(logging.DEBUG)
159
log.info("Debugging mode enabled")
160
161
if args.base_url is not None:
162
config.tgtg.base_url = args.base_url
163
164
scanner = Scanner(config)
165
if args.tokens:
166
credentials = scanner.get_credentials()
167
if args.json:
168
print(json.dumps(credentials, sort_keys=True))
169
elif args.json_pretty:
170
print(json.dumps(credentials, sort_keys=True, indent=4))
171
else:
172
print()
173
print("Your TGTG credentials:")
174
print("Email: ", credentials.get("email"))
175
print("Access Token: ", credentials.get("access_token"))
176
print("Refresh Token: ", credentials.get("refresh_token"))
177
print("Datadome Cookie:", credentials.get("datadome_cookie"))
178
print()
179
elif args.favorites:
180
favorites = scanner.get_favorites()
181
if args.json:
182
print(json.dumps(favorites, sort_keys=True))
183
elif args.json_pretty:
184
print(json.dumps(favorites, sort_keys=True, indent=4))
185
else:
186
print()
187
print("Your favorites:")
188
print(json.dumps(favorites, sort_keys=True, indent=4))
189
print()
190
elif args.favorite_ids:
191
favorites = scanner.get_favorites()
192
item_ids = [fav.get("item", {}).get("item_id") for fav in favorites]
193
if args.json:
194
print(json.dumps(item_ids, sort_keys=True))
195
elif args.json_pretty:
196
print(json.dumps(item_ids, sort_keys=True, indent=4))
197
else:
198
print()
199
print("Item IDs:")
200
print(" ".join(item_ids))
201
print()
202
elif args.add is not None:
203
for item_id in args.add:
204
scanner.set_favorite(item_id)
205
print("done.")
206
elif args.remove is not None:
207
for item_id in args.remove:
208
scanner.unset_favorite(item_id)
209
print("done.")
210
elif args.remove_all:
211
if query_yes_no("Remove all favorites from your account?", default="no"):
212
scanner.unset_all_favorites()
213
print("done.")
214
else:
215
_run_scanner(scanner)
216
except ConfigurationError as err:
217
log.error("Configuration Error: %s", err)
218
sys.exit(1)
219
except TgtgAPIError as err:
220
log.error("TGTG API Error: %s", err)
221
sys.exit(1)
222
except KeyboardInterrupt:
223
log.info("Shutting down scanner ...")
224
scanner.stop()
225
sys.exit(0)
226
except SystemExit:
227
sys.exit(1)
228
229
230
def _get_config_file() -> Path | None:
231
# Default: config.ini in current working directory or next to executable
232
config_file = Path(PROG_PATH, "config.ini")
233
if config_file.is_file():
234
return config_file
235
# config.ini in project folder (same place as config.sample.ini)
236
config_file = Path(__file__).parents[1] / "config.ini"
237
if config_file.is_file():
238
return config_file
239
# legacy: config.ini in src folder
240
config_file = Path(__file__).parents[1] / "src" / "config.ini"
241
if config_file.is_file():
242
return config_file
243
return None
244
245
246
def _run_scanner(scanner: Scanner) -> NoReturn:
247
_print_welcome_message()
248
_print_version_check()
249
if scanner.config.quiet and not scanner.config.debug:
250
for logger_name in logging.root.manager.loggerDict:
251
logging.getLogger(logger_name).setLevel(logging.ERROR)
252
scanner.run()
253
254
255
def _get_new_version() -> dict | None:
256
log = logging.getLogger("tgtg")
257
try:
258
res = requests.get(VERSION_URL, timeout=60)
259
res.raise_for_status()
260
lastest_release = res.json()
261
if version.parse(__version__) < version.parse(lastest_release.get("tag_name")):
262
return lastest_release
263
except (RequestException, version.InvalidVersion, ValueError) as err:
264
log.warning("Failed getting latest version! - %s", err)
265
return None
266
267
268
def _print_version_check() -> None:
269
log = logging.getLogger("tgtg")
270
try:
271
lastest_release = _get_new_version()
272
if lastest_release is not None:
273
log.info("New Version %s available!", version.parse(lastest_release.get("tag_name")))
274
log.info("Please visit %s", lastest_release.get("html_url"))
275
log.info("")
276
except (version.InvalidVersion, ValueError) as err:
277
log.warning("Failed checking for new Version! - %s", err)
278
279
280
def _print_welcome_message() -> None:
281
log = logging.getLogger("tgtg")
282
for line in HEADER:
283
log.info(line)
284
log.info("")
285
log.info("Version %s", __version__)
286
today = datetime.date.today()
287
log.info("©%s, %s", today.year, __author__)
288
log.info("For documentation and support please visit %s", __url__)
289
log.info("")
290
291
292
def _register_signals() -> None:
293
# TODO: Define SIGUSR1, SIGUSR2
294
signal.signal(signal.SIGINT, _handle_exit_signal)
295
signal.signal(signal.SIGTERM, _handle_exit_signal)
296
if hasattr(signal, "SIGBREAK"):
297
signal.signal(signal.SIGBREAK, _handle_exit_signal)
298
if not IS_WINDOWS:
299
signal.signal(signal.SIGHUP, _handle_exit_signal) # type: ignore[attr-defined]
300
# TODO: SIGQUIT is ideally meant to terminate with core dumps
301
signal.signal(signal.SIGQUIT, _handle_exit_signal) # type: ignore[attr-defined]
302
303
304
def _handle_exit_signal(signum: int, _frame: Any) -> None:
305
log = logging.getLogger("tgtg")
306
log.debug("Received signal %d", signum)
307
raise KeyboardInterrupt
308
309
310
def query_yes_no(question, default="yes") -> bool:
311
"""Ask a yes/no question via raw_input() and return their answer.
312
313
"question" is a string that is presented to the user.
314
"default" is the presumed answer if the user just hits <Enter>.
315
It must be "yes" (the default), "no" or None (meaning
316
an answer is required of the user).
317
318
The "answer" return value is True for "yes" or False for "no".
319
"""
320
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
321
if default is None:
322
prompt = " [y/n] "
323
elif default == "yes":
324
prompt = " [Y/n] "
325
elif default == "no":
326
prompt = " [y/N] "
327
else:
328
raise ValueError(f"invalid default answer: '{default}'")
329
330
while True:
331
print(question + prompt)
332
choice = input().lower()
333
if default is not None and choice == "":
334
return valid[default]
335
if choice in valid:
336
return valid[choice]
337
print("Please respond with 'yes' or 'no' (or 'y' or 'n').")
338
339
340
if __name__ == "__main__":
341
main()
342
343