Skip to content

Commit

Permalink
Version 2.1.0 Release - Activity Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alanisaac authored and jaekitch committed Oct 24, 2022
1 parent 3ff2c95 commit 6d20773
Show file tree
Hide file tree
Showing 18 changed files with 1,031 additions and 357 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,4 @@ dmypy.json
.pytest_cache/

.DS_Store
config.yml
7 changes: 7 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Change Log
### v2.1.0

- Added new Activity log type for syncing Activity logs from the Admin API.
- The default value for checkpointing is now True. See the config path under `dls_settings.api.checkpointing.enabled` to disable this functionality.
- Improved application logging.
- DLS now responds to SIGTERM signals for shutdown.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Duologsync (v2.0.0)
Duo Log Sync (v2.1.0)
===================

[![Issues](https://img.shields.io/github/issues/duosecurity/duo_log_sync)](https://github.com/duosecurity/duo_log_sync/issues)
Expand Down Expand Up @@ -31,7 +31,6 @@ Duologsync (v2.0.0)
---

## Logging

- A logging filepath can be specified in `config.yml`. By default, logs will be stored under the `/tmp` folder with name `duologsync.log`.
- These logs are only application/system logs, and not the actual logs retrieved from Duo endpoints.

Expand Down
2 changes: 1 addition & 1 deletion duologsync/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = "duologsync"
__description__ = "Ingest + sync logs from the Duo Admin API into another system."
__url__ = "https://github.com/duosecurity/duo_log_sync"
__version__ = "2.0.0"
__version__ = "2.1.0"
__author__ = "Duo Security, Inc."
__license__ = "MIT"
__email__ = "[email protected]"
128 changes: 89 additions & 39 deletions duologsync/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,56 @@
from duologsync.producer.telephony_producer import TelephonyProducer
from duologsync.consumer.trustmonitor_consumer import TrustMonitorConsumer
from duologsync.producer.trustmonitor_producer import TrustMonitorProducer
from duologsync.consumer.activity_consumer import ActivityConsumer
from duologsync.producer.activity_producer import ActivityProducer
from duologsync.util import create_admin, check_for_specific_endpoint
from duologsync.writer import Writer
from duologsync.config import Config
from duologsync.program import Program


def main():
"""
Kicks off DuoLogSync by setting important variables, creating and running
a Producer-Consumer pair for each log-type defined in a config file passed
to the program.
"""

arg_parser = argparse.ArgumentParser(prog='duologsync',
description="Path to config file")
arg_parser.add_argument('ConfigPath', metavar='config-path', type=str,
help='Config to start application')
arg_parser = argparse.ArgumentParser(
prog="duologsync", description="Path to config file"
)
arg_parser.add_argument(
"ConfigPath",
metavar="config-path",
type=str,
help="Config to start application",
)
args = arg_parser.parse_args()

# Handle shutting down the program via Ctrl-C
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGINT, signal_handler)
# Handle shutting down the program via SIGTERM
signal.signal(signal.SIGTERM, signal_handler)

# Create a config Dictionary from a YAML file located at args.ConfigPath
config = Config.create_config(args.ConfigPath)
Config.set_config(config)

# Do extra checks for Trust Monitor support
is_dtm_in_config = check_for_specific_endpoint('trustmonitor', config)
is_dtm_in_config = check_for_specific_endpoint("trustmonitor", config)
log_format = Config.get_log_format()
is_msp = Config.account_is_msp()

if (is_dtm_in_config and log_format != 'JSON'):
Program.log(f"DuoLogSync: Trust Monitor endpoint only supports JSON", logging.WARNING)
if is_dtm_in_config and log_format != "JSON":
Program.log(
"DuoLogSync: Trust Monitor endpoint only supports JSON", logging.WARNING
)
return

if (is_dtm_in_config and is_msp):
Program.log(f"DuoLogSync: Trust Monitor endpoint only supports non-msp", logging.WARNING)
if is_dtm_in_config and is_msp:
Program.log(
"DuoLogSync: Trust Monitor endpoint only supports non-msp", logging.WARNING
)
return

Program.setup_logging(Config.get_log_filepath())
Expand All @@ -79,20 +93,29 @@ def main():
asyncio.get_event_loop().close()

if Program.is_logging_set():
print(f"DuoLogSync: shutdown successfully. Check "
f"{Config.get_log_filepath()} for program logs")
print(
f"DuoLogSync: shutdown successfully. Check "
f"{Config.get_log_filepath()} for program logs"
)


def sigint_handler(signal_number, stack_frame):
def signal_handler(signal_number, stack_frame):
"""
Handler for SIGINT (Ctrl-C) to gracefully shutdown DuoLogSync
Handler for signals to gracefully shutdown DuoLogSync
"""

shutdown_reason = f"received signal {signal_number} (Ctrl-C)"
if signal_number == signal.SIGINT:
shutdown_reason = f"received signal {signal_number} (Ctrl-C)"
else:
shutdown_reason = f"received signal {signal.strsignal(signal_number)}"

Program.initiate_shutdown(shutdown_reason)

if stack_frame:
Program.log(f"DuoLogSync: stack frame from Ctrl-C is {stack_frame}",
logging.INFO)
Program.log(
f"DuoLogSync: stack frame from signal is {stack_frame}", logging.INFO
)


def create_tasks(server_to_writer):
"""
Expand All @@ -101,46 +124,53 @@ def create_tasks(server_to_writer):
if the account is MSP. Return a list containing the asyncio tasks for
running those objects.
@param writer Dictionary mapping server ids to writer objects
@param server_to_writer Dictionary mapping server ids to writer objects
@return list of asyncio tasks for running the Producer and Consumer objects
"""
tasks = []

# Object with functions needed to utilize log API calls
admin = create_admin(
Config.get_account_ikey(), Config.get_account_skey(),
Config.get_account_hostname(), is_msp=Config.account_is_msp(),
proxy_server=Config.get_proxy_server(), proxy_port=Config.get_proxy_port())
Config.get_account_ikey(),
Config.get_account_skey(),
Config.get_account_hostname(),
is_msp=Config.account_is_msp(),
proxy_server=Config.get_proxy_server(),
proxy_port=Config.get_proxy_port(),
)

