Skip to content

Commit

Permalink
Fixed issue with whois lookups on only local IPs (#506)
Browse files Browse the repository at this point in the history
* Fixed issue with whois lookups on only local IPs

* Added more whois resilience

* Fixed issue with result formatting

* Added handling of invalid IP passed

* A few fixes for ip_utils.py whois functions.

Adding fixes for two broken html links in docs.

* Factored out rdap lookup section of of _whois_lookup

* Addressing a mypy warning.

I tidied up the ipwhois_result to use a named tuple so that it's a bit clearer what is going on and to provide stronger type hints

Co-authored-by: Ian Hellen <[email protected]>
  • Loading branch information
petebryan and ianhelle authored Oct 6, 2022
1 parent 302fdb4 commit 4451555
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 99 deletions.
2 changes: 1 addition & 1 deletion docs/source/data_acquisition/MordorData.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ for API details.

For more explanation of the data items shown in the browser, please see
the `Mordor GitHub repo <https://github.com/OTRF/mordor>`__ and the
`Threat Hunter Playbook <https://threathunterplaybook.com/introduction.html>`__
`Threat Hunter Playbook <https://threathunterplaybook.com>`__

.. code:: ipython3
Expand Down
8 changes: 4 additions & 4 deletions docs/source/visualization/MorphCharts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ call display() and pass it the data, and chart name:
morph.display(data=query_data, chart_name="SigninsChart")
This will format the data and chart configuration file and put them in a direcory named
This will format the data and chart configuration file and put them in a directory named
'morphchart_package' in the current working directory. It will also return an IFrame
displaying http://morphcharts.com/. To load the charts within the IFrame click
"Choose Files" under "Load Package" heading. Select the 'description.json' and
'query_data.csv' files under the 'morphchart_package' folder to load the customised charts.
'query_data.csv' files under the 'morphchart_package' folder to load the customized charts.

Creating New Chart Templates
----------------------------
You can create a add new chart templates for use by this module. The best way to do this
is by creating a customized chart set using http://morphcharts.com/designer.html. Once complete
is by creating a customized chart set using http://morphcharts.com. Once complete
save the chart, which will download a filed called 'description.json'. To add this as a template
by creating a YAML file with the following format:

Expand All @@ -70,4 +70,4 @@ by creating a YAML file with the following format:
DescriptionFile: <The JSON blob from your description.json file>
Then include the file in the msticpy/data/morph_charts folder and it will be discovered next time you
intialize an MorphCharts object.
initialize an MorphCharts object.
214 changes: 120 additions & 94 deletions msticpy/context/ip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import warnings
from functools import lru_cache
from time import sleep
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union

import httpx
import pandas as pd
Expand Down Expand Up @@ -320,20 +320,24 @@ def get_whois_df(
"""
del show_progress
whois_data = ip_whois(data[ip_column].drop_duplicates())
if not isinstance(whois_data, pd.DataFrame):
return data.assign(ASNDescription="No data returned")
data = data.merge(
whois_data, # type: ignore
how="left",
left_on=ip_column,
right_on="query",
suffixes=("", "_whois"),
)
data[whois_col] = data[whois_data.columns].apply(lambda x: x.to_dict(), axis=1)
data[asn_col] = data["asn_description"]
if not all_columns:
return data.drop(columns=whois_data.columns)
return data
if (
isinstance(whois_data, pd.DataFrame)
and not whois_data.empty
and "query" in whois_data.columns
):
data = data.merge(
whois_data, # type: ignore
how="left",
left_on=ip_column,
right_on="query",
suffixes=("", "_whois"),
)
data[whois_col] = data[whois_data.columns].apply(lambda x: x.to_dict(), axis=1)
data[asn_col] = data["asn_description"]
if not all_columns:
return data.drop(columns=whois_data.columns)
return data
return data.assign(ASNDescription="No data returned")


@pd.api.extensions.register_dataframe_accessor("mp_whois")
Expand Down Expand Up @@ -424,13 +428,13 @@ def ip_whois(
rate_limit = len(ip) > 50
if rate_limit:
print("Large number of lookups, this may take some time.")
whois_results = []
whois_results: Dict[str, Any] = {}
for ip_addr in ip:
if rate_limit:
sleep(query_rate)
whois_results.append(
_whois_lookup(ip_addr, raw=raw, retry_count=retry_count)
)
whois_results[ip_addr] = _whois_lookup(
ip_addr, raw=raw, retry_count=retry_count
).properties
return _whois_result_to_pandas(whois_results)
if isinstance(ip, (str, IpAddress)):
return _whois_lookup(ip, raw=raw)
Expand Down Expand Up @@ -529,51 +533,71 @@ def get_asn_from_ip(
ip = ip.strip()
query = f" -v {ip}\r\n"
ip_response = _cymru_query(query)
keys = ip_response.split("\n")[0].split("|")
keys = ip_response.split("\n", maxsplit=1)[0].split("|")
values = ip_response.split("\n")[1].split("|")
return {key.strip(): value.strip() for key, value in zip(keys, values)}


class _IpWhoIsResult(NamedTuple):
"""Named tuple for IPWhoIs Result."""

name: Optional[str]
properties: Dict[str, Any] = {}


@lru_cache(maxsize=1024)
def _whois_lookup(
ip_addr: Union[str, IpAddress], raw: bool = False, retry_count: int = 5
) -> Tuple:
) -> _IpWhoIsResult:
"""Conduct lookup of IP Whois information."""
if isinstance(ip_addr, IpAddress):
ip_addr = ip_addr.Address
asn_items = get_asn_from_ip(ip_addr.strip())
ipwhois_result = (asn_items["AS Name"], {}) # type: ignore
ipwhois_result[1]["asn"] = asn_items["AS"]
ipwhois_result[1]["query"] = asn_items["IP"]
ipwhois_result[1]["asn_cidr"] = asn_items["BGP Prefix"]
ipwhois_result[1]["asn_country_code"] = asn_items["CC"]
ipwhois_result[1]["asn_registry"] = asn_items["Registry"]
ipwhois_result[1]["asn_date"] = asn_items["Allocated"]
ipwhois_result[1]["asn_description"] = asn_items["AS Name"]
if ipwhois_result[1]["asn_registry"] in _REGISTRIES:
registry_urls = _REGISTRIES[ipwhois_result[1]["asn_registry"]]
else:
return (None, None)
rdap_data = _rdap_lookup(
url=registry_urls["url"] + str(ip_addr), retry_count=retry_count
if asn_items and "Error: no ASN or IP match on line 1." not in asn_items:
ipwhois_result = _IpWhoIsResult(asn_items["AS Name"], {}) # type: ignore
ipwhois_result.properties["asn"] = asn_items["AS"]
ipwhois_result.properties["query"] = asn_items["IP"]
ipwhois_result.properties["asn_cidr"] = asn_items["BGP Prefix"]
ipwhois_result.properties["asn_country_code"] = asn_items["CC"]
ipwhois_result.properties["asn_registry"] = asn_items["Registry"]
ipwhois_result.properties["asn_date"] = asn_items["Allocated"]
ipwhois_result.properties["asn_description"] = asn_items["AS Name"]
registry_url = _REGISTRIES.get(asn_items.get("Registry", ""), {}).get("url")
if not asn_items or not registry_url:
return _IpWhoIsResult(None)
return _add_rdap_data(
ipwhois_result=ipwhois_result,
rdap_reg_url=f"{registry_url}{ip_addr}",
retry_count=retry_count,
raw=raw,
)
if rdap_data.status_code == 200:
rdap_data_content = rdap_data.json()
net = _create_net(rdap_data_content)
ipwhois_result[1]["nets"] = [net]
for link in rdap_data_content["links"]:
if link["rel"] == "up":
up_data_link = link["href"]
up_rdap_data = httpx.get(up_data_link)
up_rdap_data_content = up_rdap_data.json()
up_net = _create_net(up_rdap_data_content)
ipwhois_result[1]["nets"].append(up_net)
if raw:
ipwhois_result[1]["raw"] = rdap_data
elif rdap_data.status_code == 429:
sleep(5)
_whois_lookup(ip_addr)
else:


def _add_rdap_data(
ipwhois_result: _IpWhoIsResult, rdap_reg_url: str, retry_count: int, raw: bool
) -> _IpWhoIsResult:
"""Add RDAP data to WhoIs result."""
retries = 0
while retries < retry_count:
rdap_data = _rdap_lookup(url=rdap_reg_url, retry_count=retry_count)
if rdap_data.status_code == 200:
rdap_data_content = rdap_data.json()
net = _create_net(rdap_data_content)
ipwhois_result.properties["nets"] = [net]
for link in rdap_data_content["links"]:
if link["rel"] == "up":
up_data_link = link["href"]
up_rdap_data = httpx.get(up_data_link)
up_rdap_data_content = up_rdap_data.json()
up_net = _create_net(up_rdap_data_content)
ipwhois_result.properties["nets"].append(up_net)
if raw:
ipwhois_result.properties["raw"] = rdap_data
break
if rdap_data.status_code == 429:
sleep(3)
retries += 1
continue
raise MsticpyConnectionError(f"Error code: {rdap_data.status_code}")
return ipwhois_result

Expand All @@ -588,42 +612,45 @@ def _rdap_lookup(url: str, retry_count: int = 5) -> httpx.Response:
retry_count -= 1
if not rdap_data:
raise MsticpyException(
"Rate limt exceeded - try adjusting query_rate parameter to slow down requests"
"Rate limit exceeded - try adjusting query_rate parameter to slow down requests"
)
return rdap_data


def _whois_result_to_pandas(results: Union[str, List]) -> pd.DataFrame:
def _whois_result_to_pandas(results: Union[str, List, Dict]) -> pd.DataFrame:
"""Transform whois results to a Pandas DataFrame."""
if isinstance(results, list):
new_results = [result[1] for result in results]
else:
new_results = results[1] # type: ignore
return pd.DataFrame(new_results)
if isinstance(results, dict):
return pd.DataFrame(
[result or {"query": idx} for idx, result in results.items()]
)
raise NotImplementedError("Only dict type current supported for `results`.")


def _find_address(
entity: dict,
) -> Union[str, None]: # pylint: disable=inconsistent-return-statements
"""Find an orgs address from an RDAP entity."""
if "vcardArray" in entity:
for vcard in entity["vcardArray"]:
if isinstance(vcard, list):
for vcard_sub in vcard:
if vcard_sub[0] == "adr":
return vcard_sub[1]["label"]
if "vcardArray" not in entity:
return None
for vcard in [vcard for vcard in entity["vcardArray"] if isinstance(vcard, list)]:
for vcard_sub in vcard:
if vcard_sub[0] == "adr":
return vcard_sub[1]["label"]
return None


def _create_net(data: Dict) -> Dict:
"""Create a network object from RDAP data."""
net_cidr = (
str(data["cidr0_cidrs"][0]["v4prefix"])
+ "/"
+ str(data["cidr0_cidrs"][0]["length"])
if "cidr0_cidrs" in data
else None
)
net_data = data.get("cidr0_cidrs", [None])[0] or {}
net_prefixes = net_data.keys() & {"v4prefix", "v6prefix"}

if not net_data or not net_prefixes:
net_cidr = "No network data retrieved."
else:
net_cidr = " ".join(
f"{net_data[net_prefix]}/{net_data.get('length', '')}"
for net_prefix in net_prefixes
)
address = ""
for item in data["events"]:
created = item["eventDate"] if item["eventAction"] == "last changed" else None
Expand All @@ -647,28 +674,27 @@ def _create_net(data: Dict) -> Dict:

def _asn_whois_query(query, server, port=43, retry_count=5) -> str:
"""Connect to whois server and send query."""
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((server, port))
conn.send(query.encode())
response = ""
response_data = None
while retry_count > 0 and not response_data:
try:
response_data = conn.recv(4096).decode()
if "error" in response_data:
raise MsticpyConnectionError(
"An error occured during lookup, please try again."
)
if "rate limit exceeded" in response_data:
raise MsticpyConnectionError(
"Rate limit exceeded please wait and try again."
)
response += response_data
except (UnicodeDecodeError, ConnectionResetError):
retry_count -= 1
response_data = None
conn.close()
return response
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as conn:
conn.connect((server, port))
conn.send(query.encode())
response = []
response_data = None
while retry_count > 0 and not response_data:
try:
response_data = conn.recv(4096).decode()
if "error" in response_data:
raise MsticpyConnectionError(
"An error occurred during lookup, please try again."
)
if "rate limit exceeded" in response_data:
raise MsticpyConnectionError(
"Rate limit exceeded please wait and try again."
)
response.append(response_data)
except (UnicodeDecodeError, ConnectionResetError):
retry_count -= 1
response_data = None
return "".join(response)


def _cymru_query(query):
Expand Down

0 comments on commit 4451555

Please sign in to comment.