Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/lib/utils/httpd.py
2989 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
from __future__ import print_function
9
10
import mimetypes
11
import gzip
12
import os
13
import re
14
import sys
15
import threading
16
import time
17
import traceback
18
19
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
20
21
from lib.core.enums import HTTP_HEADER
22
from lib.core.settings import UNICODE_ENCODING
23
from lib.core.settings import VERSION_STRING
24
from thirdparty import six
25
from thirdparty.six.moves import BaseHTTPServer as _BaseHTTPServer
26
from thirdparty.six.moves import http_client as _http_client
27
from thirdparty.six.moves import socketserver as _socketserver
28
from thirdparty.six.moves import urllib as _urllib
29
30
HTTP_ADDRESS = "0.0.0.0"
31
HTTP_PORT = 8951
32
DEBUG = True
33
HTML_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "data", "html"))
34
DISABLED_CONTENT_EXTENSIONS = (".py", ".pyc", ".md", ".txt", ".bak", ".conf", ".zip", "~")
35
36
class ThreadingServer(_socketserver.ThreadingMixIn, _BaseHTTPServer.HTTPServer):
37
def finish_request(self, *args, **kwargs):
38
try:
39
_BaseHTTPServer.HTTPServer.finish_request(self, *args, **kwargs)
40
except Exception:
41
if DEBUG:
42
traceback.print_exc()
43
44
class ReqHandler(_BaseHTTPServer.BaseHTTPRequestHandler):
45
def do_GET(self):
46
path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "")
47
params = {}
48
content = None
49
50
if query:
51
params.update(_urllib.parse.parse_qs(query))
52
53
for key in params:
54
if params[key]:
55
params[key] = params[key][-1]
56
57
self.url, self.params = path, params
58
59
if path == '/':
60
path = "index.html"
61
62
path = path.strip('/')
63
64
path = path.replace('/', os.path.sep)
65
path = os.path.abspath(os.path.join(HTML_DIR, path)).strip()
66
67
if not os.path.isfile(path) and os.path.isfile("%s.html" % path):
68
path = "%s.html" % path
69
70
if ".." not in os.path.relpath(path, HTML_DIR) and os.path.isfile(path) and not path.endswith(DISABLED_CONTENT_EXTENSIONS):
71
content = open(path, "rb").read()
72
self.send_response(_http_client.OK)
73
self.send_header(HTTP_HEADER.CONNECTION, "close")
74
self.send_header(HTTP_HEADER.CONTENT_TYPE, mimetypes.guess_type(path)[0] or "application/octet-stream")
75
else:
76
content = ("<!DOCTYPE html><html lang=\"en\"><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested URL %s was not found on this server.</p></body></html>" % self.path.split('?')[0]).encode(UNICODE_ENCODING)
77
self.send_response(_http_client.NOT_FOUND)
78
self.send_header(HTTP_HEADER.CONNECTION, "close")
79
80
if content is not None:
81
for match in re.finditer(b"<!(\\w+)!>", content):
82
name = match.group(1)
83
_ = getattr(self, "_%s" % name.lower(), None)
84
if _:
85
content = self._format(content, **{name: _()})
86
87
if "gzip" in self.headers.get(HTTP_HEADER.ACCEPT_ENCODING):
88
self.send_header(HTTP_HEADER.CONTENT_ENCODING, "gzip")
89
_ = six.BytesIO()
90
compress = gzip.GzipFile("", "w+b", 9, _)
91
compress._stream = _
92
compress.write(content)
93
compress.flush()
94
compress.close()
95
content = compress._stream.getvalue()
96
97
self.send_header(HTTP_HEADER.CONTENT_LENGTH, str(len(content)))
98
99
self.end_headers()
100
101
if content:
102
self.wfile.write(content)
103
104
self.wfile.flush()
105
106
def _format(self, content, **params):
107
if content:
108
for key, value in params.items():
109
content = content.replace("<!%s!>" % key, value)
110
111
return content
112
113
def version_string(self):
114
return VERSION_STRING
115
116
def log_message(self, format, *args):
117
return
118
119
def finish(self):
120
try:
121
_BaseHTTPServer.BaseHTTPRequestHandler.finish(self)
122
except Exception:
123
if DEBUG:
124
traceback.print_exc()
125
126
def start_httpd():
127
server = ThreadingServer((HTTP_ADDRESS, HTTP_PORT), ReqHandler)
128
thread = threading.Thread(target=server.serve_forever)
129
thread.daemon = True
130
thread.start()
131
132
print("[i] running HTTP server at '%s:%d'" % (HTTP_ADDRESS, HTTP_PORT))
133
134
if __name__ == "__main__":
135
try:
136
start_httpd()
137
138
while True:
139
time.sleep(1)
140
except KeyboardInterrupt:
141
pass
142
143