Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/lib/utils/sqlalchemy.py
2989 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
import importlib
9
import logging
10
import os
11
import re
12
import sys
13
import traceback
14
import warnings
15
16
_path = list(sys.path)
17
_sqlalchemy = None
18
try:
19
sys.path = sys.path[1:]
20
module = importlib.import_module("sqlalchemy")
21
if hasattr(module, "dialects"):
22
_sqlalchemy = module
23
warnings.simplefilter(action="ignore", category=_sqlalchemy.exc.SAWarning)
24
except:
25
pass
26
finally:
27
sys.path = _path
28
29
try:
30
import MySQLdb # used by SQLAlchemy in case of MySQL
31
warnings.filterwarnings("error", category=MySQLdb.Warning)
32
except (ImportError, AttributeError):
33
pass
34
35
from lib.core.data import conf
36
from lib.core.data import logger
37
from lib.core.exception import SqlmapConnectionException
38
from lib.core.exception import SqlmapFilePathException
39
from lib.core.exception import SqlmapMissingDependence
40
from plugins.generic.connector import Connector as GenericConnector
41
from thirdparty import six
42
from thirdparty.six.moves import urllib as _urllib
43
44
def getSafeExString(ex, encoding=None): # Cross-referenced function
45
raise NotImplementedError
46
47
class SQLAlchemy(GenericConnector):
48
def __init__(self, dialect=None):
49
GenericConnector.__init__(self)
50
51
self.dialect = dialect
52
self.address = conf.direct
53
54
if conf.dbmsUser:
55
self.address = self.address.replace("'%s':" % conf.dbmsUser, "%s:" % _urllib.parse.quote(conf.dbmsUser))
56
self.address = self.address.replace("%s:" % conf.dbmsUser, "%s:" % _urllib.parse.quote(conf.dbmsUser))
57
58
if conf.dbmsPass:
59
self.address = self.address.replace(":'%s'@" % conf.dbmsPass, ":%s@" % _urllib.parse.quote(conf.dbmsPass))
60
self.address = self.address.replace(":%s@" % conf.dbmsPass, ":%s@" % _urllib.parse.quote(conf.dbmsPass))
61
62
if self.dialect:
63
self.address = re.sub(r"\A.+://", "%s://" % self.dialect, self.address)
64
65
def connect(self):
66
if _sqlalchemy:
67
self.initConnection()
68
69
try:
70
if not self.port and self.db:
71
if not os.path.exists(self.db):
72
raise SqlmapFilePathException("the provided database file '%s' does not exist" % self.db)
73
74
_ = self.address.split("//", 1)
75
self.address = "%s////%s" % (_[0], os.path.abspath(self.db))
76
77
if self.dialect == "sqlite":
78
engine = _sqlalchemy.create_engine(self.address, connect_args={"check_same_thread": False})
79
elif self.dialect == "oracle":
80
engine = _sqlalchemy.create_engine(self.address)
81
else:
82
engine = _sqlalchemy.create_engine(self.address, connect_args={})
83
84
self.connector = engine.connect()
85
except (TypeError, ValueError):
86
if "_get_server_version_info" in traceback.format_exc():
87
try:
88
import pymssql
89
if int(pymssql.__version__[0]) < 2:
90
raise SqlmapConnectionException("SQLAlchemy connection issue (obsolete version of pymssql ('%s') is causing problems)" % pymssql.__version__)
91
except ImportError:
92
pass
93
elif "invalid literal for int() with base 10: '0b" in traceback.format_exc():
94
raise SqlmapConnectionException("SQLAlchemy connection issue ('https://bitbucket.org/zzzeek/sqlalchemy/issues/3975')")
95
else:
96
pass
97
except SqlmapFilePathException:
98
raise
99
except Exception as ex:
100
raise SqlmapConnectionException("SQLAlchemy connection issue ('%s')" % getSafeExString(ex))
101
102
self.printConnected()
103
else:
104
raise SqlmapMissingDependence("SQLAlchemy not available (e.g. 'pip%s install SQLAlchemy')" % ('3' if six.PY3 else ""))
105
106
def fetchall(self):
107
try:
108
retVal = []
109
for row in self.cursor.fetchall():
110
retVal.append(tuple(row))
111
return retVal
112
except _sqlalchemy.exc.ProgrammingError as ex:
113
logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % getSafeExString(ex))
114
return None
115
116
def execute(self, query):
117
retVal = False
118
119
# Reference: https://stackoverflow.com/a/69491015
120
if hasattr(_sqlalchemy, "text"):
121
query = _sqlalchemy.text(query)
122
123
try:
124
self.cursor = self.connector.execute(query)
125
retVal = True
126
except (_sqlalchemy.exc.OperationalError, _sqlalchemy.exc.ProgrammingError) as ex:
127
logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % getSafeExString(ex))
128
except _sqlalchemy.exc.InternalError as ex:
129
raise SqlmapConnectionException(getSafeExString(ex))
130
131
return retVal
132
133
def select(self, query):
134
retVal = None
135
136
if self.execute(query):
137
retVal = self.fetchall()
138
139
return retVal
140
141