Skip to content

Commit

Permalink
Download reverse DNS map from GitHub
Browse files Browse the repository at this point in the history
  • Loading branch information
seanthegeek committed Mar 26, 2024
1 parent ed593a0 commit 18f7508
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
2 changes: 2 additions & 0 deletions parsedmarc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
MAGIC_JSON = b"\7b"

IP_ADDRESS_CACHE = ExpiringDict(max_len=10000, max_age_seconds=1800)
REVERSE_DNS_MAP = dict()


class ParserError(RuntimeError):
Expand Down Expand Up @@ -840,6 +841,7 @@ def parse_forensic_report(feedback_report, sample, msg_date,

ip_address = re.split(r'\s', parsed_report["source_ip"]).pop(0)
parsed_report_source = get_ip_address_info(ip_address,
cache=IP_ADDRESS_CACHE,
ip_db_path=ip_db_path,
offline=offline,
nameservers=nameservers,
Expand Down
59 changes: 46 additions & 13 deletions parsedmarc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import mailbox
import re
import csv
import io

try:
import importlib.resources as pkg_resources
except ImportError:
Expand All @@ -30,6 +32,7 @@
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests

from parsedmarc.log import logger
import parsedmarc.resources.dbip
Expand Down Expand Up @@ -295,36 +298,63 @@ def get_ip_address_country(ip_address, db_path=None):
return country


def get_service_from_reverse_dns_base_domain(base_domain):
def get_service_from_reverse_dns_base_domain(base_domain,
offline=False,
reverse_dns_map=None):
"""
Returns the service name of a given base domain name from reverse DNS.
Args:
base_domain (str): The base domain of the reverse DNS lookup
offline (bool): Use the built-in copy of the reverse DNS map
reverse_dns_map (dict): A reverse DNS map
Returns:
dict: A dictionary containing name and type.
If the service is unknown, the name will be
the supplied reverse_dns_base_domain and the type will be None
"""
def load_csv(_csv_file):
reader = csv.DictReader(_csv_file)
for row in reader:
key = row["base_reverse_dns"].lower().strip()
reverse_dns_map[key] = dict(
name=row["name"],
type=row["type"])

base_domain = base_domain.lower().strip()
service_map = dict()
with pkg_resources.path(parsedmarc.resources.maps,
"base_reverse_dns_map.csv") as path:
with open(path) as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
service_map[row["base_reverse_dns"].lower().strip()] = dict(
name=row["name"],
type=row["type"])
url = ("https://raw.githubusercontent.com/domainaware/parsedmarc/master/"
"parsedmarc/resources/maps/base_reverse_dns_map.csv")
if reverse_dns_map is None:
reverse_dns_map = dict()
csv_file = io.StringIO()

if not offline and len(reverse_dns_map) == 0:
try:
logger.debug(f"Trying to fetch "
f"reverse DNS map from {url}...")
csv_file.write(requests.get(url).text)
csv_file.seek(0)
load_csv(csv_file)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to fetch reverse DNS map: {e}")
if len(reverse_dns_map) == 0:
logger.info("Loading included reverse DNS map...")
with pkg_resources.path(parsedmarc.resources.maps,
"base_reverse_dns_map.csv") as path:
with open(path) as csv_file:
load_csv(csv_file)
try:
service = service_map[base_domain]
service = reverse_dns_map[base_domain]
except KeyError:
service = dict(name=base_domain, type=None)

return service


def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
def get_ip_address_info(ip_address, ip_db_path=None,
cache=None,
reverse_dns_map=None,
offline=False,
nameservers=None, timeout=2.0):
"""
Returns reverse DNS and country information for the given IP address
Expand All @@ -333,6 +363,7 @@ def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
ip_address (str): The IP address to check
ip_db_path (str): path to a MMDB file from MaxMind or DBIP
cache (ExpiringDict): Cache storage
reverse_dns_map (dict): A reverse DNS map
offline (bool): Do not make online queries for geolocation or DNS
nameservers (list): A list of one or more nameservers to use
(Cloudflare's public DNS resolvers by default)
Expand Down Expand Up @@ -368,7 +399,9 @@ def get_ip_address_info(ip_address, ip_db_path=None, cache=None, offline=False,
info["type"] = None
if reverse_dns is not None:
base_domain = get_base_domain(reverse_dns)
service = get_service_from_reverse_dns_base_domain(base_domain)
service = get_service_from_reverse_dns_base_domain(base_domain,
offline=offline,
reverse_dns_map=reverse_dns_map)
info["base_domain"] = base_domain
info["type"] = service["type"]
info["name"] = service["name"]
Expand Down

0 comments on commit 18f7508

Please sign in to comment.