Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
maurosoria
GitHub Repository: maurosoria/dirsearch
Path: blob/master/lib/controller/session.py
896 views
1
# -*- coding: utf-8 -*-
2
# This program is free software; you can redistribute it and/or modify
3
# it under the terms of the GNU General Public License as published by
4
# the Free Software Foundation; either version 2 of the License, or
5
# (at your option) any later version.
6
#
7
# This program is distributed in the hope that it will be useful,
8
# but WITHOUT ANY WARRANTY; without even the implied warranty of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
# GNU General Public License for more details.
11
#
12
# You should have received a copy of the GNU General Public License
13
# along with this program; if not, write to the Free Software
14
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
15
# MA 02110-1301, USA.
16
#
17
# Author: Mauro Soria
18
19
from __future__ import annotations
20
21
import json
22
import os
23
from typing import Any
24
25
import mysql.connector
26
import psycopg
27
28
from lib.core.exceptions import InvalidURLException, UnpicklingError
29
from lib.core.logger import logger
30
from lib.report.manager import ReportManager
31
from lib.utils.file import FileUtils
32
from lib.view.terminal import interface
33
34
35
class SessionStore:
36
SESSION_VERSION = 1
37
SESSION_OPTION_SET_KEYS = {
38
"recursion_status_codes",
39
"include_status_codes",
40
"exclude_status_codes",
41
"exclude_sizes",
42
"skip_on_status",
43
}
44
SESSION_OPTION_TUPLE_KEYS = {
45
"extensions",
46
"exclude_extensions",
47
"prefixes",
48
"suffixes",
49
}
50
FILES = {
51
"meta": "meta.json",
52
"controller": "controller.json",
53
"dictionary": "dictionary.json",
54
"options": "options.json",
55
}
56
57
def __init__(self, options: dict[str, Any]) -> None:
58
self.options = options
59
60
def list_sessions(self, base_path: str) -> list[dict[str, Any]]:
61
sessions: list[dict[str, Any]] = []
62
63
if os.path.isfile(base_path):
64
summary = self._summarize_session_file(base_path)
65
if summary:
66
sessions.append(summary)
67
return sessions
68
69
if not os.path.isdir(base_path):
70
return sessions
71
72
for root, dirs, files in os.walk(base_path):
73
if root == base_path:
74
for file_name in files:
75
summary = self._summarize_session_file(
76
FileUtils.build_path(root, file_name)
77
)
78
if summary:
79
sessions.append(summary)
80
81
if self.FILES["meta"] in files:
82
summary = self._summarize_session_dir(root)
83
if summary:
84
sessions.append(summary)
85
dirs.clear()
86
87
sessions.sort(key=lambda item: item["path"])
88
return sessions
89
90
def load(self, session_path: str) -> dict[str, Any]:
91
if os.path.isfile(session_path):
92
payload = self._read_json(session_path)
93
self._validate_payload(payload)
94
return payload
95
96
session_dir = self._get_session_dir(session_path)
97
meta_payload = self._read_json(
98
FileUtils.build_path(session_dir, self.FILES["meta"])
99
)
100
payload = {
101
"version": meta_payload["version"],
102
"last_output": meta_payload.get("last_output", ""),
103
"output_history": meta_payload.get("output_history", []),
104
"controller": self._read_json(
105
FileUtils.build_path(session_dir, self.FILES["controller"])
106
),
107
"dictionary": self._read_json(
108
FileUtils.build_path(session_dir, self.FILES["dictionary"])
109
),
110
"options": self._read_json(
111
FileUtils.build_path(session_dir, self.FILES["options"])
112
),
113
}
114
self._validate_payload(payload)
115
return payload
116
117
def save(self, controller: Any, session_path: str, last_output: str) -> None:
118
session_dir = self._get_session_dir(session_path)
119
output_history = self._get_controller_history(controller)
120
if output_history is None:
121
output_history = self._load_output_history(session_dir)
122
else:
123
output_history = list(output_history)
124
if last_output:
125
output_history.append(
126
{"start_time": controller.start_time, "output": last_output}
127
)
128
controller.output_history = output_history
129
payload = {
130
"version": self.SESSION_VERSION,
131
"controller": self._serialize_controller_state(controller),
132
"dictionary": self._serialize_dictionary(controller),
133
"options": self._serialize_options(),
134
"last_output": last_output,
135
}
136
FileUtils.create_dir(session_dir)
137
138
meta_path = FileUtils.build_path(session_dir, self.FILES["meta"])
139
self._write_json(
140
meta_path,
141
{
142
"version": payload["version"],
143
"last_output": last_output,
144
"output_history": output_history,
145
},
146
)
147
self._write_json(
148
FileUtils.build_path(session_dir, self.FILES["controller"]),
149
payload["controller"],
150
)
151
self._write_json(
152
FileUtils.build_path(session_dir, self.FILES["dictionary"]),
153
payload["dictionary"],
154
)
155
self._write_json(
156
FileUtils.build_path(session_dir, self.FILES["options"]),
157
payload["options"],
158
)
159
160
def apply_to_controller(self, controller: Any, payload: dict[str, Any]) -> None:
161
controller_state = payload["controller"]
162
controller.start_time = controller_state["start_time"]
163
controller.passed_urls = set(controller_state.get("passed_urls", []))
164
controller.directories = controller_state.get("directories", [])
165
controller.jobs_processed = controller_state.get("jobs_processed", 0)
166
controller.errors = controller_state.get("errors", 0)
167
controller.consecutive_errors = controller_state.get("consecutive_errors", 0)
168
controller.base_path = controller_state.get("base_path", "")
169
controller.url = controller_state.get("url", "")
170
controller.old_session = controller_state.get("old_session", True)
171
if not hasattr(controller, "dictionary") or controller.dictionary is None:
172
from lib.core.dictionary import Dictionary
173
174
controller.dictionary = Dictionary()
175
else:
176
controller.dictionary = controller.dictionary.__class__()
177
dictionary_state = payload["dictionary"]
178
controller.dictionary.__setstate__(
179
(
180
dictionary_state["items"],
181
dictionary_state["index"],
182
dictionary_state.get("extra", []),
183
dictionary_state.get("extra_index", 0),
184
)
185
)
186
try:
187
controller.reporter = ReportManager(self.options["output_formats"])
188
except (
189
InvalidURLException,
190
mysql.connector.Error,
191
psycopg.Error,
192
) as error:
193
logger.exception(error)
194
interface.error(str(error))
195
raise SystemExit(1)
196
197
def restore_options(self, serialized: dict[str, Any]) -> dict[str, Any]:
198
restored: dict[str, Any] = {}
199
for key, value in serialized.items():
200
if key in self.SESSION_OPTION_SET_KEYS and value is not None:
201
restored[key] = set(value)
202
elif key in self.SESSION_OPTION_TUPLE_KEYS and value is not None:
203
restored[key] = tuple(value)
204
else:
205
restored[key] = value
206
return restored
207
208
def _serialize_controller_state(self, controller: Any) -> dict[str, Any]:
209
return {
210
"start_time": controller.start_time,
211
"passed_urls": sorted(controller.passed_urls),
212
"directories": list(controller.directories),
213
"jobs_processed": controller.jobs_processed,
214
"errors": controller.errors,
215
"consecutive_errors": controller.consecutive_errors,
216
"base_path": controller.base_path,
217
"url": controller.url,
218
"old_session": controller.old_session,
219
}
220
221
def _serialize_dictionary(self, controller: Any) -> dict[str, Any]:
222
items, index, extra, extra_index = controller.dictionary.__getstate__()
223
return {
224
"items": items,
225
"index": index,
226
"extra": extra,
227
"extra_index": extra_index,
228
}
229
230
def _serialize_options(self) -> dict[str, Any]:
231
serialized: dict[str, Any] = {}
232
for key, value in self.options.items():
233
if isinstance(value, (set, tuple)):
234
serialized[key] = list(value)
235
else:
236
serialized[key] = value
237
return serialized
238
239
def _get_session_dir(self, session_path: str) -> str:
240
return session_path
241
242
def _read_json(self, path: str) -> dict[str, Any]:
243
try:
244
with open(path, "r", encoding="utf-8") as file_handle:
245
return json.load(file_handle)
246
except (
247
OSError,
248
json.JSONDecodeError,
249
TypeError,
250
UnicodeDecodeError,
251
) as error:
252
raise UnpicklingError(str(error)) from error
253
254
def _write_json(self, path: str, payload: dict[str, Any]) -> None:
255
with open(path, "w", encoding="utf-8") as file_handle:
256
json.dump(payload, file_handle, indent=2, ensure_ascii=False)
257
258
def _validate_payload(self, payload: dict[str, Any]) -> None:
259
if payload.get("version") != self.SESSION_VERSION:
260
raise UnpicklingError("Unsupported session format version")
261
for key in ("controller", "dictionary", "options"):
262
if key not in payload:
263
raise UnpicklingError("Missing required session data")
264
265
def _get_controller_history(self, controller: Any) -> list[dict[str, Any]] | None:
266
if not hasattr(controller, "output_history"):
267
return None
268
history = controller.output_history
269
if isinstance(history, list):
270
return history
271
return None
272
273
def _load_output_history(self, session_dir: str) -> list[dict[str, Any]]:
274
meta_path = FileUtils.build_path(session_dir, self.FILES["meta"])
275
if not os.path.isfile(meta_path):
276
return []
277
try:
278
meta_payload = self._read_json(meta_path)
279
except UnpicklingError:
280
return []
281
if meta_payload.get("version") != self.SESSION_VERSION:
282
return []
283
history_payload = meta_payload.get("output_history")
284
if isinstance(history_payload, list):
285
history: list[dict[str, Any]] = []
286
for entry in history_payload:
287
if not isinstance(entry, dict):
288
continue
289
output = entry.get("output")
290
if output is None:
291
continue
292
history.append(
293
{"start_time": entry.get("start_time"), "output": output}
294
)
295
return history
296
297
last_output = meta_payload.get("last_output")
298
if not last_output:
299
return []
300
301
start_time = None
302
controller_path = FileUtils.build_path(session_dir, self.FILES["controller"])
303
if os.path.isfile(controller_path):
304
try:
305
controller_payload = self._read_json(controller_path)
306
start_time = controller_payload.get("start_time")
307
except UnpicklingError:
308
start_time = None
309
310
return [{"start_time": start_time, "output": last_output}]
311
312
def _summarize_session_dir(self, session_dir: str) -> dict[str, Any] | None:
313
meta_path = FileUtils.build_path(session_dir, self.FILES["meta"])
314
if not os.path.isfile(meta_path):
315
return None
316
try:
317
meta_payload = self._read_json(meta_path)
318
if meta_payload.get("version") != self.SESSION_VERSION:
319
return None
320
controller_payload = self._read_json(
321
FileUtils.build_path(session_dir, self.FILES["controller"])
322
)
323
options_payload = self._read_json(
324
FileUtils.build_path(session_dir, self.FILES["options"])
325
)
326
except UnpicklingError:
327
return None
328
return self._build_summary(
329
session_dir, meta_path, controller_payload, options_payload
330
)
331
332
def _summarize_session_file(self, session_file: str) -> dict[str, Any] | None:
333
try:
334
payload = self._read_json(session_file)
335
except UnpicklingError:
336
return None
337
if payload.get("version") != self.SESSION_VERSION:
338
return None
339
controller_payload = payload.get("controller")
340
options_payload = payload.get("options")
341
if controller_payload is None or options_payload is None:
342
return None
343
return self._build_summary(
344
session_file, session_file, controller_payload, options_payload
345
)
346
347
def _build_summary(
348
self,
349
session_path: str,
350
meta_path: str,
351
controller_state: dict[str, Any],
352
options_state: dict[str, Any],
353
) -> dict[str, Any]:
354
return {
355
"path": session_path,
356
"url": controller_state.get("url", ""),
357
"targets_left": len(options_state.get("urls") or []),
358
"directories_left": len(controller_state.get("directories") or []),
359
"jobs_processed": controller_state.get("jobs_processed", 0),
360
"errors": controller_state.get("errors", 0),
361
"modified": os.path.getmtime(meta_path),
362
}
363
364