Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/machine-learning-notebooks/MasqueradingProcessNameAnomaly.ipynb
3250 views
Kernel: Python 3.8 - AzureML

Masquerading Process Name Anomaly Algorithm

Notebook Version: 1.0

Python Version: Python 3.8

Required Packages: azure_sentinel_utilities, damerauLevenshtein, azureml-synapse

Platforms Supported: Azure Synapse Workspace, Azure Sentinel, Azure Log Analytics Workspace, Storage Account, Azure Machine Learning Notebooks connected to Azure Synapse Workspace

Data Source Required: Yes

Data Source: SecurityEvents

Spark Version: 3.1 or above

Description

This notebook demonstrates how to apply custom machine learning algorithms to data in Azure Sentinel. It showcases a Masquerading Process Name anomaly algorithm, which looks for Windows process creation events for processes whose names are similar to known normal processes. It is a very common attack vector for malicious processes to masquerade as known normal processes by having names similar to known normal ones but different by a single character. Since these are easy to miss when simply looked at, they can succeed at running malicious code on your machine. Examples of such malicious processes are scvhost.exe, svch0st.exe, etc. -> Known normal process here was svchost.exe.

The data used here is from the SecurityEvents table with EventID = 4688. These correspond to process creation events from Windows machines.

You will have to export this data from your Log Analytics workspace into a storage account. Instructions for this LA export mechanism can be found here: LA export mechanism.

Here is a Blog explaining data export

Data is then loaded from this storage account container and the results are published to your Log Analytics resource.

This notebook can be run either from the AML platform or directly off of Synapse. Based on what you choose, the setup will differ. Please follow either section A or B, that suits you, for setup before running the main pyspark code.

A. Running on AML

You will need to configure your environment to use a Synapse cluster with your AML workspace. For this, you require to setup the Synapse compute and attach the necessary packages/wheel files. Then, for the rest of the code, you need to convert to using Synapse language by marking each cell with a %%synapse header.

Steps:

  1. Install AzureML Synapse package on the AML compute to use spark magics

  2. Configure AzureML and Azure Synapse Analytics

  3. Attach the required packages and wheel files to the compute.

  4. Start Spark session

from azureml.core import Workspace, LinkedService

1. Install AzureML Synapse package on the AML compute to use spark magics

You will have to setup the AML compute that is attached to your notebook with some packages so that the rest of this code can run properly.

# Run the following line and confirm that 'azureml-synapse' is not installed. %pip list
# Now run the following line and then restart the kernel/compute so that the package is installed. %pip install azureml-synapse
# Rerun the following line and confirm that 'azureml-synapse' is installed. %pip list

2. Configure AzureML and Azure Synapse Analytics

Please use notebook Configurate Azure ML and Azure Synapse Analytics to configure environment.

The notebook will configure existing Azure synapse workspace to create and connect to Spark pool. You can then create linked service and connect AML workspace to Azure Synapse workspaces. You can skip point 6 which exports data from Log Analytics to Datalake Storage Gen2 because you have already set up the data export to the storage account above.

Note: Specify the input parameters in below step in order to connect AML workspace to synapse workspace using linked service.

amlworkspace = "<aml workspace name>" # fill in your AML workspace name subscription_id = "<subscription id>" # fill in your subscription id resource_group = '<resource group of AML workspace>' # fill in your resource groups for AML workspace linkedservice = '<linked service name>' # fill in your linked service created to connect to synapse workspace

Authentication to Azure Resources:

In this step we will connect aml workspace to linked service connected to Azure Synapse workspace

# Get the aml workspace aml_workspace = Workspace.get(name=amlworkspace, subscription_id=subscription_id, resource_group=resource_group) # Retrieve a known linked service linked_service = LinkedService.get(aml_workspace, linkedservice)

3. Attach the required packages and wheel files to the compute.

You will have to setup the spark pool that is attached to your notebook with some packages so that the rest of this code can run properly.

Please follow these steps:

  1. On the AML Studio left menu, navigate to Linked Services

  2. Click on the name of the Link Service you want to use

  3. Select Spark pools tab

  4. Click the Spark pool you want to use.

  5. In Synapse Properties, click the Synapse workspace. It will open the workspace in a new tab.

  6. Click on 'Manage' in the left window.

  7. Click on 'Apache Spark pools' in the left window.

  8. Select the '...' in the pool you want to use and click on 'Packages'.

  9. Now upload the following two files in this blade.

a. Create a requirements.txt with the following line in it and upload it to the Requirements section

fastDamerauLevenshtein

b. Download the azure_sentinel_utilities whl package from Repo

First upload this package in the 'Workspace packages' in the left tab of the original blade.

c. Then select this package from there in this tab.

4. Start Spark session

