Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
epsylon
GitHub Repository: epsylon/ufonet
Path: blob/master/core/tools/abductor.py
1208 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-"
3
"""
4
This file is part of the UFONet project, https://ufonet.03c8.net
5
6
Copyright (c) 2013/2020 | psy <[email protected]>
7
8
You should have received a copy of the GNU General Public License along
9
with UFONet; if not, write to the Free Software Foundation, Inc., 51
10
Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
11
"""
12
import ssl, random, socket, time, re
13
import urllib.request, urllib.error, urllib.parse
14
from urllib.parse import urlparse as urlparse
15
16
# UFONet recognizance (abduction) class
17
class Abductor(object):
18
def __init__(self,ufonet):
19
self.ufonet=ufonet
20
self.start = None
21
self.stop = None
22
self.port = None
23
self.ctx = ssl.create_default_context() # creating context to bypass SSL cert validation (black magic)
24
self.ctx.check_hostname = False
25
self.ctx.verify_mode = ssl.CERT_NONE
26
27
def proxy_transport(self, proxy):
28
proxy_url = self.ufonet.extract_proxy(proxy)
29
proxy = urllib.request.ProxyHandler({'https': proxy_url})
30
opener = urllib.request.build_opener(proxy)
31
urllib.request.install_opener(opener)
32
33
def establish_connection(self, target):
34
if target.endswith(""):
35
target.replace("", "/")
36
self.ufonet.user_agent = random.choice(self.ufonet.agents).strip() # shuffle user-agent
37
headers = {'User-Agent' : self.ufonet.user_agent, 'Referer' : self.ufonet.referer} # set fake user-agent and referer
38
try:
39
req = urllib.request.Request(target, None, headers)
40
if self.ufonet.options.proxy: # set proxy
41
self.proxy_transport(self.ufonet.options.proxy)
42
self.start = time.time()
43
target_reply = urllib.request.urlopen(req, context=self.ctx)
44
header = target_reply.getheaders()
45
target_reply = target_reply.read().decode('utf-8')
46
self.stop = time.time()
47
else:
48
self.start = time.time()
49
target_reply = urllib.request.urlopen(req, context=self.ctx)
50
header = target_reply.getheaders()
51
target_reply = target_reply.read().decode('utf-8')
52
self.stop = time.time()
53
except:
54
print('[Error] [AI] Unable to connect -> [Exiting!]\n')
55
return
56
return target_reply, header
57
58
def convert_size(self, size):
59
import math
60
if (size == 0):
61
return '0B'
62
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
63
i = int(math.floor(math.log(size,1024)))
64
p = math.pow(1024,i)
65
s = round(size/p,2)
66
return '%s %s' % (s,size_name[i])
67
68
def convert_time(self, time):
69
return '%.2f' % time
70
71
def extract_banner(self, header): # extract webserver banner
72
banner = None
73
via = None
74
try:
75
for h in header:
76
if h[0] == "Server":
77
banner = h[1]
78
except:
79
banner = "NOT found!"
80
try:
81
for h in header:
82
if h[0] == "Via":
83
via = h[1]
84
except: # return when fails performing query
85
via = "NOT found!"
86
if not banner:
87
banner = "NOT found!"
88
if not via:
89
via = "NOT found!"
90
return banner, via
91
92
def extract_whois(self, domain): # extract whois data from target domain
93
try:
94
import whois
95
d = whois.query(domain, ignore_returncode=True) # ignore return code
96
if d.creation_date is None: # return when no creation date
97
return
98
else:
99
print(" -Registrant : " + str(d.registrar))
100
print(" -Creation date: " + str(d.creation_date))
101
print(" -Expiration : " + str(d.expiration_date))
102
print(" -Last update : " + str(d.last_updated))
103
except: # return when fails performing query
104
return
105
106
def extract_cve(self, banner): # extract Denial of Service vulnerabilities related with webserver banner from CVE database
107
url = 'https://cve.mitre.org/cgi-bin/cvekey.cgi?keyword'
108
q = str(banner)
109
query_string = { '':q}
110
data = urllib.parse.urlencode(query_string)
111
target = url + data
112
try:
113
self.ufonet.user_agent = random.choice(self.ufonet.agents).strip() # shuffle user-agent
114
headers = {'User-Agent' : self.ufonet.user_agent, 'Referer' : self.ufonet.referer} # set fake user-agent and referer
115
req = urllib.request.Request(target, None, headers)
116
if self.ufonet.options.proxy: # set proxy
117
self.proxy_transport(self.ufonet.options.proxy)
118
target_reply = urllib.request.urlopen(req, context=self.ctx).read().decode('utf-8')
119
else:
120
target_reply = urllib.request.urlopen(req, context=self.ctx).read().decode('utf-8')
121
except:
122
return #sys.exit(2)
123
if target_reply == "": # no records found
124
return
125
if "<b>0</b> CVE entries" in target_reply: # regex for: no CVE records found
126
cve = "NOT found!"
127
else:
128
regex_s = '<td valign="top" nowrap="nowrap"><a href="(.+?)">' # regex magics
129
pattern_s = re.compile(regex_s)
130
cve = re.findall(pattern_s, target_reply)
131
return cve
132
133
def waf_detection(self, banner, target_reply):
134
self.wafs_file = "core/txt/wafs.txt" # set source path to retrieve 'wafs'
135
try:
136
f = open(self.wafs_file)
137
wafs = f.readlines()
138
f.close()
139
except:
140
wafs = "broken!"
141
sep = "##"
142
for w in wafs:
143
if sep in w:
144
w = w.split(sep)
145
signature = w[0] # signature
146
t = w[1] # vendor
147
if signature in target_reply or signature in banner:
148
waf = "VENDOR -> " + str(t)
149
else:
150
pass
151
else:
152
waf = "FIREWALL NOT PRESENT (or not discovered yet)! ;-)\n"
153
return waf
154
155
def abducting(self, target):
156
try:
157
target_reply, header = self.establish_connection(target)
158
except:
159
print("[Error] [AI] Something wrong connecting to your target -> [Aborting!]\n")
160
return #sys.exit(2)
161
if not target_reply:
162
print("[Error] [AI] Something wrong connecting to your target -> [Aborting!]\n")
163
return #sys.exit(2)
164
print(' -Target URL:', target, "\n")
165
try:
166
if target.startswith("http://"):
167
self.port = "80"
168
if target.startswith("https://"):
169
self.port = "443"
170
except:
171
self.port = "Error!"
172
try:
173
domain = urlparse(target)
174
domain = domain.netloc
175
if domain.startswith("www."):
176
domain = domain.replace("www.", "")
177
except:
178
domain = "OFF"
179
try:
180
ipv4 = socket.gethostbyname(domain)
181
except:
182
try:
183
try: # extra resolver plan extracted from Orb (https://orb.03c8.net/) [24/12/2018 -> OK!]
184
import dns.resolver
185
r = dns.resolver.Resolver()
186
r.nameservers = ['8.8.8.8', '8.8.4.4'] # google DNS resolvers
187
url = urlparse(domain)
188
a = r.query(url.netloc, "A") # A record
189
for rd in a:
190
ipv4 = str(rd)
191
except:
192
ipv4 = "OFF"
193
except:
194
ipv4 = "OFF"
195
try:
196
ipv6 = socket.getaddrinfo(domain, int(self.port), socket.AF_INET6, socket.IPPROTO_TCP)
197
ftpca = ipv6[0]
198
ipv6 = ftpca[4][0]
199
except:
200
ipv6 = "OFF"
201
print(' -IP :', ipv4)
202
print(' -IPv6 :', ipv6)
203
print(' -Port :', self.port)
204
print(' \n -Domain:', domain)
205
try:
206
whois = self.extract_whois(domain)
207
except:
208
pass
209
try:
210
size = self.convert_size(len(target_reply))
211
except:
212
size = "Error!"
213
try:
214
time_required = self.stop - self.start
215
load = self.convert_time(time_required)
216
except:
217
load = "Error!"
218
try:
219
banner, via = self.extract_banner(header)
220
except:
221
pass
222
print('\n---------')
223
print("\nTrying single visit broadband test (using GET)...\n")
224
print(' -Bytes in :', size)
225
print(' -Load time:', load, "seconds\n")
226
print('---------')
227
print("\nDetermining webserver fingerprint (note that this value can be a fake)...\n")
228
print(' -Banner:', banner)
229
print(' -Vía :', via , "\n")
230
print('---------')
231
print("\nSearching for extra Anti-DDoS protections...\n")
232
waf = self.waf_detection(banner, target_reply)
233
print(' -WAF/IDS: ' + waf)
234
if 'VENDOR' in waf:
235
print(' -NOTICE : This FIREWALL probably is using Anti-(D)DoS measures!', "\n")
236
print('---------')
237
if banner == "NOT found!":
238
pass
239
else:
240
print("\nSearching at CVE (https://cve.mitre.org) for vulnerabilities...\n")
241
try:
242
cve = self.extract_cve(banner)
243
if cve == None:
244
print(' -Last Reports: NOT found!', "\n")
245
elif cve == "NOT found!":
246
print(' -Last Reports:', cve)
247
else:
248
print(' -Last Reports:')
249
i = 0
250
for c in cve:
251
i = i + 1
252
if i < 11:
253
cve_info = c.replace("/cgi-bin/cvename.cgi?name=","")
254
print("\n +", cve_info, "->", "https://cve.mitre.org" + c) # 8 tab for zen
255
print('\n---------')
256
except:
257
pass
258
print("\n[Info] [AI] Abduction finished! -> [OK!]\n")
259
260