From f618f69c6ce64084f452fb0caa37a1d9e08e3432 Mon Sep 17 00:00:00 2001 From: ramspoluri Date: Fri, 24 May 2024 20:43:36 +0530 Subject: [PATCH] Added 'since' option to search for messages since a certain time - Added `since` option under `mailbox` section to search for messages since a certain time instead of going through the complete mailbox during testing scenarios. Acceptable values -`5m|3h|2d|1w`, units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}). Defaults to `1d` if an incorrect value is provided. - Not to mark messages as read if test option is selected (works only for MSGraphConnection) --- docs/source/usage.md | 3 ++ parsedmarc/__init__.py | 66 +++++++++++++++++++++++++++++++++++----- parsedmarc/cli.py | 4 +++ parsedmarc/mail/gmail.py | 44 +++++++++++++++++++-------- parsedmarc/mail/graph.py | 18 ++++++++--- parsedmarc/mail/imap.py | 6 +++- 6 files changed, 116 insertions(+), 25 deletions(-) diff --git a/docs/source/usage.md b/docs/source/usage.md index 5cfafddc..4d7a8567 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -153,6 +153,9 @@ The full set of configuration options are: - `check_timeout` - int: Number of seconds to wait for a IMAP IDLE response or the number of seconds until the next mail check (Default: `30`) + - `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`) + Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}). + Defaults to `1d` if incorrect value is provided. - `imap` - `host` - str: The IMAP server hostname or IP address - `port` - int: The IMAP server port (Default: `993`) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8d94b428..3b273a4b 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -17,7 +17,7 @@ from base64 import b64decode from collections import OrderedDict from csv import DictWriter -from datetime import datetime +from datetime import datetime, timedelta from io import BytesIO, StringIO from typing import Callable @@ -28,7 +28,8 @@ from mailsuite.smtp import send_email from parsedmarc.log import logger -from parsedmarc.mail import MailboxConnection +from parsedmarc.mail import MailboxConnection, IMAPConnection, \ + MSGraphConnection, GmailConnection from parsedmarc.utils import get_base_domain, get_ip_address_info from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import parse_email @@ -1371,6 +1372,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, strip_attachment_payloads=False, results=None, batch_size=10, + since=None, create_folders=True): """ Fetches and parses DMARC reports from a mailbox @@ -1393,6 +1395,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, results (dict): Results from the previous run batch_size (int): Number of messages to read and process before saving (use 0 for no limit) + since: Search for messages since certain time + (units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}) create_folders (bool): Whether to create the destination folders (not used in watch) @@ -1405,6 +1409,9 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, if connection is None: raise ValueError("Must supply a connection") + # current_time useful to fetch_messages later in the program + current_time = None + aggregate_reports = [] forensic_reports = [] smtp_tls_reports = [] @@ -1428,12 +1435,44 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, connection.create_folder(smtp_tls_reports_folder) connection.create_folder(invalid_reports_folder) - messages = connection.fetch_messages(reports_folder, batch_size=batch_size) + if since: + _since = 1440 # default one day + if re.match(r'\d{1,2}[mhd]$', since): + s = re.split(r'(\d+)', since) + match s[2]: + case 'm': _since = int(s[1]) + case 'h': _since = int(s[1])*60 + case 'd': _since = int(s[1])*60*24 + case 'w': _since = int(s[1])*60*24*7 + else: + logger.warning("Incorrect format for \'since\' option. \ + Provided value:{0}, Expected values:(5m|3h|2d|1w). \ + Ignoring option, fetching messages for last 24hrs" + .format(since)) + + if isinstance(connection, IMAPConnection): + logger.debug("Only days and weeks values in \'since\' option are \ + considered for IMAP conections. Examples: 2d or 1w") + since = (datetime.utcnow() - timedelta(minutes=_since)).date() + current_time = datetime.utcnow().date() + elif isinstance(connection, MSGraphConnection): + since = (datetime.utcnow() - timedelta(minutes=_since)) \ + .isoformat() + 'Z' + current_time = datetime.utcnow().isoformat() + 'Z' + elif isinstance(connection, GmailConnection): + since = (datetime.utcnow() - timedelta(minutes=_since)) \ + .strftime('%s') + current_time = datetime.utcnow().strftime('%s') + else: + pass + + messages = connection.fetch_messages(reports_folder, batch_size=batch_size, + since=since) total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) - if batch_size: + if batch_size and not since: message_limit = min(total_messages, batch_size) else: message_limit = total_messages @@ -1445,7 +1484,15 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, logger.debug("Processing message {0} of {1}: UID {2}".format( i+1, message_limit, msg_uid )) - msg_content = connection.fetch_message(msg_uid) + if isinstance(mailbox, MSGraphConnection): + if test: + msg_content = connection.fetch_message(msg_uid, + mark_read=False) + else: + msg_content = connection.fetch_message(msg_uid, + mark_read=True) + else: + msg_content = connection.fetch_message(msg_uid) try: sa = strip_attachment_payloads parsed_email = parse_report_email( @@ -1564,7 +1611,11 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, ("forensic_reports", forensic_reports), ("smtp_tls_reports", smtp_tls_reports)]) - total_messages = len(connection.fetch_messages(reports_folder)) + if current_time: + total_messages = len(connection.fetch_messages(reports_folder, + since=current_time)) + else: + total_messages = len(connection.fetch_messages(reports_folder)) if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run @@ -1582,7 +1633,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, always_use_local_files=always_use_local_files, reverse_dns_map_path=reverse_dns_map_path, reverse_dns_map_url=reverse_dns_map_url, - offline=offline + offline=offline, + since=current_time, ) return results diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index d2ebae61..681a73b4 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -404,6 +404,7 @@ def process_reports(reports_): mailbox_test=False, mailbox_batch_size=10, mailbox_check_timeout=30, + mailbox_since=None, imap_host=None, imap_skip_certificate_verification=False, imap_ssl=True, @@ -585,6 +586,8 @@ def process_reports(reports_): if "check_timeout" in mailbox_config: opts.mailbox_check_timeout = mailbox_config.getint( "check_timeout") + if "since" in mailbox_config: + opts.mailbox_since = mailbox_config["since"] if "imap" in config.sections(): imap_config = config["imap"] @@ -1312,6 +1315,7 @@ def process_reports(reports_): nameservers=opts.nameservers, test=opts.mailbox_test, strip_attachment_payloads=opts.strip_attachment_payloads, + since=opts.mailbox_since, ) aggregate_reports += reports["aggregate_reports"] diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py index 436e1f02..e99f816e 100644 --- a/parsedmarc/mail/gmail.py +++ b/parsedmarc/mail/gmail.py @@ -67,18 +67,33 @@ def create_folder(self, folder_name: str): else: raise e - def _fetch_all_message_ids(self, reports_label_id, page_token=None): - results = ( - self.service.users() - .messages() - .list( - userId="me", - includeSpamTrash=self.include_spam_trash, - labelIds=[reports_label_id], - pageToken=page_token, + def _fetch_all_message_ids(self, reports_label_id, page_token=None, + since=None): + if since: + results = ( + self.service.users() + .messages() + .list( + userId="me", + includeSpamTrash=self.include_spam_trash, + labelIds=[reports_label_id], + pageToken=page_token, + q=f'after:{since}', + ) + .execute() + ) + else: + results = ( + self.service.users() + .messages() + .list( + userId="me", + includeSpamTrash=self.include_spam_trash, + labelIds=[reports_label_id], + pageToken=page_token, + ) + .execute() ) - .execute() - ) messages = results.get("messages", []) for message in messages: yield message["id"] @@ -90,7 +105,12 @@ def _fetch_all_message_ids(self, reports_label_id, page_token=None): def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]: reports_label_id = self._find_label_id_for_label(reports_folder) - return [id for id in self._fetch_all_message_ids(reports_label_id)] + since = kwargs.get('since') + if since: + return [id for id in self._fetch_all_message_ids(reports_label_id, + since=since)] + else: + return [id for id in self._fetch_all_message_ids(reports_label_id)] def fetch_message(self, message_id): msg = self.service.users().messages()\ diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index 2fc4364d..1346c404 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -144,17 +144,22 @@ def fetch_messages(self, folder_name: str, **kwargs) -> List[str]: folder_id = self._find_folder_id_from_folder_path(folder_name) url = f'/users/{self.mailbox_name}/mailFolders/' \ f'{folder_id}/messages' + since = kwargs.get('since') + if not since: + since = None batch_size = kwargs.get('batch_size') if not batch_size: batch_size = 0 - emails = self._get_all_messages(url, batch_size) + emails = self._get_all_messages(url, batch_size, since) return [email['id'] for email in emails] - def _get_all_messages(self, url, batch_size): + def _get_all_messages(self, url, batch_size, since): messages: list params = { '$select': 'id' } + if since: + params['$filter'] = f'receivedDateTime ge {since}' if batch_size and batch_size > 0: params['$top'] = batch_size else: @@ -165,8 +170,9 @@ def _get_all_messages(self, url, batch_size): messages = result.json()['value'] # Loop if next page is present and not obtained message limit. while '@odata.nextLink' in result.json() and ( + since is not None or ( batch_size == 0 or - batch_size - len(messages) > 0): + batch_size - len(messages) > 0)): result = self._client.get(result.json()['@odata.nextLink']) if result.status_code != 200: raise RuntimeError(f'Failed to fetch messages {result.text}') @@ -181,13 +187,15 @@ def mark_message_read(self, message_id: str): raise RuntimeWarning(f"Failed to mark message read" f"{resp.status_code}: {resp.json()}") - def fetch_message(self, message_id: str): + def fetch_message(self, message_id: str, **kwargs): url = f'/users/{self.mailbox_name}/messages/{message_id}/$value' result = self._client.get(url) if result.status_code != 200: raise RuntimeWarning(f"Failed to fetch message" f"{result.status_code}: {result.json()}") - self.mark_message_read(message_id) + mark_read = kwargs.get('mark_read') + if mark_read: + self.mark_message_read(message_id) return result.text def delete_message(self, message_id: str): diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 150185e1..73c7fa0e 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -31,7 +31,11 @@ def create_folder(self, folder_name: str): def fetch_messages(self, reports_folder: str, **kwargs): self._client.select_folder(reports_folder) - return self._client.search() + since = kwargs.get('since') + if since: + return self._client.search([u'SINCE', since]) + else: + return self._client.search() def fetch_message(self, message_id): return self._client.fetch_message(message_id, parse=False)