Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/AutomatedNotebooks-Manager.ipynb
3250 views
Kernel: papermill
import os from datetime import datetime from pathlib import Path import logging import nbformat from azure.mgmt.resource import ResourceManagementClient from azure.storage.fileshare import ShareFileClient, ShareServiceClient from azure.storage.queue import QueueServiceClient import papermill as pm from msticpy.common.azure_auth import az_connect from msticpy.common.keyvault_client import BHKeyVaultClient from msticpy.data.azure_sentinel import AzureSentinel from azure.common.exceptions import CloudError
# Open a log file that tracks notebook executions path = Path.cwd().joinpath("notebook_execution.log") logging.basicConfig(filename=path)
# Populate with details relating to your environment # Tenant ID ten_id = "YOUR TENANT ID" # The name of the Key Vault containing AFS key vault_name = "YOUR KV NAME" # The secret name that the AFS key is stored in kv_sec_name = "YOUR SECRET NAME" # Subscription ID of the Microsoft Sentinel Workspace to get incidents from subscriptionId = "YOUR SUBSCRIPTION ID" # The name of the Resource Group of the Microsoft Sentinel Workspace to get incidents from resourceGroupName = "YOUR RG NAME" # The name of the Microsoft Sentinel Workspace to get incidents from workspaceName = "YOUR WORKSPACE NAME" # The name of the Microsoft Sentinel Workspace ID to get incidents from ws_id = "YOUR WORKSPACE ID" # The name of the Azure Storage Queue account used (if used) q_account = "YOUR QUEUE ACCOUNT" # The name of the Azure Storage Queue account used (if used) q_name = "YOUR QUEUE NAME" # Details of the Azure Machine Learning workspace to be used (sub_id = Subscription ID, RG = Resource Group name, AMLWorkspace = AML Workspace Name) AML_details = { "sub_id": "YOUR SUB ID", "RG": "YOUR RG NAME", "AMLWorkspace": "YOUR WORKSPACE NAME", "ten_id": ten_id, }
# Authenticate to Azure using Azure CLI or Managed Identity creds = az_connect(["cli", "msi"]) token = creds.modern.get_token("https://management.azure.com/.default") def get_api_headers(): token = creds.modern.get_token("https://management.azure.com/.default") """Return authorization header with current token.""" return { "Authorization": f"Bearer {token.token}", "Content-Type": "application/json", } # Access key vault and get Azure Storage access key kv_c = BHKeyVaultClient(tenant_id=ten_id, vault_name=vault_name) afs_cred = kv_c.get_secret(kv_sec_name)
# Connect to Microsoft Sentinel azs = AzureSentinel() azs.connect() logging.info(f"{datetime.now()} - Successfully connected to Microsoft Sentinel")
# Get recent Incidents from API try: incidents = azs.get_incidents( sub_id=subscriptionId, res_grp=resourceGroupName, ws_name=workspaceName ) incident_ids = incidents["name"].tolist() except CloudError: logging.info(f"{datetime.now()} - Unable to retreive incidents") incident_ids = []
# If using Queue method get incidents from queue - uncomment following cells to use this method # queue_service_client = QueueServiceClient( # account_url=q_account, credential=creds.modern, api_version="2019-07-07" # ) # q_client = queue_service_client.get_queue_client(q_name) # messages = q_client.receive_messages() # logging.info(f"{datetime.now()} - Getting incidents from queue") # incident_ids = [message["content"] for message in messages] # q_client.clear_messages()
# Use a local archive to avoid processing of incidents more than once. try: with open("incident_archive", "r") as input_file: incident_archive = input_file.read().splitlines() except FileNotFoundError: incident_archive = []
incident_file = open("incident_archive", "w") out_files = [] # For each incident, if it has not already been processed then run the incident triage notebook with that ID if incident_ids: for incident_id in incident_ids: if incident_id not in incident_archive: logging.info(f"{datetime.now()} - Processing incident {incident_id}") (Path.cwd() / 'out').mkdir(parents=True, exist_ok=True) out_path = Path.cwd().joinpath(f"out/{incident_id}.ipynb") # If execution error occurs continue but record this in the log try: pm.execute_notebook( "AutomatedNotebooks-IncidentTriage.ipynb", str(out_path), parameters={ "incident_id": incident_id, "ten_id": ten_id, "ws_id": ws_id, } ) out_files.append(out_path) except pm.PapermillExecutionError: logging.info( f"{datetime.now()} - Unable to process incident {incident_id} - skipping" ) # Once processed add incident to archive incident_file.write(incident_id + "\n") else: logging.info( f"{datetime.now()} - Incident {incident_id} has already been processed - skipping" ) incident_file.close()
# Function to move a notebook from local path to Azure File Storage def move_to_afs(path, incident_id): with open(path) as notebook: notebook = notebook.read() account = get_storage_acct() notebook_name = path.name share_name = get_share(account) file_client = ShareFileClient( account_url=f"{account}.file.core.windows.net", share_name=share_name, file_path=f"Users/TriageNbs/{notebook_name}", credential=afs_cred, ) file_client.upload_file(notebook) path = (f"https://ml.azure.com/fileexplorerAzNB?wsid=/subscriptions/{AML_details['sub_id']}" f"/resourcegroups/{AML_details['RG']}/workspaces/AzureMLWorkspace&tid={AML_details['ten_id']}" f"&activeFilePath=Users/{notebook_name}") write_to_incident(incident_id, path) update_incident(incident_id) # Function to find the Azure Storage Account used by Azure ML def get_storage_acct(): res_client = ResourceManagementClient(creds.legacy, AML_details["sub_id"]) res = res_client.resources.get( AML_details["RG"], "", "Microsoft.MachineLearningServices/workspaces", "", AML_details["AMLWorkspace"], "2021-01-01", ) account = res.properties["storageAccount"].split("/")[-1] return account # Function to get the correct file share to store notebook in def get_share(account): ssc = ShareServiceClient(f"{account}.file.core.windows.net", afs_cred) for share in list(ssc.list_shares()): if share["name"].startswith("code-"): return share["name"] # Function to write a comment to the Microsoft Sentinel Incident that contains a link to the notebook def write_to_incident(incidentId, path): html = f"<a href='{path}'>View incident triage notebook in AML</a>" logging.info( f"{datetime.now()} - Adding link to notebook for incident: {incidentId}" ) azs.post_comment( incident_id=incidentId, comment=html, sub_id=subscriptionId, res_grp=resourceGroupName, ws_name=workspaceName, ) # Function to update incident severity to High if triage determines a risk def update_incident(incidentId): logging.info(f"{datetime.now()} - Updating severity for {incidentId}") azs.update_incident( incident_id=incidentId, update_items={"severity": "High", "status": "New"}, sub_id=subscriptionId, res_grp=resourceGroupName, ws_name=workspaceName, ) # For each processed incident check if there was valuable output in the notebook and process it if out_files: for path in out_files: incident_id = str(path.name).split(".")[0] try: nb = nbformat.read(path, as_version=2) for cell in nb["worksheets"][0]["cells"]: if "output" in cell["metadata"]["tags"] and cell["outputs"]: logging.info( f"{datetime.now()} - Storing notebook for {incident_id} in AML" ) move_to_afs(path, incident_id) break os.remove(str(path)) except FileNotFoundError: continue