Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/Sentinel Bulk Logs Export.ipynb
3249 views
Kernel: Python 3.10 - AzureML

Overview

This Notebook takes a KQL query and breaks it into batches that fit within the limits of the Azure Monitor API. This allows us to export more than the default 30,0000 record/64MB limits experienced when using the native interface and API directly. The export will run the batches in parallel and write the data to local disk in the format specified in the OUTPUT_FORMAT parameter (CSV or Parquet).

Common Use Cases

  • eDiscovery requests where a large number of rows need to exported and sent to an external party.

  • Investigations where a large number of indicators need to be exported for external analysis.

  • Compliance and external archival scenarios where certain datasets need to be stored outside of Log Analytics.

  • Data Science/Engineering scenarios where analysts need access to a large dataset in CSV or Parquet format for additional analytics outside of KQL.

  • Make sure you are running on a Compute Instance, NOT Serverless Spark Compute (which is the default method for a new AML workspace), as that method is not currently supported. Provision a Compute Instance with at least 4 cores. For larger datasets, you can increase the cores and memory further, just remember to update the JOBS parameter to match the number of cores as needed.

  • The DefaultAzureCredential Class is used to authenticate to the Log Analytics workspace. This should automatically authenticate the user that is launching the Notebook in AML, assuming the default SSO option is enabled when provisioning the AML compute instance and that user has access to the Log Analytics workspace. Also, ensure you press the "Authenticate" button if you see the "You need to be authenticated to the compute..." banner within AML Studio. A Managed Identity assigned to the AML compute instance can be used instead, but it needs to have access to the Log Analytics workspace.

  • Use the project operator in the QUERY parameter in Step 2 to limit the amount of data being exported. This will help speed up the overall process significantly, given the current Azure Monitor API limits and the low throughput it provides. As a benchmark, it took 7 minutes to export all fields and 2.5 Million rows from the SecurityEvent table (roughly 3.2GB of data) using a 32-core AML Compute Instance. This is why being efficient in only selecting the fields needed is important for larger datasets.

  • Run each cell independently, and in order, to ensure each step runs without issue before moving on to the next. Latter steps depend on the previous steps completing successfully.

  • Review all of the parameters and their descriptions in Step 2 to get a better sense of how to tune based on the dataset being exported.

  • Check the logs.log file within the run directory for additional troubleshooting information.

1. Install Dependencies

Run this cell to install the required Phython libraries.

import sys !{sys.executable} -m pip install azure-monitor-query azure-identity pandas tqdm

2. Set Parameters

Modify the below parameters as necessary and then run the below cell.

from datetime import datetime, timedelta, timezone #Required parameters: START_TIME = datetime(2025, 5, 1, tzinfo=timezone.utc) #Start time of the time range for the query. END_TIME = datetime(2025, 5, 31, tzinfo=timezone.utc) #End time of the time range for the query. QUERY = "SecurityEvent | project TimeGenerated, Account, Computer, EventID" #KQL query to run. #If needed, change which Log Analytics workspace to use: USE_DEFAULT_LAW_ID = True #If present, use the Log Analytics workspace ID that is present in the config.json file which gets created by Sentinel Notebooks. LAW_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" #Log Analytics workspace ID to use if config.json file is not present or USE_DEFAULT_LAW_ID is set to False. #Optional parameters used for performance and output tuning: JOBS = 4 #Number of jobs to run in parallel. Typically, this should match the number of cores of the VM. Because the Azure Monitor API can only run 5 concurrent queries at a time, there are diminishing returns after a certain point. Any value over 64 will revert to 64. AUTO_BATCH = True #Attempts to automatically detect optimial batch size (time range) to use when breaking up the query. BATCH_SIZE = timedelta(hours=6) #If AUTO_BATCH is set to False, this batch size (time range) will be used to break up the query. MIN_BATCH_SIZE = timedelta(minutes=1) #If the data returned cannot fit within this time range, we skip and move to the next batch. OUTPUT_DIRECTORY = "./law_export" #Directory where results will be stored. A new directory gets created for each run. OUTPUT_FILE_PREFIX = "query_results" #Prefix used for the data files containing the query results. OUTPUT_FORMAT = 'CSV' #File format used to store the query results on disk. CSV or PARQUET values are supported. COMBINE_FORMAT = 'CSV' #File format used when combining files in Step 4. CSV or PARQUET values are supported. COMBINE_MAX_ROWS = 500000 #Sets the max number of rows per file when combining in Step 4. COMBINE_SORT = True #Sorts the data by the specified TIMESTAMP_FIELD when combining. TIMESTAMP_FIELD = 'TimeGenerated' #Field to use for timestamp-based batching. TimeGenerated is default. For example, _OriginalTimeGenerated can be used for data restored via Search Job. TG_START_TIME = datetime(2025, 5, 29, tzinfo=timezone.utc) #If using a timestamp field other than TimeGenerated, we still need to provide a time range for TimeGenerated in the request. If TIMESTAMP_FIELD is set to TimeGenerated, this parameter is ignored. TG_END_TIME = datetime(2025, 6, 1, tzinfo=timezone.utc) #If using a timestamp field other than TimeGenerated, we still need to provide time range for TimeGenerated in the request. If TIMESTAMP_FIELD is set to TimeGenerated, this parameter is ignored. TIMEOUT = 3 #Number of minutes allowed before query times out. 10 minutes is max. LOG_LEVEL = 'INFO' #Logging level. Supported values are the standard DEBUG, INFO, WARNING, ERROR, etc.

