Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/lib/utils/har.py
2989 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
import base64
9
import datetime
10
import io
11
import re
12
import time
13
14
from lib.core.bigarray import BigArray
15
from lib.core.convert import getBytes
16
from lib.core.convert import getText
17
from lib.core.settings import VERSION
18
from thirdparty.six.moves import BaseHTTPServer as _BaseHTTPServer
19
from thirdparty.six.moves import http_client as _http_client
20
21
# Reference: https://dvcs.w3.org/hg/webperf/raw-file/tip/specs/HAR/Overview.html
22
# http://www.softwareishard.com/har/viewer/
23
24
class HTTPCollectorFactory(object):
25
def __init__(self, harFile=False):
26
self.harFile = harFile
27
28
def create(self):
29
return HTTPCollector()
30
31
class HTTPCollector(object):
32
def __init__(self):
33
self.messages = BigArray()
34
self.extendedArguments = {}
35
36
def setExtendedArguments(self, arguments):
37
self.extendedArguments = arguments
38
39
def collectRequest(self, requestMessage, responseMessage, startTime=None, endTime=None):
40
self.messages.append(RawPair(requestMessage, responseMessage,
41
startTime=startTime, endTime=endTime,
42
extendedArguments=self.extendedArguments))
43
44
def obtain(self):
45
return {"log": {
46
"version": "1.2",
47
"creator": {"name": "sqlmap", "version": VERSION},
48
"entries": [pair.toEntry().toDict() for pair in self.messages],
49
}}
50
51
class RawPair(object):
52
def __init__(self, request, response, startTime=None, endTime=None, extendedArguments=None):
53
self.request = getBytes(request)
54
self.response = getBytes(response)
55
self.startTime = startTime
56
self.endTime = endTime
57
self.extendedArguments = extendedArguments or {}
58
59
def toEntry(self):
60
return Entry(request=Request.parse(self.request), response=Response.parse(self.response),
61
startTime=self.startTime, endTime=self.endTime,
62
extendedArguments=self.extendedArguments)
63
64
class Entry(object):
65
def __init__(self, request, response, startTime, endTime, extendedArguments):
66
self.request = request
67
self.response = response
68
self.startTime = startTime or 0
69
self.endTime = endTime or 0
70
self.extendedArguments = extendedArguments
71
72
def toDict(self):
73
out = {
74
"request": self.request.toDict(),
75
"response": self.response.toDict(),
76
"cache": {},
77
"timings": {
78
"send": -1,
79
"wait": -1,
80
"receive": -1,
81
},
82
"time": int(1000 * (self.endTime - self.startTime)),
83
"startedDateTime": "%s%s" % (datetime.datetime.fromtimestamp(self.startTime).isoformat(), time.strftime("%z")) if self.startTime else None
84
}
85
out.update(self.extendedArguments)
86
return out
87
88
class Request(object):
89
def __init__(self, method, path, httpVersion, headers, postBody=None, raw=None, comment=None):
90
self.method = method
91
self.path = path
92
self.httpVersion = httpVersion
93
self.headers = headers or {}
94
self.postBody = postBody
95
self.comment = comment.strip() if comment else comment
96
self.raw = raw
97
98
@classmethod
99
def parse(cls, raw):
100
request = HTTPRequest(raw)
101
return cls(method=request.command,
102
path=request.path,
103
httpVersion=request.request_version,
104
headers=request.headers,
105
postBody=request.rfile.read(),
106
comment=request.comment,
107
raw=raw)
108
109
@property
110
def url(self):
111
host = self.headers.get("Host", "unknown")
112
return "http://%s%s" % (host, self.path)
113
114
def toDict(self):
115
out = {
116
"httpVersion": self.httpVersion,
117
"method": self.method,
118
"url": self.url,
119
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items()],
120
"cookies": [],
121
"queryString": [],
122
"headersSize": -1,
123
"bodySize": -1,
124
"comment": getText(self.comment),
125
}
126
127
if self.postBody:
128
contentType = self.headers.get("Content-Type")
129
out["postData"] = {
130
"mimeType": contentType,
131
"text": getText(self.postBody).rstrip("\r\n"),
132
}
133
134
return out
135
136
class Response(object):
137
extract_status = re.compile(b'\\((\\d{3}) (.*)\\)')
138
139
def __init__(self, httpVersion, status, statusText, headers, content, raw=None, comment=None):
140
self.raw = raw
141
self.httpVersion = httpVersion
142
self.status = status
143
self.statusText = statusText
144
self.headers = headers
145
self.content = content
146
self.comment = comment.strip() if comment else comment
147
148
@classmethod
149
def parse(cls, raw):
150
altered = raw
151
comment = b""
152
153
if altered.startswith(b"HTTP response [") or altered.startswith(b"HTTP redirect ["):
154
stream = io.BytesIO(raw)
155
first_line = stream.readline()
156
parts = cls.extract_status.search(first_line)
157
status_line = "HTTP/1.0 %s %s" % (getText(parts.group(1)), getText(parts.group(2)))
158
remain = stream.read()
159
altered = getBytes(status_line) + b"\r\n" + remain
160
comment = first_line
161
162
response = _http_client.HTTPResponse(FakeSocket(altered))
163
response.begin()
164
165
# NOTE: https://github.com/sqlmapproject/sqlmap/issues/5942
166
response.length = len(raw[raw.find(b"\r\n\r\n") + 4:])
167
168
try:
169
content = response.read()
170
except _http_client.IncompleteRead:
171
content = raw[raw.find(b"\r\n\r\n") + 4:].rstrip(b"\r\n")
172
173
return cls(httpVersion="HTTP/1.1" if response.version == 11 else "HTTP/1.0",
174
status=response.status,
175
statusText=response.reason,
176
headers=response.msg,
177
content=content,
178
comment=comment,
179
raw=raw)
180
181
def toDict(self):
182
content = {
183
"mimeType": self.headers.get("Content-Type"),
184
"text": self.content,
185
"size": len(self.content or "")
186
}
187
188
binary = set([b'\0', b'\1'])
189
if any(c in binary for c in self.content):
190
content["encoding"] = "base64"
191
content["text"] = getText(base64.b64encode(self.content))
192
else:
193
content["text"] = getText(content["text"])
194
195
return {
196
"httpVersion": self.httpVersion,
197
"status": self.status,
198
"statusText": self.statusText,
199
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items() if key.lower() != "uri"],
200
"cookies": [],
201
"content": content,
202
"headersSize": -1,
203
"bodySize": -1,
204
"redirectURL": "",
205
"comment": getText(self.comment),
206
}
207
208
class FakeSocket(object):
209
# Original source:
210
# https://stackoverflow.com/questions/24728088/python-parse-http-response-string
211
212
def __init__(self, response_text):
213
self._file = io.BytesIO(response_text)
214
215
def makefile(self, *args, **kwargs):
216
return self._file
217
218
class HTTPRequest(_BaseHTTPServer.BaseHTTPRequestHandler):
219
# Original source:
220
# https://stackoverflow.com/questions/4685217/parse-raw-http-headers
221
222
def __init__(self, request_text):
223
self.comment = None
224
self.rfile = io.BytesIO(request_text)
225
self.raw_requestline = self.rfile.readline()
226
227
if self.raw_requestline.startswith(b"HTTP request ["):
228
self.comment = self.raw_requestline
229
self.raw_requestline = self.rfile.readline()
230
231
self.error_code = self.error_message = None
232
self.parse_request()
233
234
def send_error(self, code, message):
235
self.error_code = code
236
self.error_message = message
237
238