Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/extra/vulnserver/vulnserver.py
2992 views
1
#!/usr/bin/env python
2
3
"""
4
vulnserver.py - Trivial SQLi vulnerable HTTP server (Note: for testing purposes)
5
6
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
7
See the file 'LICENSE' for copying permission
8
"""
9
10
from __future__ import print_function
11
12
import base64
13
import json
14
import re
15
import sqlite3
16
import sys
17
import threading
18
import traceback
19
20
PY3 = sys.version_info >= (3, 0)
21
UNICODE_ENCODING = "utf-8"
22
DEBUG = False
23
24
if PY3:
25
from http.client import INTERNAL_SERVER_ERROR
26
from http.client import NOT_FOUND
27
from http.client import OK
28
from http.server import BaseHTTPRequestHandler
29
from http.server import HTTPServer
30
from socketserver import ThreadingMixIn
31
from urllib.parse import parse_qs
32
from urllib.parse import unquote_plus
33
else:
34
from BaseHTTPServer import BaseHTTPRequestHandler
35
from BaseHTTPServer import HTTPServer
36
from httplib import INTERNAL_SERVER_ERROR
37
from httplib import NOT_FOUND
38
from httplib import OK
39
from SocketServer import ThreadingMixIn
40
from urlparse import parse_qs
41
from urllib import unquote_plus
42
43
SCHEMA = """
44
CREATE TABLE users (
45
id INTEGER,
46
name TEXT,
47
surname TEXT,
48
PRIMARY KEY (id)
49
);
50
INSERT INTO users (id, name, surname) VALUES (1, 'luther', 'blisset');
51
INSERT INTO users (id, name, surname) VALUES (2, 'fluffy', 'bunny');
52
INSERT INTO users (id, name, surname) VALUES (3, 'wu', '179ad45c6ce2cb97cf1029e212046e81');
53
INSERT INTO users (id, name, surname) VALUES (4, 'sqlmap/1.0-dev (https://sqlmap.org)', 'user agent header');
54
INSERT INTO users (id, name, surname) VALUES (5, NULL, 'nameisnull');
55
"""
56
57
LISTEN_ADDRESS = "localhost"
58
LISTEN_PORT = 8440
59
60
_conn = None
61
_cursor = None
62
_lock = None
63
_server = None
64
_alive = False
65
66
def init(quiet=False):
67
global _conn
68
global _cursor
69
global _lock
70
71
_conn = sqlite3.connect(":memory:", isolation_level=None, check_same_thread=False)
72
_cursor = _conn.cursor()
73
_lock = threading.Lock()
74
75
_cursor.executescript(SCHEMA)
76
77
if quiet:
78
global print
79
80
def _(*args, **kwargs):
81
pass
82
83
print = _
84
85
class ThreadingServer(ThreadingMixIn, HTTPServer):
86
def finish_request(self, *args, **kwargs):
87
try:
88
HTTPServer.finish_request(self, *args, **kwargs)
89
except Exception:
90
if DEBUG:
91
traceback.print_exc()
92
93
class ReqHandler(BaseHTTPRequestHandler):
94
def do_REQUEST(self):
95
path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "")
96
params = {}
97
98
if query:
99
params.update(parse_qs(query))
100
101
if "<script>" in unquote_plus(query):
102
self.send_response(INTERNAL_SERVER_ERROR)
103
self.send_header("X-Powered-By", "Express")
104
self.send_header("Connection", "close")
105
self.end_headers()
106
self.wfile.write("CLOUDFLARE_ERROR_500S_BOX".encode(UNICODE_ENCODING))
107
return
108
109
if hasattr(self, "data"):
110
if self.data.startswith('{') and self.data.endswith('}'):
111
params.update(json.loads(self.data))
112
elif self.data.startswith('<') and self.data.endswith('>'):
113
params.update(dict((_[0], _[1].replace("&apos;", "'").replace("&quot;", '"').replace("&lt;", '<').replace("&gt;", '>').replace("&amp;", '&')) for _ in re.findall(r'name="([^"]+)" value="([^"]*)"', self.data)))
114
else:
115
self.data = self.data.replace(';', '&') # Note: seems that Python3 started ignoring parameter splitting with ';'
116
params.update(parse_qs(self.data))
117
118
for name in self.headers:
119
params[name.lower()] = self.headers[name]
120
121
if "cookie" in params:
122
for part in params["cookie"].split(';'):
123
part = part.strip()
124
if '=' in part:
125
name, value = part.split('=', 1)
126
params[name.strip()] = unquote_plus(value.strip())
127
128
for key in params:
129
if params[key] and isinstance(params[key], (tuple, list)):
130
params[key] = params[key][-1]
131
132
self.url, self.params = path, params
133
134
if self.url == '/':
135
if not any(_ in self.params for _ in ("id", "query")):
136
self.send_response(OK)
137
self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING)
138
self.send_header("Connection", "close")
139
self.end_headers()
140
self.wfile.write(b"<!DOCTYPE html><html><head><title>vulnserver</title></head><body><h3>GET:</h3><a href='/?id=1'>link</a><hr><h3>POST:</h3><form method='post'>ID: <input type='text' name='id'><input type='submit' value='Submit'></form></body></html>")
141
else:
142
code, output = OK, ""
143
144
try:
145
if self.params.get("echo", ""):
146
output += "%s<br>" % self.params["echo"]
147
148
if self.params.get("reflect", ""):
149
output += "%s<br>" % self.params.get("id")
150
151
with _lock:
152
if "query" in self.params:
153
_cursor.execute(self.params["query"])
154
elif "id" in self.params:
155
if "base64" in self.params:
156
_cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % base64.b64decode("%s===" % self.params["id"], altchars=self.params.get("altchars")).decode())
157
else:
158
_cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % self.params["id"])
159
results = _cursor.fetchall()
160
161
output += "<b>SQL results:</b><br>\n"
162
163
if self.params.get("code", ""):
164
if not results:
165
code = INTERNAL_SERVER_ERROR
166
else:
167
if results:
168
output += "<table border=\"1\">\n"
169
170
for row in results:
171
output += "<tr>"
172
for value in row:
173
output += "<td>%s</td>" % value
174
output += "</tr>\n"
175
176
output += "</table>\n"
177
else:
178
output += "no results found"
179
180
output += "</body></html>"
181
except Exception as ex:
182
code = INTERNAL_SERVER_ERROR
183
output = "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex)
184
185
self.send_response(code)
186
187
self.send_header("Content-type", "text/html")
188
self.send_header("Connection", "close")
189
190
if self.raw_requestline.startswith(b"HEAD"):
191
self.send_header("Content-Length", str(len(output)))
192
self.end_headers()
193
else:
194
self.end_headers()
195
self.wfile.write(output if isinstance(output, bytes) else output.encode(UNICODE_ENCODING))
196
else:
197
self.send_response(NOT_FOUND)
198
self.send_header("Connection", "close")
199
self.end_headers()
200
201
def do_GET(self):
202
self.do_REQUEST()
203
204
def do_PUT(self):
205
self.do_POST()
206
207
def do_HEAD(self):
208
self.do_REQUEST()
209
210
def do_POST(self):
211
length = int(self.headers.get("Content-length", 0))
212
if length:
213
data = self.rfile.read(length)
214
data = unquote_plus(data.decode(UNICODE_ENCODING, "ignore"))
215
self.data = data
216
elif self.headers.get("Transfer-encoding") == "chunked":
217
data, line = b"", b""
218
count = 0
219
220
while True:
221
line += self.rfile.read(1)
222
if line.endswith(b'\n'):
223
if count % 2 == 1:
224
current = line.rstrip(b"\r\n")
225
if not current:
226
break
227
else:
228
data += current
229
230
count += 1
231
line = b""
232
233
self.data = data.decode(UNICODE_ENCODING, "ignore")
234
235
self.do_REQUEST()
236
237
def log_message(self, format, *args):
238
return
239
240
def run(address=LISTEN_ADDRESS, port=LISTEN_PORT):
241
global _alive
242
global _server
243
try:
244
_alive = True
245
_server = ThreadingServer((address, port), ReqHandler)
246
print("[i] running HTTP server at 'http://%s:%d'" % (address, port))
247
_server.serve_forever()
248
except KeyboardInterrupt:
249
_server.socket.close()
250
raise
251
finally:
252
_alive = False
253
254
if __name__ == "__main__":
255
try:
256
init()
257
run(sys.argv[1] if len(sys.argv) > 1 else LISTEN_ADDRESS, int(sys.argv[2] if len(sys.argv) > 2 else LISTEN_PORT))
258
except KeyboardInterrupt:
259
print("\r[x] Ctrl-C received")
260
261