Path: blob/master/tutorials-and-examples/example-notebooks/mp_data.py
3253 views
# -------------------------------------------------------------------------1# Copyright (c) Microsoft Corporation. All rights reserved.2# Licensed under the MIT License. See License.txt in the project root for3# license information.4# --------------------------------------------------------------------------5"""Demo QueryProvider."""6from pathlib import Path7import pickle8from typing import Any, Iterable9from time import sleep1011import pandas as pd121314def read_pd_df(data_file, query_name):15"""Read DataFrame from file."""16if not Path(data_file).is_file():17raise FileNotFoundError(18f"Data file {data_file} for query {query_name} not found."19)2021if data_file.lower().endswith("csv"):22return pd.read_csv(23data_file, infer_datetime_format=True, parse_dates=["TimeGenerated"]24)25return pd.read_pickle(data_file)262728class TILookupDemo:29"""TILookup demo class"""3031_DATA_DEFS = {32"ipv4": "data/ti_results_ipv4.pkl",33"url": "data/ti_results_url.pkl",34}3536def lookup_ioc(self, ioc_type, **kwargs):37"""Lookup single IoC."""38sleep(1)39return read_pd_df(self._DATA_DEFS.get(ioc_type), ioc_type)4041@staticmethod42def result_to_df(results):43"""Convert IoC results to DataFrame."""44if isinstance(results, pd.DataFrame):45return results46return pd.DataFrame()474849class GeoLiteLookupDemo:50"""GeoLitLookup demo class."""5152_DATA_DEFS = {53"ip_locs": "data/ip_locations.pkl",54}5556def lookup_ip(57self,58ip_address: str = None,59ip_addr_list: Iterable = None,60ip_entity: Any = None,61):62"""Look up location."""63del ip_address, ip_addr_list, ip_entity64with open(self._DATA_DEFS["ip_locs"], "rb") as iploc_file:65ip_locs = pickle.load(iploc_file)66return str(ip_locs), ip_locs676869_ASN_DATA = pd.read_pickle("data/az_whois.df.pkl")707172def get_whois_info_demo(ip_addr, show_progress=False):73"""Lookup Whois data from dataframe."""74sleep(0.02)75if show_progress:76print(".", end="")77if "ExtASN" not in _ASN_DATA.columns:78return "Unknown", {}79match_row = _ASN_DATA[_ASN_DATA["AllExtIPs"] == ip_addr]80asn_text = match_row["ExtASN"].unique()[0]81if isinstance(asn_text, tuple):82return asn_text[0], {}83return asn_text, {}848586