Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/cyberbattlesim
Path: blob/main/cyberbattle/simulation/generate_network.py
960 views
1
# Copyright (c) Microsoft Corporation.
2
# Licensed under the MIT License.
3
4
"""Generating random graphs"""
5
6
from cyberbattle.simulation.model import Identifiers, NodeID, CredentialID, PortName, FirewallConfiguration, FirewallRule, RulePermission
7
import numpy as np
8
import networkx as nx
9
from cyberbattle.simulation import model as m
10
import random
11
from typing import List, Optional, Tuple, DefaultDict
12
13
from collections import defaultdict
14
15
ENV_IDENTIFIERS = Identifiers(
16
properties=["breach_node"],
17
ports=["SMB", "HTTP", "RDP"],
18
local_vulnerabilities=["ScanWindowsCredentialManagerForRDP", "ScanWindowsExplorerRecentFiles", "ScanWindowsCredentialManagerForSMB"],
19
remote_vulnerabilities=["Traceroute"],
20
)
21
22
23
def generate_random_traffic_network(
24
n_clients: int = 200,
25
n_servers={
26
"SMB": 1,
27
"HTTP": 1,
28
"RDP": 1,
29
},
30
seed: Optional[int] = 0,
31
tolerance: np.float32 = np.float32(1e-3),
32
alpha=np.array([(0.1, 0.3), (0.18, 0.09)], dtype=float),
33
beta=np.array([(100, 10), (10, 100)], dtype=float),
34
) -> nx.DiGraph:
35
"""
36
Randomly generate a directed multi-edge network graph representing
37
fictitious SMB, HTTP, and RDP traffic.
38
39
Arguments:
40
n_clients: number of workstation nodes that can initiate sessions with server nodes
41
n_servers: dictionary indicatin the numbers of each nodes listening to each protocol
42
seed: seed for the psuedo-random number generator
43
tolerance: absolute tolerance for bounding the edge probabilities in [tolerance, 1-tolerance]
44
alpha: beta distribution parameters alpha such that E(edge prob) = alpha / beta
45
beta: beta distribution parameters beta such that E(edge prob) = alpha / beta
46
47
Returns:
48
(nx.classes.multidigraph.MultiDiGraph): the randomly generated network from the hierarchical block model
49
"""
50
edges_labels = defaultdict(set) # set backed multidict
51
52
for protocol in list(n_servers.keys()):
53
sizes = [n_clients, n_servers[protocol]]
54
# sample edge probabilities from a beta distribution
55
np.random.seed(seed)
56
probs: np.ndarray = np.random.beta(a=alpha, b=beta, size=(2, 2))
57
58
# scale by edge type
59
if protocol == "SMB":
60
probs = 3 * probs
61
if protocol == "RDP":
62
probs = 4 * probs
63
64
# don't allow probs too close to zero or one
65
probs = np.clip(probs, a_min=tolerance, a_max=np.float32(1.0 - tolerance))
66
67
# sample edges using block models given edge probabilities
68
di_graph_for_protocol = nx.stochastic_block_model(sizes=sizes, p=probs, directed=True, seed=seed)
69
70
for edge in di_graph_for_protocol.edges:
71
edges_labels[edge].add(protocol)
72
73
digraph = nx.DiGraph()
74
for (u, v), port in list(edges_labels.items()):
75
digraph.add_edge(u, v, protocol=port)
76
return digraph
77
78
79
def cyberbattle_model_from_traffic_graph(
80
traffic_graph: nx.DiGraph,
81
cached_smb_password_probability=0.75,
82
cached_rdp_password_probability=0.8,
83
cached_accessed_network_shares_probability=0.6,
84
cached_password_has_changed_probability=0.1,
85
traceroute_discovery_probability=0.5,
86
probability_two_nodes_use_same_password_to_access_given_resource=0.8,
87
) -> nx.DiGraph:
88
"""Generate a random CyberBattle network model from a specified traffic (directed multi) graph.
89
90
The input graph can for instance be generated with `generate_random_traffic_network`.
91
Each edge of the input graph indicates that a communication took place
92
between the two nodes with the protocol specified in the edge label.
93
94
Returns a CyberBattle network with the same nodes and implanted vulnerabilities
95
to be used to instantiate a CyverBattleSim gym.
96
97
Arguments:
98
99
cached_smb_password_probability, cached_rdp_password_probability:
100
probability that a password used for authenticated traffic was cached by the OS for SMB and RDP
101
cached_accessed_network_shares_probability:
102
probability that a network share accessed by the system was cached by the OS
103
cached_password_has_changed_probability:
104
probability that a given password cached on a node has been rotated on the target node
105
(typically low has people tend to change their password infrequently)
106
probability_two_nodes_use_same_password_to_access_given_resource:
107
as the variable name says
108
traceroute_discovery_probability:
109
probability that a target node of an SMB/RDP connection get exposed by a traceroute attack
110
"""
111
# convert node IDs to string
112
graph = nx.relabel_nodes(traffic_graph, {i: str(i) for i in traffic_graph.nodes})
113
114
password_counter: int = 0
115
116
def generate_password() -> CredentialID:
117
nonlocal password_counter
118
password_counter = password_counter + 1
119
return f"unique_pwd{password_counter}"
120
121
def traffic_targets(source_node: NodeID, protocol: str) -> List[NodeID]:
122
neighbors = [t for (s, t) in graph.edges() if s == source_node and protocol in graph.edges[(s, t)]["protocol"]]
123
return neighbors
124
125
# Map (node, port name) -> assigned pwd
126
assigned_passwords: DefaultDict[Tuple[NodeID, PortName], List[CredentialID]] = defaultdict(list)
127
128
def assign_new_valid_password(node: NodeID, port: PortName) -> CredentialID:
129
pwd = generate_password()
130
assigned_passwords[node, port].append(pwd)
131
return pwd
132
133
def reuse_valid_password(node: NodeID, port: PortName) -> CredentialID:
134
"""Reuse a password already assigned to that node an port, if none is already
135
assigned create and assign a new valid password"""
136
if (node, port) not in assigned_passwords:
137
return assign_new_valid_password(node, port)
138
139
# reuse any of the existing assigne valid password for that node/port
140
return random.choice(assigned_passwords[node, port])
141
142
def create_cached_credential(node: NodeID, port: PortName) -> CredentialID:
143
if random.random() < cached_password_has_changed_probability:
144
# generate a new invalid password
145
return generate_password()
146
else:
147
if random.random() < probability_two_nodes_use_same_password_to_access_given_resource:
148
return reuse_valid_password(node, port)
149
else:
150
return assign_new_valid_password(node, port)
151
152
def add_leak_neighbors_vulnerability(node_id: m.NodeID, library: Optional[m.VulnerabilityLibrary] = None) -> m.VulnerabilityLibrary:
153
"""Create random vulnerabilities
154
that reveals immediate traffic neighbors from a given node"""
155
156
if not library:
157
library = {}
158
159
rdp_neighbors = traffic_targets(node_id, "RDP")
160
161
if len(rdp_neighbors) > 0:
162
library["ScanWindowsCredentialManagerForRDP"] = m.VulnerabilityInfo(
163
description="Look for RDP credentials in the Windows Credential Manager",
164
type=m.VulnerabilityType.LOCAL,
165
outcome=m.LeakedCredentials(
166
credentials=[
167
m.CachedCredential(node=target_node, port="RDP", credential=create_cached_credential(target_node, "RDP"))
168
for target_node in rdp_neighbors
169
if random.random() < cached_rdp_password_probability
170
]
171
),
172
reward_string="Discovered creds in the Windows Credential Manager",
173
cost=2.0,
174
)
175
176
smb_neighbors = traffic_targets(node_id, "SMB")
177
178
if len(smb_neighbors) > 0:
179
library["ScanWindowsExplorerRecentFiles"] = m.VulnerabilityInfo(
180
description="Look for network shares in the Windows Explorer Recent files",
181
type=m.VulnerabilityType.LOCAL,
182
outcome=m.LeakedNodesId([target_node for target_node in smb_neighbors if random.random() < cached_accessed_network_shares_probability]),
183
reward_string="Windows Explorer Recent Files revealed network shares",
184
cost=1.0,
185
)
186
187
library["ScanWindowsCredentialManagerForSMB"] = m.VulnerabilityInfo(
188
description="Look for network credentials in the Windows Credential Manager",
189
type=m.VulnerabilityType.LOCAL,
190
outcome=m.LeakedCredentials(
191
credentials=[
192
m.CachedCredential(node=target_node, port="SMB", credential=create_cached_credential(target_node, "SMB"))
193
for target_node in smb_neighbors
194
if random.random() < cached_smb_password_probability
195
]
196
),
197
reward_string="Discovered SMB creds in the Windows Credential Manager",
198
cost=2.0,
199
)
200
201
if len(smb_neighbors) > 0 and len(rdp_neighbors) > 0:
202
library["Traceroute"] = m.VulnerabilityInfo(
203
description="Attempt to discvover network nodes using Traceroute",
204
type=m.VulnerabilityType.REMOTE,
205
outcome=m.LeakedNodesId([target_node for target_node in smb_neighbors or rdp_neighbors if random.random() < traceroute_discovery_probability]),
206
reward_string="Discovered new network nodes via traceroute",
207
cost=5.0,
208
)
209
210
return library
211
212
def create_vulnerabilities_from_traffic_data(node_id: m.NodeID):
213
return add_leak_neighbors_vulnerability(node_id=node_id)
214
215
firewall_conf = FirewallConfiguration(
216
[FirewallRule("RDP", RulePermission.ALLOW), FirewallRule("SMB", RulePermission.ALLOW)], [FirewallRule("RDP", RulePermission.ALLOW), FirewallRule("SMB", RulePermission.ALLOW)]
217
)
218
219
# Pick a random node as the agent entry node
220
entry_node_index = random.randrange(len(graph.nodes))
221
entry_node_id, entry_node_data = list(graph.nodes(data=True))[entry_node_index]
222
graph.nodes[entry_node_id].clear()
223
graph.nodes[entry_node_id].update(
224
{
225
"data": m.NodeInfo(
226
services=[],
227
value=0,
228
properties=["breach_node"],
229
vulnerabilities=create_vulnerabilities_from_traffic_data(entry_node_id),
230
agent_installed=True,
231
firewall=firewall_conf,
232
reimagable=False,
233
)
234
}
235
)
236
237
def create_node_data_without_vulnerabilities(node_id: m.NodeID):
238
return m.NodeInfo(
239
services=[m.ListeningService(name=port, allowedCredentials=assigned_passwords[(target_node, port)]) for (target_node, port) in assigned_passwords.keys() if target_node == node_id],
240
value=random.randint(0, 100),
241
agent_installed=False,
242
firewall=firewall_conf,
243
)
244
245
# Step 1: Create all the nodes with associated services and firewall configuration
246
for node in list(graph.nodes):
247
if node != entry_node_id:
248
graph.nodes[node].clear()
249
graph.nodes[node].update({"data": create_node_data_without_vulnerabilities(node)})
250
251
# Step 2: Assign vulnerabilities to each node.
252
# This must be a separate step because vulnerabilities definitions
253
# may depend on the passwords assigned to the nodes in Step 1.
254
for node in list(graph.nodes):
255
if node != entry_node_id:
256
node_data = graph.nodes[node]["data"]
257
node_data.vulnerabilities = create_vulnerabilities_from_traffic_data(node)
258
graph.nodes[node].update({"data": node_data})
259
260
# remove all the edges inherited from the network graph
261
graph.clear_edges()
262
263
return graph
264
265
266
def new_environment(n_servers_per_protocol: int):
267
"""Create a new simulation environment based on
268
a randomly generated network topology.
269
270
NOTE: the probabilities and parameter values used
271
here for the statistical generative model
272
were arbirarily picked. We recommend exploring different values for those parameters.
273
"""
274
traffic = generate_random_traffic_network(
275
seed=None,
276
n_clients=50,
277
n_servers={
278
"SMB": n_servers_per_protocol,
279
"HTTP": n_servers_per_protocol,
280
"RDP": n_servers_per_protocol,
281
},
282
alpha=np.array([(1, 1), (0.2, 0.5)], dtype=float),
283
beta=np.array([(1000, 10), (10, 100)], dtype=float),
284
)
285
286
network = cyberbattle_model_from_traffic_graph(
287
traffic,
288
cached_rdp_password_probability=0.8,
289
cached_smb_password_probability=0.7,
290
cached_accessed_network_shares_probability=0.8,
291
cached_password_has_changed_probability=0.01,
292
probability_two_nodes_use_same_password_to_access_given_resource=0.9,
293
)
294
return m.Environment(network=network, vulnerability_library=dict([]), identifiers=ENV_IDENTIFIERS)
295
296