Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/example-notebooks/M365 Defender - APIs ep3.ipynb
3253 views
Kernel: Python 3.8.5 64-bit (conda)
# Declare the required imports import json import time import pandas as pd import numpy as np import matplotlib.pyplot as plt import urllib.request import urllib.parse import datetime # Define the variables to access the Microsoft Threat protection API using Azure APP # Replace with your Tenant ID tenantId = '' # Replace with your Application ID appId = '' # Replace with the Secret for your Application appSecret = '' # Replace with the input JSON file with the queries you want to run query_inputfile_fullpath = "C:\\temp\\singleQUery.json" #Define the folder to save the query results as CSV query_outputfolder_fullpath = "C:\\Temp\\output" # Define the Teams channel teamsurl = "https://microsoft.webhook.office.com/webhookb2/....."
def app_auth(): url = "https://login.windows.net/%s/oauth2/token" % (tenantId) # Login OAUTH2 page resourceAppIdUri = 'https://api.security.microsoft.com' # M365 Api body = { 'resource' : resourceAppIdUri, 'client_id' : appId, 'client_secret' : appSecret, 'grant_type' : 'client_credentials' } data = urllib.parse.urlencode(body).encode("utf-8") req = urllib.request.Request(url, data) response = urllib.request.urlopen(req) jsonResponse = json.loads(response.read()) aadToken = jsonResponse["access_token"] # Access token for the next hour return aadToken # Declare a function to query the M365 Defender API def exec_mtp_query(query): url = "https://api.security.microsoft.com/api/advancedhunting/run" #M365 Advanced Hunting API headers = { 'Content-Type' : 'application/json', 'Accept' : 'application/json', 'Authorization' : "Bearer " + aadToken } data = json.dumps({ 'Query' : query }).encode("utf-8") req = urllib.request.Request(url, data, headers) response = urllib.request.urlopen(req) jsonResponse = json.loads(response.read()) schema = jsonResponse["Schema"] results = jsonResponse["Results"] df = pd.DataFrame(results) return df # Declaring a function to calculate the elapsed time during the query def getTimeDifferenceFromNow(TimeStart, TimeEnd): timeDiff = TimeEnd - TimeStart return timeDiff.total_seconds() / 60 # This function will send data to teams def send_message_to_teams(messagetosend): import requests payload = { "text": messagetosend } headers = { 'Content-Type': 'application/json' } response = requests.post(teamsurl, headers=headers, data=json.dumps(payload)) if response.text == "1": print("Message Successfully posted in teams") else: print("Message post in teams failed") return response
TimeStart = datetime.datetime.now() TimeEnd = datetime.datetime.now() aadToken = app_auth() with open(query_inputfile_fullpath) as json_file: data = json.load(json_file) i = 1 for p in data: ### str_per = (i/len(data))*100 # check if we need a new token Elapsed_Minutes = getTimeDifferenceFromNow(TimeStart, TimeEnd) if round(Elapsed_Minutes) < 45: print('Using existing AzAD token') else: print('Generate a new AzAD token') aadToken = app_auth() TimeStart = datetime.datetime.now() print('\x1b[0;30;44m' + "Running",i,"of",len(data)," queries, we are at the ",'%.2f' %str_per,"% of processing rules"+ '\x1b[0m') # Set the Query name from JSON source file query_name = p['Name'] # Making all the columns visible pd.set_option('display.max_columns', None) # Expanding the output of the display to be more visible pd.set_option('max_colwidth', 200) print('\x1b[6;30;42m' + "Running the",query_name, "query" + '\x1b[0m') # Define the query from JSON source file kql_query = p['Query'] if len(kql_query) != 0: print('\x1b[0;30;47m' + kql_query + '\x1b[0m') try: query_results = exec_mtp_query(kql_query) except Exception as query_error: print(query_error) else: if len(query_results) == 0: # wait 4 second to don't exeeed the 15 queries per 60 seconds limit time.sleep(4) print('\x1b[6;30;42m' + "No results for the",query_name, "query" + '\x1b[0m') else: if len(query_results) == 10000: print('\x1b[6;30;42m' + "The results for the",query_name, "query are exeeding the 10,000 limits" + '\x1b[0m') print('\x1b[6;30;42m' + "Results for the",query_name, "query" + '\x1b[0m') print('\x1b[6;30;42m' + "The query returns",len(query_results), "results" + '\x1b[0m') messagetosend = 'The query: ' +query_name + ', returns ' + str(len(query_results)) + ' results' if i == 1: QueryResultsSummary = pd.DataFrame([[query_name,len(query_results)]], columns = ['QueryName','QueryResults']) else: QueryResultsSummary_temp = pd.DataFrame([[query_name,len(query_results)]], columns = ['QueryName','QueryResults']) QueryResultsSummary = QueryResultsSummary.append(QueryResultsSummary_temp) display(query_results.head()) # save to Tsv try: file_out = query_outputfolder_fullpath+'\\'+query_name+'.tsv' except Exception as file_export_error: print(file_export_error) else: query_results.to_csv(file_out, sep="\t") i = i +1 TimeEnd = datetime.datetime.now() # Send message summary to Teams messagetosend = QueryResultsSummary.to_html(index = False) #send_message_to_teams(messagetosend)