Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
malwaredllc
GitHub Repository: malwaredllc/byob
Path: blob/master/web-gui/tests/unit/test_session_api.py
1292 views
1
import pytest
2
from hashlib import md5
3
from datetime import datetime
4
from random import getrandbits
5
from buildyourownbotnet import c2
6
from buildyourownbotnet.core.dao import session_dao
7
from buildyourownbotnet.server import SessionThread
8
from ..conftest import app_client, new_user, login, cleanup
9
10
11
def test_api_session_new(app_client, new_user):
12
"""
13
Given a user,
14
when a POST request is sent to /api/session/new endpoint with valid session parameters,
15
check that the session metadata is correctly stored in the database and the metadata is returned as JSON.
16
"""
17
uid = md5(bytes(getrandbits(10))).hexdigest()
18
session_dict = {
19
"id": 1,
20
"online": True,
21
"public_ip": '1.2.3.4',
22
"local_ip": '192.1.1.168',
23
"mac_address": '00:0A:95:9D:68:16',
24
"username": 'test_user',
25
"administrator": True,
26
"platform": 'linux2',
27
"device": 'test_device',
28
"architecture": 'x32',
29
"latitude": 0.00,
30
"longitude": 0.00,
31
"owner": new_user.username
32
}
33
res = app_client.post('/api/session/new', json=session_dict)
34
assert res.status_code == 200
35
36
session_metadata = res.json
37
assert isinstance(session_metadata, dict)
38
for key, val in session_dict.items():
39
assert session_metadata.get(key) == val
40
cleanup()
41
42
def test_api_session_remove(app_client, new_user, new_session):
43
"""
44
Given a user and a session,
45
when a POST request is sent to /api/session/remove with a valid session UID,
46
check the session metadata is correctly removed from the database.
47
"""
48
login(app_client, new_user.username, 'test_password')
49
50
# create dummy session
51
dummy_session = SessionThread(id=1, c2=c2, connection=None)
52
dummy_session.info = dict(new_session.serialize())
53
c2.sessions[new_user.username] = {new_session.uid: dummy_session}
54
55
# save session uid because new_session will be deleted
56
uid = new_session.uid
57
58
res = app_client.post('/api/session/remove',
59
data={'session_uid': uid},
60
follow_redirects=True,
61
headers = {"Content-Type":"application/x-www-form-urlencoded"}
62
)
63
assert res.status_code == 200
64
assert session_dao.get_session(uid) is None
65
assert uid not in c2.sessions[new_user.username]
66
67
def test_api_session_remove_invalid(app_client, new_user, new_session):
68
"""
69
Given a user and a session,
70
when a POST request is sent to /api/session/remove with invalid/missing session UID,
71
check the session metadata is correctly removed from the database.
72
"""
73
login(app_client, new_user.username, 'test_password')
74
75
# invalid uid
76
res = app_client.post('/api/session/remove',
77
data={'session_uid': '123'},
78
follow_redirects=True,
79
headers = {"Content-Type":"application/x-www-form-urlencoded"}
80
)
81
assert res.status_code == 200
82
83
def test_api_session_remove_unauthenticated(app_client, new_user, new_session):
84
"""
85
Given an unauthenticated user and a session,
86
when a POST request is sent to /api/session/remove,
87
check that a HTTP 403 forbidden status is returned and the session is not removed.
88
"""
89
res = app_client.post('/api/session/remove',
90
data={'session_uid': new_session.uid},
91
follow_redirects=True,
92
headers = {"Content-Type":"application/x-www-form-urlencoded"}
93
)
94
assert res.status_code == 403
95
assert session_dao.get_session(new_session.uid) is not None
96
97
98
def test_api_session_poll(app_client, new_user, new_session):
99
"""
100
Given an authenticated user with at least 1 session,
101
when a GET request is sent to /api/session/poll,
102
check that any new sessions' metadata is returned in JSON format,
103
and that the sessions are marked as no longer being new in the database.
104
"""
105
login(app_client, new_user.username, 'test_password')
106
107
# check valid response
108
res = app_client.get("/api/session/poll")
109
assert res.status_code == 200
110
111
# check correct data type returned with correct number of new sessions
112
sessions_list = res.json
113
assert isinstance(sessions_list, list)
114
assert len(sessions_list) == 1
115
116
# check session metadata is accurate
117
session_metadata = sessions_list[0]
118
for key, val in session_metadata.items():
119
assert session_metadata.get(key) == val
120
121
# check subsequent polls don't return the same old session
122
res = app_client.get("/api/session/poll")
123
assert res.status_code == 200
124
125
# check correct data type returned with correct number of new sessions
126
sessions_list = res.json
127
assert isinstance(sessions_list, list)
128
assert len(sessions_list) == 0
129