Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
malwaredllc
GitHub Repository: malwaredllc/byob
Path: blob/master/web-gui/tests/unit/test_dao.py
1292 views
1
import pytest
2
from hashlib import md5
3
from random import getrandbits
4
from datetime import datetime
5
6
from buildyourownbotnet.core.dao import user_dao, session_dao, task_dao, payload_dao, file_dao
7
from buildyourownbotnet.models import db, bcrypt, User, Payload, Session, Task, ExfiltratedFile
8
from ..conftest import app_client, new_user, new_session
9
10
def test_add_user(app_client):
11
"""
12
Given a username and hashed password,
13
when the user_dao.add_user method is called,
14
check the user data is added to the database correctly.
15
"""
16
try:
17
test_username = 'test_user'
18
test_password = 'test_password'
19
test_hashed_password = bcrypt.generate_password_hash(test_password).decode('utf-8')
20
user = user_dao.add_user(username=test_username, hashed_password=test_hashed_password)
21
except Exception as e:
22
pytest.fail("user_dao.add_user returned exception: " + str(e))
23
assert user.username == test_username
24
assert user.password == test_hashed_password
25
assert bcrypt.check_password_hash(user.password, test_password)
26
27
# clean up
28
User.query.delete()
29
db.session.commit()
30
31
def test_get_user(app_client, new_user):
32
"""
33
Given a user id or username,
34
when the user_dao.get_user method is called,
35
check that we can fetch the user data from the database.
36
"""
37
assert user_dao.get_user(user_id=new_user.id) is not None
38
assert user_dao.get_user(username=new_user.username) is not None
39
40
def test_get_user_sessions(app_client, new_user):
41
"""
42
Given a user,
43
when session_dao.get_user_sessions is called,
44
check that user sessions are returned from the database correctly.
45
"""
46
# check for valid user
47
assert len(session_dao.get_user_sessions(new_user.id)) == 0
48
49
# check for invalid user
50
assert len(session_dao.get_user_sessions(-1)) == 0
51
52
def test_get_user_sessions_new(app_client, new_session):
53
"""
54
Given a user,
55
when the session_dao.get_user_sessions_new is called,
56
check the user's new sessions are fetched and their 'new' attribute is updated to false in the database.
57
"""
58
# get session owner (user)
59
user = user_dao.get_user(username=new_session.owner)
60
assert user is not None
61
62
# get users's new sessions and test 'new' attribute has been toggled to false
63
new_user_sessions = session_dao.get_user_sessions_new(user.id)
64
assert len(new_user_sessions) > 0
65
assert all(s.new is False for s in user.sessions)
66
67
def test_handle_session(app_client, new_user):
68
"""
69
Given a new user,
70
when a new user is created via session_dao.handle_session function,
71
then check the session metadata is stored in the database correctly.
72
"""
73
# add test session (without uid)
74
uid = md5(bytes(getrandbits(10))).hexdigest()
75
input_session_dict = {
76
"online": True,
77
"joined": datetime.utcnow(),
78
"last_online": datetime.utcnow(),
79
"public_ip": '1.2.3.4',
80
"local_ip": '192.1.1.168',
81
"mac_address": '00:0A:95:9D:68:16',
82
"username": 'test_user',
83
"administrator": True,
84
"platform": 'linux2',
85
"device": 'test_device',
86
"architecture": 'x32',
87
"latitude": 0.00,
88
"longitude": 0.00,
89
"owner": new_user.username,
90
}
91
try:
92
output_session_dict = session_dao.handle_session(input_session_dict)
93
except Exception as e:
94
pytest.fail("dao.handle_session exception handling new session: " + str(e))
95
96
# check server assigned uid
97
assert 'uid' in output_session_dict
98
uid = output_session_dict['uid']
99
100
# run tests
101
session = session_dao.get_session(uid)
102
assert session.owner == new_user.username
103
assert session.uid == uid
104
assert session.online is True
105
assert (datetime.utcnow() - session.joined).seconds <= 5
106
assert (datetime.utcnow() - session.last_online).seconds <= 5
107
assert session.public_ip == '1.2.3.4'
108
assert session.local_ip == '192.1.1.168'
109
assert session.mac_address == '00:0A:95:9D:68:16'
110
assert session.username == 'test_user'
111
assert session.administrator is True
112
assert session.platform == 'linux2'
113
assert session.device == 'test_device'
114
assert session.architecture == 'x32'
115
assert session.longitude == 0.00
116
assert session.latitude == 0.00
117
118
# add test session (with uid)
119
uid = md5(bytes(getrandbits(10))).hexdigest()
120
input_session_dict = {
121
"uid": uid,
122
"online": True,
123
"joined": datetime.utcnow(),
124
"last_online": datetime.utcnow(),
125
"public_ip": '5.6.7.8',
126
"local_ip": '192.1.1.168',
127
"mac_address": '00:0A:95:9D:68:16',
128
"username": 'test_user',
129
"administrator": True,
130
"platform": 'linux2',
131
"device": 'test_device',
132
"architecture": 'x32',
133
"latitude": 0.00,
134
"longitude": 0.00,
135
"owner": new_user.username,
136
}
137
try:
138
output_session_dict = session_dao.handle_session(input_session_dict)
139
except Exception as e:
140
pytest.fail("dao.handle_session exception handling existing session: " + str(e))
141
142
# run tests
143
session = session_dao.get_session(uid)
144
assert session.owner == new_user.username
145
assert session.uid == uid
146
assert session.online is True
147
assert (datetime.utcnow() - session.joined).seconds <= 5
148
assert (datetime.utcnow() - session.last_online).seconds <= 5
149
assert session.public_ip == '5.6.7.8'
150
assert session.local_ip == '192.1.1.168'
151
assert session.mac_address == '00:0A:95:9D:68:16'
152
assert session.username == 'test_user'
153
assert session.administrator is True
154
assert session.platform == 'linux2'
155
assert session.device == 'test_device'
156
assert session.architecture == 'x32'
157
assert session.longitude == 0.00
158
assert session.latitude == 0.00
159
160
def test_delete_session(app_client, new_session):
161
"""
162
Given a session,
163
when the session_dao.delete_session method is called,
164
check the session was removed from the database correctly.
165
"""
166
try:
167
session_dao.delete_session(new_session.uid)
168
except Exception as e:
169
pytest.fail("session_dao.delete_session returned an exception: " + str(e))
170
assert session_dao.get_session(new_session.uid) is None
171
172
def test_handle_task(app_client, new_session):
173
"""
174
Given a session,
175
when the task_dao.handle_task method is called from a session,
176
check the new task is issued a UID, an issued timestamp,
177
and the metadata is stored in the database correctly.
178
"""
179
# 1. test new task
180
input_task_dict = {
181
"session": new_session.uid,
182
"task": "whoami",
183
}
184
try:
185
output_task_dict = task_dao.handle_task(input_task_dict)
186
except Exception as e:
187
pytest.fail("dao.handle_task exception handling new task: " + str(e))
188
189
# run tests
190
tasks = task_dao.get_session_tasks(new_session.uid)
191
assert len(tasks) == 1
192
task = task_dao.get_task(output_task_dict['uid'])
193
assert len(task.uid) == 32
194
assert task.session == new_session.uid
195
assert task.task == 'whoami'
196
assert (datetime.utcnow() - task.issued).seconds <= 2
197
198
def test_handle_completed_task(app_client, new_session):
199
"""
200
Given a session,
201
when the task_dao.handle_task method is called for a completed task,
202
ensure the existing task metadata is updated correctly in the database.
203
"""
204
# issue test task
205
input_task_dict = {
206
"session": new_session.uid,
207
"task": "whoami"
208
}
209
output_task_dict = task_dao.handle_task(input_task_dict)
210
211
# complete test task
212
output_task_dict['result'] = 'test_result'
213
try:
214
completed_task_dict = task_dao.handle_task(output_task_dict)
215
except Exception as e:
216
pytest.fail("dao.handle_task exception handling completed task: " + str(e))
217
218
# run tests
219
assert 'uid' in completed_task_dict
220
task = task_dao.get_task(output_task_dict['uid'])
221
assert task.result == 'test_result'
222
assert task.completed is not None
223
assert (datetime.utcnow() - task.completed).seconds <= 5
224
225
def test_handle_invalid_task(app_client):
226
"""
227
Given a session,
228
when the task_dao.handle_task method is called with an invalid task,
229
check that there is no exception and it is handled gracefully.
230
"""
231
try:
232
invalid_task_dict = task_dao.handle_task('invalid task - not a dict')
233
except Exception as e:
234
pytest.fail("dao.handle_task exception handling invalid task: " + str(e))
235
assert isinstance(invalid_task_dict, dict)
236
assert 'result' in invalid_task_dict
237
assert 'Error' in invalid_task_dict['result']
238
239
def test_update_session_status(app_client, new_session):
240
"""
241
Given a session,
242
when the session_dao.update_session_status is called,
243
check that the 'online' attribute of session metadata is correctly updated in the database.
244
"""
245
# toggle online/offline status
246
prev_status = new_session.online
247
new_status = False if new_session.online else True
248
session_dao.update_session_status(new_session.uid, new_status)
249
250
# check if it was updated correctly
251
session = session_dao.get_session(new_session.uid)
252
assert session is not None
253
assert session.online == new_status
254
255
def test_add_user_payload(app_client, new_user):
256
"""
257
Given a user,
258
when the payload_dao.add_user_payload method is called,
259
check that the payload metadata is added to the database correctly.
260
"""
261
try:
262
payload = payload_dao.add_user_payload(new_user.id, 'test.py', 'nix', 'x32')
263
except Exception as e:
264
pytest.fail("payload_dao.add_user_payload returned exception: " + str(e))
265
assert payload.owner == new_user.username
266
assert payload.filename == 'test.py'
267
assert payload.operating_system == 'nix'
268
assert payload.architecture == 'x32'
269
270
# cleanup
271
Payload.query.delete()
272
db.session.commit()
273
274
def test_get_user_payloads(app_client, new_user):
275
"""
276
Given a user,
277
when the payload_dao.get_user_payloads method is called,
278
check that all user payload metadata are returned from the database correctly.
279
"""
280
try:
281
# add test payload
282
new_payload = payload_dao.add_user_payload(new_user.id, 'test.py', 'nix', 'x32')
283
payloads = payload_dao.get_user_payloads(new_user.id)
284
except Exception as e:
285
pytest.fail("payload_dao.get_user_payloads returned excpetion: " + str(e))
286
assert len(payloads) != 0
287
288
# cleanup
289
Payload.query.delete()
290
db.session.commit()
291
292
def test_add_user_file(app_client, new_user, new_session):
293
"""
294
Given a user,
295
when the file_dao.add_user_file method is called,
296
check the file metadata is added to the database correctly.
297
"""
298
try:
299
test_file = file_dao.add_user_file(new_user.username, 'test.txt', new_session.public_ip, 'test_module')
300
except Exception as e:
301
pytest.fail("file_dao.add_user_file returned exception: " + str(e))
302
assert test_file is not None
303
assert test_file.owner == new_user.username
304
assert test_file.session == new_session.public_ip
305
assert test_file.module == 'test_module'
306
assert test_file.filename == 'test.txt'
307
308
# cleanup
309
ExfiltratedFile.query.delete()
310
db.session.commit()
311
312
def test_get_user_files(app_client, new_user, new_session):
313
"""
314
Given a user and session,
315
when the file_dao.get_user_files method is called,
316
check that all metadata for user's files is returned from the database correctly.
317
"""
318
try:
319
# add test file
320
test_file = file_dao.add_user_file(new_user.username, 'test.txt', new_session.public_ip, 'test_module')
321
files = file_dao.get_user_files(new_user.id)
322
except Exception as e:
323
pytest.fail("file_dao.get_user_files returned exception: " + str(e))
324
assert len(files) != 0
325
326
# cleanup
327
ExfiltratedFile.query.delete()
328
db.session.commit()
329