# This is where functionality would be added to check if an account is MSP
# (Config.account_is_msp), and then retrieve child accounts (ignoring those
# in a blocklist) if the account is indeed MSP
# TODO: Implement blocklist
if Config.account_is_msp():
child_account = admin.get_child_accounts()
child_accounts_id = [account['account_id'] for account in child_account]
child_accounts_id = [account["account_id"] for account in child_account]

for account in child_accounts_id:
# TODO: This can be made into a separate function
for mapping in Config.get_account_endpoint_server_mappings():
# Get the writer to be used for this set of endpoints
writer = server_to_writer[mapping.get('server')]
writer = server_to_writer[mapping.get("server")]

for endpoint in mapping.get('endpoints'):
new_tasks = create_consumer_producer_pair(endpoint, writer, admin, account)
for endpoint in mapping.get("endpoints"):
new_tasks = create_consumer_producer_pair(
endpoint, writer, admin, account
)
tasks.extend(new_tasks)
else:
for mapping in Config.get_account_endpoint_server_mappings():
# Get the writer to be used for this set of endpoints
writer = server_to_writer[mapping.get('server')]
writer = server_to_writer[mapping.get("server")]

for endpoint in mapping.get('endpoints'):
for endpoint in mapping.get("endpoints"):
new_tasks = create_consumer_producer_pair(endpoint, writer, admin)
tasks.extend(new_tasks)

return tasks