Enter your Synapse Spark compute below. To find the Spark compute, please follow these steps:

  1. On the AML Studio left menu, navigate to Linked Services

  2. Click on the name of the Link Service you want to use

  3. Select Spark pools tab

  4. Get the Name of the Spark pool you want to use.

synapse_spark_compute = input('Synapse Spark compute:')
# Start spark session %synapse start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute

B. Running directly on Synapse

You will need to attach the required packages and wheel files to the cluster you intend to use with this notebook. Follow Step 3 above to complete this.

Common Code

From here on below, all the steps are the same for both AML and Synapse platforms. The main difference is, if you have setup through AML then pre-pend each pyspark block with the synapse header %%synapse. For Synapse runs, don't add that header.

  1. One-time: Set credentials in KeyVault so the notebook can access

    • Create KeyVault

    • Store the following secrets in the KeyVault

      • Storage Account connection string: the keyName should be 'saConnectionString'

      • Log Analytics workspaceSharedKey: the keyName should be 'wsSharedKey'

      • Log Analytics workspaceId: the keyName should be 'wsId'

      • Log Analytics workspaceResourceId: the keyName should be 'wsResourceId'

    • Add the KeyVault as a linked service to your Azure Synapse workspace

  2. Ensure the settings in the cell below are filled in.

%%synapse from azure_sentinel_utilities.azure_storage import storage_blob_manager from azure_sentinel_utilities.log_analytics import log_analytics_client import re import datetime as dt import time from pyspark.sql import functions as F, types as T from pyspark.sql.window import Window from fastDamerauLevenshtein import damerauLevenshtein import random import string

These are some customizable variables which are used further in the code.

%%synapse frequentThreshold = 0.8 # If the percentile of a process's creation count is above this threshold, it is considered a frequent and normal process. infrequentThreshold = 0.2 # If the percentile of a process's creation count is lower than this threshold, it is considered an infrequent and possibly malicious process. levenDistThreshold = 0.85 # Higher the levenshtein distance, more similar are the two sequences. This threshold will help you select only the very similar processes and remove the noise.

Making Connections to the Storage Account and KeyVaults for user credentials

%%synapse #Log Analytics WorkSpace (Sentinel) to write the results workspaceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # wks_guid workspaceSharedKey = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsSharedKey', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') workspaceResourceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsResourceId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # eg: /subscriptions/<sub_guid>/resourcegroups/<rg_name>/providers/microsoft.operationalinsights/work #extract storage account and key from connection string connectionString = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'saConnectionString', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') print("Connection String to your storage account is : ", connectionString) keyPattern = 'DefaultEndpointsProtocol=(\w+);AccountName=(\w+);AccountKey=([^;]+);' match = re.match(keyPattern, connectionString) storageAccount = match.group(2) storageKey = match.group(3) print("Storage Account is : ", storageAccount) print("Storage Key is : ", storageKey) containerName = "am-securityevent" # This name is fixed for security events basePath = "WorkspaceResourceId={workspaceResourceId}".format(workspaceResourceId=workspaceResourceId) print("BasePath is : ", basePath) startTime = dt.datetime.now() - dt.timedelta(days=1) endTime = dt.datetime.now() - dt.timedelta(days=0) startTimeStr = startTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p") print("Start Time of Algo run is : ", startTime) endTimeStr = endTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p") print("End Time of Algo run is : ", endTime)

This cell defines the helper functions.

  1. calcDist() -> calculates the Levenshtein distance. This is a measure of the difference between two sequences by calculating the edit distance. It takes into account the number of different characters in the sequence as well as the length of the sequences. If the extensions of both the processes are the same, then it excludes the extension when calculating the distance.

  2. getRandomTimeStamp() -> calculates a random timestamp. This is added to the synthetically created process events.

  3. getKnownNormalProcs() -> creates a hardcoded list of known normal processes which malicious processes may masquerade as.

  4. getSyntheticMaliciousProcs() -> creates a list of potentially malicious processes by modifying a single random letter of the normal processes to form new names.

  5. getSyntheticEvents() -> synthetically creates a list of 4688 events. It gets the known normal and synthetically created malicious process names from previous functions and creates entire events using time stamp and process path.

