From 3f11659e555e2c98591fa72470525b766b7dc24c Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Sun, 18 Jun 2023 13:27:41 +0100 Subject: [PATCH 01/12] Add processors. Move to pyproject.toml. --- .vscode/settings.json | 28 +- grove/__about__.py | 1 - grove/__init__.py | 25 +- grove/connectors/__init__.py | 309 ++++++++++++------ grove/connectors/github/audit_log.py | 6 +- grove/connectors/gsuite/activities.py | 2 +- grove/connectors/local/heartbeat.py | 4 +- grove/connectors/okta/system_log.py | 2 +- grove/connectors/onepassword/api.py | 3 +- grove/connectors/oomnitza/activities.py | 8 +- grove/connectors/oomnitza/api.py | 1 + grove/connectors/sf/event_log.py | 2 +- grove/connectors/tfc/api.py | 2 +- grove/connectors/torq/api.py | 12 +- grove/connectors/twilio/messages.py | 2 +- grove/connectors/twilio/monitor_events.py | 2 +- grove/connectors/workday/activity_logging.py | 10 +- grove/connectors/zoom/activities.py | 2 +- grove/connectors/zoom/api.py | 10 +- grove/connectors/zoom/operationlogs.py | 2 +- grove/constants.py | 1 + grove/entrypoints/base.py | 4 +- grove/exceptions.py | 6 +- grove/helpers/parsing.py | 112 ++++++- grove/helpers/plugin.py | 7 +- grove/models.py | 46 ++- grove/outputs/__init__.py | 67 +++- grove/outputs/aws_s3.py | 124 ++++--- grove/outputs/local_file.py | 70 ++-- grove/outputs/local_stdout.py | 14 +- grove/processors/__init__.py | 57 ++++ grove/processors/extract_paths.py | 97 ++++++ grove/processors/filter_paths.py | 44 +++ grove/processors/split_path.py | 55 ++++ pyproject.toml | 183 +++++++++++ setup.cfg | 160 --------- setup.py | 77 ----- .../example_logs.py | 4 +- .../connectors/local_heartbeat.json | 5 +- tests/fixtures/gsuite/activities/001.json | 17 +- tests/mocks/__init__.py | 6 +- tests/mocks/output.py | 5 +- .../test_connectors_atlassian_audit_events.py | 4 +- tests/test_connectors_deduplicate.py | 14 +- tests/test_connectors_github_audit.py | 2 +- tests/test_connectors_gsuite_activities.py | 2 +- tests/test_connectors_gsuite_alerts.py | 2 +- tests/test_connectors_okta_system_log.py | 2 +- ...est_connectors_onepassword_events_audit.py | 5 +- ...onnectors_onepassword_events_itemusages.py | 4 +- ...ctors_onepassword_events_signinattempts.py | 4 +- tests/test_connectors_oomnitza_activities.py | 7 +- ...test_connectors_pagerduty_audit_records.py | 4 +- tests/test_connectors_sf_event_log.py | 2 +- tests/test_connectors_sfmc_audit_events.py | 4 +- tests/test_connectors_sfmc_security_events.py | 4 +- tests/test_connectors_slack_audit.py | 4 +- tests/test_connectors_tfc_audit_trails.py | 4 +- tests/test_connectors_torq_activity_logs.py | 4 +- tests/test_connectors_torq_audit_logs.py | 4 +- ...est_connectors_workday_activity_logging.py | 6 +- tests/test_connectors_zoom_activities.py | 4 +- tests/test_connectors_zoom_operation.py | 4 +- tests/test_helpers_parsing.py | 87 +++++ tests/test_outputs_base.py | 2 + tests/test_processors_extract_paths.py | 80 +++++ tests/test_processors_filter_paths.py | 55 ++++ tests/test_processors_split_path.py | 50 +++ 68 files changed, 1393 insertions(+), 565 deletions(-) create mode 100644 grove/processors/__init__.py create mode 100644 grove/processors/extract_paths.py create mode 100644 grove/processors/filter_paths.py create mode 100644 grove/processors/split_path.py create mode 100644 pyproject.toml delete mode 100644 setup.cfg delete mode 100644 setup.py create mode 100644 tests/test_helpers_parsing.py create mode 100644 tests/test_processors_extract_paths.py create mode 100644 tests/test_processors_filter_paths.py create mode 100644 tests/test_processors_split_path.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 3254ecd..4e20929 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,25 +1,16 @@ { - "python.formatting.provider": "black", "python.linting.enabled": true, - "python.linting.flake8Enabled": true, - "python.linting.flake8Args": [ - "--config setup.cfg" - ], - "python.linting.mypyEnabled": true, - "python.linting.mypyArgs": [ - "--config-file setup.cfg" - ], - "isort.args": [ - "-sp setup.cfg" - ], - "python.linting.pylintEnabled": false, + "python.formatting.provider": "none", "[python]": { + "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": true, }, - "editor.tabSize": 4, - "editor.formatOnSave": true + "editor.defaultFormatter": "ms-python.black-formatter", }, + "editor.rulers": [ + 88 + ], "[terraform]": { "editor.tabSize": 2, "editor.formatOnSave": true @@ -33,9 +24,4 @@ "editor.tabSize": 2, "editor.autoIndent": "advanced" }, - "python.testing.pytestArgs": [ - "tests" - ], - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true } diff --git a/grove/__about__.py b/grove/__about__.py index 587b648..42141f4 100644 --- a/grove/__about__.py +++ b/grove/__about__.py @@ -2,6 +2,5 @@ __version__ = "1.0.0rc4" __title__ = "grove" -__author__ = "HashiCorp Security (TDR)" __license__ = "Mozilla Public License 2.0" __copyright__ = "Copyright 2023 HashiCorp, Inc." diff --git a/grove/__init__.py b/grove/__init__.py index de9c844..e831b7e 100644 --- a/grove/__init__.py +++ b/grove/__init__.py @@ -3,15 +3,18 @@ """A framework for collecting and transforming SaaS logs.""" -from grove import caches # noqa: F401 -from grove import configs # noqa: F401 -from grove import connectors # noqa: F401 -from grove import constants # noqa: F401 -from grove import entrypoints # noqa: F401 -from grove import exceptions # noqa: F401 -from grove import helpers # noqa: F401 -from grove import logging # noqa: F401 -from grove import models # noqa: F401 -from grove import outputs # noqa: F401 -from grove import types # noqa: F401 +from grove import ( + caches, # noqa: F401 + configs, # noqa: F401 + connectors, # noqa: F401 + constants, # noqa: F401 + entrypoints, # noqa: F401 + exceptions, # noqa: F401 + helpers, # noqa: F401 + logging, # noqa: F401 + models, # noqa: F401 + outputs, # noqa: F401 + processors, # noqa: F401 + types, # noqa: F401 +) from grove.__about__ import * # noqa: F401, F403 diff --git a/grove/connectors/__init__.py b/grove/connectors/__init__.py index 05ae23d..c0df849 100644 --- a/grove/connectors/__init__.py +++ b/grove/connectors/__init__.py @@ -10,10 +10,10 @@ import logging import os from typing import Any, Dict, List, Optional -from grove.__about__ import __version__ import jmespath +from grove.__about__ import __version__ from grove.constants import ( CACHE_KEY_LOCK, CACHE_KEY_POINTER, @@ -33,17 +33,20 @@ LOCK_DATE_FORMAT, PLUGIN_GROUP_CACHE, PLUGIN_GROUP_OUTPUT, + PLUGIN_GROUP_PROCESSOR, REVERSE_CHRONOLOGICAL, ) from grove.exceptions import ( AccessException, ConcurrencyException, + ConfigurationException, DataFormatException, GroveException, NotFoundException, + ProcessorError, ) -from grove.helpers import plugin -from grove.models import ConnectorConfig +from grove.helpers import parsing, plugin +from grove.models import ConnectorConfig, OutputStream class BaseConnector: @@ -78,14 +81,31 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): } # Let the caller handle exceptions from failure to load handlers directly. - self._output = plugin.load_handler( - os.environ.get(ENV_GROVE_OUTPUT_HANDLER, DEFAULT_OUTPUT_HANDLER), - PLUGIN_GROUP_OUTPUT, - ) self._cache = plugin.load_handler( os.environ.get(ENV_GROVE_CACHE_HANDLER, DEFAULT_CACHE_HANDLER), PLUGIN_GROUP_CACHE, ) + self._output = plugin.load_handler( + os.environ.get(ENV_GROVE_OUTPUT_HANDLER, DEFAULT_OUTPUT_HANDLER), + PLUGIN_GROUP_OUTPUT, + ) + self._output.setup() + + # Processors are only setup once for each connector instance. + self._processors = {} + + for processor in self.configuration.processors: + try: + self._processors[processor.name] = plugin.load_handler( + processor.processor, + PLUGIN_GROUP_PROCESSOR, + processor, + ) + except ConfigurationException as err: + raise ProcessorError( + f"Failed to initialise processor '{processor.name}' " + f"({processor.processor}). {err}", + ) # The time that our current lock expires, if we have one. self._lock_expiry: Optional[datetime.datetime] = None @@ -97,7 +117,7 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): except ValueError as err: self.logger.warning( f"Lock duration ('{ENV_GROVE_LOCK_DURATION}') must be a number.", - extra={"exception": err}, + extra={"exception": err, **self.log_context}, ) # Determines if the start of a 'window' has been passed during a collection. @@ -107,13 +127,16 @@ def __init__(self, config: ConnectorConfig, context: Dict[str, str]): self._window_start = str() self._window_end = str() - # Tracks the total number of saved log entries. - self._saved = 0 - # Paginated / chunked data needs an incrementing identifier to keep things # orderly. self._part = 0 + # Track the number of output logs by the configured output destination stream. + # This allows statistics to be generated on deduplication, splitting, etc. + self._saved = {} + for descriptor, _ in self.configuration.outputs.items(): + self._saved[descriptor] = 0 + # Tracks hashes of unique log entries, keyed by their pointer value. self._hashes: Dict[str, set[str]] = {} @@ -145,7 +168,7 @@ def run(self): self.collect() except GroveException as err: self.logger.error( - "Connector was unable to collect logs.", + "Connector was unable to complete collection successfully.", extra={"exception": err, **self.log_context}, ) self.unlock() @@ -163,9 +186,10 @@ def run(self): def _run_chronological(self): """Performs chronological specific post collection operations.""" + # TODO: Move to processor. try: self.logger.debug( - "Saving deduplication hashes to cache", + "Saving deduplication hashes to cache.", extra=self.log_context, ) self.save_hashes() @@ -193,7 +217,7 @@ def _run_reverse_chronological(self): return except NotFoundException: self.logger.debug( - "Skipping pointer swap and clean-up as there is no next-pointer", + "Skipping pointer swap and clean-up as there is no next-pointer.", extra={**self.log_context}, ) return @@ -224,6 +248,7 @@ def _run_reverse_chronological(self): ) return + # TODO: Move to processor. try: self.logger.debug( "Saving deduplication hashes to cache", @@ -242,16 +267,77 @@ def collect(self): """Provides a stub for a connector to initiate a collection.""" pass - def save(self, candidates: List[Any]): - """Saves log candidates, and updates the pointer in the cache. + def process_and_write(self, entries: List[Any]): + """Write log entries them to the configured output handler. - :param candidates: List of log candidates to save. + :param entries: List of log entries to process. + """ + # Allow failures to bubble all the way up and fail the run. If processing fails + # we want to defer collection, to allow retry later. We always pass a copy of + # the entries to prevent accidental overwriting of the collected raw data by + # a processor. + processed = self.process(entries) + + for descriptor, stream in self.configuration.outputs.items(): + # Ensure the output uses the correct stream. + to_save = entries + if stream == OutputStream.processed: + to_save = processed + + number_of_entries = len(to_save) + if number_of_entries < 1: + self.logger.info( + "No log entries to output, skipping.", + extra=self.log_context, + ) + continue + + try: + self._output.submit( + data=self._output.serialize( + data=to_save, + metadata=self.metadata(), + ), + part=self._part, + operation=self.operation, + connector=self.NAME, + identity=self.identity, + descriptor=descriptor, + ) + + # Update counters. + self._saved[descriptor] += number_of_entries + + self.logger.info( + "Log submitted successfully to output.", + extra={ + "part": self._part, + "stream": stream, + "descriptor": descriptor, + "entries": number_of_entries, + **self.log_context, + }, + ) + except AccessException as err: + self.logger.error( + "Failed to write logs to output, cannot continue.", + extra={ + "part": self._part, + "exception": err, + "stream": stream, + "descriptor": descriptor, + **self.log_context, + }, + ) + raise - :raises GroveException: The LOG_ORDER defined by the Connector is not valid. + def save(self, entries: List[Any]): + """Saves log entries, and updates the pointer in the cache. - :return: A count of entries saved. + :param entries: List of log entries to save. """ - entries = self.deduplicate_by_hash(candidates) + # TODO: Move deduplication into a processor. + entries = self.deduplicate_by_hash(entries) if len(entries) < 1: self.logger.warning( @@ -265,51 +351,29 @@ def save(self, candidates: List[Any]): self.lock() if self.LOG_ORDER == CHRONOLOGICAL: - return self._save_chronological(entries) + self._save_chronological(entries) if self.LOG_ORDER == REVERSE_CHRONOLOGICAL: - return self._save_reverse_chronological(entries) + self._save_reverse_chronological(entries) - # Fall through for anything not supported / incorrectly specified. - raise GroveException(f"Connector LOG_ORDER '{self.LOG_ORDER}' is not valid.") + self.finalize() - def _save_chronological(self, candidates: List[Any]): + def _save_chronological(self, entries: List[Any]): """Saves log entries when retrieved logs are in chronological order. - :param candidates: List of log entries to save. + :param entries: List of log entries to save. """ - newest = jmespath.search(self.POINTER_PATH, candidates[-1]) - + # Pointers are extracted prior to processing as processing may modify the + # structure, or remove entries entirely. + newest = jmespath.search(self.POINTER_PATH, entries[-1]) if newest is None: - self.logger.error( - "Pointer path was not found in returned logs, cannot continue.", - extra={"pointer_path": self.POINTER_PATH, **self.log_context}, + raise GroveException( + f"Pointer path ({self.POINTER_PATH}) was not found in returned logs." ) - return - # Generate metadata for the candidate log entries, and save to the output - # handler. - try: - self._output.submit( - data=self._output.serialize( - data=candidates, - metadata=self.metadata(), - ), - part=self._part, - operation=self.operation, - connector=self.NAME, - identity=self.identity, - ) - self.logger.info( - "Log submitted successfully to output.", - extra={"part": self._part, **self.log_context}, - ) - except AccessException as err: - self.logger.error( - "Failed to write logs to output, cannot continue.", - extra={"exception": err, **self.log_context}, - ) - return + # Exceptions are allowed to bubble up here to ensure connectors exit on error, + # rather than silently dropping batches of log entries. + self.process_and_write(entries) # Once uploaded, then update the pointer. NOTE: There is an opportunity for # issues to occur between the output and pointer update which would lead to @@ -325,11 +389,10 @@ def _save_chronological(self, candidates: List[Any]): "Failed to save pointer to cache, cannot continue.", extra={"exception": err, **self.log_context}, ) - return + raise - # Get ready for the next block of candidate log entries (if required). + # Get ready for the next batch of candidate log entries (if required). self._part += 1 - self._saved += len(candidates) def _save_reverse_chronological(self, candidates: List[Any]): # noqa: C901 """Save log entries when logs are in reverse chronological order. @@ -348,11 +411,9 @@ def _save_reverse_chronological(self, candidates: List[Any]): # noqa: C901 newest = jmespath.search(self.POINTER_PATH, candidates[0]) if oldest is None or newest is None: - self.logger.error( - "Pointer path was not found in logs entry, cannot continue.", - extra={"pointer_path": self.POINTER_PATH, **self.log_context}, + raise GroveException( + f"Pointer path ({self.POINTER_PATH}) was not found in returned logs." ) - return # If a window start is in the cache then a previous collection is incomplete. # We'll skip entries until we find our window, and then only collect entries @@ -371,11 +432,9 @@ def _save_reverse_chronological(self, candidates: List[Any]): # noqa: C901 current_pointer = jmespath.search(self.POINTER_PATH, entry) if current_pointer is None: - self.logger.error( - "Pointer path was not found in logs entry, cannot continue.", - extra={"pointer_path": self.POINTER_PATH, **self.log_context}, + raise GroveException( + f"Pointer path ({self.POINTER_PATH}) not found in log entry." ) - return # We need to track FROM the window end, inclusive, to ensure that we # don't miss any logs. This is required in cases where the timestamp @@ -409,32 +468,12 @@ def _save_reverse_chronological(self, candidates: List[Any]): # noqa: C901 if len(entries) < 1: return - # Generate metadata for the entries, and save to the output handler. - try: - self._output.submit( - data=self._output.serialize( - data=entries, - metadata=self.metadata(), - ), - part=self._part, - operation=self.operation, - connector=self.NAME, - identity=self.identity, - ) - self.logger.info( - "Log submitted successfully to output.", - extra={"part": self._part, **self.log_context}, - ) - except AccessException as err: - self.logger.error( - "Failed to write logs to output, cannot continue.", - extra={"exception": err, **self.log_context}, - ) - return + # Exceptions are allowed to bubble up here to ensure connectors exit on error, + # rather than silently dropping batches of log entries. + self.process_and_write(entries) # Get ready for the next block of entries (if required). self._part += 1 - self._saved += len(entries) # Save the new window geometry to cache but only AFTER data is saved, and only # save the window start when it's updated. @@ -566,7 +605,7 @@ def deduplicate_by_hash(self, candidates: List[Any]): return entries - def deduplicate_by_pointer(self, candidates: List[Any]): + def deduplicate_by_pointer(self, entries: List[Any]): """Deduplicate log entries by pointer values. Deduplicates records which occur before or after a pointer on the current @@ -578,27 +617,27 @@ def deduplicate_by_pointer(self, candidates: List[Any]): For example, some provider's only allow filtering on a date (YYYY-MM-DD) while returning log entries with timestamps that have millisecond precision. - :param candidates: A list of log entries to deduplicate. + :param entries: A list of log entries to deduplicate. :return: A deduplicated list of log entries. """ if self.LOG_ORDER == CHRONOLOGICAL: - return self._deduplicate_by_pointer_chronological(candidates) + return self._deduplicate_by_pointer_chronological(entries) if self.LOG_ORDER == REVERSE_CHRONOLOGICAL: - return self._deduplicate_by_pointer_reverse_chronological(candidates) + return self._deduplicate_by_pointer_reverse_chronological(entries) - def _deduplicate_by_pointer_chronological(self, candidates: List[Any]): + def _deduplicate_by_pointer_chronological(self, entries: List[Any]): """Deduplicates chronological log entries by their pointer. - :param candidates: A list of log entries to deduplicate. + :param entries: A list of log entries to deduplicate. :return: A deduplicated list of log entries. """ - entries = [] + results = [] pointer_passed = False - for candidate in candidates: + for candidate in entries: candidate_pointer = str(jmespath.search(self.POINTER_PATH, candidate)) if candidate_pointer == self.pointer: @@ -606,28 +645,28 @@ def _deduplicate_by_pointer_chronological(self, candidates: List[Any]): # Only track chronological records on and after the pointer. if pointer_passed: - entries.append(candidate) + results.append(candidate) # If we never encountered the pointer, don't filter the records at all. This may # cause some duplicates if the pointer is on a subsequent page, but we always # prefer duplicates in these cases. if not pointer_passed: - entries = candidates + results = entries - return entries + return results - def _deduplicate_by_pointer_reverse_chronological(self, candidates: List[Any]): + def _deduplicate_by_pointer_reverse_chronological(self, entries: List[Any]): """Deduplicates reverse chronological log entries by their pointer. - :param candidates: A list of log entries to deduplicate. + :param entries: A list of log entries to deduplicate. :return: A deduplicated list of log entries. """ - entries = [] + results = [] pointer_found = False pointer_passed = False - for candidate in candidates: + for candidate in entries: candidate_pointer = jmespath.search(self.POINTER_PATH, candidate) if candidate_pointer == self.pointer: @@ -638,15 +677,65 @@ def _deduplicate_by_pointer_reverse_chronological(self, candidates: List[Any]): break if not pointer_passed: - entries.append(candidate) + results.append(candidate) # If we never encountered the pointer, don't filter the records at all. This may # cause some duplicates if the pointer is on a subsequent page, but we always # prefer duplicates in these cases. if not pointer_passed: - entries = candidates + results = entries - return entries + return results + + def process(self, entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Process log entries prior to saving. + + :param entries: A list of log entries to process. + + :return: A processed list of log entries. + """ + # Shortcut where there are no processors configured. + if len(self._processors) < 1: + return entries + + # As processors can modify the number of entries, we need to loop over them + # multiple times. + processed = parsing.quick_copy(entries) + + for name, processor in self._processors.items(): + for index, _ in enumerate(processed): + try: + processed[index:index] = processor.process(processed.pop(index)) + except Exception as err: + raise ProcessorError( + f"Processor '{name}' ({processor}) failed during " + f" processing. {err}" + ) + + return processed + + def finalize(self): + """Performs final steps after each save operation has complete.""" + + # Finalize all processors. + for name, processor in self._processors.items(): + # Once again this exception handler is exceptionally (!) broad, to ensure + # that any unhandled exception, including from downstream libraries, are + # caught and handled consistently (except for BaseException derived). + try: + processor.finalize() + except Exception as err: + # As this runs after saving data and pointers, all we can really do is + # log this and continue. + self.logger.error( + "Processor failed during finalization.", + extra={ + "identity": name, + "processor": processor, + "exception": err, + **self.log_context, + }, + ) @property def hashes(self) -> Dict[str, set[str]]: @@ -661,7 +750,10 @@ def hashes(self) -> Dict[str, set[str]]: try: self._hashes[self.pointer] = set( json.loads( - self._cache.get(self.cache_key(CACHE_KEY_SEEN), self.operation) + self._cache.get( + self.cache_key(CACHE_KEY_SEEN), + self.operation, + ) ) ) except (TypeError, json.decoder.JSONDecodeError) as err: @@ -946,3 +1038,4 @@ def unlock(self): # Bye-bye lock. self._lock_expiry = None + self._lock_expiry = None diff --git a/grove/connectors/github/audit_log.py b/grove/connectors/github/audit_log.py index da1d646..0a02b31 100644 --- a/grove/connectors/github/audit_log.py +++ b/grove/connectors/github/audit_log.py @@ -40,7 +40,7 @@ def delay(self): :return: The "delay" component of the connector configuration. """ try: - candidate = self.configuration.delay # type: ignore + candidate = self.configuration.delay except AttributeError: return 0 @@ -63,7 +63,7 @@ def scope(self): :return: The "scope" component of the connector configuration. """ try: - candidate = self.configuration.scope # type: ignore + candidate = self.configuration.scope except AttributeError: return "orgs" @@ -84,7 +84,7 @@ def fqdn(self): :return: The "fqdn" component of the connector configuration. """ try: - return self.configuration.fqdn # type: ignore + return self.configuration.fqdn except AttributeError: return "api.github.com" diff --git a/grove/connectors/gsuite/activities.py b/grove/connectors/gsuite/activities.py index 42f4c49..3a60d5e 100644 --- a/grove/connectors/gsuite/activities.py +++ b/grove/connectors/gsuite/activities.py @@ -59,7 +59,7 @@ def delay(self): :return: The "delay" component of the connector configuration. """ try: - candidate = self.configuration.delay # type: ignore + candidate = self.configuration.delay except AttributeError: return 0 diff --git a/grove/connectors/local/heartbeat.py b/grove/connectors/local/heartbeat.py index 24bea6f..cf5e0ad 100644 --- a/grove/connectors/local/heartbeat.py +++ b/grove/connectors/local/heartbeat.py @@ -25,7 +25,7 @@ def count(self): :return: The number of heartbeat messages to emit. """ try: - return int(self.configuration.count) # type: ignore + return int(self.configuration.count) except (AttributeError, ValueError): return 5 @@ -36,7 +36,7 @@ def interval(self): :return: The heartbeat interval, in seconds. """ try: - return int(self.configuration.interval) # type: ignore + return int(self.configuration.interval) except (AttributeError, ValueError): return 1 diff --git a/grove/connectors/okta/system_log.py b/grove/connectors/okta/system_log.py index f0e1ec6..c1d4b2c 100644 --- a/grove/connectors/okta/system_log.py +++ b/grove/connectors/okta/system_log.py @@ -27,7 +27,7 @@ def domain(self): :return: The "domain" portion of the connector's configuration. """ try: - return self.configuration.domain # type: ignore + return self.configuration.domain except AttributeError: return "okta.com" diff --git a/grove/connectors/onepassword/api.py b/grove/connectors/onepassword/api.py index 1fbb654..4d6fc6f 100644 --- a/grove/connectors/onepassword/api.py +++ b/grove/connectors/onepassword/api.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Optional import requests + from grove.exceptions import RateLimitException, RequestFailedException from grove.types import AuditLogEntries, HTTPResponse @@ -154,7 +155,7 @@ def get_auditevents( cursor: Optional[str] = None, start_time: Optional[str] = None, ) -> AuditLogEntries: - """Fetches a list of of actions performed by team members within a 1Password account. + """Fetches a list of actions performed by members of a 1Password account. :param cursor: Cursor to use when fetching results. Supersedes other parameters. :param start_time: The ISO Format timestamp to query logs since. diff --git a/grove/connectors/oomnitza/activities.py b/grove/connectors/oomnitza/activities.py index 9be2dae..6d95cc0 100644 --- a/grove/connectors/oomnitza/activities.py +++ b/grove/connectors/oomnitza/activities.py @@ -28,8 +28,8 @@ def collect(self): cursor = 0 # If no pointer is stored then a previous run hasn't been performed, so set the - # pointer to 2 days ago. In the case of the Oomnitza activities API the pointer is - # the value of the "timestamp" field from the latest record retrieved from + # pointer to 2 days ago. In the case of the Oomnitza activities API the pointer + # is the value of the "timestamp" field from the latest record retrieved from # the API - which is in epoch. The Oomnitza API doesnt account for milliseconds. now = datetime.fromtimestamp(time.time()).strftime("%s") @@ -40,8 +40,8 @@ def collect(self): datetime.fromtimestamp(time.time()) - timedelta(days=2) ).strftime("%s") - # Get log data from the upstream API. A "start_date" and "end_date" datetime query - # parameters are required. + # Get log data from the upstream API. A "start_date" and "end_date" datetime + # query parameters are required. while True: log = client.get_activites( start_date=self.pointer, end_date=now, cursor=cursor diff --git a/grove/connectors/oomnitza/api.py b/grove/connectors/oomnitza/api.py index ade4b83..e47f4fa 100644 --- a/grove/connectors/oomnitza/api.py +++ b/grove/connectors/oomnitza/api.py @@ -7,6 +7,7 @@ from typing import Dict, Optional import requests + from grove.exceptions import RequestFailedException from grove.types import AuditLogEntries, HTTPResponse diff --git a/grove/connectors/sf/event_log.py b/grove/connectors/sf/event_log.py index bab1e4f..2b405a7 100644 --- a/grove/connectors/sf/event_log.py +++ b/grove/connectors/sf/event_log.py @@ -47,7 +47,7 @@ def token(self): :return: The "token" portion of the connector's configuration. """ try: - return self.configuration.token # type: ignore + return self.configuration.token except AttributeError: return None diff --git a/grove/connectors/tfc/api.py b/grove/connectors/tfc/api.py index 7d07e8a..3033e3f 100644 --- a/grove/connectors/tfc/api.py +++ b/grove/connectors/tfc/api.py @@ -80,7 +80,7 @@ def get_trails( ) -> AuditLogEntries: """Fetches a list of audit logs which match the provided filters. - :param since: The ISO8601 format of the most recent event to include (inclusive). + :param since: The ISO8601 date of the most recent event to include (inclusive). :param cursor: The page to fetch. If omitted, endpoint returns first page. :param page_size: Number of audit events per page. Defaults to 1000. diff --git a/grove/connectors/torq/api.py b/grove/connectors/torq/api.py index 28b6c86..3d3a4f9 100644 --- a/grove/connectors/torq/api.py +++ b/grove/connectors/torq/api.py @@ -144,8 +144,8 @@ def get_logs( ) -> AuditLogEntries: """Fetches a list of logs from Torq which match the provided filters. - :param start_time: The required date and time of the earliest log entry. - Timestamps are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. + :param start_time: The required date and time of the earliest log entry. Start + times are in RFC3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param result_field: The key name for the list of logs in the returned json. :param to_date: The required date and time in UTC of the latest log entry. :param limit: The maximum number of items to include in a single response. @@ -178,8 +178,8 @@ def get_audit_logs( ) -> AuditLogEntries: """Fetches a list of audit logs from Torq which match the provided filters. - :param start_time: The required date and time of the earliest log entry. - Timestamps are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. + :param start_time: The required date and time of the earliest log entry. Start + times are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param to_date: The required date and time in UTC of the latest log entry. :param cursor: The cursor to use when paging. @@ -197,8 +197,8 @@ def get_activity_logs( ) -> AuditLogEntries: """Fetches a list of activity logs from Torq which match the provided filters. - :param start_time: The required date and time of the earliest log entry. - Timestamps are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. + :param start_time: The required date and time of the earliest log entry. Start + times are in RFC 3339 format, for example, 2022-03-09T08:40:18.490771179Z. :param to_date: The required date and time in UTC of the latest log entry. :param cursor: The cursor to use when paging. diff --git a/grove/connectors/twilio/messages.py b/grove/connectors/twilio/messages.py index 1459d97..2988ca3 100644 --- a/grove/connectors/twilio/messages.py +++ b/grove/connectors/twilio/messages.py @@ -30,7 +30,7 @@ def secret(self): :return: The value of the 'secret' field from the configuration. """ try: - return self.configuration.secret # type: ignore + return self.configuration.secret except AttributeError: return None diff --git a/grove/connectors/twilio/monitor_events.py b/grove/connectors/twilio/monitor_events.py index bb3d52f..8a00bda 100644 --- a/grove/connectors/twilio/monitor_events.py +++ b/grove/connectors/twilio/monitor_events.py @@ -28,7 +28,7 @@ def secret(self): :return: The value of the 'secret' field from the configuration. """ try: - return self.configuration.secret # type: ignore + return self.configuration.secret except AttributeError: return None diff --git a/grove/connectors/workday/activity_logging.py b/grove/connectors/workday/activity_logging.py index ce87b02..db57add 100644 --- a/grove/connectors/workday/activity_logging.py +++ b/grove/connectors/workday/activity_logging.py @@ -28,7 +28,7 @@ def base_url(self): :return: The "base_url" portion of the connector's configuration. """ try: - return self.configuration.base_url # type: ignore + return self.configuration.base_url except AttributeError: return None @@ -41,7 +41,7 @@ def client_id(self): :return: The "client_id" portion of the connector's configuration. """ try: - return self.configuration.client_id # type: ignore + return self.configuration.client_id except AttributeError: return None @@ -54,7 +54,7 @@ def client_secret(self): :return: The "client_secret" portion of the connector's configuration. """ try: - return self.configuration.client_secret # type: ignore + return self.configuration.client_secret except AttributeError: return None @@ -92,7 +92,9 @@ def collect(self): # parameters are required. while True: log = client.get_activity_logging( - from_date=self.pointer, to_date=now, cursor=cursor + from_date=self.pointer, + to_date=now, + cursor=cursor, ) # Save this batch of log entries. diff --git a/grove/connectors/zoom/activities.py b/grove/connectors/zoom/activities.py index e16d495..4658e0f 100644 --- a/grove/connectors/zoom/activities.py +++ b/grove/connectors/zoom/activities.py @@ -25,7 +25,7 @@ def client_id(self): This is required as this is a third authentication element required by Zoom. """ try: - return self.configuration.client_id # type: ignore + return self.configuration.client_id except AttributeError: return None diff --git a/grove/connectors/zoom/api.py b/grove/connectors/zoom/api.py index 86cce48..16d1fc7 100644 --- a/grove/connectors/zoom/api.py +++ b/grove/connectors/zoom/api.py @@ -3,8 +3,8 @@ """Zoom API client. -As the Python Zoom client does not currently support Audit API, this client has been created in -the interim. +As the Python Zoom client does not currently support Audit API, this client has been +created in the interim. """ import base64 @@ -151,9 +151,9 @@ def get_logs( :return: AuditLogEntries object containing a pagination cursor, and log entries. """ - # The endpoint returns the same total value of results regardless of the limit and - # offset parameters. The pagination parameters determine the amount of content - # in the data[] array. + # The endpoint returns the same total value of results regardless of the limit + # and offset parameters. The pagination parameters determine the amount of + # content in the data[] array. result = self._get( f"{API_BASE_URI}/{endpoint}", diff --git a/grove/connectors/zoom/operationlogs.py b/grove/connectors/zoom/operationlogs.py index 29e2de1..44567d2 100644 --- a/grove/connectors/zoom/operationlogs.py +++ b/grove/connectors/zoom/operationlogs.py @@ -25,7 +25,7 @@ def client_id(self): This is required as this is a third authentication element required by Zoom. """ try: - return self.configuration.client_id # type: ignore + return self.configuration.client_id except AttributeError: return None diff --git a/grove/constants.py b/grove/constants.py index 014bd66..9d56dbe 100644 --- a/grove/constants.py +++ b/grove/constants.py @@ -41,6 +41,7 @@ PLUGIN_GROUP_CACHE = "grove.caches" PLUGIN_GROUP_OUTPUT = "grove.outputs" PLUGIN_GROUP_CONFIG = "grove.configs" +PLUGIN_GROUP_PROCESSOR = "grove.processors" PLUGIN_GROUP_SECRET = "grove.secrets" # noqa: S105 PLUGIN_GROUP_CONNECTOR = "grove.connectors" diff --git a/grove/entrypoints/base.py b/grove/entrypoints/base.py index 08a56bb..d00b719 100644 --- a/grove/entrypoints/base.py +++ b/grove/entrypoints/base.py @@ -84,7 +84,7 @@ def entrypoint(context: Dict[str, Any]): try: configurations = configure() except GroveException as err: - logger.fatal( + logger.critical( "Failed to initialise configuration handler", extra={"exception": err} ) return @@ -93,7 +93,7 @@ def entrypoint(context: Dict[str, Any]): try: workers = int(os.environ.get(ENV_GROVE_WORKER_COUNT, DEFAULT_WORKER_COUNT)) except ValueError as err: - logger.fatal( + logger.critical( f"Worker count ('{ENV_GROVE_WORKER_COUNT}') must be a number.", extra={"exception": err}, ) diff --git a/grove/exceptions.py b/grove/exceptions.py index 267ec38..f52b61b 100644 --- a/grove/exceptions.py +++ b/grove/exceptions.py @@ -17,7 +17,7 @@ class ConnectorMissingException(GroveException): class ConcurrencyException(GroveException): - """Indidates that Grove may be running in another location concurrently.""" + """Indicates that Grove may be running in another location concurrently.""" class NotImplementedException(GroveException): @@ -42,3 +42,7 @@ class AccessException(GroveException): class DataFormatException(GroveException): """Indicates an issue occurred while attempting to process data.""" + + +class ProcessorError(GroveException): + """Indicates that an error occurred when setting up or calling a processor.""" diff --git a/grove/helpers/parsing.py b/grove/helpers/parsing.py index 6a2898a..ba9c59f 100644 --- a/grove/helpers/parsing.py +++ b/grove/helpers/parsing.py @@ -3,6 +3,10 @@ """Provides helpers for parsing.""" +import json +import re +from typing import Any, Dict, List + from pydantic import ValidationError @@ -13,15 +17,119 @@ def validation_error(exc: ValidationError): :return: The exception as a string, including fields with validation errors. """ - prefix = exc.model.Config.env_prefix # type: ignore message = "Handler configuration is not valid" + try: + prefix = exc.model.Config.env_prefix # type: ignore + except AttributeError: + prefix = "" # Ensure the validation errors are included in the logged error message. for error in exc.errors(): - field = error["loc"][0].upper() # type: ignore + field = str(error["loc"][0]).upper() problem = error["msg"] # Add the environment variable prefix onto the field name. message = f"{message}, {prefix}{field} {problem}" return message + + +def quick_copy(value: Any): + """Performs a quick deep copy by marshalling and unmarshalling to JSON. + + This operation, although strange, is significantly quicker than copy.deepcopy(). + This has been moved into a helper to enable potential performance improvements in + future without code changes in processors being required. + + :param value: The value to perform a deep copy of. + + :return: The deep copy of the input value. + """ + return json.loads(json.dumps(value)) + + +def quote_aware_split(value: str, delimiter=".") -> List[str]: + """Splits a value by delimiter, returning a list. + + This function is quote aware, ensuring that splitting will not occur inside of a + value quoted with single-quotes. + + :param value: The value to split. + :param delimiter: The delimiter to split using. + + :return: A list of elements split from the input value. + """ + fields = [] + + for field in re.split(rf"({re.escape(delimiter)}|'.*?')", value): + # Drop empty and delimiter only fields. + field = field.strip(delimiter) + field = field.strip() + field = re.sub(r"^'(.*)'$", r"\1", field) + + if field: + fields.append(field) + + return fields + + +def update_path( + candidate: Dict[str, Any], + path: List[str], + value: Any, + replace: bool = False, +) -> Dict[str, Any]: + """Updates or deletes values under the specified path for the provided candidate. + + A path is a list of strings delimited string which express a location within the + candidate data. If the location is not nested, a single element list should be + provided. + + As an example of this, a path of `["A", "B", "C"]` expresses that the specified + value should be set under `{"A": {"B": {"C": value } } }` within the candidate + dictionary. + + This function recursively walks the provided candidate dictionary until the location + specified by the path has been located. Once found, the provided value will perform + on of the following operations: + + 1. By default, the provided value will be combined with the existing value. + 2. If `replace` is `True`, the existing value will be replaced with the new. + 3. If `None` is provided as the new value, the specified path will be deleted. + + :param candidate: The dictionary to update. + :param path: The path to the key to update, as a list of strings. + :param value: The value to assign to the destination path, or None to delete. + :param replace: Whether to replace the current value with the new value, or combine. + + :return: The updated dictionary. + """ + key = path.pop(0) + + # Set the value on the last recursion. + if len(path) < 1: + if value is None: + del candidate[key] + return candidate + + # If replace is set, don't combine the new value with the existing. + if replace: + candidate[key] = value + return candidate + + # By default, combine the new value with the existing value(s) - making sure to + # handle dictionaries as well as lists. + if key in candidate and type(candidate[key]) == list: + candidate[key].append(value) + else: + candidate = {**candidate, key: value} + + return candidate + + # If recursing, ensure the child we're trying to recurse into exists. + if key not in candidate: + candidate[key] = {} + + candidate[key] = update_path(candidate[key], path, value, replace=replace) + + return candidate diff --git a/grove/helpers/plugin.py b/grove/helpers/plugin.py index 34570df..3e39273 100644 --- a/grove/helpers/plugin.py +++ b/grove/helpers/plugin.py @@ -35,15 +35,16 @@ def lookup_handler(name: str, group: str) -> EntryPoint: ) -def load_handler(name: str, group: str) -> Any: +def load_handler(name: str, group: str, *args: Any, **kwargs: Any) -> Any: """Attempts to locate and load the requested plugin handler. This is a convenience method which wrappers the lookup operation, and performs the - load and instantiation required to hydrate a 'real' handler. + load and instantiation required to hydrate a 'real' handler. Any additional + arguments passed to load_handler are pass through to the handler during creation. :param name: The name of the handler to load (e.g. 'aws_ssm'). :param group: The group the handler belongs to (e.g. 'grove.outputs'). """ cls = lookup_handler(name, group).load() - return cls() + return cls(*args, **kwargs) diff --git a/grove/models.py b/grove/models.py index fb9ee1f..878383d 100644 --- a/grove/models.py +++ b/grove/models.py @@ -5,7 +5,8 @@ import base64 import binascii -from typing import Dict +from enum import Enum +from typing import Dict, List from pydantic import BaseModel, Extra, Field, root_validator, validator @@ -36,8 +37,36 @@ def decode(value: str, encoding: str) -> str: raise DataFormatException(f"Unknown encoding method '{encoding}'") +class ProcessorConfig(BaseModel, extra=Extra.allow): + """A processor configuration object. + + A processor configuration object represents information used by processors to + perform some set of operations on log entries. This base configuration object + is bare-bones as processors may define their own required configuration fields. + """ + + # Name is an arbitrary name administrators can provide to processors to enable + # better tracking and identification of processors. + name: str + + # Processor defines the processor which should be run. This must match the plugin + # entrypoint name. + processor: str + + +class OutputStream(str, Enum): + """Defines supported output 'streams'. + + This is used to allow routing of original / raw collected data differently to + post processed data. + """ + + raw = "raw" + processed = "processed" + + class ConnectorConfig(BaseModel, extra=Extra.allow): - """A connector configuration object. + """Defines the connector configuration structure. A configuration object represents information which Grove uses to call a given connector. All connectors must have at least a name, key, identity, and connector @@ -73,6 +102,19 @@ class ConnectorConfig(BaseModel, extra=Extra.allow): # from API endpoints which allow filtering records to return. operation: str = Field(OPERATION_DEFAULT) + # Processors allow processing of data during collection. + processors: List[ProcessorConfig] = Field([]) + + # Outputs allows specification of what type of data to output, and with what + # descriptor. By default, any processed logs will be output with a descriptor of + # 'processed', and raw logs with a descriptor of 'logs'. + outputs: Dict[str, OutputStream] = Field( + { + "logs": OutputStream.raw, + "processed": OutputStream.processed, + } + ) + @validator("key") def _validate_key_or_secret(cls, value, values, field): # noqa: B902 """Ensures that 'key' is set directly or a reference is present in 'secrets'. diff --git a/grove/outputs/__init__.py b/grove/outputs/__init__.py index 28da9a7..b4c9121 100644 --- a/grove/outputs/__init__.py +++ b/grove/outputs/__init__.py @@ -6,13 +6,44 @@ import abc import gzip import json -from typing import Any, Dict, List +import logging +from typing import Any, Dict, List, Optional + +from pydantic import BaseSettings, Extra, ValidationError from grove.constants import GROVE_METADATA_KEY -from grove.exceptions import DataFormatException +from grove.exceptions import ConfigurationException, DataFormatException +from grove.helpers import parsing class BaseOutput(abc.ABC): + """The basis for all Grove output handlers.""" + + class Configuration(BaseSettings, extra=Extra.allow): + """Defines the configuration directives required by all output handlers.""" + + pass + + def __init__(self): + """Implements core logic which applies to all handlers. + + This includes configuration of logging, and parsing of configuration. + """ + self.logger = logging.getLogger(__name__) + + # Wrap validation errors to keep them in the Grove exception hierarchy. + try: + self.config = self.Configuration() + except ValidationError as err: + raise ConfigurationException(parsing.validation_error(err)) + + def setup(self): + """Implements logic to setup any required clients, sockets, or connections. + + If not required for the given output handler, this may be a no-op. + """ + pass + @abc.abstractmethod def submit( self, @@ -20,6 +51,9 @@ def submit( connector: str, identity: str, operation: str, + part: int = 0, + suffix: Optional[str] = None, + descriptor: Optional[str] = None, ): """Implements logic require to write collected log data to the given backend. @@ -27,14 +61,24 @@ def submit( :param connector: Name of the connector which retrieved the data. :param identity: Identity the collected data was collect for. :param operation: Operation the collected logs are associated with. + :param part: Number indicating which part of the same log stream this file + contains data for. This is used to indicate that the logs are from the same + collection, but have been broken into smaller files for downstream + processing. + :param suffix: An optional suffix to allow propagation of file type information + or other relevant features. + :param descriptor: An optional and arbitrary descriptor associated with the + log data. This may be used by handlers for construction / specification of + file paths, URLs, or database tables. """ pass - def serialize(self, data: List[Any], metadata: Dict[str, Any]) -> bytes: - """Serialize data to a standard format (gzipped NDJSON). + def serialize(self, data: List[Any], metadata: Dict[str, Any] = {}) -> bytes: + """Implements serialization of log entries to a gzipped NDJSON. :param data: A list of log entries to serialize to JSON. - :param metadata: Metadata to append to JSON before serialisation. + :param metadata: Metadata to append to each log entry before serialization. If + not specified no metadata will be added. :return: Log data serialized as gzipped NDJSON (as bytes). @@ -46,14 +90,21 @@ def serialize(self, data: List[Any], metadata: Dict[str, Any]) -> bytes: # This is expensive but we can't just json.dumps into gzip.compress as that # will not yield NDJSON. for entry in data: - entry[GROVE_METADATA_KEY] = metadata + # Skip entry log entries. + if entry is None: + continue + + if metadata: + entry[GROVE_METADATA_KEY] = { + **metadata, + **entry.get(GROVE_METADATA_KEY, {}), + } # We don't want to silently drop and lose single records, so drop the entire # batch if there is bad data (which will trigger a retry next run). try: candidate.append(json.dumps(entry, separators=(",", ":"))) except TypeError as err: - message = f"Unable to serialize to JSON: {err}" - raise DataFormatException(message) + raise DataFormatException(f"Unable to serialize to JSON: {err}") return gzip.compress(bytes("\r\n".join(candidate), "utf-8")) diff --git a/grove/outputs/aws_s3.py b/grove/outputs/aws_s3.py index 4ff740f..2384f2a 100644 --- a/grove/outputs/aws_s3.py +++ b/grove/outputs/aws_s3.py @@ -4,60 +4,69 @@ """Grove AWS S3 output handler.""" import datetime -import logging import os from typing import Optional from boto3.session import Session from botocore.exceptions import BotoCoreError, ClientError -from pydantic import BaseSettings, Field, ValidationError +from pydantic import Field from grove.constants import DATESTAMP_FORMAT -from grove.exceptions import AccessException, ConfigurationException -from grove.helpers import parsing +from grove.exceptions import AccessException from grove.outputs import BaseOutput OBJECT_KEY = ( - "logs/{connector}/{identity}/{year}/{month}/{day}/" - "{operation}-{datestamp}.{part}.json.gz" + "{descriptor}{connector}/{identity}/{year}/{month}/{day}/" + "{operation}-{datestamp}.{part}{kind}" ) -class Configuration(BaseSettings): - """Defines environment variables used to configure the AWS S3 handler. - - This should also include any appropriate default values for fields which are not - required. - """ - - bucket: str = Field( - description="The name of the S3 bucket to output logs to.", - ) - assume_role_arn: Optional[str] = Field( - description="An optional AWS role to assume when authenticating with AWS.", - default=None, - ) - bucket_region: Optional[str] = Field( - description="The region that S3 the bucket exists in (default us-east-1)", - default=os.environ.get("AWS_REGION", "us-east-1"), - ) +class Handler(BaseOutput): + """This output handler allows Grove to write collected logs to an AWS S3 bucket.""" - class Config: - """Allow environment variable override of configuration fields. + class Configuration(BaseOutput.Configuration): + """Defines environment variables used to configure the AWS S3 handler. - This also enforce a prefix for all environment variables for this handler. As - an example the field `bucket` would be set using the environment variable - `GROVE_OUTPUT_AWS_S3_BUCKET`. + This should also include any appropriate default values for fields which are not + required. """ - env_prefix = "GROVE_OUTPUT_AWS_S3_" - case_insensitive = True - - -class Handler(BaseOutput): - """This output handler allows Grove to write collected logs to an AWS S3 bucket.""" - - def __init__(self): + bucket: str = Field( + description="The name of the S3 bucket to output logs to.", + ) + aws_access_key_id: Optional[str] = Field( + description="An optional AWS access key to use when authenticating", + default=os.environ.get("AWS_ACCESS_KEY_ID"), + ) + aws_secret_access_key: Optional[str] = Field( + description="An optional AWS secret key to use when authenticating", + default=os.environ.get("AWS_SECRET_ACCESS_KEY"), + ) + aws_session_token: Optional[str] = Field( + description="An optional AWS session token to use when authenticating", + default=os.environ.get("AWS_SESSION_TOKEN"), + ) + assume_role_arn: Optional[str] = Field( + description="An optional AWS role to assume when authenticating with AWS.", + default=None, + ) + bucket_region: Optional[str] = Field( + description="The region that S3 the bucket exists in (default us-east-1)", + default=os.environ.get("AWS_REGION", "us-east-1"), + ) + + class Config: + """Allow environment variable override of configuration fields. + + This also enforce a prefix for all environment variables for this handler. + As an example the field `bucket` would be set using the environment variable + `GROVE_OUTPUT_AWS_S3_BUCKET`. + """ + + env_prefix = "GROVE_OUTPUT_AWS_S3_" + case_insensitive = True + + def setup(self): """Sets up access to S3. This handler also attempt to assume a configured role in order to allow @@ -66,24 +75,34 @@ def __init__(self): :raises ConfigurationException: There was an issue with configuration. :raises AccessException: An issue occurred when accessing S3. """ - self.logger = logging.getLogger(__name__) - - # Wrap validation errors to keep them in the Grove exception hierarchy. - try: - self.config = Configuration() # type: ignore - except ValidationError as err: - raise ConfigurationException(parsing.validation_error(err)) - # Explicit calls to session are mostly used to allow mocks during testing. session = Session() + # Only add in optional arguments if configured. + client_kwargs = {} + + if self.config.aws_access_key_id: + client_kwargs["aws_access_key_id"] = self.config.aws_access_key_id + client_kwargs["aws_secret_access_key"] = self.config.aws_secret_access_key + + if self.config.aws_session_token: + client_kwargs["aws_session_token"] = self.config.aws_session_token + # If a role was specified, ensure we assume it and use STS tokens to interact # with S3. try: if not self.config.assume_role_arn: - self.s3 = session.client("s3", region_name=self.config.bucket_region) + self.s3 = session.client( + "s3", + region_name=self.config.bucket_region, + **client_kwargs, + ) else: - sts = session.client("sts") + sts = session.client( + "sts", + region_name=self.config.bucket_region, + **client_kwargs, + ) role = sts.assume_role( RoleArn=self.config.assume_role_arn, RoleSessionName="GroveOutputWriter", @@ -105,6 +124,8 @@ def submit( identity: str, operation: str, part: int = 0, + kind: Optional[str] = ".json.gz", + descriptor: Optional[str] = "logs/", ): """Persists captured data to an S3 compatible object store. @@ -116,9 +137,16 @@ def submit( contains data for. This is used to indicate that the logs are from the same collection, but have been broken into smaller files for downstream processing. + :param kind: An optional file suffix to use for objects written. + :param descriptor: An optional path to append to the beginning of the output + S3 key. :raises AccessException: An issue occurred when accessing S3. """ + # Append a trailing slash to the descriptor if set - to form a path. + if descriptor and not descriptor.endswith("/"): + descriptor = f"{descriptor}/" + try: datestamp = datetime.datetime.utcnow() self.s3.put_object( @@ -133,6 +161,8 @@ def submit( month=datestamp.strftime("%m"), day=datestamp.strftime("%d"), datestamp=datestamp.strftime(DATESTAMP_FORMAT), + descriptor=descriptor, + kind=kind, ), ) except ClientError as err: diff --git a/grove/outputs/local_file.py b/grove/outputs/local_file.py index 276c3ed..f6a3e7c 100644 --- a/grove/outputs/local_file.py +++ b/grove/outputs/local_file.py @@ -4,63 +4,52 @@ """Grove local file path output handler.""" import datetime -import logging import os +from typing import Optional -from pydantic import BaseSettings, Field, ValidationError +from pydantic import Field from grove.constants import DATESTAMP_FORMAT -from grove.exceptions import AccessException, ConfigurationException -from grove.helpers import parsing +from grove.exceptions import AccessException from grove.outputs import BaseOutput OBJECT_PATH = ( - "logs/{connector}/{identity}/{year}/{month}/{day}/" - "{operation}-{datestamp}.{part}.json.gz" + "{descriptor}{connector}/{identity}/{year}/{month}/{day}/" + "{operation}-{datestamp}.{part}{kind}" ) -class Configuration(BaseSettings): - """Defines environment variables used to configure the local file handler. - - This should also include any appropriate default values for fields which are not - required. - """ +class Handler(BaseOutput): + class Configuration(BaseOutput.Configuration): + """Defines environment variables used to configure the local file handler. - path: str = Field( - description="The path to the directory to write collected logs to.", - ) + This should also include any appropriate default values for fields which are not + required. + """ - class Config: - """Allow environment variable override of configuration fields. + path: str = Field( + description="The path to the directory to write collected logs to.", + ) - This also enforce a prefix for all environment variables for this handler. As - an example the field `path` would be set using the environment variable - `GROVE_OUTPUT_LOCAL_FILE_PATH`. - """ + class Config: + """Allow environment variable override of configuration fields. - env_prefix = "GROVE_OUTPUT_LOCAL_FILE_" - case_insensitive = True + This also enforce a prefix for all environment variables for this handler. + As an example the field `path` would be set using the environment variable + `GROVE_OUTPUT_LOCAL_FILE_PATH`. + """ + env_prefix = "GROVE_OUTPUT_LOCAL_FILE_" + case_insensitive = True -class Handler(BaseOutput): - def __init__(self): + def setup(self): """Set up access to local filesystem path. This also checks that an output directory is configured, and it is initially accessible and writable. - :raises ConfigurationException: There was an issue with output configuration. - :raises AccessException: There was an issue accessing to the specified file path. + :raises AccessException: There was an issue accessing to the provided file path. """ - self.logger = logging.getLogger(__name__) - - # Wrap validation errors to keep them in the Grove exception hierarchy. - try: - self.config = Configuration() # type: ignore - except ValidationError as err: - raise ConfigurationException(parsing.validation_error(err)) - # Perform a spot check to see if the directory is writable now. Although this # can change, we'd like to bail before we collect any data if it's a simple # permissions related misconfiguration. @@ -81,6 +70,8 @@ def submit( identity: str, operation: str, part: int = 0, + kind: Optional[str] = ".json.gz", + descriptor: Optional[str] = "logs/", ): """Persists captured data to a local file path. @@ -92,9 +83,16 @@ def submit( contains data for. This is used to indicate that the logs are from the same collection, but have been broken into smaller files for downstream processing. + :param kind: An optional file suffix to use for files written. + :param descriptor: An optional path to append to the beginning of the output + file path. :raises AccessException: An issue occurred when writing data. """ + # Append a trailing slash to the descriptor if set - to form a path. + if descriptor and not descriptor.endswith("/"): + descriptor = f"{descriptor}/" + # Each log file is output under a particular hierarchy to assist with sharding # and ingestion / finding of log data. datestamp = datetime.datetime.utcnow() @@ -108,6 +106,8 @@ def submit( month=datestamp.strftime("%m"), day=datestamp.strftime("%d"), datestamp=datestamp.strftime(DATESTAMP_FORMAT), + kind=kind, + descriptor=descriptor, ) # Quick and dirty directory traversal check. diff --git a/grove/outputs/local_stdout.py b/grove/outputs/local_stdout.py index dee0d3d..49142ba 100644 --- a/grove/outputs/local_stdout.py +++ b/grove/outputs/local_stdout.py @@ -5,7 +5,7 @@ import datetime import json -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from grove.constants import DATESTAMP_FORMAT, GROVE_METADATA_KEY from grove.exceptions import DataFormatException @@ -20,6 +20,8 @@ def submit( identity: str, operation: str, part: int = 0, + kind: Optional[str] = "json", + descriptor: Optional[str] = "raw", ): """Print captured data to stdout. @@ -31,6 +33,9 @@ def submit( contains data for. This is used to indicate that the logs are from the same collection, but have been broken into smaller files for downstream processing. + :param kind: The format of the data being output. + :param descriptor: An arbitrary descriptor to identify the data being output. + """ datestamp = datetime.datetime.utcnow() @@ -38,6 +43,8 @@ def submit( json.dumps( { "part": part, + "kind": kind, + "descriptor": descriptor, "connector": connector, "identity": identity, "operation": operation, @@ -48,11 +55,12 @@ def submit( flush=True, ) - def serialize(self, data: List[Any], metadata: Dict[str, Any]) -> bytes: + def serialize(self, data: List[Any], metadata: Dict[str, Any] = {}) -> bytes: """Serialize data to a standard format (NDJSON). :param data: A list of log entries to serialize to JSON. - :param metadata: Metadata to append to JSON before serialisation. + :param metadata: Metadata to append to each log entry before serialization. If + not specified no metadata will be added. :return: Log data serialized as NDJSON. diff --git a/grove/processors/__init__.py b/grove/processors/__init__.py new file mode 100644 index 0000000..1309148 --- /dev/null +++ b/grove/processors/__init__.py @@ -0,0 +1,57 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Provides processors for collected log entries.""" + +import abc +import logging +from typing import Any, Dict, List + +from pydantic import Extra, ValidationError + +from grove.exceptions import ConfigurationException +from grove.helpers import parsing +from grove.models import ProcessorConfig + + +class BaseProcessor(abc.ABC): + """Provides an abstract base processor which all processors must inherit from.""" + + class Configuration(ProcessorConfig, extra=Extra.forbid): + """Defines the required configuration and validators for the processor.""" + + pass + + def __init__(self, config: Dict[str, Any]): + """Sets up a Grove processor. + + :param config: The configuration document for this processor, as a dict. + """ + self.logger = logging.getLogger(__name__) + + # Load and validate configuration. We perform a bit of a strange operation here + # but our caller needs to have loaded the configuration into a ProcessorConfig + # already, but we want to re-validate it here. As a result, we convert to a dict + # and back again. + try: + self.configuration = self.Configuration(**config.dict()) + except ValidationError as err: + raise ConfigurationException( + f"Processor configuration is invalid. {parsing.validation_error(err)}" + ) + + @abc.abstractmethod + def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: + """Performs a set of processes against a log entry. + + :param entry: A collected log entry. + + :returns: The processed log entry in a list. If only a single entry is required + the list should contain a single element. If the log entry is to be dropped, + an empty list should be used. + """ + pass + + def finalize(self): + """Performs a final set of operations after logs have been saved.""" + return diff --git a/grove/processors/extract_paths.py b/grove/processors/extract_paths.py new file mode 100644 index 0000000..726b9ec --- /dev/null +++ b/grove/processors/extract_paths.py @@ -0,0 +1,97 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Grove processor to extract and map fields using JMESPaths. + +This processor is intended to be used to transform raw log entries into a common schema. +This is especially useful for ensuring that all collected log entries from differing +upstream vendors are in a consistent format - whether industry standard, or bespoke. +""" + +import json +from typing import Any, Dict, List, Optional + +import jmespath +from pydantic import BaseModel, Extra, validator + +from grove.helpers import parsing +from grove.models import ProcessorConfig +from grove.processors import BaseProcessor + + +class Mapping(BaseModel, extra=Extra.forbid): + """Expresses the configuration fields used to specify path mapping.""" + + # Destination specifies where to write extracted or specified values into. This + # can be a nested path, with subsequent dimensions specified with dots (`.`). + destination: str + + # Sources defines a list of JMESPaths to map into the destination. If multiple + # are provided, the sources are processed in order with the first match winning. + sources: List[str] = [] + + # Static allows a static field to be written into the destination, rather than + # extraction from the source. This field is incompatible with sources. + static: Optional[str] + + @validator("static") + def static_or_sources(cls, value, values): + """Ensures that either sources or static is set, not both.""" + if value and len(values.get("sources")) > 0: + raise ValueError("Either sources or static should be set, not both.") + + return value + + +class Handler(BaseProcessor): + """Extract and map fields using JMESPaths.""" + + class Configuration(ProcessorConfig, extra=Extra.forbid): + """Expresses the configuration and associated validators for the processor.""" + + # Remap the original event as a string under the provided path. If not set, any + # field not mapped will be dropped. + raw: Optional[str] + + # Defines the field mapping. + fields: List[Mapping] + + def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: + """Attempt to extract and map fields from the log entry. + + :param entry: A collected log entry. + + :return: The processed log entry with fields mapped, as a list. + """ + result: Dict[str, Any] = {} + + # Map the entire log entry under the given path - if configured. + if self.configuration.raw: + result = parsing.update_path( + result, + parsing.quote_aware_split(self.configuration.raw), + json.dumps(entry, separators=(",", ":")), + ) + + for field in self.configuration.fields: + value = field.static + destination = parsing.quote_aware_split(field.destination) + + # If a static value is defined it should be used over any source fields. + if not value: + # Mappings may contain multiple sources to attempt to map. These are + # evaluated from the first entry to the last, with the first match + # winning. + for source in field.sources: + value = jmespath.search(source, entry) + if value: + break + + # Combine the extracted value with the data nested under the same path - or + # create the path if not present. + result = parsing.update_path(result, destination, value) + + # Return the newly processed entry. A list is always used, even if only a single + # element is returned, to allow support for dropping log entries, or splitting a + # single log entry into multiple. + return [result] diff --git a/grove/processors/filter_paths.py b/grove/processors/filter_paths.py new file mode 100644 index 0000000..afaa06f --- /dev/null +++ b/grove/processors/filter_paths.py @@ -0,0 +1,44 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Grove processor to filter (delete) fields from log entries based on provided paths. + +This processor is intended to allow removal of superfluous or duplicated data from +log entries. This may be used after a processing stage to remove the original source +data, or used to prune down a log entry from a particularly verbose vendor. +""" + +from typing import Any, Dict, List + +from pydantic import Extra + +from grove.helpers import parsing +from grove.models import ProcessorConfig +from grove.processors import BaseProcessor + + +class Handler(BaseProcessor): + """Filter (delete) fields from log entries based on provided paths.""" + + class Configuration(ProcessorConfig, extra=Extra.forbid): + """Expresses the configuration and associated validators for the processor.""" + + # Source defines a list of paths to field to drop (delete). These should be + # defined as a JMESPaths. + sources: List[str] + + def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: + """Attempt to drop a configured field from the log entry. + + :param entry: A collected log entry. + + :return: The processed log entry, with fields dropped. + """ + for source in self.configuration.sources: + entry = parsing.update_path( + entry, + parsing.quote_aware_split(source), + None, + ) + + return [entry] diff --git a/grove/processors/split_path.py b/grove/processors/split_path.py new file mode 100644 index 0000000..ad8fd4c --- /dev/null +++ b/grove/processors/split_path.py @@ -0,0 +1,55 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Grove processor to split a log entry into N log entries by the specified JMESPath. + +This processor is intended to allow "fanning-out" a single log entry which contains +several related operations into distinct log entries per item. The remainder of the +log entry outside of the split path will not be modified. +""" + +from typing import Any, Dict, List + +import jmespath +from pydantic import Extra + +from grove.helpers import parsing +from grove.models import ProcessorConfig +from grove.processors import BaseProcessor + + +class Handler(BaseProcessor): + """Split a log entry into N log entries by the specified JMESPath.""" + + class Configuration(ProcessorConfig, extra=Extra.forbid): + """Expresses the configuration and associated validators for the processor.""" + + # Source defines the path to split the log entry by. This should be defined as a + # JMESPath. The field referenced by this path should be a list. + source: str + + def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: + """Attempt to extract and map fields from the log entry. + + :param entry: A collected log entry. + + :return: The processed log entry. + """ + # In this instance we WANT to mutate the copy outside of the processor. + processed = [] + children = jmespath.search(self.configuration.source, entry) + + if len(children) <= 1: + return [entry] + + for child in children: + processed.append( + parsing.update_path( + parsing.quick_copy(entry), + parsing.quote_aware_split(self.configuration.source), + [child], + replace=True, + ) + ) + + return processed diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d527125 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,183 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "grove" +readme = "README.md" +description = "A Software as a Service (SaaS) log collection framework." +requires-python = ">=3.9" +dynamic = ["version"] +authors = [{name = "HashiCorp Security (TDR)"}] +license = {text = "MPL-2.0"} +classifiers = [ + "Programming Language :: Python :: 3.9", + "Natural Language :: English", +] +dependencies = [ + "urllib3<2.0", + "aws-lambda-powertools>=2.0,<3.0", + "boto3>=1.26,<2.0", + "requests>=2.28,<3.0", + "google-api-python-client>=2.68,<3.0", + "simple-salesforce>=1.12,<2.0", + "twilio>=7.15,<8.0", + "pydantic>=1.10,<2.0", + "jmespath>=1.0.0,<2.0", +] + +[project.optional-dependencies] +tests = [ + "black", + "coverage", + "ruff", + "types-setuptools", + "isort", + "mypy", + "pip-tools", + "mock", + "pytest", + "pytest-cov", + "responses", + "tox", + "sphinx", + "furo", + "moto[s3,ssm]", + "types-requests", +] + +[tool.setuptools.packages.find] +include = ["grove*"] + +[tool.setuptools.dynamic] +version = {attr = "grove.__about__.__version__"} + +[project.scripts] +grove = "grove.entrypoints.local_process:entrypoint" + +[project.entry-points."grove.entrypoints"] +aws_lambda = "grove.entrypoints.aws_lambda:entrypoint" +local_process = "grove.entrypoints.local_process:entrypoint" + +[project.entry-points."grove.connectors"] +atlassian_audit_events = "grove.connectors.atlassian.audit_events:Connector" +github_audit_log = "grove.connectors.github.audit_log:Connector" +gsuite_activities = "grove.connectors.gsuite.activities:Connector" +local_heartbeat = "grove.connectors.local.heartbeat:Connector" +gsuite_alerts = "grove.connectors.gsuite.alerts:Connector" +okta_system_log = "grove.connectors.okta.system_log:Connector" +onepassword_events_itemusages = "grove.connectors.onepassword.events_itemusages:Connector" +onepassword_events_signinattempts = "grove.connectors.onepassword.events_signinattempts:Connector" +onepassword_events_audit = "grove.connectors.onepassword.events_audit:Connector" +pagerduty_audit_records = "grove.connectors.pagerduty.audit_records:Connector" +sf_event_log = "grove.connectors.sf.event_log:Connector" +sfmc_audit_events = "grove.connectors.sfmc.audit_events:Connector" +sfmc_security_events = "grove.connectors.sfmc.security_events:Connector" +slack_audit_logs = "grove.connectors.slack.audit_logs:Connector" +tfc_audit_trails = "grove.connectors.tfc.audit_trails:Connector" +torq_activity_logs = "grove.connectors.torq.activity_logs:Connector" +torq_audit_logs = "grove.connectors.torq.audit_logs:Connector" +twilio_monitor_events = "grove.connectors.twilio.monitor_events:Connector" +twilio_messages = "grove.connectors.twilio.messages:Connector" +workday_activity_logging = "grove.connectors.workday.activity_logging:Connector" +zoom_activities = "grove.connectors.zoom.activities:Connector" +zoom_operationlogs = "grove.connectors.zoom.operationlogs:Connector" +oomnitza_activities = "grove.connectors.oomnitza.activities:Connector" + +[project.entry-points."grove.caches"] +aws_dynamodb = "grove.caches.aws_dynamodb:Handler" +local_memory = "grove.caches.local_memory:Handler" + +[project.entry-points."grove.outputs"] +aws_s3 = "grove.outputs.aws_s3:Handler" +local_file = "grove.outputs.local_file:Handler" +local_stdout = "grove.outputs.local_stdout:Handler" + +[project.entry-points."grove.configs"] +aws_ssm = "grove.configs.aws_ssm:Handler" +local_file = "grove.configs.local_file:Handler" + +[project.entry-points."grove.secrets"] +aws_ssm = "grove.secrets.aws_ssm:Handler" +hashicorp_vault = "grove.secrets.hashicorp_vault:Handler" + +[project.entry-points."grove.processors"] +extract_paths = "grove.processors.extract_paths:Handler" +filter_paths = "grove.processors.filter_paths:Handler" +split_path = "grove.processors.split_path:Handler" + +[tool.mypy] +files = [ + "./grove/**/*.py", + "./tests/**/*.py" +] +disable_error_code = "attr-defined" +allow_redefinition = false +check_untyped_defs = true +disallow_any_generics = true +disallow_untyped_calls = false +ignore_errors = false +ignore_missing_imports = true +implicit_reexport = false +local_partial_types = true +strict_optional = true +strict_equality = true +no_implicit_optional = true +warn_no_return = true +warn_unused_ignores = true +warn_redundant_casts = true +warn_unused_configs = true +warn_unreachable = true + +[tool.isort] +multi_line_output = 3 +profile = "black" + +[tool.pytest.ini_options] +junit_family = "xunit2" +norecursedirs = ".*" +self-contained-html = true +testpaths = [ + "tests" +] +addopts = """ + --strict + --tb=auto + --cov=grove + --cov-report=term-missing:skip-covered + --cov-branch + -p no:doctest + -p no:warnings + -s +""" + +[tool.tox] +legacy_tox_ini = """ + [tox] + envlist = linters,py3 + + [testenv] + pip_version = pip + extras = tests + commands = pytest -c pyproject.toml + srcs = grove + + [testenv:linters] + basepython = python3 + usedevelop = true + commands = + {[testenv:ruff]commands} + {[testenv:mypy]commands} + + [testenv:ruff] + basepython = python3 + skip_install = true + commands = + ruff check {[testenv]srcs} + + [testenv:mypy] + basepython3 = python3 + skip_install = true + commands = + - mypy --config-file pyproject.toml {[testenv]srcs} +""" diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 91be562..0000000 --- a/setup.cfg +++ /dev/null @@ -1,160 +0,0 @@ -[metadata] -name = grove -description = A Software as a Service (SaaS) log collection framework. -versioning = build-id -classifiers = - Programming Language :: Python :: 3.9 - Natural Language :: English - -[options] -python_requires = >= 3.9 -install_requires = - urllib3<2.0 - aws-lambda-powertools>=2.0,<3.0 - boto3>=1.26,<2.0 - requests>=2.28,<3.0 - google-api-python-client>=2.68,<3.0 - simple-salesforce>=1.12,<2.0 - twilio>=7.15,<8.0 - pydantic>=1.10,<2.0 - -[options.extras_require] -tests = - black - coverage - flake8 - flake8-black - flake8-blind-except - flake8-bugbear - flake8-builtins - flake8-comprehensions - flake8-docstrings - flake8-isort - flake8_tuple - types-bleach - types-requests - types-setuptools - isort - mypy - pip-tools - mock - moto[ssm,s3] - pytest - pytest-cov - responses - tox - sphinx - furo - -; flake8 for linting. -[flake8] -max-complexity = 10 -import-order-style = edited -application-import-names = grove -max-line-length = 88 -select = B,C,D,E,F,P,T4,W,B9 -exclude = - *.egg-info, - *.pyc, - .cache, - .coverage.*, - .gradle, - .tox, - build, - dist, - htmlcov.* -ignore = - # Exception chaining is automatic inside of except blocks. - B904, - # Don't prefer !r / !s in string interpolation. - B028, B907, - # See https://github.com/PyCQA/pycodestyle/issues/373 - E203, - # Ignore too many leading '#' for block comment - E266, - # Ignore Line too long (82 > 79) in favor of bugbear - E501, - # Ignore Line break before binary operator (not PEP8) - W503, - # Ignore 1 blank line required before/after class docstring and summary - D203,D204,D205, - # Ignore multi-line docstring summary should start at the first line - D212 - # Ignore First line should end with a period - D400 - # Ignore First line should be in imperative mood - D401 - # Ignore missing module, class, public method, function, - # public package, magic method, and __init__ docstrings. - # ...probably want to enable these at some point... - D100,D101,D102,D103,D104,D105,D107 - -; mypy for type checking. -[mypy] -files = ./grove/**/*.py,./tests/**/*.py -allow_redefinition = False -check_untyped_defs = True -disallow_any_generics = True -disallow_untyped_calls = False -ignore_errors = False -ignore_missing_imports = True -implicit_reexport = False -local_partial_types = True -strict_optional = True -strict_equality = True -no_implicit_optional = True -warn_no_return = True -warn_unused_ignores = True -warn_redundant_casts = True -warn_unused_configs = True -warn_unreachable = True - -; isort for import sorting. -[tool:isort] -multi_line_output = 3 -profile = black - -; pytest for Testing. -[tool:pytest] -junit_family = xunit2 -norecursedirs =.* -self-contained-html = true -testpaths = tests -addopts = - --strict - --tb=auto - --cov=grove - --cov-report=term-missing:skip-covered - --cov-branch - -p no:doctest - -p no:warnings - -s - -; Tox for linter and test execution. -[tox:tox] -envlist = linters,py3 - -[testenv] -pip_version = pip -extras = tests -commands = pytest -c setup.cfg -srcs = setup.py grove - -[testenv:linters] -basepython = python3 -usedevelop = true -commands = - {[testenv:flake8]commands} - {[testenv:mypy]commands} - -[testenv:flake8] -basepython = python3 -skip_install = true -commands = - flake8 --config setup.cfg {[testenv]srcs} - -[testenv:mypy] -basepython3 = python3 -skip_install = true -commands = - - mypy --config-file setup.cfg {[testenv]srcs} diff --git a/setup.py b/setup.py deleted file mode 100644 index 1e6d794..0000000 --- a/setup.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (c) HashiCorp, Inc. -# SPDX-License-Identifier: MPL-2.0 - -"""Minimal setup for grove.""" -import os - -from setuptools import find_packages, setup - -# These will be overwritten by the values from __about__.py -__version__ = "0.0.0" -__author__ = "Not Defined" - -path = os.path.dirname(os.path.abspath(__file__)) -exec(open(os.path.join(path, "grove/__about__.py")).read()) # noqa: S102 - -# Load the long description for PyPi. -long_description = open(os.path.join(path, "README.md")).read() - -setup( - name="grove", - version=__version__, - author=__author__, - packages=find_packages(include=["grove", "grove.*"]), - long_description=long_description, - long_description_content_type="text/markdown", - entry_points={ - "console_scripts": [ - "grove = grove.entrypoints.local_process:entrypoint", - ], - "grove.entrypoints": [ - "aws_lambda = grove.entrypoints.aws_lambda:entrypoint", - "local_process = grove.entrypoints.local_process:entrypoint", - ], - "grove.connectors": [ - "atlassian_audit_events = grove.connectors.atlassian.audit_events:Connector", - "github_audit_log = grove.connectors.github.audit_log:Connector", - "gsuite_activities = grove.connectors.gsuite.activities:Connector", - "local_heartbeat = grove.connectors.local.heartbeat:Connector", - "gsuite_alerts = grove.connectors.gsuite.alerts:Connector", - "okta_system_log = grove.connectors.okta.system_log:Connector", - "onepassword_events_itemusages = grove.connectors.onepassword.events_itemusages:Connector", # noqa: B950 - "onepassword_events_signinattempts = grove.connectors.onepassword.events_signinattempts:Connector", # noqa: B950 - "onepassword_events_audit = grove.connectors.onepassword.events_audit:Connector", # noqa: B950 - "pagerduty_audit_records = grove.connectors.pagerduty.audit_records:Connector", - "sf_event_log = grove.connectors.sf.event_log:Connector", - "sfmc_audit_events = grove.connectors.sfmc.audit_events:Connector", - "sfmc_security_events = grove.connectors.sfmc.security_events:Connector", - "slack_audit_logs = grove.connectors.slack.audit_logs:Connector", - "tfc_audit_trails = grove.connectors.tfc.audit_trails:Connector", - "torq_activity_logs = grove.connectors.torq.activity_logs:Connector", - "torq_audit_logs = grove.connectors.torq.audit_logs:Connector", - "twilio_monitor_events = grove.connectors.twilio.monitor_events:Connector", - "twilio_messages = grove.connectors.twilio.messages:Connector", - "workday_activity_logging = grove.connectors.workday.activity_logging:Connector", - "zoom_activities = grove.connectors.zoom.activities:Connector", - "zoom_operationlogs = grove.connectors.zoom.operationlogs:Connector", - "oomnitza_activities = grove.connectors.oomnitza.activities:Connector", - ], - "grove.caches": [ - "aws_dynamodb = grove.caches.aws_dynamodb:Handler", - "local_memory = grove.caches.local_memory:Handler", - ], - "grove.outputs": [ - "aws_s3 = grove.outputs.aws_s3:Handler", - "local_file = grove.outputs.local_file:Handler", - "local_stdout = grove.outputs.local_stdout:Handler", - ], - "grove.configs": [ - "aws_ssm = grove.configs.aws_ssm:Handler", - "local_file = grove.configs.local_file:Handler", - ], - "grove.secrets": [ - "aws_ssm = grove.secrets.aws_ssm:Handler", - "hashicorp_vault = grove.secrets.hashicorp_vault:Handler", - ], - }, -) diff --git a/templates/code/{{ cookiecutter.project_name }}/{{ cookiecutter.project_slug }}/example_logs.py b/templates/code/{{ cookiecutter.project_name }}/{{ cookiecutter.project_slug }}/example_logs.py index f7675b4..6ab2f35 100644 --- a/templates/code/{{ cookiecutter.project_name }}/{{ cookiecutter.project_slug }}/example_logs.py +++ b/templates/code/{{ cookiecutter.project_name }}/{{ cookiecutter.project_slug }}/example_logs.py @@ -20,7 +20,7 @@ def optional_setting(self): :return: The "optional_setting" component of the connector configuration. """ try: - return self.configuration.optional_setting # type: ignore + return self.configuration.optional_setting except AttributeError: return "Some Default value" @@ -50,6 +50,6 @@ def collect(self): self.save(log.entries) # Break out of loop when there are no more pages. - cursor = log.cursor + cursor = log.cursor # type: ignore if cursor is None: break diff --git a/templates/deployment/local-quick-start/connectors/local_heartbeat.json b/templates/deployment/local-quick-start/connectors/local_heartbeat.json index 0a549aa..b09983b 100644 --- a/templates/deployment/local-quick-start/connectors/local_heartbeat.json +++ b/templates/deployment/local-quick-start/connectors/local_heartbeat.json @@ -4,5 +4,8 @@ "connector": "local_heartbeat", "key": "", "interval": 5, - "count": 5 + "count": 5, + "outputs": { + "logs": "raw" + } } \ No newline at end of file diff --git a/tests/fixtures/gsuite/activities/001.json b/tests/fixtures/gsuite/activities/001.json index f9302d9..abb25d2 100644 --- a/tests/fixtures/gsuite/activities/001.json +++ b/tests/fixtures/gsuite/activities/001.json @@ -19,11 +19,24 @@ "name": "ADD_GROUP_MEMBER", "parameters": [{ "name": "USER_EMAIL", - "value": "example@hashicorp.com" + "value": "added@hashicorp.com" }, { "name": "GROUP_EMAIL", - "value": "example@hashicorp.com" + "value": "added@hashicorp.com" + } + ] + }, + { + "type": "GROUP_SETTINGS", + "name": "REMOVE_GROUP_MEMBER", + "parameters": [{ + "name": "USER_EMAIL", + "value": "replaced@hashicorp.com" + }, + { + "name": "GROUP_EMAIL", + "value": "replaced@hashicorp.com" } ] }] diff --git a/tests/mocks/__init__.py b/tests/mocks/__init__.py index d355fa9..21e44d7 100644 --- a/tests/mocks/__init__.py +++ b/tests/mocks/__init__.py @@ -7,13 +7,17 @@ from grove.caches import local_memory from grove.constants import PLUGIN_GROUP_CACHE, PLUGIN_GROUP_OUTPUT +from grove.helpers import plugin from tests.mocks import output # noqa: F401 -def load_handler(_: str, group: str) -> Any: +def load_handler(name: str, group: str, *args, **kwargs) -> Any: """Wraps handler loading to load predefined mocks for a given group.""" if group == PLUGIN_GROUP_OUTPUT: return output.TestHandler() if group == PLUGIN_GROUP_CACHE: return local_memory.Handler() + + cls = plugin.lookup_handler(name, group).load() + return cls(*args, **kwargs) diff --git a/tests/mocks/output.py b/tests/mocks/output.py index fb57fdd..7cb96b2 100644 --- a/tests/mocks/output.py +++ b/tests/mocks/output.py @@ -7,6 +7,7 @@ class TestHandler(BaseOutput): - def submit(self, *arg, **kwargs): + __test__ = False + + def submit(self, *args, **kwargs): """Does nothing, successfully.""" - return diff --git a/tests/test_connectors_atlassian_audit_events.py b/tests/test_connectors_atlassian_audit_events.py index 3b9a9b4..d20bc0a 100644 --- a/tests/test_connectors_atlassian_audit_events.py +++ b/tests/test_connectors_atlassian_audit_events.py @@ -69,7 +69,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 31) + self.assertEqual(self.connector._saved["logs"], 31) self.assertEqual(self.connector.pointer, "2022-05-12T19:13:13Z") @responses.activate @@ -90,5 +90,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2022-05-12T19:13:13Z") diff --git a/tests/test_connectors_deduplicate.py b/tests/test_connectors_deduplicate.py index ca9aafd..ed5103a 100644 --- a/tests/test_connectors_deduplicate.py +++ b/tests/test_connectors_deduplicate.py @@ -61,7 +61,7 @@ def test_deduplication_chronological(self): # Run a full collection first and ensure all is as we expect. first_collection.run() - self.assertEqual(first_collection._saved, 7) + self.assertEqual(first_collection._saved["logs"], 7) self.assertEqual(first_collection.pointer, "7") # Perform a collection with the latest pointer, and ensure no new records are @@ -71,7 +71,7 @@ def test_deduplication_chronological(self): # In-memory cache does not support locking as it's only intended for local # "one-shot" execution, and development use. As a result, we have to currently - # clone the state of one cache to the other to simulate this. + # alias one to the other to simulate this. # # TODO: Remove the need for this, as it's going to cause confusion in future. second_collection._cache._data = first_collection._cache._data @@ -81,7 +81,7 @@ def test_deduplication_chronological(self): ) second_collection.run() - self.assertEqual(second_collection._saved, 0) + self.assertEqual(second_collection._saved["logs"], 0) self.assertEqual(second_collection.pointer, "7") @responses.activate @@ -101,6 +101,8 @@ def test_deduplication_reverse_chronological(self): # Hot patch the connector to work in reverse chronological order. first_collection = Connector(config=config, context=context) + + # This is very naughty. first_collection.LOG_ORDER = constants.REVERSE_CHRONOLOGICAL # Load all simulated responses in order. @@ -113,7 +115,7 @@ def test_deduplication_reverse_chronological(self): # Run a full collection first and ensure all is as we expect. first_collection.run() - self.assertEqual(first_collection._saved, 7) + self.assertEqual(first_collection._saved["logs"], 7) self.assertEqual(first_collection.pointer, "7") # Perform a collection with the latest pointer, and ensure that only new records @@ -122,7 +124,7 @@ def test_deduplication_reverse_chronological(self): # In-memory cache does not support locking as it's only intended for local # "one-shot" execution, and development use. As a result, we have to currently - # clone the state of one cache to the other to simulate this. + # alias the state of one cache to the other to simulate this. # # TODO: Remove the need for this, as it's going to cause confusion in future. second_collection._cache._data = first_collection._cache._data @@ -132,5 +134,5 @@ def test_deduplication_reverse_chronological(self): ) second_collection.run() - self.assertEqual(second_collection._saved, 1) + self.assertEqual(second_collection._saved["logs"], 1) self.assertEqual(second_collection.pointer, "7") diff --git a/tests/test_connectors_github_audit.py b/tests/test_connectors_github_audit.py index 78a49ea..393bd32 100644 --- a/tests/test_connectors_github_audit.py +++ b/tests/test_connectors_github_audit.py @@ -89,5 +89,5 @@ def test_collect_no_pagination(self): # Ensure the correct number of value are returned, and the pointer properly set. self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "1625045793361") diff --git a/tests/test_connectors_gsuite_activities.py b/tests/test_connectors_gsuite_activities.py index a23f139..8e838a7 100644 --- a/tests/test_connectors_gsuite_activities.py +++ b/tests/test_connectors_gsuite_activities.py @@ -65,5 +65,5 @@ def test_collect_pagination(self, mock_transport, mock_request, mock_auth): ) self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2021-10-27T23:59:31.657Z") diff --git a/tests/test_connectors_gsuite_alerts.py b/tests/test_connectors_gsuite_alerts.py index e8a7ff5..7f5e8c1 100644 --- a/tests/test_connectors_gsuite_alerts.py +++ b/tests/test_connectors_gsuite_alerts.py @@ -63,5 +63,5 @@ def test_collect_pagination(self, mock_transport, mock_request, mock_auth): ], ) self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2021-04-03T14:05:39.950458Z") diff --git a/tests/test_connectors_okta_system_log.py b/tests/test_connectors_okta_system_log.py index 33b2936..85b397d 100644 --- a/tests/test_connectors_okta_system_log.py +++ b/tests/test_connectors_okta_system_log.py @@ -68,7 +68,7 @@ def test_collect_no_pagination(self): ) # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2021-06-24T00:04:08.123Z") @responses.activate diff --git a/tests/test_connectors_onepassword_events_audit.py b/tests/test_connectors_onepassword_events_audit.py index c7ddb85..ce72b03 100644 --- a/tests/test_connectors_onepassword_events_audit.py +++ b/tests/test_connectors_onepassword_events_audit.py @@ -9,6 +9,7 @@ from unittest.mock import patch import responses + from grove.connectors.onepassword.events_audit import Connector from grove.models import ConnectorConfig from tests import mocks @@ -89,7 +90,7 @@ def test_collect_no_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 3) + self.assertEqual(self.connector._saved["logs"], 3) self.assertEqual(self.connector.pointer, "2023-03-15T16:50:50-03:00") @responses.activate @@ -129,5 +130,5 @@ def test_collect_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2023-03-15T16:33:50-03:00") diff --git a/tests/test_connectors_onepassword_events_itemusages.py b/tests/test_connectors_onepassword_events_itemusages.py index 12e2df5..8d5cb2b 100644 --- a/tests/test_connectors_onepassword_events_itemusages.py +++ b/tests/test_connectors_onepassword_events_itemusages.py @@ -90,7 +90,7 @@ def test_collect_no_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 3) + self.assertEqual(self.connector._saved["logs"], 3) self.assertEqual(self.connector.pointer, "2020-06-11T16:42:55-03:00") @responses.activate @@ -130,5 +130,5 @@ def test_collect_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2020-06-11T16:52:55-03:00") diff --git a/tests/test_connectors_onepassword_events_signinattempts.py b/tests/test_connectors_onepassword_events_signinattempts.py index 8465e6c..5bc0ff3 100644 --- a/tests/test_connectors_onepassword_events_signinattempts.py +++ b/tests/test_connectors_onepassword_events_signinattempts.py @@ -90,7 +90,7 @@ def test_collect_no_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2021-03-01T16:42:50-03:00") @responses.activate @@ -130,5 +130,5 @@ def test_collect_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2021-03-01T16:42:50-03:00") diff --git a/tests/test_connectors_oomnitza_activities.py b/tests/test_connectors_oomnitza_activities.py index c20afd2..a11130c 100644 --- a/tests/test_connectors_oomnitza_activities.py +++ b/tests/test_connectors_oomnitza_activities.py @@ -9,6 +9,7 @@ from unittest.mock import patch import responses + from grove.connectors.oomnitza.activities import Connector from grove.models import ConnectorConfig from tests import mocks @@ -69,7 +70,7 @@ def test_collect_pagination(self): # Check the pointer matches the latest execution_time value, and that the # expected number of logs were returned. self.connector.run() - self.assertEqual(self.connector._saved, 205) + self.assertEqual(self.connector._saved["logs"], 205) self.assertEqual(self.connector.pointer, "1682538024") @responses.activate @@ -91,7 +92,7 @@ def test_collect_no_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 5) + self.assertEqual(self.connector._saved["logs"], 5) self.assertEqual(self.connector.pointer, "1680895957") @responses.activate @@ -111,4 +112,4 @@ def test_collect_no_results(self): ), ) self.connector.run() - self.assertEqual(self.connector._saved, 0) + self.assertEqual(self.connector._saved["logs"], 0) diff --git a/tests/test_connectors_pagerduty_audit_records.py b/tests/test_connectors_pagerduty_audit_records.py index 92f5502..fde60a4 100644 --- a/tests/test_connectors_pagerduty_audit_records.py +++ b/tests/test_connectors_pagerduty_audit_records.py @@ -104,7 +104,7 @@ def test_collect_pagination(self): # Check the pointer matches the latest execution_time value, and that the # expected number of logs were returned. self.connector.run() - self.assertEqual(self.connector._saved, 5) + self.assertEqual(self.connector._saved["logs"], 5) self.assertEqual(self.connector.pointer, "2021-09-08T18:03:32.120Z") @responses.activate @@ -126,5 +126,5 @@ def test_collect_no_pagination(self): # Set the chunk size large enough that no chunking is required. self.connector.run() - self.assertEqual(self.connector._saved, 4) + self.assertEqual(self.connector._saved["logs"], 4) self.assertEqual(self.connector.pointer, "2021-09-08T18:05:45.120Z") diff --git a/tests/test_connectors_sf_event_log.py b/tests/test_connectors_sf_event_log.py index e3f6dc2..599be74 100644 --- a/tests/test_connectors_sf_event_log.py +++ b/tests/test_connectors_sf_event_log.py @@ -115,5 +115,5 @@ def test_collect_no_pagination(self): # Check the pointer matches the latest value, and that the expected number of # logs were returned. self.connector.collect() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2038-01-19T03:00:00.000Z") diff --git a/tests/test_connectors_sfmc_audit_events.py b/tests/test_connectors_sfmc_audit_events.py index 2b60f38..30344f3 100644 --- a/tests/test_connectors_sfmc_audit_events.py +++ b/tests/test_connectors_sfmc_audit_events.py @@ -69,7 +69,7 @@ def test_collect_pagination(self): # Check the pointer matches the latest value, and that the expected number of # logs were returned. self.connector.collect() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2019-01-02T12:00:00.00") @responses.activate @@ -89,5 +89,5 @@ def test_collect_no_pagination(self): ) self.connector.collect() - self.assertEqual(self.connector._saved, 7) + self.assertEqual(self.connector._saved["logs"], 7) self.assertEqual(self.connector.pointer, "2019-01-07T12:00:00.00") diff --git a/tests/test_connectors_sfmc_security_events.py b/tests/test_connectors_sfmc_security_events.py index 73ea042..f2e3371 100644 --- a/tests/test_connectors_sfmc_security_events.py +++ b/tests/test_connectors_sfmc_security_events.py @@ -69,7 +69,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "2019-01-02T12:00:00.00") @responses.activate @@ -90,5 +90,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 7) + self.assertEqual(self.connector._saved["logs"], 7) self.assertEqual(self.connector.pointer, "2019-01-07T12:00:00.00") diff --git a/tests/test_connectors_slack_audit.py b/tests/test_connectors_slack_audit.py index a36fe14..45ee8b5 100644 --- a/tests/test_connectors_slack_audit.py +++ b/tests/test_connectors_slack_audit.py @@ -102,7 +102,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) self.assertEqual(self.connector.pointer, "1521214344") @responses.activate @@ -122,5 +122,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 19) + self.assertEqual(self.connector._saved["logs"], 19) self.assertEqual(self.connector.pointer, "1521214944") diff --git a/tests/test_connectors_tfc_audit_trails.py b/tests/test_connectors_tfc_audit_trails.py index 0d788d5..733f91c 100644 --- a/tests/test_connectors_tfc_audit_trails.py +++ b/tests/test_connectors_tfc_audit_trails.py @@ -99,7 +99,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 3) + self.assertEqual(self.connector._saved["logs"], 3) self.assertEqual(self.connector.pointer, "2020-06-30T17:52:46.000Z") @responses.activate @@ -119,5 +119,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 5) + self.assertEqual(self.connector._saved["logs"], 5) self.assertEqual(self.connector.pointer, "2020-06-30T17:52:46.000Z") diff --git a/tests/test_connectors_torq_activity_logs.py b/tests/test_connectors_torq_activity_logs.py index ebad313..61fa2e2 100644 --- a/tests/test_connectors_torq_activity_logs.py +++ b/tests/test_connectors_torq_activity_logs.py @@ -92,7 +92,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 2) + self.assertEqual(self.connector._saved["logs"], 2) # it's reverse chronological so the earlier timestamp should be recorded self.assertEqual(self.connector.pointer, "2022-06-24T18:15:07.380622Z") @@ -134,5 +134,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2022-06-24T18:15:06.380622Z") diff --git a/tests/test_connectors_torq_audit_logs.py b/tests/test_connectors_torq_audit_logs.py index 8a8565b..bbd68f8 100644 --- a/tests/test_connectors_torq_audit_logs.py +++ b/tests/test_connectors_torq_audit_logs.py @@ -92,7 +92,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 4) + self.assertEqual(self.connector._saved["logs"], 4) # it's reverse chronological so the earlier timestamp should be recorded self.assertEqual(self.connector.pointer, "2022-06-27T11:35:10.681687Z") @@ -134,5 +134,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2022-06-07T11:35:11.681687Z") diff --git a/tests/test_connectors_workday_activity_logging.py b/tests/test_connectors_workday_activity_logging.py index 66fb19d..b9fb838 100644 --- a/tests/test_connectors_workday_activity_logging.py +++ b/tests/test_connectors_workday_activity_logging.py @@ -194,7 +194,7 @@ def test_collect_pagination(self): # Check the pointer matches the latest execution_time value, and that the # expected number of logs were returned. self.connector.run() - self.assertEqual(self.connector._saved, 113) + self.assertEqual(self.connector._saved["logs"], 113) self.assertEqual(self.connector.pointer, "2021-10-12T23:50:09.752Z") @responses.activate @@ -234,7 +234,7 @@ def test_collect_no_pagination(self): # Ensure only a single value is returned, and the pointer is properly set. self.connector.run() - self.assertEqual(self.connector._saved, 13) + self.assertEqual(self.connector._saved["logs"], 13) self.assertEqual(self.connector.pointer, "2021-10-12T23:50:09.752Z") @responses.activate @@ -272,4 +272,4 @@ def test_collect_no_results(self): ), ) self.connector.run() - self.assertEqual(self.connector._saved, 0) + self.assertEqual(self.connector._saved["logs"], 0) diff --git a/tests/test_connectors_zoom_activities.py b/tests/test_connectors_zoom_activities.py index 5d34826..1e505ae 100644 --- a/tests/test_connectors_zoom_activities.py +++ b/tests/test_connectors_zoom_activities.py @@ -84,7 +84,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 31) + self.assertEqual(self.connector._saved["logs"], 31) self.assertEqual(self.connector.pointer, "2022-08-23T14:45:54Z") @responses.activate @@ -121,5 +121,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2022-08-23T14:45:54Z") diff --git a/tests/test_connectors_zoom_operation.py b/tests/test_connectors_zoom_operation.py index 452fd16..0245c33 100644 --- a/tests/test_connectors_zoom_operation.py +++ b/tests/test_connectors_zoom_operation.py @@ -84,7 +84,7 @@ def test_collect_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 31) + self.assertEqual(self.connector._saved["logs"], 31) self.assertEqual(self.connector.pointer, "2022-08-23T14:46:17Z") @responses.activate @@ -121,5 +121,5 @@ def test_collect_no_pagination(self): ) self.connector.run() - self.assertEqual(self.connector._saved, 1) + self.assertEqual(self.connector._saved["logs"], 1) self.assertEqual(self.connector.pointer, "2022-08-23T14:46:17Z") diff --git a/tests/test_helpers_parsing.py b/tests/test_helpers_parsing.py new file mode 100644 index 0000000..6b403a9 --- /dev/null +++ b/tests/test_helpers_parsing.py @@ -0,0 +1,87 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Implements tests for parsing helpers.""" + +import unittest + +from grove.helpers import parsing + + +class ParsingHelpersTestCase(unittest.TestCase): + """Implements tests for parsing helpers.""" + + def test_update_by_path(self): + """Ensures path updating operates as expected.""" + # Multi-dimension. + expected_multi = { + "A": { + "B": { + "C": { + "D": { + "E": "injected", + "deepest": True, + }, + "adjacent": True, + } + } + }, + "top": True, + } + self.assertDictEqual( + expected_multi, + parsing.update_path( + { + "A": {"B": {"C": {"D": {"deepest": True}, "adjacent": True}}}, + "top": True, + }, + "A.B.C.D.E".split("."), + "injected", + ), + ) + + # Replacement. + expected_replace = { + "A": {"B": {"C": "replaced"}}, + } + self.assertDictEqual( + expected_replace, + parsing.update_path( + {"A": {"B": {"C": "initial"}}}, + "A.B.C".split("."), + "replaced", + ), + ) + + # Single dimension. + expected_single = { + "A": "value", + } + self.assertDictEqual( + expected_single, + parsing.update_path( + {}, + "A".split("."), + "value", + ), + ) + + # Deletion of nested keys. + self.assertDictEqual( + {"A": 1}, + parsing.update_path( + {"A": 1, "B": {"C": [1, 2, 3], "D": {"E": "F"}}}, + "B".split("."), + None, + ), + ) + + # Deletion of deeply nested keys. + self.assertDictEqual( + {"A": 1, "B": {"D": {"E": "F"}}}, + parsing.update_path( + {"A": 1, "B": {"C": [1, 2, 3], "D": {"E": "F"}}}, + "B.C".split("."), + None, + ), + ) diff --git a/tests/test_outputs_base.py b/tests/test_outputs_base.py index 240410b..84f5f63 100644 --- a/tests/test_outputs_base.py +++ b/tests/test_outputs_base.py @@ -12,6 +12,8 @@ # Required as BaseOutput is an ABC, so without defining submit we will not be able to # instantiate it to validate methods on the base class. class TestOutput(BaseOutput): + __test__ = False + def submit(self, *args, **kwargs): pass diff --git a/tests/test_processors_extract_paths.py b/tests/test_processors_extract_paths.py new file mode 100644 index 0000000..c1c62a6 --- /dev/null +++ b/tests/test_processors_extract_paths.py @@ -0,0 +1,80 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Implements unit tests for the path extracting processor.""" + +import json +import os +import unittest +from unittest.mock import patch + +from grove.models import ProcessorConfig +from grove.processors import extract_paths +from tests import mocks + + +class ProcessorPathExtratTestCase(unittest.TestCase): + """Implements unit tests for the path extracting processor.""" + + @patch("grove.helpers.plugin.load_handler", mocks.load_handler) + def setUp(self): + """Setup the processor and associated configuration for testing.""" + self.dir = os.path.dirname(os.path.abspath(__file__)) + + # Create a mapping compatible with the Okta system log fixture. + self.processor = extract_paths.Handler( + ProcessorConfig( + name="ecs", + processor="extract_paths", + raw="event.original", + fields=[ + { + "destination": "@timestamp", + "sources": [ + "published", + ], + }, + { + "destination": "'source.ip'", + "sources": [ + "client.ipAddress", + ], + }, + { + "destination": "'ecs.version'", + "static": "8.8", + }, + { + "destination": "nested.key", + "static": "example", + }, + { + "destination": "another.nested.key", + "sources": [ + "client.device", + ], + }, + ], + ) + ) + + def test_extract_paths(self): + """Ensure path extraction operates as expected.""" + # Load and process the fixture. + entries = json.load( + open(os.path.join(self.dir, "fixtures/okta/system_log/001.json"), "r") + ) + + # Process a single target. + target = entries[0] + results = self.processor.process(target) + + # Ensure fields are is mapped correctly. + self.assertEqual(results[0]["source.ip"], "000.000.00.000") + self.assertEqual(results[0]["@timestamp"], "2021-06-24T00:04:08.123Z") + self.assertEqual(results[0]["ecs.version"], "8.8") + self.assertEqual(results[0]["nested"]["key"], "example") + self.assertEqual(results[0]["another"]["nested"]["key"], "Computer") + + # Ensure the raw message is present. + self.assertGreater(len(results[0]["event"]["original"]), 0) diff --git a/tests/test_processors_filter_paths.py b/tests/test_processors_filter_paths.py new file mode 100644 index 0000000..f47dde5 --- /dev/null +++ b/tests/test_processors_filter_paths.py @@ -0,0 +1,55 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Implements unit tests for the path filtering processor.""" + +import json +import os +import unittest +from unittest.mock import patch + +from grove.models import ProcessorConfig +from grove.processors import filter_paths +from tests import mocks + + +class ProcessorPathFilterTestCase(unittest.TestCase): + """Implements unit tests for the path filtering processor.""" + + @patch("grove.helpers.plugin.load_handler", mocks.load_handler) + def setUp(self): + """Setup the processor and associated configuration for testing.""" + self.dir = os.path.dirname(os.path.abspath(__file__)) + + # Create a mapping compatible with the Okta system log fixture. + self.processor = filter_paths.Handler( + ProcessorConfig( + name="Filter debugContext", + processor="filter_paths", + sources=[ + "debugContext", + "client.geographicalContext", + ], + ) + ) + + def test_filter_paths(self): + """Ensure path filtering operates as expected.""" + # Load and process the fixture. + entries = json.load( + open( + os.path.join(self.dir, "fixtures/okta/system_log/001.json"), + "r", + ) + ) + + # Firstly, ensure the 'debugContext' field exists to begin with. + self.assertTrue("debugContext" in entries[0]) + self.assertTrue("geographicalContext" in entries[0]["client"]) + + # Process a single log entry. + records = self.processor.process(entries[0]) + + # Ensure the 'debugContext' field was removed. + self.assertFalse("debugContext" in records[0]) + self.assertFalse("geographicalContext" in entries[0]["client"]) diff --git a/tests/test_processors_split_path.py b/tests/test_processors_split_path.py new file mode 100644 index 0000000..3cd5977 --- /dev/null +++ b/tests/test_processors_split_path.py @@ -0,0 +1,50 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Implements unit tests for the path splitting processor.""" + +import json +import os +import unittest +from unittest.mock import patch + +from grove.models import ProcessorConfig +from grove.processors import split_path +from tests import mocks + + +class ProcessorPathMapperTestCase(unittest.TestCase): + """Implements unit tests for the path splitting processor.""" + + @patch("grove.helpers.plugin.load_handler", mocks.load_handler) + def setUp(self): + """Setup the processor and associated configuration for testing.""" + self.dir = os.path.dirname(os.path.abspath(__file__)) + + # Create a mapping compatible with the Okta system log fixture. + self.processor = split_path.Handler( + ProcessorConfig( + name="Fan Out", + processor="split_path", + source="events", + ) + ) + + def test_split_path(self): + """Ensure path splitting operates as expected.""" + # Load and process the fixture. + entries = json.load( + open( + os.path.join(self.dir, "fixtures/gsuite/activities/001.json"), + "r", + ) + ) + + # Confirm that the initial log entry has two entries. + self.assertEqual(len(entries["items"]), 1) + + # Process a single log entry. + records = self.processor.process(entries["items"][0]) + + # Confirm that two records resulted. + self.assertEqual(len(records), 2) From 50cd97b41e4b1c8593cd7d9a24f00ef828fe8f20 Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Sat, 8 Jul 2023 13:16:17 +0100 Subject: [PATCH 02/12] Cache calls to quote_aware_split. --- grove/helpers/parsing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/grove/helpers/parsing.py b/grove/helpers/parsing.py index ba9c59f..e6ccc0a 100644 --- a/grove/helpers/parsing.py +++ b/grove/helpers/parsing.py @@ -5,6 +5,7 @@ import json import re +from functools import cache from typing import Any, Dict, List from pydantic import ValidationError @@ -48,6 +49,7 @@ def quick_copy(value: Any): return json.loads(json.dumps(value)) +@cache def quote_aware_split(value: str, delimiter=".") -> List[str]: """Splits a value by delimiter, returning a list. From 8e0aec1e0e097fdef83c58aadaf6463e575756a7 Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Sat, 8 Jul 2023 13:36:00 +0100 Subject: [PATCH 03/12] Update processor base class to allow finalize only This was possible prior to this commit, but required a stub method for process to be implemented. --- grove/connectors/__init__.py | 2 ++ grove/processors/__init__.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/grove/connectors/__init__.py b/grove/connectors/__init__.py index c0df849..ea46c56 100644 --- a/grove/connectors/__init__.py +++ b/grove/connectors/__init__.py @@ -348,6 +348,8 @@ def save(self, entries: List[Any]): # Always refresh our lock while saving. This allows us to grab a new lock for # every page of data to try and prevent our lock expiring before we've performed # a full collection. + # + # Unlock is not called here, as it's performed by the caller. self.lock() if self.LOG_ORDER == CHRONOLOGICAL: diff --git a/grove/processors/__init__.py b/grove/processors/__init__.py index 1309148..d6203fe 100644 --- a/grove/processors/__init__.py +++ b/grove/processors/__init__.py @@ -40,7 +40,6 @@ def __init__(self, config: Dict[str, Any]): f"Processor configuration is invalid. {parsing.validation_error(err)}" ) - @abc.abstractmethod def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: """Performs a set of processes against a log entry. @@ -50,8 +49,9 @@ def process(self, entry: Dict[str, Any]) -> List[Dict[str, Any]]: the list should contain a single element. If the log entry is to be dropped, an empty list should be used. """ - pass + return [entry] def finalize(self): """Performs a final set of operations after logs have been saved.""" + return From b9b8338bfa6ea5717eabc415b6b0bf031d854ac3 Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Mon, 10 Jul 2023 12:45:16 +0100 Subject: [PATCH 04/12] Add local file secret backend. --- grove/secrets/local_file.py | 75 ++++++++++++++++++++++++++++++++ pyproject.toml | 3 +- tests/test_secrets_local_file.py | 49 +++++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 grove/secrets/local_file.py create mode 100644 tests/test_secrets_local_file.py diff --git a/grove/secrets/local_file.py b/grove/secrets/local_file.py new file mode 100644 index 0000000..b32262b --- /dev/null +++ b/grove/secrets/local_file.py @@ -0,0 +1,75 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Grove local file secrets handler.""" + +import logging +import os + +from pydantic import BaseSettings, Field, ValidationError + +from grove.exceptions import AccessException, ConfigurationException +from grove.helpers import parsing +from grove.secrets import BaseSecret + + +class Configuration(BaseSettings): + """Defines environment variables used to configure the local file handler. + + This should also include any appropriate default values for fields which are not + required. + """ + + path_prefix: str = Field( + str(), + description="An optional prefix to append to configured secret paths.", + ) + + class Config: + """Allow environment variable override of configuration fields. + + This also enforce a prefix for all environment variables for this handler. As + an example the field `path` would be set using the environment variable + `GROVE_SECRET_LOCAL_FILE_PATH_PREFIX`. + """ + + env_prefix = "GROVE_SECRET_LOCAL_FILE_" + case_insensitive = True + + +class Handler(BaseSecret): + """A secret handler to read secrets from local files.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + # Wrap validation errors to keep them in the Grove exception hierarchy. + try: + self.config = Configuration() # type: ignore + except ValidationError as err: + raise ConfigurationException(parsing.validation_error(err)) + + def get(self, id: str) -> str: + """Gets and returns an secret from the specified file path. + + If a path prefix is configured this will be appended to the beginning of the + configured file path. However, if the path of the secret begins with a '/' it + the path prefix will be ignored - as it will be considered a fully-qualified + path specification. + + :param id: The file to read the secret from. + + :return: The plain-text secret, read from the specified file. + """ + secret = str() + path = os.path.join(self.config.path_prefix, id) + + try: + with open(path, "rb") as f: + secret = str(f.read(), "utf-8").rstrip() + except (ValidationError, OSError) as err: + raise AccessException( + f"Unable to read secret from configured '{path}'. {err}" + ) + + return secret diff --git a/pyproject.toml b/pyproject.toml index d527125..39e11fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,6 +100,7 @@ local_file = "grove.configs.local_file:Handler" [project.entry-points."grove.secrets"] aws_ssm = "grove.secrets.aws_ssm:Handler" hashicorp_vault = "grove.secrets.hashicorp_vault:Handler" +local_file = "grove.secrets.local_file:Handler" [project.entry-points."grove.processors"] extract_paths = "grove.processors.extract_paths:Handler" @@ -159,7 +160,7 @@ legacy_tox_ini = """ [testenv] pip_version = pip extras = tests - commands = pytest -c pyproject.toml + commands = pytest -c pyproject.toml {posargs} srcs = grove [testenv:linters] diff --git a/tests/test_secrets_local_file.py b/tests/test_secrets_local_file.py new file mode 100644 index 0000000..f390999 --- /dev/null +++ b/tests/test_secrets_local_file.py @@ -0,0 +1,49 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +"""Implements tests for the local file secrets backend.""" + +import os +import tempfile +import unittest + +from grove.secrets.local_file import Handler + + +class SecretsLocalFileTestCase(unittest.TestCase): + """Implements tests for the local file secrets backend.""" + + def setUp(self): + self.fixtures = os.path.abspath( + os.path.join(os.path.dirname(__file__), "fixtures/") + ) + + def test_relative_path(self): + """Ensures a secret can be read from a relative file path.""" + expected = "_Super_S3cret_Stuff." + + with tempfile.NamedTemporaryFile("w") as fout: + fout.write(expected) + fout.write("\n") + fout.flush() + + # Validate loading with a path prefix. + os.environ["GROVE_SECRET_LOCAL_FILE_PATH_PREFIX"] = os.path.dirname( + fout.name + ) + + self.secrets = Handler() + self.assertEqual(self.secrets.get(os.path.basename(fout.name)), expected) + + def test_absolute_path(self): + """Ensures a secret can be read from an absolute file path.""" + expected = "_Super_S3cret_Stuff." + + with tempfile.NamedTemporaryFile("w") as fout: + fout.write(expected) + fout.write("\n") + fout.flush() + + # Validate loading with a path prefix. + self.secrets = Handler() + self.assertEqual(self.secrets.get(fout.name), expected) From c100f23490c80b04cf0f914c67cb23a7adae3bfa Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Mon, 10 Jul 2023 12:46:07 +0100 Subject: [PATCH 05/12] Remove caching to prevent unexpected mutation. --- grove/helpers/parsing.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/grove/helpers/parsing.py b/grove/helpers/parsing.py index e6ccc0a..ba9c59f 100644 --- a/grove/helpers/parsing.py +++ b/grove/helpers/parsing.py @@ -5,7 +5,6 @@ import json import re -from functools import cache from typing import Any, Dict, List from pydantic import ValidationError @@ -49,7 +48,6 @@ def quick_copy(value: Any): return json.loads(json.dumps(value)) -@cache def quote_aware_split(value: str, delimiter=".") -> List[str]: """Splits a value by delimiter, returning a list. From d1b941235b203e5bc6c536a0cc12b8680a53eb37 Mon Sep 17 00:00:00 2001 From: Peter Adkins <74542596+hcpadkins@users.noreply.github.com> Date: Mon, 10 Jul 2023 13:49:03 +0100 Subject: [PATCH 06/12] Documentation updates. --- README.md | 14 +++++++-- docs/api.rst | 1 + docs/conf.py | 21 +++---------- docs/grove.rst | 1 + docs/index.rst | 26 ++++++++++++--- docs/internals.rst | 26 +++++++++------ docs/static/custom.css | 42 ++++++++++++++++++------- docs/static/grove-logo-light.png | Bin 0 -> 19674 bytes docs/static/grove-logo-small-light.png | Bin 0 -> 16787 bytes docs/static/grove-logo-small.png | Bin 0 -> 19248 bytes docs/static/grove-logo.png | Bin 0 -> 23315 bytes docs/static/grove-support-light.png | Bin 0 -> 73194 bytes docs/static/grove-support.png | Bin 0 -> 78562 bytes 13 files changed, 85 insertions(+), 46 deletions(-) create mode 100644 docs/static/grove-logo-light.png create mode 100644 docs/static/grove-logo-small-light.png create mode 100644 docs/static/grove-logo-small.png create mode 100644 docs/static/grove-logo.png create mode 100644 docs/static/grove-support-light.png create mode 100644 docs/static/grove-support.png diff --git a/README.md b/README.md index bff16c0..5dfa805 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ -## Grove - -> Grove is not an official HashiCorp project. +
+
+
+
+
+
+
+
+