def create_consumer_producer_pair(endpoint, writer, admin, child_account=None):
"""
Create a pair of Producer-Consumer objects for each endpoint and return a
Expand All @@ -162,37 +192,57 @@ def create_consumer_producer_pair(endpoint, writer, admin, child_account=None):
# Create the right pair of Producer-Consumer objects based on endpoint
if endpoint == Config.AUTH:
if Config.account_is_msp():
producer = AuthlogProducer(admin.json_api_call, log_queue,
child_account_id=child_account,
url_path="/admin/v2/logs/authentication")
producer = AuthlogProducer(
admin.json_api_call,
log_queue,
child_account_id=child_account,
url_path="/admin/v2/logs/authentication",
)
else:
producer = AuthlogProducer(admin.get_authentication_log, log_queue)
consumer = AuthlogConsumer(log_format, log_queue, writer, child_account)
elif endpoint == Config.TELEPHONY:
if Config.account_is_msp():
producer = TelephonyProducer(admin.json_api_call, log_queue,
child_account_id=child_account,
url_path='/admin/v1/logs/telephony')
producer = TelephonyProducer(
admin.json_api_call,
log_queue,
child_account_id=child_account,
url_path="/admin/v1/logs/telephony",
)
else:
producer = TelephonyProducer(admin.get_telephony_log, log_queue)
consumer = TelephonyConsumer(log_format, log_queue, writer, child_account)
elif endpoint == Config.ADMIN:
if Config.account_is_msp():
producer = AdminactionProducer(admin.json_api_call, log_queue,
child_account_id=child_account,
url_path='/admin/v1/logs/administrator')
producer = AdminactionProducer(
admin.json_api_call,
log_queue,
child_account_id=child_account,
url_path="/admin/v1/logs/administrator",
)
else:
producer = AdminactionProducer(admin.get_administrator_log, log_queue)
consumer = AdminactionConsumer(log_format, log_queue, writer, child_account)
elif endpoint == Config.TRUST_MONITOR:
producer = TrustMonitorProducer(admin.get_trust_monitor_events_by_offset, log_queue)
producer = TrustMonitorProducer(
admin.get_trust_monitor_events_by_offset, log_queue
)
consumer = TrustMonitorConsumer(log_format, log_queue, writer, child_account)
elif endpoint == Config.ACTIVITY:
producer = ActivityProducer(
admin.json_api_call,
log_queue,
url_path="/admin/v2/logs/activity",
)
consumer = ActivityConsumer(log_format, log_queue, writer, child_account)
else:
Program.log(f"{endpoint} is not a recognized endpoint", logging.WARNING)
del log_queue
return []

tasks = [asyncio.ensure_future(producer.produce()),
asyncio.ensure_future(consumer.consume())]
tasks = [
asyncio.ensure_future(producer.produce()),
asyncio.ensure_future(consumer.consume()),
]

return tasks
20 changes: 11 additions & 9 deletions duologsync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ class Config:
AUTH = 'auth'
TELEPHONY = 'telephony'
TRUST_MONITOR = 'trustmonitor'
ACTIVITY = 'activity'

DIRECTORY_DEFAULT = '/tmp'
LOG_FILEPATH_DEFAULT = DIRECTORY_DEFAULT + '/' + 'duologsync.log'
LOG_FORMAT_DEFAULT = 'JSON'
API_OFFSET_DEFAULT = 180
API_TIMEOUT_DEFAULT = 120
CHECKPOINTING_ENABLED_DEFAULT = False
CHECKPOINTING_ENABLED_DEFAULT = True
CHECKPOINTING_DIRECTORY_DEFAULT = DIRECTORY_DEFAULT
PROXY_SERVER_DEFAULT = ''
PROXY_PORT_DEFAULT = 0
Expand Down Expand Up @@ -161,7 +162,7 @@ class Config:
'type': 'list',
'empty': False,
'required': True,
'allowed': [ADMIN, AUTH, TELEPHONY, TRUST_MONITOR]
'allowed': [ADMIN, AUTH, TELEPHONY, TRUST_MONITOR, ACTIVITY]
}
}
)
Expand Down Expand Up @@ -239,11 +240,12 @@ def get_value(cls, keys):

cls._check_config_is_set()
curr_value = cls._config
for key in keys:
curr_value = curr_value.get(key)
if curr_value:
for key in keys:
curr_value = curr_value.get(key)

if curr_value is None:
raise ValueError(f"{key} is an invalid key for this Config")
if curr_value is None:
raise ValueError(f"{key} is an invalid key for this Config")

return curr_value

Expand Down Expand Up @@ -326,7 +328,7 @@ def get_proxy_port(cls):
@classmethod
def create_config(cls, config_filepath):
"""
Attemp to read the file at config_filepath and generate a config
Attempt to read the file at config_filepath and generate a config
Dictionary object based on a defined JSON schema
@param config_filepath File from which to generate a config object
Expand All @@ -345,7 +347,7 @@ def create_config(cls, config_filepath):
config = cls._validate_and_normalize_config(config)
if config.get('dls_settings').get('api').get('timeout') < cls.API_TIMEOUT_DEFAULT:
config['dls_settings']['api']['timeout'] = cls.API_TIMEOUT_DEFAULT
Program.log('DuoLogSync: Setting default api timeout to 120 seconds.')
Program.log(f'DuoLogSync: Setting default api timeout to {cls.API_TIMEOUT_DEFAULT} seconds.')

# Will occur when given a bad filepath or a bad file
except OSError as os_error:
Expand Down Expand Up @@ -400,7 +402,7 @@ def get_value_from_keys(dictionary, keys):
Drill down into dictionary to retrieve a value given a list of keys
@param dictionary dict to retrieve a value from
@param fields List of fields to follow to retrieve a value
@param keys List of keys to follow to retrieve a value
@return value from the log found after following the list of keys given
"""
Expand Down
12 changes: 12 additions & 0 deletions duologsync/consumer/activity_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from duologsync.config import Config
from duologsync.consumer.consumer import Consumer


class ActivityConsumer(Consumer):
"""
An implementation of the Consumer class for user activity logs
"""

def __init__(self, log_format, log_queue, writer, child_account_id=None):
super().__init__(log_format, log_queue, writer, child_account_id=child_account_id)
self.log_type = Config.ACTIVITY
Loading

0 comments on commit 6d20773

Please sign in to comment.