%%synapse def calcDist(one, two): oneDot = -1 twoDot = -1 if(one is not None and two is not None): oneDot = one.rfind('.') twoDot = two.rfind('.') if((oneDot != -1) and (twoDot != -1)): oneEnd = one[oneDot+1:] twoEnd = two[twoDot+1:] if(oneEnd == twoEnd): one = one[:oneDot] two = two[:twoDot] return damerauLevenshtein(one, two) return 0.0 calcDistUdf = F.udf(calcDist, T.FloatType()) def getRandomTimeStamp(): input_time_format = '%m/%d/%Y, %I:%M:%S.%f %p' output_time_format = '%m/%d/%Y, %I:%M:%S.000 %p' randAdd = random.random() stime = time.mktime(time.strptime(startTimeStr, input_time_format)) etime = time.mktime(time.strptime(endTimeStr, input_time_format)) ptime = stime + randAdd * (etime - stime) return time.strftime(output_time_format, time.localtime(ptime)) getRandomTimeStampUdf = F.udf(getRandomTimeStamp, T.StringType()) def getListGoodProcs(): return [ "svchost", "csrss", "smss", "services", "winlogon", "wininit", "lsass", "spoolsv", "conhost", "powershell", "init", "bash", "avahi-daemon", "gnome-session", "getty", "acpid", "dbus-daemon", "dbus-launch", "networkmanager", "explorer", "more.com", "mode.com", "nbtstat", "netstat", "cscript", "cnscript", "tracert", "tracerpt", "systeminfo", "system_info", "aitagent", "adtagent", "wininit", "wininst", "userinit", "userinst", "dnscmd", "dfscmd", "nslookup", "nblookup" ] def getListKnownProcs(): goodProcs = getListGoodProcs() goodProcs = [s + ('.exe' if not '.' in s else '') for s in goodProcs] df = spark.createDataFrame(goodProcs, T.StringType()) df = df.withColumnRenamed("value", "Process") return df def getSyntheticMaliciousProcs(): badProcs = [] goodProcs = getListGoodProcs() for process in goodProcs: length = len(process) randNum = random.randint(1, length-2) # not changing first or last letter because that is easier to spot # get other random integer randLetter = random.choice(string.ascii_lowercase) # substitute original letter with random letter temp = list(process) temp[randNum] = randLetter badProcs = badProcs + ["".join(temp)] badProcs = [s + ('.exe' if not '.' in s else '') for s in badProcs] badProcs = badProcs + ["scvhost.exe", "svch0st.exe"] # adding known masquerading processes df = spark.createDataFrame(badProcs, T.StringType()) df = df.withColumnRenamed("value", "Process") return df def getSyntheticEvents(typeOfEvent): processPath = "" numExplode = 1 if(typeOfEvent == "normal"): processPath = "C:\Windows\System32\\" df = getListKnownProcs() numExplode = 20 elif(typeOfEvent == "malicious"): processPath = "C:\Windows\Temp\\" df = getSyntheticMaliciousProcs() df = df.withColumn("NewProcessName", F.concat(F.lit(processPath), F.col("Process"))) df = df.withColumn("NumExplode", F.lit(numExplode)) df = df.withColumn("EventID", F.lit("4688")) new_df = df.withColumn('NumExplode', F.expr('explode(array_repeat(NumExplode,int(NumExplode)))')).drop("NumExplode") new_df = new_df.withColumn("TimeGenerated", getRandomTimeStampUdf()) new_df = new_df.withColumn("From", F.lit("Hardcoded")) new_df = new_df.select("NewProcessName", "Process", "TimeGenerated", "From") return new_df

Next, we define the schema of the input and get the raw customer 4688 events. We are using the following details: EventID, NewProcessName, Process, TimeGenerated.

%%synapse def security_event_schema(): return T.StructType([ T.StructField(name = "EventID", dataType = T.StringType(), nullable = True), T.StructField(name = "NewProcessName", dataType = T.StringType(), nullable = True), T.StructField(name = "Process", dataType = T.StringType(), nullable = True), T.StructField(name = "TimeGenerated", dataType = T.StringType(), nullable = True), ]) blobManager = storage_blob_manager(connectionString) raw_df = blobManager.get_raw_df(startTime, endTime, containerName, basePath, security_event_schema(), blobManager.get_blob_service_client(connectionString)) final = raw_df.where(F.col("EventID") == "4688") final = final.withColumn("Process", F.lower("Process")) final = final.drop("EventID") final = final.withColumn("From", F.lit("User")) final = final.cache() print("There are ", final.count(), " events of type 4688 to process.") final.show()

Here we append synthetically created normal and malicious process creation events. This is being done to show performance of this algorithm by ensuring some masquerading process names are caught.

%%synapse normalEvents = getSyntheticEvents("normal") potentiallyMaliciousEvents = getSyntheticEvents("malicious") final = final.union(normalEvents).union(potentiallyMaliciousEvents) print("Count of SecurityEvents + Synthethically created 4688 events are : ", final.count())

We are comparing frequent to infrequent processes to decide maliciousness of a process.

The approach here is that we consider processes occuring more than 'frequentThreshold' percentile of the time as normal and those occuring less than 'infrequentThreshold' percentile of the time as potentially malicious. Those in the middle range are excluded from analysis because they fall in the grey area of being of relatively high popularity but falling below the first threshold.

