Skip to content

Commit

Permalink
Changed timesketch import to support event filters
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Oct 16, 2021
1 parent fdfd0fb commit fe82602
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 53 deletions.
11 changes: 10 additions & 1 deletion cli_client/python/timesketch_cli_client/commands/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@


@click.command('import')
@click.option(
'--event-filter', default=None,
help='Optional event filter to pass to psort.')
@click.option('--name', help='Name of the timeline.')
@click.option(
'--timeout', type=int, default=600, help='Seconds to wait for indexing.')
@click.argument('file_path', type=click.Path(exists=True))
@click.pass_context
def importer(ctx, name, timeout, file_path):
def importer(ctx, event_filter, name, timeout, file_path):
"""Import timeline.
Args:
ctx: Click CLI context object.
event_filter: Event filter to pass to psort.
name: Name of the timeline to create.
timeout: Seconds to wait for indexing.
file_path: File path to the file to import.
Expand All @@ -48,7 +52,12 @@ def importer(ctx, name, timeout, file_path):
# TODO: Consider using the whole command as upload context instead
# of the file path.
streamer.set_upload_context(file_path)
streamer.set_event_filter(event_filter)

# Note that the event filter must be set before _upload_binary_file()
# is invoked by add_file().
streamer.add_file(file_path)

timeline = streamer.timeline
if not timeline:
click.echo('Error creating timeline, please try again.')
Expand Down
16 changes: 12 additions & 4 deletions importer_client/python/timesketch_import_client/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import numpy
import pandas

from timesketch_api_client import timeline
from timesketch_api_client import definitions
from timesketch_api_client import timeline
from timesketch_import_client import utils

logger = logging.getLogger('timesketch_importer.importer')
Expand Down Expand Up @@ -62,6 +62,7 @@ def __init__(self):
self._data_lines = []
self._data_type = None
self._datetime_field = None
self._event_filter = None
self._format_string = None
self._index = ''
self._last_response = None
Expand Down Expand Up @@ -373,6 +374,9 @@ def _upload_binary_file(self, file_path):
'provider': self._provider,
'data_label': self._data_label,
}
if self._event_filter:
data['event_filter'] = self._event_filter

if self._index:
data['index_name'] = self._index

Expand Down Expand Up @@ -579,7 +583,7 @@ def add_excel_file(self, filepath, **kwargs):
self.add_data_frame(data_frame)

def add_file(self, filepath, delimiter=','):
"""Add a CSV, JSONL or a PLASO file to the buffer.
"""Add a CSV, JSONL or a Plaso storage file to the buffer.
Args:
filepath: the path to the file to add.
Expand Down Expand Up @@ -614,6 +618,7 @@ def add_file(self, filepath, delimiter=','):
fh, delimiter=delimiter,
chunksize=self._threshold_entry):
self.add_data_frame(chunk_frame, part_of_iter=True)

elif file_ending == 'plaso':
self._upload_binary_file(filepath)

Expand Down Expand Up @@ -733,6 +738,10 @@ def set_entry_threshold(self, threshold):
"""Set the threshold for number of entries per chunk."""
self._threshold_entry = threshold

def set_event_filter(self, event_filter):
"""Set the event filter to pass to psort."""
self._event_filter = event_filter

def set_filesize_threshold(self, threshold):
"""Set the threshold for file size per chunk."""
self._threshold_filesize = threshold
Expand Down Expand Up @@ -807,13 +816,12 @@ def timeline(self):
logger.warning('No timeline ID has been stored as of yet.')
return None

timeline_obj = timeline.Timeline(
return timeline.Timeline(
timeline_id=self._timeline_id,
sketch_id=self._sketch.id,
api=self._sketch.api,
name=self._timeline_name,
searchindex=self._index)
return timeline_obj

def __enter__(self):
"""Make it possible to use "with" statement."""
Expand Down
11 changes: 7 additions & 4 deletions timesketch/api/v1/resources/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,19 @@ def _upload_and_index(
db_session.add(timeline)
db_session.commit()

event_filter = form.get('event_filter', None)

sketch_id = sketch.id
# Start Celery pipeline for indexing and analysis.
# Import here to avoid circular imports.
# pylint: disable=import-outside-toplevel
from timesketch.lib import tasks
pipeline = tasks.build_index_pipeline(
file_path=file_path, events=events, timeline_name=timeline_name,
index_name=searchindex.index_name, file_extension=file_extension,
sketch_id=sketch_id, only_index=enable_stream,
timeline_id=timeline.id)
event_filter=event_filter, events=events,
file_extension=file_extension, file_path=file_path,
index_name=searchindex.index_name, only_index=enable_stream,
sketch_id=sketch_id, timeline_id=timeline.id,
timeline_name=timeline_name)
task_id = uuid.uuid4().hex
pipeline.apply_async(task_id=task_id)

Expand Down
82 changes: 38 additions & 44 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,61 +201,53 @@ def _set_timeline_status(timeline_id, status, error_msg=None):
db_session.commit()


def _get_index_task_class(file_extension):
"""Get correct index task function for the supplied file type.
Args:
file_extension (str): File type based on filename extension.
Returns:
A task function.
Raises:
KeyError if no task class can be found.
"""
if file_extension == 'plaso':
index_class = run_plaso
elif file_extension in ['csv', 'jsonl']:
index_class = run_csv_jsonl
else:
raise KeyError('No task that supports {0:s}'.format(file_extension))
return index_class


def build_index_pipeline(
file_path='', events='', timeline_name='', index_name='',
file_extension='', sketch_id=None, only_index=False, timeline_id=None):
event_filter=None, events='', file_extension='', file_path='',
index_name='', only_index=False, sketch_id=None, timeline_id=None,
timeline_name=''):
"""Build a pipeline for index and analysis.
Args:
file_path: The full path to a file to upload, either a file_path or
or events need to be defined.
event_filter: Event filter to pass to psort.
events: String with the event data, either file_path or events
needs to be defined.
timeline_name: Name of the timeline to create.
index_name: Name of the index to index to.
file_extension: The file extension of the file.
sketch_id: The ID of the sketch to analyze.
file_path: The full path to a file to upload, either a file_path or
or events need to be defined.
index_name: Name of the index to index to.
only_index: If set to true then only indexing tasks are run, not
analyzers. This is to be used when uploading data in chunks,
we don't want to run the analyzers until all chunks have been
uploaded.
sketch_id: The ID of the sketch to analyze.
timeline_id: Optional ID of the timeline object this data belongs to.
timeline_name: Name of the timeline to create.
Returns:
Celery chain with indexing task (or single indexing task) and analyzer
task group.
Raises:
RuntimeError: if no file path or events were specified.
"""
if not (file_path or events):
raise RuntimeError(
'Unable to upload data, missing either a file or events.')
index_task_class = _get_index_task_class(file_extension)

if file_extension not in ('csv', 'jsonl', 'plaso'):
raise KeyError('No task that supports {0:s}'.format(file_extension))

sketch_analyzer_chain = None
searchindex = SearchIndex.query.filter_by(index_name=index_name).first()

index_task = index_task_class.s(
file_path, events, timeline_name, index_name, file_extension,
timeline_id)
if file_extension in ('csv', 'jsonl'):
index_task = run_csv_jsonl.s(
file_path, events, timeline_name, index_name, file_extension,
timeline_id)
else:
index_task = run_plaso.s(
file_path, event_filter, timeline_name, index_name, file_extension,
timeline_id)

# TODO: Check if a scenario is set or an investigative question
# is in the sketch, and then enable data finder on the newly
Expand Down Expand Up @@ -496,38 +488,36 @@ def run_sketch_analyzer(

@celery.task(track_started=True, base=SqlAlchemyTask)
def run_plaso(
file_path, events, timeline_name, index_name, source_type, timeline_id):
file_path, event_filter, timeline_name, index_name, source_type,
timeline_id):
"""Create a Celery task for processing Plaso storage file.
Args:
file_path: Path to the plaso file on disk.
events: String with event data, invalid for plaso files.
event_filter: Event filter to pass to psort.
timeline_name: Name of the Timesketch timeline.
index_name: Name of the datastore index.
source_type: Type of file, csv or jsonl.
timeline_id: ID of the timeline object this data belongs to.
Raises:
RuntimeError: If the function is called using events, plaso
is not installed or is of unsupported version.
Returns:
Name (str) of the index.
Raises:
RuntimeError: If Plaso is not installed or is of unsupported version.
"""
if not plaso:
raise RuntimeError(
'Plaso isn\'t installed, unable to continue processing plaso '
'files.')

plaso_version = int(plaso.__version__)
plaso_version = int(plaso.__version__, 10)
if plaso_version <= PLASO_MINIMUM_VERSION:
raise RuntimeError(
'Plaso version is out of date (version {0:d}, please upgrade to a '
'version that is later than {1:d}'.format(
plaso_version, PLASO_MINIMUM_VERSION))

if events:
raise RuntimeError('Plaso uploads needs a file, not events.')

event_type = 'generic_event' # Document type for Elasticsearch

mappings = None
Expand Down Expand Up @@ -593,10 +583,9 @@ def run_plaso(
psort_path = 'psort.py'

cmd = [
psort_path, '-o', 'elastic_ts', file_path, '--server', elastic_server,
psort_path, '-o', 'elastic_ts', '--server', elastic_server,
'--port', str(elastic_port), '--status_view', 'none',
'--index_name', index_name,
]
'--index_name', index_name]

if mappings_file_path:
cmd.extend(['--elastic_mappings', mappings_file_path])
Expand All @@ -620,6 +609,11 @@ def run_plaso(
if psort_memory:
cmd.extend(['--process_memory_limit', str(psort_memory)])

cmd.append(file_path)

if event_filter:
cmd.append(event_filter)

# Run psort.py
try:
subprocess.check_output(
Expand Down

0 comments on commit fe82602

Please sign in to comment.