Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb
3253 views
Kernel: Synapse PySpark

Automate Tools - Parquet Files Generator

Notebook Version: 1.0
Python Version: Python 3.8
Apache Spark Version: 3.1
Required Packages: azure-monitor-query, azure-mgmt-loganalytics
Platforms Supported: Azure Synapse Analytics

Description

Table of Contents

  1. Warm-up

  2. Azure Log Analytics Data Queries

  3. Save result to Azure Log Analytics Custom Table

1. Warm-up

# Load Python libraries that will be used in this notebook from azure.mgmt.loganalytics import LogAnalyticsManagementClient from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus from azure.monitor.ingestion import LogsIngestionClient from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential from azure.core.exceptions import HttpResponseError import sys from datetime import datetime, timezone, timedelta import requests import pandas as pd import numpy import json import math import ipywidgets from IPython.display import display, HTML, Markdown
# User input for Log Analytics workspace as the data source for querying subscription_id_source = "" resource_group_name_source = "" workspace_name_source = "" workspace_id_source = "" workspace_resource_id_source = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id_source, resource_group_name_source, workspace_name_source)
# User input for Log Analytics workspace for data ingestion tenant_id = "" subscription_id = "" workspace_id = "" resource_group_name = "" location = "" workspace_name = '' workspace_resource_id = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id, resource_group_name, workspace_name) data_collection_endpoint_name = "" data_collection_rule_name = "" custom_table_name = "" stream_name = "Custom-" + custom_table_name immutable_rule_id = "" dce_endpoint = "" akv_name = "" client_id_name = "" client_secret_name = "" akv_link_name = ""
# You may need to change resource_uri for various cloud environments. resource_uri = "https://api.loganalytics.io" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) access_token = credential.get_token(resource_uri + "/.default") token = access_token[0]

2. Azure Log Analytics Data Queries

# Functions for query def query_la(workspace_id_query, query): la_data_client = LogsQueryClient(credential=credential) end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(15) query_result = la_data_client.query_workspace( workspace_id=workspace_id_query, query=query, timespan=(start_time, end_time)) df_la_query = pd.DataFrame if query_result.status == LogsQueryStatus.SUCCESS: if hasattr(query_result, 'tables'): data = query_result.tables if len(query_result.tables) > 1: print('You have more than one tyable to processs') elif query_result.status == LogsQueryStatus.PARTIAL: data=query_result.partial_data print(query_result.partial_error) else: print(query_result.error) if len(query_result.tables) > 1: print('You have more than one tyable to processs') for table in data: df_la_query = pd.DataFrame(data=table.rows, columns=table.columns) return df_la_query def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2): "Slice the time to render records <= 500K" count_query = query.format(lookback_start, lookback_unit, lookback_end) count = ' | summarize count()' count_query = count_query + count df_count = query_la(workspace_id_source, count_query) row_count = df_count['count_'][0] print(count_query) print(row_count) df_final = pd.DataFrame() if row_count > query_row_limit: number_of_divide = 0 while row_count > query_row_limit: row_count = row_count / split_factor number_of_divide = number_of_divide + 1 factor = split_factor ** number_of_divide step_number = math.ceil(int(lookback_start) / factor) if factor > int(lookback_start) and lookback_unit == 'h': lookback_unit = 'm' number_of_minutes = 60 step_number = math.ceil(int(lookback_start)*number_of_minutes / factor) try: for i in range(int(lookback_end), factor + 1, 1): if i > 0: df_la_query = pd.DataFrame current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number) print(current_query) df_la_query = query_la(workspace_id_source, current_query) print(df_la_query.shape[0]) df_final = pd.concat([df_final, df_la_query]) except: print("query failed") raise else: df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end)) return df_final

Slice data for query

# Use Dror's test LA table query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip" lookback_start = '4' df_final = slice_query_la(query_template, lookback_start) print(df_final.shape[0])
spark.conf.set("spark.sql.execution.arrow.enabled","true") spark_final=spark.createDataFrame(df_final) spark_final.printSchema() spark_final.show()
path = 'abfss://[email protected]/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))
spark_final.write.parquet(path, mode='overwrite')
spark.read.parquet(path).count()