The values of these thresholds can be customized by you based on your needs.

%%synapse groupProcess = final.groupBy("Process").count().sort(F.desc("count")) groupProcess = groupProcess.select("Process","count", F.percent_rank().over(Window.partitionBy().orderBy(groupProcess['count'])).alias("percent_rank")) groupProcess = groupProcess.sort(F.desc("percent_rank")) frequentProcess = groupProcess.where(F.col("percent_rank") >= frequentThreshold).select("Process") frequentProcess = frequentProcess.withColumnRenamed("Process", "frequentProcess") infrequentProcess = groupProcess.where(F.col("percent_rank") < infrequentThreshold).select("Process") infrequentProcess = infrequentProcess.withColumnRenamed("Process", "infrequentProcess") print("There are ", frequentProcess.count(), " normal processes in your data") print("There are ", infrequentProcess.count(), " potentially malicious processes in your data") print("Examples of some potentially malicious processes: ") infrequentProcess.show()

Next we find the Levenshtein distance between the normal and potentially malicious processes to check whether we have any masquerading processes.

%%synapse compare = frequentProcess.crossJoin(infrequentProcess) compare = compare.withColumn("Dist", calcDistUdf(F.col("frequentProcess"), F.col("infrequentProcess"))) print("Showing the Levenshtein distances between various processes") compare.show()

It is always useful to have the corresponding process paths from where the processes spawned to understand maliciousness of the process. This cell finds the paths of all the processes, for context. We also filter based on a threshold values which you can alter to better fit your criteria.

%%synapse frequentProcessWhole = compare.join(final, (final.Process == compare.frequentProcess), how = "left").drop("Process") frequentProcessWhole = frequentProcessWhole.withColumnRenamed("NewProcessName", "frequentProcessPath") frequentProcessWhole = frequentProcessWhole.withColumnRenamed("TimeGenerated", "frequentTimeGenerated") frequentProcessWhole = frequentProcessWhole.withColumnRenamed("From", "frequentFrom") infrequentProcessWhole = frequentProcessWhole.join(final, (final.Process == frequentProcessWhole.infrequentProcess), how = "left").drop("Process") infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("NewProcessName", "infrequentProcessPath") infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("TimeGenerated", "infrequentTimeGenerated") infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("From", "infrequentFrom") infrequentProcessWholeFiltered = infrequentProcessWhole.where(F.col("Dist") > levenDistThreshold) print("Your anomalies: ") (infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "User") & (F.col("infrequentFrom") == "User"))).show() print("Hardcoded anomalies examples") (infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "Hardcoded") | (F.col("infrequentFrom") == "Hardcoded"))).show()

To remove noise, we are extracting only the process names and path information of the potentially malicious process names.

%%synapse print("Showing potential anomalies after removing noise") (infrequentProcessWholeFiltered.drop("frequentTimeGenerated", "infrequentTimeGenerated").distinct()).show()

Sending results to Log Analytics

%%synapse def escape_str(str): return str.replace('\\','\\\\') escape_strUdf = F.udf(escape_str, T.StringType()) def send_results_to_log_analytics(df_to_la): # The log type is the name of the event that is being submitted. This will show up under "Custom Logs" as log_type + '_CL' log_type = 'MasqueradingProcessNameResult' df_to_la = df_to_la.withColumn('timestamp', F.current_timestamp()) # concatenate columns to form one json record json_records = df_to_la.withColumn('json_field', F.concat(F.lit('{'), F.lit(' \"TimeStamp\": \"'), F.from_unixtime(F.unix_timestamp(F.col("timestamp")), "y-MM-dd'T'hh:mm:ss.SSS'Z'"), F.lit('\",'), F.lit(' \"NormalProcess\": \"'), escape_strUdf(F.col('frequentProcess')), F.lit('\",'), F.lit(' \"PotentiallyMaliciousProcess\": \"'), escape_strUdf(F.col('infrequentProcess')), F.lit('\",'), F.lit(' \"AnomalyScore\":'), F.col('Dist'), F.lit('}') ) ) # combine json record column to create the array json_body = json_records.agg(F.concat_ws(", ", F.collect_list('json_field')).alias('body')) if len(json_body.first()) > 0: json_payload = json_body.first()['body'] json_payload = '[' + json_payload + ']' payload = json_payload.encode('utf-8') return log_analytics_client(workspaceId, workspaceSharedKey).post_data(payload, log_type) else: return "No json data to send to LA"
%%synapse print("Sending results to LogAnalytics") print("Sending ", infrequentProcessWholeFiltered.count(), " results to Log Analytics") send_results_to_log_analytics(infrequentProcessWholeFiltered) print("Done")
# Run the following line if you have been running through AML %synapse stop