Path: blob/master/scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb
3253 views
Kernel: Synapse PySpark
Automate Tools - Parquet Files Generator
Notebook Version: 1.0
Python Version: Python 3.8
Apache Spark Version: 3.1
Required Packages: azure-monitor-query, azure-mgmt-loganalytics
Platforms Supported: Azure Synapse Analytics
Description
Table of Contents
Warm-up
Azure Log Analytics Data Queries
Save result to Azure Log Analytics Custom Table
1. Warm-up
In [ ]:
# Load Python libraries that will be used in this notebook from azure.mgmt.loganalytics import LogAnalyticsManagementClient from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus from azure.monitor.ingestion import LogsIngestionClient from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential from azure.core.exceptions import HttpResponseError import sys from datetime import datetime, timezone, timedelta import requests import pandas as pd import numpy import json import math import ipywidgets from IPython.display import display, HTML, Markdown
In [ ]:
# User input for Log Analytics workspace as the data source for querying subscription_id_source = "" resource_group_name_source = "" workspace_name_source = "" workspace_id_source = "" workspace_resource_id_source = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id_source, resource_group_name_source, workspace_name_source)
In [ ]:
# User input for Log Analytics workspace for data ingestion tenant_id = "" subscription_id = "" workspace_id = "" resource_group_name = "" location = "" workspace_name = '' workspace_resource_id = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id, resource_group_name, workspace_name) data_collection_endpoint_name = "" data_collection_rule_name = "" custom_table_name = "" stream_name = "Custom-" + custom_table_name immutable_rule_id = "" dce_endpoint = "" akv_name = "" client_id_name = "" client_secret_name = "" akv_link_name = ""
In [ ]:
# You may need to change resource_uri for various cloud environments. resource_uri = "https://api.loganalytics.io" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name) client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name) credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) access_token = credential.get_token(resource_uri + "/.default") token = access_token[0]
2. Azure Log Analytics Data Queries
In [ ]:
# Functions for query def query_la(workspace_id_query, query): la_data_client = LogsQueryClient(credential=credential) end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(15) query_result = la_data_client.query_workspace( workspace_id=workspace_id_query, query=query, timespan=(start_time, end_time)) df_la_query = pd.DataFrame if query_result.status == LogsQueryStatus.SUCCESS: if hasattr(query_result, 'tables'): data = query_result.tables if len(query_result.tables) > 1: print('You have more than one tyable to processs') elif query_result.status == LogsQueryStatus.PARTIAL: data=query_result.partial_data print(query_result.partial_error) else: print(query_result.error) if len(query_result.tables) > 1: print('You have more than one tyable to processs') for table in data: df_la_query = pd.DataFrame(data=table.rows, columns=table.columns) return df_la_query def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2): "Slice the time to render records <= 500K" count_query = query.format(lookback_start, lookback_unit, lookback_end) count = ' | summarize count()' count_query = count_query + count df_count = query_la(workspace_id_source, count_query) row_count = df_count['count_'][0] print(count_query) print(row_count) df_final = pd.DataFrame() if row_count > query_row_limit: number_of_divide = 0 while row_count > query_row_limit: row_count = row_count / split_factor number_of_divide = number_of_divide + 1 factor = split_factor ** number_of_divide step_number = math.ceil(int(lookback_start) / factor) if factor > int(lookback_start) and lookback_unit == 'h': lookback_unit = 'm' number_of_minutes = 60 step_number = math.ceil(int(lookback_start)*number_of_minutes / factor) try: for i in range(int(lookback_end), factor + 1, 1): if i > 0: df_la_query = pd.DataFrame current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number) print(current_query) df_la_query = query_la(workspace_id_source, current_query) print(df_la_query.shape[0]) df_final = pd.concat([df_final, df_la_query]) except: print("query failed") raise else: df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end)) return df_final
Slice data for query
In [ ]:
# Use Dror's test LA table query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip" lookback_start = '4' df_final = slice_query_la(query_template, lookback_start) print(df_final.shape[0])
In [ ]:
spark.conf.set("spark.sql.execution.arrow.enabled","true") spark_final=spark.createDataFrame(df_final) spark_final.printSchema() spark_final.show()
In [ ]:
path = 'abfss://[email protected]/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))
In [ ]:
spark_final.write.parquet(path, mode='overwrite')
In [ ]:
spark.read.parquet(path).count()