diff --git a/docs/source/data_acquisition/MordorData.rst b/docs/source/data_acquisition/MordorData.rst index f079a70f6..43d417350 100644 --- a/docs/source/data_acquisition/MordorData.rst +++ b/docs/source/data_acquisition/MordorData.rst @@ -301,7 +301,7 @@ for API details. For more explanation of the data items shown in the browser, please see the `Mordor GitHub repo `__ and the -`Threat Hunter Playbook `__ +`Threat Hunter Playbook `__ .. code:: ipython3 diff --git a/docs/source/visualization/MorphCharts.rst b/docs/source/visualization/MorphCharts.rst index 10823ff30..6f815194c 100644 --- a/docs/source/visualization/MorphCharts.rst +++ b/docs/source/visualization/MorphCharts.rst @@ -48,16 +48,16 @@ call display() and pass it the data, and chart name: morph.display(data=query_data, chart_name="SigninsChart") -This will format the data and chart configuration file and put them in a direcory named +This will format the data and chart configuration file and put them in a directory named 'morphchart_package' in the current working directory. It will also return an IFrame displaying http://morphcharts.com/. To load the charts within the IFrame click "Choose Files" under "Load Package" heading. Select the 'description.json' and -'query_data.csv' files under the 'morphchart_package' folder to load the customised charts. +'query_data.csv' files under the 'morphchart_package' folder to load the customized charts. Creating New Chart Templates ---------------------------- You can create a add new chart templates for use by this module. The best way to do this -is by creating a customized chart set using http://morphcharts.com/designer.html. Once complete +is by creating a customized chart set using http://morphcharts.com. Once complete save the chart, which will download a filed called 'description.json'. To add this as a template by creating a YAML file with the following format: @@ -70,4 +70,4 @@ by creating a YAML file with the following format: DescriptionFile: Then include the file in the msticpy/data/morph_charts folder and it will be discovered next time you -intialize an MorphCharts object. +initialize an MorphCharts object. diff --git a/msticpy/context/ip_utils.py b/msticpy/context/ip_utils.py index ea0429f41..fb32365c6 100644 --- a/msticpy/context/ip_utils.py +++ b/msticpy/context/ip_utils.py @@ -18,7 +18,7 @@ import warnings from functools import lru_cache from time import sleep -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union import httpx import pandas as pd @@ -320,20 +320,24 @@ def get_whois_df( """ del show_progress whois_data = ip_whois(data[ip_column].drop_duplicates()) - if not isinstance(whois_data, pd.DataFrame): - return data.assign(ASNDescription="No data returned") - data = data.merge( - whois_data, # type: ignore - how="left", - left_on=ip_column, - right_on="query", - suffixes=("", "_whois"), - ) - data[whois_col] = data[whois_data.columns].apply(lambda x: x.to_dict(), axis=1) - data[asn_col] = data["asn_description"] - if not all_columns: - return data.drop(columns=whois_data.columns) - return data + if ( + isinstance(whois_data, pd.DataFrame) + and not whois_data.empty + and "query" in whois_data.columns + ): + data = data.merge( + whois_data, # type: ignore + how="left", + left_on=ip_column, + right_on="query", + suffixes=("", "_whois"), + ) + data[whois_col] = data[whois_data.columns].apply(lambda x: x.to_dict(), axis=1) + data[asn_col] = data["asn_description"] + if not all_columns: + return data.drop(columns=whois_data.columns) + return data + return data.assign(ASNDescription="No data returned") @pd.api.extensions.register_dataframe_accessor("mp_whois") @@ -424,13 +428,13 @@ def ip_whois( rate_limit = len(ip) > 50 if rate_limit: print("Large number of lookups, this may take some time.") - whois_results = [] + whois_results: Dict[str, Any] = {} for ip_addr in ip: if rate_limit: sleep(query_rate) - whois_results.append( - _whois_lookup(ip_addr, raw=raw, retry_count=retry_count) - ) + whois_results[ip_addr] = _whois_lookup( + ip_addr, raw=raw, retry_count=retry_count + ).properties return _whois_result_to_pandas(whois_results) if isinstance(ip, (str, IpAddress)): return _whois_lookup(ip, raw=raw) @@ -529,51 +533,71 @@ def get_asn_from_ip( ip = ip.strip() query = f" -v {ip}\r\n" ip_response = _cymru_query(query) - keys = ip_response.split("\n")[0].split("|") + keys = ip_response.split("\n", maxsplit=1)[0].split("|") values = ip_response.split("\n")[1].split("|") return {key.strip(): value.strip() for key, value in zip(keys, values)} +class _IpWhoIsResult(NamedTuple): + """Named tuple for IPWhoIs Result.""" + + name: Optional[str] + properties: Dict[str, Any] = {} + + @lru_cache(maxsize=1024) def _whois_lookup( ip_addr: Union[str, IpAddress], raw: bool = False, retry_count: int = 5 -) -> Tuple: +) -> _IpWhoIsResult: """Conduct lookup of IP Whois information.""" if isinstance(ip_addr, IpAddress): ip_addr = ip_addr.Address asn_items = get_asn_from_ip(ip_addr.strip()) - ipwhois_result = (asn_items["AS Name"], {}) # type: ignore - ipwhois_result[1]["asn"] = asn_items["AS"] - ipwhois_result[1]["query"] = asn_items["IP"] - ipwhois_result[1]["asn_cidr"] = asn_items["BGP Prefix"] - ipwhois_result[1]["asn_country_code"] = asn_items["CC"] - ipwhois_result[1]["asn_registry"] = asn_items["Registry"] - ipwhois_result[1]["asn_date"] = asn_items["Allocated"] - ipwhois_result[1]["asn_description"] = asn_items["AS Name"] - if ipwhois_result[1]["asn_registry"] in _REGISTRIES: - registry_urls = _REGISTRIES[ipwhois_result[1]["asn_registry"]] - else: - return (None, None) - rdap_data = _rdap_lookup( - url=registry_urls["url"] + str(ip_addr), retry_count=retry_count + if asn_items and "Error: no ASN or IP match on line 1." not in asn_items: + ipwhois_result = _IpWhoIsResult(asn_items["AS Name"], {}) # type: ignore + ipwhois_result.properties["asn"] = asn_items["AS"] + ipwhois_result.properties["query"] = asn_items["IP"] + ipwhois_result.properties["asn_cidr"] = asn_items["BGP Prefix"] + ipwhois_result.properties["asn_country_code"] = asn_items["CC"] + ipwhois_result.properties["asn_registry"] = asn_items["Registry"] + ipwhois_result.properties["asn_date"] = asn_items["Allocated"] + ipwhois_result.properties["asn_description"] = asn_items["AS Name"] + registry_url = _REGISTRIES.get(asn_items.get("Registry", ""), {}).get("url") + if not asn_items or not registry_url: + return _IpWhoIsResult(None) + return _add_rdap_data( + ipwhois_result=ipwhois_result, + rdap_reg_url=f"{registry_url}{ip_addr}", + retry_count=retry_count, + raw=raw, ) - if rdap_data.status_code == 200: - rdap_data_content = rdap_data.json() - net = _create_net(rdap_data_content) - ipwhois_result[1]["nets"] = [net] - for link in rdap_data_content["links"]: - if link["rel"] == "up": - up_data_link = link["href"] - up_rdap_data = httpx.get(up_data_link) - up_rdap_data_content = up_rdap_data.json() - up_net = _create_net(up_rdap_data_content) - ipwhois_result[1]["nets"].append(up_net) - if raw: - ipwhois_result[1]["raw"] = rdap_data - elif rdap_data.status_code == 429: - sleep(5) - _whois_lookup(ip_addr) - else: + + +def _add_rdap_data( + ipwhois_result: _IpWhoIsResult, rdap_reg_url: str, retry_count: int, raw: bool +) -> _IpWhoIsResult: + """Add RDAP data to WhoIs result.""" + retries = 0 + while retries < retry_count: + rdap_data = _rdap_lookup(url=rdap_reg_url, retry_count=retry_count) + if rdap_data.status_code == 200: + rdap_data_content = rdap_data.json() + net = _create_net(rdap_data_content) + ipwhois_result.properties["nets"] = [net] + for link in rdap_data_content["links"]: + if link["rel"] == "up": + up_data_link = link["href"] + up_rdap_data = httpx.get(up_data_link) + up_rdap_data_content = up_rdap_data.json() + up_net = _create_net(up_rdap_data_content) + ipwhois_result.properties["nets"].append(up_net) + if raw: + ipwhois_result.properties["raw"] = rdap_data + break + if rdap_data.status_code == 429: + sleep(3) + retries += 1 + continue raise MsticpyConnectionError(f"Error code: {rdap_data.status_code}") return ipwhois_result @@ -588,42 +612,45 @@ def _rdap_lookup(url: str, retry_count: int = 5) -> httpx.Response: retry_count -= 1 if not rdap_data: raise MsticpyException( - "Rate limt exceeded - try adjusting query_rate parameter to slow down requests" + "Rate limit exceeded - try adjusting query_rate parameter to slow down requests" ) return rdap_data -def _whois_result_to_pandas(results: Union[str, List]) -> pd.DataFrame: +def _whois_result_to_pandas(results: Union[str, List, Dict]) -> pd.DataFrame: """Transform whois results to a Pandas DataFrame.""" - if isinstance(results, list): - new_results = [result[1] for result in results] - else: - new_results = results[1] # type: ignore - return pd.DataFrame(new_results) + if isinstance(results, dict): + return pd.DataFrame( + [result or {"query": idx} for idx, result in results.items()] + ) + raise NotImplementedError("Only dict type current supported for `results`.") def _find_address( entity: dict, ) -> Union[str, None]: # pylint: disable=inconsistent-return-statements """Find an orgs address from an RDAP entity.""" - if "vcardArray" in entity: - for vcard in entity["vcardArray"]: - if isinstance(vcard, list): - for vcard_sub in vcard: - if vcard_sub[0] == "adr": - return vcard_sub[1]["label"] + if "vcardArray" not in entity: + return None + for vcard in [vcard for vcard in entity["vcardArray"] if isinstance(vcard, list)]: + for vcard_sub in vcard: + if vcard_sub[0] == "adr": + return vcard_sub[1]["label"] return None def _create_net(data: Dict) -> Dict: """Create a network object from RDAP data.""" - net_cidr = ( - str(data["cidr0_cidrs"][0]["v4prefix"]) - + "/" - + str(data["cidr0_cidrs"][0]["length"]) - if "cidr0_cidrs" in data - else None - ) + net_data = data.get("cidr0_cidrs", [None])[0] or {} + net_prefixes = net_data.keys() & {"v4prefix", "v6prefix"} + + if not net_data or not net_prefixes: + net_cidr = "No network data retrieved." + else: + net_cidr = " ".join( + f"{net_data[net_prefix]}/{net_data.get('length', '')}" + for net_prefix in net_prefixes + ) address = "" for item in data["events"]: created = item["eventDate"] if item["eventAction"] == "last changed" else None @@ -647,28 +674,27 @@ def _create_net(data: Dict) -> Dict: def _asn_whois_query(query, server, port=43, retry_count=5) -> str: """Connect to whois server and send query.""" - conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - conn.connect((server, port)) - conn.send(query.encode()) - response = "" - response_data = None - while retry_count > 0 and not response_data: - try: - response_data = conn.recv(4096).decode() - if "error" in response_data: - raise MsticpyConnectionError( - "An error occured during lookup, please try again." - ) - if "rate limit exceeded" in response_data: - raise MsticpyConnectionError( - "Rate limit exceeded please wait and try again." - ) - response += response_data - except (UnicodeDecodeError, ConnectionResetError): - retry_count -= 1 - response_data = None - conn.close() - return response + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as conn: + conn.connect((server, port)) + conn.send(query.encode()) + response = [] + response_data = None + while retry_count > 0 and not response_data: + try: + response_data = conn.recv(4096).decode() + if "error" in response_data: + raise MsticpyConnectionError( + "An error occurred during lookup, please try again." + ) + if "rate limit exceeded" in response_data: + raise MsticpyConnectionError( + "Rate limit exceeded please wait and try again." + ) + response.append(response_data) + except (UnicodeDecodeError, ConnectionResetError): + retry_count -= 1 + response_data = None + return "".join(response) def _cymru_query(query):