Path: blob/main/cyberbattle/simulation/commandcontrol.py
960 views
# Copyright (c) Microsoft Corporation.1# Licensed under the MIT License.23"""A 'Command & control'-like interface exposing to a human player4the attacker view and actions of the game.5This includes commands to visualize the part of the environment6that were explored so far, and for each node where the attacker client7is installed, execute actions on the machine.8"""910import networkx as nx11from typing import List, Optional, Dict, Union, Tuple, Set12from plotly.graph_objects import Scatter, Figure, layout # type: ignore1314from . import model, actions151617class CommandControl:18"""The Command and Control interface to the simulation.1920This represents a server that centralize information and secrets21retrieved from the individual clients running on the network nodes.22"""2324# Global list aggregating all credentials gathered so far, from any node in the network25__gathered_credentials: Set[model.CachedCredential]26_actuator: actions.AgentActions27__environment: model.Environment28__total_reward: float2930def __init__(self, environment_or_actuator: Union[model.Environment, actions.AgentActions]):31if isinstance(environment_or_actuator, model.Environment):32self.__environment = environment_or_actuator33self._actuator = actions.AgentActions(self.__environment, throws_on_invalid_actions=True)34elif isinstance(environment_or_actuator, actions.AgentActions):35self.__environment = environment_or_actuator._environment36self._actuator = environment_or_actuator37else:38raise ValueError("Invalid type: expecting Union[model.Environment, actions.AgentActions])")3940self.__gathered_credentials = set()41self.__total_reward = 04243def __save_credentials(self, outcome: model.VulnerabilityOutcome) -> None:44"""Save credentials obtained from exploiting a vulnerability"""45if isinstance(outcome, model.LeakedCredentials):46self.__gathered_credentials.update(outcome.credentials)47return4849def __accumulate_reward(self, reward: actions.Reward) -> None:50"""Accumulate new reward"""51self.__total_reward += reward5253def total_reward(self) -> actions.Reward:54"""Return the current accumulated reward"""55return self.__total_reward5657def list_nodes(self) -> List[actions.DiscoveredNodeInfo]:58"""Returns the list of nodes ID that were discovered or owned by the attacker."""59return self._actuator.list_nodes()6061def get_node_color(self, node_info: model.NodeInfo) -> str:62if node_info.agent_installed:63return "red"64else:65return "green"6667def plot_nodes(self) -> None:68"""Plot the sub-graph of nodes either so far69discovered (their ID is knowned by the agent)70or owned (i.e. where the attacker client is installed)."""71discovered_nodes = [node_id for node_id, _ in self._actuator.discovered_nodes()]72sub_graph = self.__environment.network.subgraph(discovered_nodes)73nx.draw(74sub_graph,75with_labels=True,76node_color=[self.get_node_color(self.__environment.get_node(i)) for i in sub_graph.nodes],77)7879def known_vulnerabilities(self) -> model.VulnerabilityLibrary:80"""Return the global list of known vulnerability."""81return self.__environment.vulnerability_library8283def list_remote_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:84"""Return list of all remote attacks that the Command&Control may85execute onto the specified node."""86return self._actuator.list_remote_attacks(node_id)8788def list_local_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:89"""Return list of all local attacks that the Command&Control may90execute onto the specified node."""91return self._actuator.list_local_attacks(node_id)9293def list_attacks(self, node_id: model.NodeID) -> List[model.VulnerabilityID]:94"""Return list of all attacks that the Command&Control may95execute on the specified node."""96return self._actuator.list_attacks(node_id)9798def list_all_attacks(self) -> List[Dict[str, object]]:99"""List all possible attacks from all the nodes currently owned by the attacker"""100return self._actuator.list_all_attacks()101102def print_all_attacks(self) -> None:103"""Pretty print list of all possible attacks from all the nodes currently owned by the attacker"""104return self._actuator.print_all_attacks()105106def run_attack(self, node_id: model.NodeID, vulnerability_id: model.VulnerabilityID) -> Optional[model.VulnerabilityOutcome]:107"""Run an attack and attempt to exploit a vulnerability on the specified node."""108result = self._actuator.exploit_local_vulnerability(node_id, vulnerability_id)109if result.outcome is not None:110self.__save_credentials(result.outcome)111self.__accumulate_reward(result.reward)112return result.outcome113114def run_remote_attack(115self,116node_id: model.NodeID,117target_node_id: model.NodeID,118vulnerability_id: model.VulnerabilityID,119) -> Optional[model.VulnerabilityOutcome]:120"""Run a remote attack from the specified node to exploit a remote vulnerability121in the specified target node"""122123result = self._actuator.exploit_remote_vulnerability(node_id, target_node_id, vulnerability_id)124if result.outcome is not None:125self.__save_credentials(result.outcome)126self.__accumulate_reward(result.reward)127return result.outcome128129def connect_and_infect(130self,131source_node_id: model.NodeID,132target_node_id: model.NodeID,133port_name: model.PortName,134credentials: model.CredentialID,135) -> bool:136"""Install the agent on a remote machine using the137provided credentials"""138result = self._actuator.connect_to_remote_machine(source_node_id, target_node_id, port_name, credentials)139self.__accumulate_reward(result.reward)140return result.outcome is not None141142@property143def credentials_gathered_so_far(self) -> Set[model.CachedCredential]:144"""Returns the list of credentials gathered so far by the145attacker (from any node)"""146return self.__gathered_credentials147148149def get_outcome_first_credential(150outcome: Optional[model.VulnerabilityOutcome],151) -> model.CredentialID:152"""Return the first credential found in a given vulnerability exploit outcome"""153if outcome is not None and isinstance(outcome, model.LeakedCredentials):154return outcome.credentials[0].credential155else:156raise ValueError("Vulnerability outcome does not contain any credential")157158159class EnvironmentDebugging:160"""Provides debugging feature exposing internals of the environment161that are not normally revealed to an attacker agent according to162the rules of the simulation.163"""164165__environment: model.Environment166__actuator: actions.AgentActions167168def __init__(self, actuator_or_c2: Union[actions.AgentActions, CommandControl]):169if isinstance(actuator_or_c2, actions.AgentActions):170self.__actuator = actuator_or_c2171elif isinstance(actuator_or_c2, CommandControl):172self.__actuator = actuator_or_c2._actuator173else:174raise ValueError("Invalid type: expecting Union[actions.AgentActions, CommandControl])")175176self.__environment = self.__actuator._environment177178def network_as_plotly_traces(self, xref: str = "x", yref: str = "y") -> Tuple[List[Scatter], dict]:179known_nodes = [node_id for node_id, _ in self.__actuator.discovered_nodes()]180181subgraph = self.__environment.network.subgraph(known_nodes)182183# pos = nx.fruchterman_reingold_layout(subgraph)184pos = nx.shell_layout(subgraph, [[known_nodes[0]], known_nodes[1:]])185186def edge_text(source: model.NodeID, target: model.NodeID) -> str:187data = self.__environment.network.get_edge_data(source, target)188name: str = data["kind"].name189return name190191color_map = {192actions.EdgeAnnotation.LATERAL_MOVE: "red",193actions.EdgeAnnotation.REMOTE_EXPLOIT: "orange",194actions.EdgeAnnotation.KNOWS: "gray",195}196197def edge_color(source: model.NodeID, target: model.NodeID) -> str:198data = self.__environment.network.get_edge_data(source, target)199if "kind" in data:200return color_map[data["kind"]]201return "black"202203_layout: dict = dict(204title="CyberBattle simulation",205font=dict(size=10),206showlegend=True,207autosize=False,208width=800,209height=400,210margin=layout.Margin(l=2, r=2, b=15, t=35),211hovermode="closest",212annotations=[213dict(214ax=pos[source][0],215ay=pos[source][1],216axref=xref,217ayref=yref,218x=pos[target][0],219y=pos[target][1],220xref=xref,221yref=yref,222arrowcolor=edge_color(source, target),223hovertext=edge_text(source, target),224showarrow=True,225arrowhead=1,226arrowsize=1,227arrowwidth=1,228startstandoff=10,229standoff=10,230align="center",231opacity=1,232)233for (source, target) in list(subgraph.edges)234],235)236237owned_nodes_coordinates = [(i, c) for i, c in pos.items() if self.get_node_information(i).agent_installed]238discovered_nodes_coordinates = [(i, c) for i, c in pos.items() if not self.get_node_information(i).agent_installed]239240trace_owned_nodes = Scatter(241x=[c[0] for i, c in owned_nodes_coordinates],242y=[c[1] for i, c in owned_nodes_coordinates],243mode="markers+text",244name="owned",245marker=dict(246symbol="circle-dot",247size=5,248# green #0e9d00249color="#D32F2E", # red250line=dict(color="rgb(255,0,0)", width=8),251),252text=[i for i, c in owned_nodes_coordinates],253hoverinfo="text",254textposition="bottom center",255)256257trace_discovered_nodes = Scatter(258x=[c[0] for i, c in discovered_nodes_coordinates],259y=[c[1] for i, c in discovered_nodes_coordinates],260mode="markers+text",261name="discovered",262marker=dict(263symbol="circle-dot",264size=5,265color="#0e9d00", # green266line=dict(color="rgb(0,255,0)", width=8),267),268text=[i for i, c in discovered_nodes_coordinates],269hoverinfo="text",270textposition="bottom center",271)272273dummy_scatter_for_edge_legend = [Scatter(x=[0], y=[0], mode="lines", line=dict(color=color_map[a]), name=a.name) for a in actions.EdgeAnnotation]274275all_scatters = dummy_scatter_for_edge_legend + [276trace_owned_nodes,277trace_discovered_nodes,278]279return (all_scatters, _layout)280281def plot_discovered_network(self) -> None:282"""Plot the network graph with plotly"""283fig = Figure()284traces, _layout = self.network_as_plotly_traces()285for t in traces:286fig.add_trace(t)287fig.update_layout(_layout)288fig.show()289290def get_node_information(self, node_id: model.NodeID) -> model.NodeInfo:291"""Print node information"""292return self.__environment.get_node(node_id)293294295