3. Export Data

Run the below cell to start the export process. Data will be written to local files in the directory specified in the OUTPUT_DIRECTORY parameter.

from datetime import datetime, timedelta, timezone import pandas as pd import time from azure.monitor.query import LogsQueryClient, LogsQueryStatus from azure.core.exceptions import HttpResponseError from azure.identity import DefaultAzureCredential import logging import os import glob import json from multiprocessing import Pool, Manager, current_process, Queue from tqdm import tqdm from IPython.display import clear_output completed_jobs = [] failed_jobs = [] errors = [] skipped_batches = [] time_format: str = "%m-%d-%Y %H-%M-%S" class time_range_class: def __init__(self, name, start_time, end_time): self.name = name self.start_time = start_time self.end_time = end_time def get_time_ranges(start_time=datetime.now(), end_time=datetime.now() - timedelta(hours=24), number_of_ranges=5): ranges = [] interval = (end_time - start_time) / number_of_ranges delta = timedelta(microseconds=0) index = 0 for i in range(number_of_ranges): range_name = f"Job{str(index)}" range_start = end_time - ((i + 1) * interval) range_end = (end_time - (i * interval)) - delta time_range = time_range_class(range_name, range_start, range_end) ranges.append(time_range) index += 1 delta = timedelta(microseconds=1) return ranges def read_config_values(file_path): try: with open(file_path) as json_file: if json_file: json_config = json.load(json_file) return (json_config["workspace_id"]) except: return None def write_to_file(df, export_path, prefix, format): match format: case 'PARQUET': path = os.path.join(export_path, f"{prefix}.parquet") df.to_parquet(path) case 'CSV': path = os.path.join(export_path, f"{prefix}.csv") df.to_csv(path, index=False) def get_batch_size(query, law_id, start_time, end_time, timeout, timestamp_field, tg_start_time, tg_end_time): batch_query_summarize = ("| summarize NumberOfBatchesBytes = 38400000 / avg(estimate_data_size(*)), NumberOfBatchesRows = count()" "| project NumberOfBatchesBytes = todecimal(NumberOfBatchesRows / NumberOfBatchesBytes), NumberOfBatchesRows = todecimal(NumberOfBatchesRows) / todecimal(450000)" "| project NumberOfBatches = round(iff(NumberOfBatchesBytes > NumberOfBatchesRows, NumberOfBatchesBytes, NumberOfBatchesRows), 2)" "| project NumberOfBatches = iif(NumberOfBatches < toreal(1), toreal(1), NumberOfBatches)") if timestamp_field != 'TimeGenerated': batch_query_where = (f"{query} | where {timestamp_field} between (todatetime('{start_time}') .. todatetime('{end_time}'))") batch_query = (f"{batch_query_where} {batch_query_summarize}") response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(tg_start_time, tg_end_time), timeout=timeout) else: batch_query = (f"{query} {batch_query_summarize}") response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(start_time, end_time), timeout=timeout) if response.status == LogsQueryStatus.SUCCESS: data = response.tables else: error = response.partial_error raise Exception(error.details[0]["innererror"]) for table in data: df = pd.DataFrame(data=table.rows, columns=table.columns) return df['NumberOfBatches'].iloc[0] def define_logger(name, log_level="INFO"): logger = logging.getLogger(name) logger.addHandler(logging.handlers.QueueHandler(log_queue)) logger.setLevel(log_level) logger.handlers[0].setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')) return logger def export_log_analytics_data( law_id: str, query: str, start_time: datetime = None, end_time: datetime = None, batch_size: timedelta = timedelta(hours=4), job_name: str = None, status_queue = None, log_queue = None, min_batch_size: timedelta = timedelta(minutes=15), client: LogsQueryClient = None, export_path = '', export_prefix = 'query_results', auto_batch = True, export_format: str = 'CSV', timeout: int = 10, timestamp_field = 'TimeGenerated', tg_start_time: datetime = None, tg_end_time: datetime = None, log_level = "INFO", delay: int = 0, max_retries: int = 5, export_to_file: bool = True, json_depth: int = 10, ): time_range: timedelta = end_time - start_time error_count: int = 0 initial_batch_size: timedelta = batch_size batch_count: timedelta = timedelta() current_count: int = 0 percent_complete: int = 0 stop_time: datetime = start_time runs_without_error_count: int = 0 loop_done: bool = False rows_returned: int = 0 results = [] logger = define_logger(name=f"{current_process().name}-{job_name}", log_level=log_level) logger.info(f"Starting new job between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.") if auto_batch == True: try: batch_size = time_range / get_batch_size(query, law_id, start_time, end_time, timeout=timeout, timestamp_field=timestamp_field,tg_start_time=tg_start_time, tg_end_time=tg_end_time) logger.info(f"Calculated auto-batch size of: {batch_size}") except Exception as err: logger.error(f"Unable to auto-batch, please check your query and the log for more info. You can disable auto-batch via the AUTO_BATCH parameter if the dataset is to large to calculate. {type(err)} {err}") return ({'job_name': job_name, 'status': 'error'}) else: logger.info(f"Using manual batch size of: {batch_size}.") if batch_size > time_range: batch_size = time_range while error_count <= max_retries: try: while loop_done == False: if batch_size < initial_batch_size and runs_without_error_count > 5: batch_size *= 2 logger.info(f"Increasing batch size to {batch_size}.") start_time = end_time - batch_size if start_time <= stop_time: start_time = stop_time batch_size = end_time - start_time loop_done = True logger.info(f"Running query between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.") if timestamp_field != 'TimeGenerated': batch_query = (f"{query} | where {timestamp_field} between (todatetime('{start_time}') .. todatetime('{end_time}'))") response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(tg_start_time, tg_end_time), timeout=timeout) else: response = client.query_workspace(workspace_id=law_id, query=query, timespan=(start_time, end_time), timeout=timeout) if response.status == LogsQueryStatus.SUCCESS: data = response.tables else: error = response.partial_error raise Exception(error.details[0]["innererror"]) file_prefix = f"{export_prefix}_{start_time.strftime(time_format)}" for table in data: df = pd.DataFrame(data=table.rows, columns=table.columns) if df.shape[0] > 0: write_to_file(df, export_path, file_prefix, export_format) else: logger.info(f"No data returned for {start_time.strftime(time_format)} and {end_time.strftime(time_format)}, skipping writing to disk.") batch_count += batch_size percent_complete_previous = percent_complete percent_complete = round((batch_count / time_range) * 100) logger.info(f"Received {df.shape[0]} rows of data and wrote to {file_prefix}.{export_format.lower()}. Percent Complete: {percent_complete}") status_queue.put({'job_name': job_name, 'event': 'progress_update', 'message': (percent_complete - percent_complete_previous)}) rows_returned += int(df.shape[0]) runs_without_error_count += 1 end_time = start_time + timedelta(microseconds=-1) time.sleep(delay) logger.info(f"Finished exporting {rows_returned} records from Log Analytics. Percent Complete: 100") status_queue.put({'job_name': job_name, 'event': 'progress_update', 'message': (100 - percent_complete)}) close_logger(logger) return ({'job_name': job_name, 'status': 'success', 'rows_returned_total': rows_returned}) except Exception as err: if "Response ended prematurely" in str(err): logger.warning(f"Response ended prematurely, retrying.") logger.debug(f"{type(err)} {err}") elif ("A recognition error occurred in the query") in str(err) or "A semantic error occurred" in str(err) or "The requested path does not exist" in str(err): logger.error(f"There is likely an error in the query or the workspace ID. {type(err)} {err}") return ({'job_name': job_name, 'status': 'error'}) elif ("Maximum response size of 100000000 bytes exceeded" in str(err) or 'The results of this query exceed the set limit of 64000000 bytes' in str(err) or 'The results of this query exceed the set limit of 500000 records' in str(err)): runs_without_error_count = 0 if batch_size == min_batch_size: status_queue.put({'job_name': job_name, 'event': 'skipped_batch', 'message': f"{start_time.strftime(time_format)} - {end_time.strftime(time_format)}"}) logger.warning(f"Results cannot be returned in the specified minimum batch size. Skipping batch between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.") logger.debug(f"{type(err)} {err}") batch_count += batch_size end_time = start_time + timedelta(microseconds=-1) loop_done = False else: batch_size = batch_size / 2 if batch_size < min_batch_size: batch_size = min_batch_size logger.info(f"Request was too large, reduced batch size to: {batch_size}.") logger.debug(f"{type(err)} {err}") loop_done = False else: logger.error(f"Unhandled Error: {type(err)} {err}") error_count += 1 if error_count > max_retries: logger.error("Max number of retries reached, exiting.") return ({'job_name': job_name, 'status': 'error'}) def log_result(result): global completed_jobs global failed_jobs if result['status'] == 'success': completed_jobs.append(result) else: logger.error(f"{result['job_name']} has failed. Please check log file for details.") failed_jobs.append(result) def log_error(error): logger.error(error) def logger_process(queue, job_directory, log_level): message: logging.LogRecord logger = logging.getLogger('logger_process') logger.addHandler(logging.FileHandler(f"{job_directory}/logs.log")) logger.handlers[0].setLevel(log_level) while True: message = queue.get() if message is None: break if message.levelno >= 40: status_queue.put({'job_name': message.processName, 'event': 'error', 'message': message}) logger.handle(message) close_logger(logger) def close_logger(logger): for handler in logger.handlers: logger.removeHandler(handler) handler.close() def get_workspace(): workspace_id = read_config_values('config.json') if workspace_id == None or USE_DEFAULT_LAW_ID == False: if LAW_ID != "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx": law_id = LAW_ID else: raise Exception("Please specify a valid Log Analyics workspace ID in the Parameters cell.") else: law_id = workspace_id return law_id def make_directory(): if not os.path.exists(OUTPUT_DIRECTORY): os.makedirs(OUTPUT_DIRECTORY) job_directory = f"{OUTPUT_DIRECTORY}/{datetime.now().strftime(time_format)}" os.mkdir(job_directory) return job_directory clear_output(wait=True) job_directory = make_directory() if JOBS > 64: JOBS = 64 ranges = get_time_ranges(start_time=START_TIME, end_time=END_TIME, number_of_ranges=JOBS ) law_id = get_workspace() credential = DefaultAzureCredential() client = LogsQueryClient(credential) pbar = tqdm(total=JOBS * 100, leave=True, position=0, desc=f"Splitting query into {JOBS} jobs for parallel processing", postfix={'Errors': 0, 'Skipped Batches': 0}) with Manager() as manager: status_queue = manager.Queue() log_queue = manager.Queue() last_queue_time = datetime.now() with Pool() as pool: logger = define_logger(name=current_process().name, log_level=LOG_LEVEL) logger.info(f"Starting logger process.") pool_logger_results = pool.apply_async(logger_process, args=(log_queue, job_directory, LOG_LEVEL), error_callback=log_error) logger.info(f"Starting {len(ranges)} query jobs between {START_TIME.strftime(time_format)} and {END_TIME.strftime(time_format)}.") pool_export_results = [pool.apply_async(export_log_analytics_data, [law_id, QUERY, i.start_time, i.end_time, BATCH_SIZE, i.name, status_queue, log_queue, MIN_BATCH_SIZE, client, job_directory, OUTPUT_FILE_PREFIX, AUTO_BATCH, OUTPUT_FORMAT, TIMEOUT, TIMESTAMP_FIELD, TG_START_TIME, TG_END_TIME, LOG_LEVEL], callback=log_result, error_callback=log_error) for i in ranges] while (len(completed_jobs) + len(failed_jobs)) < JOBS or not status_queue.empty(): if not status_queue.empty(): item = status_queue.get() last_queue_time = datetime.now() match item['event']: case 'progress_update': pbar.update(item['message']) case 'skipped_batch': skipped_batches.append(item) pbar.set_postfix(ordered_dict={'\033[91mErrors': len(errors), 'Skipped Batches': len(skipped_batches)}) case 'error': errors.append(item) pbar.set_postfix(ordered_dict={'\033[91mErrors': len(errors), 'Skipped Batches': len(skipped_batches)}) if datetime.now() - last_queue_time > timedelta(minutes=TIMEOUT): log_message = f"No input received from running job(s) for more than {TIMEOUT} minutes, check log for errors. Exiting." logger.error(log_message) pbar.set_description(log_message) break if len(completed_jobs) > 0: log_message = f"Completed export of {sum([item['rows_returned_total'] for item in completed_jobs])} records to {job_directory}/." logger.info(log_message) pbar.set_description(log_message) else: pbar.set_description_str(f"No jobs completed successfully. Please check log file in {job_directory} for details.") log_queue.put(None) pool.close() pool.join() close_logger(logger) time.sleep(2) pbar.clear() pbar.close() time.sleep(2) if skipped_batches: print("\nThe below batches were skipped. Try lowering the MIN_BATCH_SIZE parameter or reduce the size of the dataset. Review log file for more details.\n") for item in skipped_batches: print(item['message']) if errors: print("\nThe below errors were encountered. Review log file for more details.\n") for item in errors: print(item['message'].message)

4. Combine Files (Optional)

Merges rows from individual data files into one or more files based on the COMBINE_MAX_ROWS parameter. Ensure you have enough memory allocated in your compute instance to account for the size of the exported dataset.

def combine_files(files, output_prefix, job_directory, output_format, combine_format, sort_column, sort, max_rows=50000): export_path = f'{job_directory}/combined' if not os.path.exists(export_path): os.mkdir(export_path) if output_format.lower() == 'csv': df = [pd.read_csv(file, low_memory=False) for file in tqdm(files, position=0, desc="Reading files")] if output_format.lower() == 'parquet': df = [pd.read_parquet(file) for file in tqdm(files, position=0, desc="Reading files")] file_index = 1 row_index = 0 pbar_combine = tqdm(leave=True, position=0, desc="Concatenating files, please wait") df = pd.concat(df, ignore_index=True) if sort: df.sort_values(by=[sort_column], inplace=True) total_rows = df.shape[0] pbar_combine.total = total_rows while row_index < total_rows: pbar_combine.set_description(f'Writing max of {max_rows} rows to each file') if combine_format.lower() == 'csv': df[row_index:(row_index + max_rows)].to_csv(f'{export_path}/{output_prefix}_combined{file_index}.csv', index=False) if combine_format.lower() == 'parquet': df[row_index:(row_index + max_rows)].to_parquet(f'{export_path}/{output_prefix}_combined{file_index}.parquet', index=False) row_index += (max_rows) file_index += 1 if row_index > total_rows: pbar_combine.update(total_rows - (row_index - (max_rows))) pbar_combine.set_description(f'Completed combining {total_rows} rows into {file_index - 1} file(s), located in {export_path}/.') else: pbar_combine.update(max_rows) pbar_combine.clear() pbar_combine.close() clear_output(wait=True) if OUTPUT_FORMAT.lower() in ('csv', 'parquet'): files = glob.glob(job_directory + '/*.{}'.format(OUTPUT_FORMAT.lower())) if TIMESTAMP_FIELD == 'TimeGenerated': sort_column = TIMESTAMP_FIELD else: sort_column = TIMESTAMP_FIELD if files: combine_files(files=files, output_prefix=OUTPUT_FILE_PREFIX, output_format=OUTPUT_FORMAT, job_directory=job_directory, combine_format=COMBINE_FORMAT, max_rows=COMBINE_MAX_ROWS, sort_column=sort_column, sort=COMBINE_SORT) else: print("No files found.") else: print("Please set OUTPUT_FORMAT parameter value to either CSV or PARQUET. Exiting.")

5. Delete Data (Optional)

Run the below cell to DELETE all run data including logs and data files.

import shutil from IPython.display import clear_output confirmation = input(f"Are you sure you want to delete the directory {OUTPUT_DIRECTORY} and all of its contents? (Y/N)") clear_output(wait=True) if confirmation.lower() in ('y', 'yes'): try: print("Deleting data...") shutil.rmtree(OUTPUT_DIRECTORY) print("Data has been deleted.") except Exception as err: print(f"Error deleting data: {err}") else: print('Operation has been aborted.')