Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Azure
GitHub Repository: Azure/Azure-Sentinel-Notebooks
Path: blob/master/tutorials-and-examples/example-notebooks/mp_data.py
3253 views
1
# -------------------------------------------------------------------------
2
# Copyright (c) Microsoft Corporation. All rights reserved.
3
# Licensed under the MIT License. See License.txt in the project root for
4
# license information.
5
# --------------------------------------------------------------------------
6
"""Demo QueryProvider."""
7
from pathlib import Path
8
import pickle
9
from typing import Any, Iterable
10
from time import sleep
11
12
import pandas as pd
13
14
15
def read_pd_df(data_file, query_name):
16
"""Read DataFrame from file."""
17
if not Path(data_file).is_file():
18
raise FileNotFoundError(
19
f"Data file {data_file} for query {query_name} not found."
20
)
21
22
if data_file.lower().endswith("csv"):
23
return pd.read_csv(
24
data_file, infer_datetime_format=True, parse_dates=["TimeGenerated"]
25
)
26
return pd.read_pickle(data_file)
27
28
29
class TILookupDemo:
30
"""TILookup demo class"""
31
32
_DATA_DEFS = {
33
"ipv4": "data/ti_results_ipv4.pkl",
34
"url": "data/ti_results_url.pkl",
35
}
36
37
def lookup_ioc(self, ioc_type, **kwargs):
38
"""Lookup single IoC."""
39
sleep(1)
40
return read_pd_df(self._DATA_DEFS.get(ioc_type), ioc_type)
41
42
@staticmethod
43
def result_to_df(results):
44
"""Convert IoC results to DataFrame."""
45
if isinstance(results, pd.DataFrame):
46
return results
47
return pd.DataFrame()
48
49
50
class GeoLiteLookupDemo:
51
"""GeoLitLookup demo class."""
52
53
_DATA_DEFS = {
54
"ip_locs": "data/ip_locations.pkl",
55
}
56
57
def lookup_ip(
58
self,
59
ip_address: str = None,
60
ip_addr_list: Iterable = None,
61
ip_entity: Any = None,
62
):
63
"""Look up location."""
64
del ip_address, ip_addr_list, ip_entity
65
with open(self._DATA_DEFS["ip_locs"], "rb") as iploc_file:
66
ip_locs = pickle.load(iploc_file)
67
return str(ip_locs), ip_locs
68
69
70
_ASN_DATA = pd.read_pickle("data/az_whois.df.pkl")
71
72
73
def get_whois_info_demo(ip_addr, show_progress=False):
74
"""Lookup Whois data from dataframe."""
75
sleep(0.02)
76
if show_progress:
77
print(".", end="")
78
if "ExtASN" not in _ASN_DATA.columns:
79
return "Unknown", {}
80
match_row = _ASN_DATA[_ASN_DATA["AllExtIPs"] == ip_addr]
81
asn_text = match_row["ExtASN"].unique()[0]
82
if isinstance(asn_text, tuple):
83
return asn_text[0], {}
84
return asn_text, {}
85
86