Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/cyberbattlesim
Path: blob/main/cyberbattle/simulation/commandcontrol.py
960 views
1
# Copyright (c) Microsoft Corporation.
2
# Licensed under the MIT License.
3
4
"""A 'Command & control'-like interface exposing to a human player
5
the attacker view and actions of the game.
6
This includes commands to visualize the part of the environment
7
that were explored so far, and for each node where the attacker client
8
is installed, execute actions on the machine.
9
"""
10
11
import networkx as nx
12
from typing import List, Optional, Dict, Union, Tuple, Set
13
from plotly.graph_objects import Scatter, Figure, layout # type: ignore
14
15
from . import model, actions
16
17
18
class CommandControl:
19
"""The Command and Control interface to the simulation.
20
21
This represents a server that centralize information and secrets
22
retrieved from the individual clients running on the network nodes.
23
"""
24
25
# Global list aggregating all credentials gathered so far, from any node in the network
26
__gathered_credentials: Set[model.CachedCredential]
27
_actuator: actions.AgentActions
28
__environment: model.Environment
29
__total_reward: float
30
31
def __init__(self, environment_or_actuator: Union[model.Environment, actions.AgentActions]):
32
if isinstance(environment_or_actuator, model.Environment):
33
self.__environment = environment_or_actuator
34
self._actuator = actions.AgentActions(self.__environment, throws_on_invalid_actions=True)
35
elif isinstance(environment_or_actuator, actions.AgentActions):
36
self.__environment = environment_or_actuator._environment
37
self._actuator = environment_or_actuator
38
else:
39
raise ValueError("Invalid type: expecting Union[model.Environment, actions.AgentActions])")
40
41
self.__gathered_credentials = set()
42
self.__total_reward = 0
43
44
def __save_credentials(self, outcome: model.VulnerabilityOutcome) -> None:
45
"""Save credentials obtained from exploiting a vulnerability"""
46
if isinstance(outcome, model.LeakedCredentials):
47
self.__gathered_credentials.update(outcome.credentials)
48
return
49
50
def __accumulate_reward(self, reward: actions.Reward) -> None:
51
"""Accumulate new reward"""
52
self.__total_reward += reward
53
54
def total_reward(self) -> actions.Reward:
55
"""Return the current accumulated reward"""
56
return self.__total_reward
57
58
def list_nodes(self) -> List[actions.DiscoveredNodeInfo]:
59
"""Returns the list of nodes ID that were discovered or owned by the attacker."""
60
return self._actuator.list_nodes()
61
62
def get_node_color(self, node_info: model.NodeInfo) -> str:
63
if node_info.agent_installed:
64
return "red"
65
else:
66
return "green"
67
68
def plot_nodes(self) -> None:
69
"""Plot the sub-graph of nodes either so far
70
discovered (their ID is knowned by the agent)
71
or owned (i.e. where the attacker client is installed)."""
72
discovered_nodes = [node_id for node_id, _ in self._actuator.discovered_nodes()]
73
sub_graph = self.__environment.network.subgraph(discovered_nodes)
74
nx.draw(
75
sub_graph,
76
with_labels=True,
77
node_color=[self.get_node_color(self.__environment.get_node(i)) for i in sub_graph.nodes],
78
)
79
80
def known_vulnerabilities(self) -> model.VulnerabilityLibrary:
81
"""Return the global list of known vulnerability."""
82
return self.__environment.vulnerability_library
83
84
def list_remote_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:
85
"""Return list of all remote attacks that the Command&Control may
86
execute onto the specified node."""
87
return self._actuator.list_remote_attacks(node_id)
88
89
def list_local_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:
90
"""Return list of all local attacks that the Command&Control may
91
execute onto the specified node."""
92
return self._actuator.list_local_attacks(node_id)
93
94
def list_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:
95
"""Return list of all attacks that the Command&Control may
96
execute on the specified node."""
97
return self._actuator.list_attacks(node_id)
98
99
def list_all_attacks(self) -> List[Dict[str, object]]:
100
"""List all possible attacks from all the nodes currently owned by the attacker"""
101
return self._actuator.list_all_attacks()
102
103
def print_all_attacks(self) -> None:
104
"""Pretty print list of all possible attacks from all the nodes currently owned by the attacker"""
105
return self._actuator.print_all_attacks()
106
107
def run_attack(self, node_id: model.NodeID, vulnerability_id: model.VulnerabilityID) -> Optional[model.VulnerabilityOutcome]:
108
"""Run an attack and attempt to exploit a vulnerability on the specified node."""
109
result = self._actuator.exploit_local_vulnerability(node_id, vulnerability_id)
110
if result.outcome is not None:
111
self.__save_credentials(result.outcome)
112
self.__accumulate_reward(result.reward)
113
return result.outcome
114
115
def run_remote_attack(
116
self,
117
node_id: model.NodeID,
118
target_node_id: model.NodeID,
119
vulnerability_id: model.VulnerabilityID,
120
) -> Optional[model.VulnerabilityOutcome]:
121
"""Run a remote attack from the specified node to exploit a remote vulnerability
122
in the specified target node"""
123
124
result = self._actuator.exploit_remote_vulnerability(node_id, target_node_id, vulnerability_id)
125
if result.outcome is not None:
126
self.__save_credentials(result.outcome)
127
self.__accumulate_reward(result.reward)
128
return result.outcome
129
130
def connect_and_infect(
131
self,
132
source_node_id: model.NodeID,
133
target_node_id: model.NodeID,
134
port_name: model.PortName,
135
credentials: model.CredentialID,
136
) -> bool:
137
"""Install the agent on a remote machine using the
138
provided credentials"""
139
result = self._actuator.connect_to_remote_machine(source_node_id, target_node_id, port_name, credentials)
140
self.__accumulate_reward(result.reward)
141
return result.outcome is not None
142
143
@property
144
def credentials_gathered_so_far(self) -> Set[model.CachedCredential]:
145
"""Returns the list of credentials gathered so far by the
146
attacker (from any node)"""
147
return self.__gathered_credentials
148
149
150
def get_outcome_first_credential(
151
outcome: Optional[model.VulnerabilityOutcome],
152
) -> model.CredentialID:
153
"""Return the first credential found in a given vulnerability exploit outcome"""
154
if outcome is not None and isinstance(outcome, model.LeakedCredentials):
155
return outcome.credentials[0].credential
156
else:
157
raise ValueError("Vulnerability outcome does not contain any credential")
158
159
160
class EnvironmentDebugging:
161
"""Provides debugging feature exposing internals of the environment
162
that are not normally revealed to an attacker agent according to
163
the rules of the simulation.
164
"""
165
166
__environment: model.Environment
167
__actuator: actions.AgentActions
168
169
def __init__(self, actuator_or_c2: Union[actions.AgentActions, CommandControl]):
170
if isinstance(actuator_or_c2, actions.AgentActions):
171
self.__actuator = actuator_or_c2
172
elif isinstance(actuator_or_c2, CommandControl):
173
self.__actuator = actuator_or_c2._actuator
174
else:
175
raise ValueError("Invalid type: expecting Union[actions.AgentActions, CommandControl])")
176
177
self.__environment = self.__actuator._environment
178
179
def network_as_plotly_traces(self, xref: str = "x", yref: str = "y") -> Tuple[List[Scatter], dict]:
180
known_nodes = [node_id for node_id, _ in self.__actuator.discovered_nodes()]
181
182
subgraph = self.__environment.network.subgraph(known_nodes)
183
184
# pos = nx.fruchterman_reingold_layout(subgraph)
185
pos = nx.shell_layout(subgraph, [[known_nodes[0]], known_nodes[1:]])
186
187
def edge_text(source: model.NodeID, target: model.NodeID) -> str:
188
data = self.__environment.network.get_edge_data(source, target)
189
name: str = data["kind"].name
190
return name
191
192
color_map = {
193
actions.EdgeAnnotation.LATERAL_MOVE: "red",
194
actions.EdgeAnnotation.REMOTE_EXPLOIT: "orange",
195
actions.EdgeAnnotation.KNOWS: "gray",
196
}
197
198
def edge_color(source: model.NodeID, target: model.NodeID) -> str:
199
data = self.__environment.network.get_edge_data(source, target)
200
if "kind" in data:
201
return color_map[data["kind"]]
202
return "black"
203
204
_layout: dict = dict(
205
title="CyberBattle simulation",
206
font=dict(size=10),
207
showlegend=True,
208
autosize=False,
209
width=800,
210
height=400,
211
margin=layout.Margin(l=2, r=2, b=15, t=35),
212
hovermode="closest",
213
annotations=[
214
dict(
215
ax=pos[source][0],
216
ay=pos[source][1],
217
axref=xref,
218
ayref=yref,
219
x=pos[target][0],
220
y=pos[target][1],
221
xref=xref,
222
yref=yref,
223
arrowcolor=edge_color(source, target),
224
hovertext=edge_text(source, target),
225
showarrow=True,
226
arrowhead=1,
227
arrowsize=1,
228
arrowwidth=1,
229
startstandoff=10,
230
standoff=10,
231
align="center",
232
opacity=1,
233
)
234
for (source, target) in list(subgraph.edges)
235
],
236
)
237
238
owned_nodes_coordinates = [(i, c) for i, c in pos.items() if self.get_node_information(i).agent_installed]
239
discovered_nodes_coordinates = [(i, c) for i, c in pos.items() if not self.get_node_information(i).agent_installed]
240
241
trace_owned_nodes = Scatter(
242
x=[c[0] for i, c in owned_nodes_coordinates],
243
y=[c[1] for i, c in owned_nodes_coordinates],
244
mode="markers+text",
245
name="owned",
246
marker=dict(
247
symbol="circle-dot",
248
size=5,
249
# green #0e9d00
250
color="#D32F2E", # red
251
line=dict(color="rgb(255,0,0)", width=8),
252
),
253
text=[i for i, c in owned_nodes_coordinates],
254
hoverinfo="text",
255
textposition="bottom center",
256
)
257
258
trace_discovered_nodes = Scatter(
259
x=[c[0] for i, c in discovered_nodes_coordinates],
260
y=[c[1] for i, c in discovered_nodes_coordinates],
261
mode="markers+text",
262
name="discovered",
263
marker=dict(
264
symbol="circle-dot",
265
size=5,
266
color="#0e9d00", # green
267
line=dict(color="rgb(0,255,0)", width=8),
268
),
269
text=[i for i, c in discovered_nodes_coordinates],
270
hoverinfo="text",
271
textposition="bottom center",
272
)
273
274
dummy_scatter_for_edge_legend = [Scatter(x=[0], y=[0], mode="lines", line=dict(color=color_map[a]), name=a.name) for a in actions.EdgeAnnotation]
275
276
all_scatters = dummy_scatter_for_edge_legend + [
277
trace_owned_nodes,
278
trace_discovered_nodes,
279
]
280
return (all_scatters, _layout)
281
282
def plot_discovered_network(self) -> None:
283
"""Plot the network graph with plotly"""
284
fig = Figure()
285
traces, _layout = self.network_as_plotly_traces()
286
for t in traces:
287
fig.add_trace(t)
288
fig.update_layout(_layout)
289
fig.show()
290
291
def get_node_information(self, node_id: model.NodeID) -> model.NodeInfo:
292
"""Print node information"""
293
return self.__environment.get_node(node_id)
294
295