from __future__ import annotations
import asyncio
import gc
import os
import sys
import shutil
import signal
import sys
import psycopg
import re
import time
import mysql.connector
from typing import Any
from urllib.parse import urlparse
from lib.connection.dns import cache_dns
from lib.connection.response import BaseResponse
from lib.core.data import blacklists, options
from lib.core.decorators import locked
from lib.core.dictionary import Dictionary, get_blacklists
from lib.core.exceptions import (
CannotConnectException,
FileExistsException,
InvalidRawRequest,
InvalidURLException,
RequestException,
SkipTargetInterrupt,
QuitInterrupt,
UnpicklingError,
)
from lib.core.logger import enable_logging, logger
from lib.core.settings import (
BANNER,
DEFAULT_HEADERS,
DEFAULT_SESSION_FILE,
EXTENSION_RECOGNITION_REGEX,
MAX_CONSECUTIVE_REQUEST_ERRORS,
NEW_LINE,
SIGINT_FORCE_QUIT_THRESHOLD,
SIGINT_WINDOW_SECONDS,
STANDARD_PORTS,
START_TIME,
UNKNOWN,
)
from lib.parse.rawrequest import parse_raw
from lib.parse.url import clean_path, parse_path
from lib.report.manager import ReportManager
from lib.utils.common import lstrip_once
from lib.utils.crawl import Crawler
from lib.utils.file import FileUtils
from lib.utils.schemedet import detect_scheme
from lib.view.terminal import interface
from lib.controller.session import SessionStore
class ForceQuitHandler:
"""Strategy for handling force quit on repeated Ctrl+C.
Different platforms have different signal handling behaviors. This base
class defines the interface, with subclasses implementing platform-specific
logic.
"""
def check_force_quit(self) -> bool:
"""Check if force quit should be triggered.
Returns True if force quit was triggered (program will exit).
"""
raise NotImplementedError
def on_pause_start(self) -> None:
"""Called when pause mode is entered."""
pass
def on_resume(self) -> None:
"""Called when resuming from pause."""
pass
class StandardForceQuitHandler(ForceQuitHandler):
"""Force quit handler for standard platforms.
Immediately exits on any Ctrl+C during pause mode.
"""
def check_force_quit(self) -> bool:
interface.warning("\nForce quit!", do_save=False)
os._exit(1)
return True
class PyInstallerLinuxForceQuitHandler(ForceQuitHandler):
"""Force quit handler for PyInstaller Linux builds.
PyInstaller on Linux has signal handling quirks that require multiple
rapid Ctrl+C presses to force quit. Uses SIGKILL for reliable termination.
"""
def __init__(self) -> None:
self._sigint_count = 0
self._last_sigint_time = 0.0
def check_force_quit(self) -> bool:
now = time.monotonic()
if now - self._last_sigint_time <= SIGINT_WINDOW_SECONDS:
self._sigint_count += 1
else:
self._sigint_count = 1
self._last_sigint_time = now
if self._sigint_count >= SIGINT_FORCE_QUIT_THRESHOLD:
interface.warning("\nForce quit!", do_save=False)
os.kill(os.getpid(), signal.SIGKILL)
os._exit(1)
return False
def on_pause_start(self) -> None:
self._sigint_count = 1
self._last_sigint_time = time.monotonic()
def on_resume(self) -> None:
self._sigint_count = 0
def _create_force_quit_handler() -> ForceQuitHandler:
"""Factory function to create the appropriate force quit handler."""
is_pyinstaller_linux = (
getattr(sys, "frozen", False) and sys.platform.startswith("linux")
)
if is_pyinstaller_linux:
return PyInstallerLinuxForceQuitHandler()
return StandardForceQuitHandler()
def format_session_path(path: str) -> str:
date_token = START_TIME.split()[0]
datetime_token = START_TIME.replace(" ", "_")
datetime_token = datetime_token.replace(":", "-")
return path.replace("{date}", date_token).replace("{datetime}", datetime_token)
class Controller:
def __init__(self) -> None:
self._handling_pause = False
self._force_quit_handler = _create_force_quit_handler()
self.loop = None
if options["session_file"]:
self._import(options["session_file"])
if not hasattr(self, "old_session"):
self.old_session = True
else:
self.setup()
self.old_session = False
self.run()
def _import(self, session_file: str) -> None:
try:
if os.path.isfile(session_file) and session_file.endswith((".pickle", ".pkl")):
interface.warning(
"Pickle session files are no longer supported. "
"Please start a new scan to create a JSON session."
)
sys.exit(1)
session_store = SessionStore(options)
payload = session_store.load(session_file)
loaded_session_file = session_file
options.update(session_store.restore_options(payload["options"]))
options["session_file"] = loaded_session_file
if options["log_file"]:
try:
FileUtils.create_dir(FileUtils.parent(options["log_file"]))
if not FileUtils.can_write(options["log_file"]):
raise Exception
enable_logging()
except Exception:
interface.error(
f'Couldn\'t create log file at {options["log_file"]}'
)
sys.exit(1)
output_history = payload.get("output_history") or []
if not output_history:
legacy_output = payload.get("last_output", "")
if legacy_output:
start_time = payload.get("controller", {}).get("start_time")
output_history = [
{"start_time": start_time, "output": legacy_output}
]
self.output_history = output_history
if output_history:
last_output = self._format_output_history(output_history)
else:
last_output = ""
session_store.apply_to_controller(self, payload)
self._confirm_session_overwrite(session_file)
except (OSError, KeyError, TypeError, UnpicklingError):
interface.error(
f"{session_file} is not a valid session file or it's in an old format"
)
sys.exit(1)
print(last_output)
def _format_output_history(self, output_history: list[dict[str, Any]]) -> str:
formatted: list[str] = []
for entry in output_history:
if not isinstance(entry, dict):
continue
output = entry.get("output")
if not output:
continue
start_time = entry.get("start_time")
if isinstance(start_time, (int, float)):
start_label = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(start_time)
)
formatted.append(f"--- Previous run started: {start_label} ---")
else:
formatted.append("--- Previous run ---")
formatted.append(output.rstrip())
return "\n".join(formatted).rstrip()
def _confirm_session_overwrite(self, session_file: str) -> None:
interface.in_line(
f"Resume session from {session_file}. Overwrite on save? [o]verwrite/[n]ew: "
)
choice = input().strip().lower()
if choice == "n":
options["session_file"] = None
def _export(self, session_file: str) -> None:
last_output = interface.buffer.rstrip()
session_file = format_session_path(session_file)
parent_dir = FileUtils.parent(session_file)
if parent_dir:
FileUtils.create_dir(parent_dir)
session_store = SessionStore(options)
session_store.save(self, session_file, last_output)
def setup(self) -> None:
blacklists.update(get_blacklists())
if options["raw_file"]:
try:
options.update(
zip(
["urls", "http_method", "headers", "data"],
parse_raw(options["raw_file"]),
)
)
except InvalidRawRequest as e:
print(str(e))
sys.exit(1)
else:
options["headers"] = {**DEFAULT_HEADERS, **options["headers"]}
self.dictionary = Dictionary(files=options["wordlists"])
self.start_time = time.time()
self.passed_urls: set[str] = set()
self.directories: list[str] = []
self.jobs_processed = 0
self.errors = 0
self.consecutive_errors = 0
if options["log_file"]:
try:
FileUtils.create_dir(FileUtils.parent(options["log_file"]))
if not FileUtils.can_write(options["log_file"]):
raise Exception
enable_logging()
except Exception:
interface.error(
f'Couldn\'t create log file at {options["log_file"]}'
)
sys.exit(1)
interface.header(BANNER)
interface.config(len(self.dictionary))
try:
self.reporter = ReportManager(options["output_formats"])
except (
InvalidURLException,
mysql.connector.Error,
psycopg.Error,
) as e:
logger.exception(e)
interface.error(str(e))
sys.exit(1)
if options["log_file"]:
interface.log_file(options["log_file"])
def run(self) -> None:
if options["async_mode"]:
from lib.connection.requester import AsyncRequester as Requester
from lib.core.fuzzer import AsyncFuzzer as Fuzzer
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
else:
from lib.connection.requester import Requester
from lib.core.fuzzer import Fuzzer
match_callbacks = (
self.match_callback, self.reporter.save, self.reset_consecutive_errors
)
not_found_callbacks = (
self.update_progress_bar, self.reset_consecutive_errors
)
error_callbacks = (self.raise_error, self.append_error_log)
self.requester = Requester()
if options["async_mode"]:
self.loop = asyncio.new_event_loop()
signal.signal(signal.SIGINT, lambda *_: self.handle_pause())
signal.signal(signal.SIGTERM, lambda *_: self.handle_pause())
while options["urls"]:
url = options["urls"][0]
self.fuzzer = Fuzzer(
self.requester,
self.dictionary,
match_callbacks=match_callbacks,
not_found_callbacks=not_found_callbacks,
error_callbacks=error_callbacks,
)
try:
self.set_target(url)
if not self.directories:
for subdir in options["subdirs"]:
self.add_directory(self.base_path + subdir)
if not self.old_session:
interface.target(self.url)
self.reporter.prepare(self.url)
self.start()
except (
CannotConnectException,
FileExistsException,
InvalidURLException,
RequestException,
SkipTargetInterrupt,
KeyboardInterrupt,
) as e:
self.directories.clear()
self.dictionary.reset()
if e.args:
interface.error(str(e))
except QuitInterrupt as e:
self.reporter.finish()
interface.error(e.args[0])
sys.exit(0)
finally:
options["urls"].pop(0)
interface.warning("\nTask Completed")
self.reporter.finish()
if options["session_file"]:
try:
if os.path.isdir(options["session_file"]):
shutil.rmtree(options["session_file"])
else:
os.remove(options["session_file"])
except Exception:
interface.error("Failed to delete old session file, remove it to free some space")
def start(self) -> None:
start_time = time.time()
while self.directories:
try:
gc.collect()
current_directory = self.directories[0]
if not self.old_session:
current_time = time.strftime("%H:%M:%S")
msg = f"{NEW_LINE}[{current_time}] Scanning: {current_directory}"
interface.warning(msg)
self.fuzzer.set_base_path(current_directory)
if options["async_mode"]:
self.pause_future = self.loop.create_future()
self.loop.run_until_complete(self.start_coroutines(start_time))
else:
self.fuzzer.start()
self.process(start_time)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
self.dictionary.reset()
self.directories.pop(0)
self.jobs_processed += 1
self.old_session = False
async def start_coroutines(self, start_time: float) -> None:
task = self.loop.create_task(self.fuzzer.start())
timeout = min(
t for t in [
options["max_time"] - (time.time() - self.start_time),
options["target_max_time"] - (time.time() - start_time),
] if t > 0
) if options["max_time"] or options["target_max_time"] else None
try:
await asyncio.wait_for(
asyncio.wait(
[self.pause_future, task],
return_when=asyncio.FIRST_COMPLETED,
),
timeout=timeout,
)
except asyncio.TimeoutError:
if time.time() - self.start_time > options["max_time"] > 0:
raise QuitInterrupt("Runtime exceeded the maximum set by the user")
raise SkipTargetInterrupt("Runtime for target exceeded the maximum set by the user")
if self.pause_future.done():
task.cancel()
await self.pause_future
await task
def process(self, start_time: float) -> None:
while True:
while not self.fuzzer.is_finished():
now = time.time()
if now - self.start_time > options["max_time"] > 0:
raise QuitInterrupt(
"Runtime exceeded the maximum set by the user"
)
if now - start_time > options["target_max_time"] > 0:
raise SkipTargetInterrupt(
"Runtime for target exceeded the maximum set by the user"
)
time.sleep(0.5)
break
def set_target(self, url: str) -> None:
if "://" not in url:
url = f'{options["scheme"] or UNKNOWN}://{url}'
if not url.endswith("/"):
url += "/"
parsed = urlparse(url)
self.base_path = lstrip_once(parsed.path, "/")
if "@" in parsed.netloc:
cred, parsed.netloc = parsed.netloc.split("@")
self.requester.set_auth("basic", cred)
if parsed.scheme not in (UNKNOWN, "https", "http"):
raise InvalidURLException(f"Unsupported URI scheme: {parsed.scheme}")
port = parsed.port
if not port:
port = STANDARD_PORTS.get(parsed.scheme, None)
elif not 0 < port < 65536:
raise InvalidURLException(f"Invalid port number: {port}")
if options["ip"]:
cache_dns(parsed.hostname, port, options["ip"])
try:
scheme = (
parsed.scheme
if parsed.scheme != UNKNOWN
else detect_scheme(parsed.hostname, port)
)
except ValueError:
scheme = detect_scheme(parsed.hostname, 443)
port = STANDARD_PORTS[scheme]
self.url = f"{scheme}://{parsed.hostname}"
if port != STANDARD_PORTS[scheme]:
self.url += f":{port}"
self.url += "/"
self.requester.set_url(self.url)
def reset_consecutive_errors(self, response: BaseResponse) -> None:
self.consecutive_errors = 0
def match_callback(self, response: BaseResponse) -> None:
if response.status in options["skip_on_status"]:
raise SkipTargetInterrupt(
f"Skipped the target due to {response.status} status code"
)
interface.status_report(response, options["full_url"])
if response.status in options["recursion_status_codes"] and any(
(
options["recursive"],
options["deep_recursive"],
options["force_recursive"],
)
):
if response.redirect:
new_path = clean_path(parse_path(response.redirect))
added_to_queue = self.recur_for_redirect(response.path, new_path)
elif len(response.history):
old_path = clean_path(parse_path(response.history[0]))
added_to_queue = self.recur_for_redirect(old_path, response.path)
else:
added_to_queue = self.recur(response.path)
if added_to_queue:
interface.new_directories(added_to_queue)
if options["replay_proxy"]:
if options["async_mode"]:
self.loop.create_task(self.requester.replay_request(response.full_path, proxy=options["replay_proxy"]))
else:
self.requester.request(response.full_path, proxy=options["replay_proxy"])
if options["crawl"]:
for path in Crawler.crawl(response):
if not self.dictionary.is_valid(path):
continue
path = lstrip_once(path, self.base_path)
self.dictionary.add_extra(path)
def update_progress_bar(self, response: BaseResponse) -> None:
jobs_count = (
len(options["subdirs"]) * (len(options["urls"]) - 1)
+ len(self.directories)
+ self.jobs_processed
)
interface.last_path(
self.dictionary.index,
len(self.dictionary),
self.jobs_processed + 1,
jobs_count,
self.requester.rate,
self.errors,
)
def raise_error(self, exception: RequestException) -> None:
if options["exit_on_error"]:
raise QuitInterrupt("Canceled due to an error")
self.errors += 1
self.consecutive_errors += 1
if self.consecutive_errors > MAX_CONSECUTIVE_REQUEST_ERRORS:
raise SkipTargetInterrupt("Too many request errors")
def append_error_log(self, exception: RequestException) -> None:
logger.exception(exception)
def _force_exit(self) -> None:
"""Force process termination, stopping asyncio loop if running."""
interface.warning("\nForce quit!", do_save=False)
if self.loop and self.loop.is_running():
try:
self.loop.stop()
except Exception:
pass
os._exit(1)
def handle_pause(self) -> None:
"""Handle SIGINT (Ctrl+C) by pausing execution and showing options."""
if self._handling_pause:
self._force_quit_handler.check_force_quit()
return
self._handling_pause = True
self._force_quit_handler.on_pause_start()
try:
try:
interface.warning(
"CTRL+C detected: Pausing threads, please wait...", do_save=False
)
if not self.fuzzer.pause():
interface.warning(
"Could not pause all threads (some may be blocked on I/O). "
"Press CTRL+C again to force quit.",
do_save=False
)
except Exception:
pass
while True:
msg = "[q]uit / [c]ontinue"
if len(self.directories) > 1:
msg += " / [n]ext"
if len(options["urls"]) > 1:
msg += " / [s]kip target"
interface.in_line(msg + ": ")
option = input()
if option.lower() == "q":
interface.in_line("[s]ave / [q]uit without saving: ")
option = input()
if option.lower() == "s":
default_session_path = format_session_path(
options["session_file"] or DEFAULT_SESSION_FILE
)
msg = f"Save to file [{default_session_path}]: "
interface.in_line(msg)
session_file = format_session_path(input() or default_session_path)
self._export(session_file)
quitexc = QuitInterrupt(f"Session saved to: {session_file}")
if options["async_mode"]:
self.pause_future.set_exception(quitexc)
break
else:
raise quitexc
elif option.lower() == "q":
quitexc = QuitInterrupt("Canceled by the user")
if options["async_mode"]:
self.pause_future.set_exception(quitexc)
break
else:
raise quitexc
elif option.lower() == "c":
self._handling_pause = False
self._force_quit_handler.on_resume()
self.fuzzer.play()
break
elif option.lower() == "n" and len(self.directories) > 1:
self.fuzzer.quit()
break
elif option.lower() == "s" and len(options["urls"]) > 1:
skipexc = SkipTargetInterrupt("Target skipped by the user")
if options["async_mode"]:
self.pause_future.set_exception(skipexc)
break
else:
raise skipexc
finally:
pass
def add_directory(self, path: str) -> None:
"""Add directory to the recursion queue"""
if any(
path.startswith(dir) or "/" + dir in path
for dir in options["exclude_subdirs"]
):
return
url = self.url + path
if (
path.count("/") - self.base_path.count("/") > options["recursion_depth"] > 0
or url in self.passed_urls
):
return
self.directories.append(path)
self.passed_urls.add(url)
@locked
def recur(self, path: str) -> list[str]:
dirs_count = len(self.directories)
path = clean_path(path)
if options["force_recursive"] and not path.endswith("/"):
path += "/"
if options["deep_recursive"]:
i = 0
for _ in range(path.count("/")):
i = path.index("/", i) + 1
self.add_directory(path[:i])
elif (
options["recursive"]
and path.endswith("/")
and re.search(EXTENSION_RECOGNITION_REGEX, path[:-1]) is None
):
self.add_directory(path)
return self.directories[dirs_count:]
def recur_for_redirect(self, path: str, redirect_path: str) -> list[str]:
if redirect_path == path + "/":
return self.recur(redirect_path)
return []