Skip to content

Commit

Permalink
Adding an AnalyzerOutput class (#2706)
Browse files Browse the repository at this point in the history
* Adding a class for a more structured way of analyzer outputs.
* Validates output via jsonschema
* returns a json string for the database

---------

Co-authored-by: janosch <[email protected]>
Co-authored-by: Johan Berggren <[email protected]>
Co-authored-by: Janosch <[email protected]>
  • Loading branch information
4 people authored May 16, 2023
1 parent 63afb2d commit 5c5b83d
Showing 1 changed file with 219 additions and 0 deletions.
219 changes: 219 additions & 0 deletions timesketch/lib/analyzers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import random
import time
import traceback


import yaml

import opensearchpy
from flask import current_app
from jsonschema import validate, ValidationError, SchemaError

import pandas

Expand Down Expand Up @@ -884,6 +887,18 @@ def __init__(self, index_name, sketch_id, timeline_id=None):
port=current_app.config["OPENSEARCH_PORT"],
)

# Add AnalyzerOutput instance and set all attributes that can be set
# automatically
self.output = AnalyzerOutput(
analyzer_identifier=self.NAME,
analyzer_name=self.DISPLAY_NAME,
timesketch_instance=current_app.config.get(
"EXTERNAL_HOST_URL", "https://localhost"
),
sketch_id=sketch_id,
timeline_id=timeline_id,
)

if not hasattr(self, "sketch"):
self.sketch = None

Expand Down Expand Up @@ -1150,3 +1165,207 @@ def get_kwargs(cls):
def run(self):
"""Entry point for the analyzer."""
raise NotImplementedError


class AnalyzerOutputException(Exception):
"""Analyzer output exception."""


class AnalyzerOutput:
"""A class to record timesketch analyzer output.
Attributes:
platform (str): [Required] Analyzer platfrom.
analyzer_identifier (str): [Required] Unique analyzer identifier.
analyzer_name (str): [Required] Analyzer display name.
result_status (str): [Required] Analyzer result status.
Valid values are success or error.
result_priority (str): [Required] Priority of the result based on the
analysis findings. Valid values are NOTE (default), LOW, MEDIUM, HIGH.
result_summary (str): [Required] A summary statement of the analyzer
finding. A result summary must exist even if there is no finding.
result_markdown (str): [Optional] A detailed information about the
analyzer finding in a markdown format.
references (List[str]): [Optional] A list of references about the
analyzer or the issue the analyzer attempts to address.
result_attributes (dict): [Optional] A dict of key : value pairs that
holds additional finding details.
platform_meta_data: (dict): [Required] A dict of key : value pairs that
holds the following information:
timesketch_instance (str): [Required] The Timesketch instance URL.
sketch_id (int): [Required] Timesketch sketch ID for this analyzer.
timeline_id (int): [Required] Timesketch timeline ID for this analyzer.
saved_views (List[int]): [Optional] Views generatred by the analyzer.
saved_stories (List[int]): [Optional] Stories generated by the analyzer.
saved_graphs (List[int]): [Optional] Graphs generated by the analyzer.
saved_aggregations (List[int]): [Optional] Aggregations generated
by the analyzer.
created_tags (List[str]): [Optional] Tags created by the analyzer.
"""

def __init__(
self,
analyzer_identifier,
analyzer_name,
timesketch_instance,
sketch_id,
timeline_id,
):
"""Initialize analyzer output."""
self.platform = "timesketch"
self.analyzer_identifier = analyzer_identifier
self.analyzer_name = analyzer_name
self.result_status = "" # TODO: link to analyzer status/error?
self.result_priority = "NOTE"
self.result_summary = ""
self.result_markdown = ""
self.references = []
self.result_attributes = {}
self.platform_meta_data = {
"timesketch_instance": timesketch_instance,
"sketch_id": sketch_id,
"timeline_id": timeline_id,
"saved_views": [],
"saved_stories": [],
"saved_graphs": [],
"saved_aggregations": [],
"created_tags": [],
}

def validate(self):
"""Validates the analyzer output and raises exception."""
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"platform": {"type": "string", "enum": ["timesketch"]},
"analyzer_identifier": {"type": "string", "minLength": 1},
"analyzer_name": {"type": "string", "minLength": 1},
"result_status": {
"type": "string",
"enum": ["SUCCESS", "NO-FINDINGS", "ERROR"],
},
"result_priority": {
"type": "string",
"default": "NOTE",
"enum": ["HIGH", "MEDIUM", "LOW", "NOTE"],
},
"result_summary": {"type": "string", "minLength": 1},
"result_markdown": {"type": "string", "minLength": 1},
"references": {
"type": "array",
"items": [{"type": "string", "minLength": 1}],
},
"result_attributes": {"type": "object"},
"platform_meta_data": {
"type": "object",
"properties": {
"timesketch_instance": {"type": "string", "minLength": 1},
"sketch_id": {"type": "integer"},
"timeline_id": {"type": "integer"},
"saved_views": {
"type": "array",
"items": [
{"type": "integer"},
],
},
"saved_stories": {
"type": "array",
"items": [{"type": "integer"}],
},
"saved_aggregations": {
"type": "array",
"items": [
{"type": "integer"},
],
},
"created_tags": {
"type": "array",
"items": [
{"type": "string"},
],
},
},
"required": [
"timesketch_instance",
"sketch_id",
"timeline_id",
],
},
},
"required": [
"platform",
"analyzer_identifier",
"analyzer_name",
"result_status",
"result_priority",
"result_summary",
"platform_meta_data",
],
}

try:
validate(instance=self.to_json(), schema=schema)
return True
except (ValidationError, SchemaError) as e:
raise AnalyzerOutputException(f"json schema error: {e}") from e

def to_json(self) -> dict:
"""Returns JSON output of AnalyzerOutput. Filters out empty values."""
# add required fields
output = {
"platform": self.platform,
"analyzer_identifier": self.analyzer_identifier,
"analyzer_name": self.analyzer_name,
"result_status": self.result_status.upper(),
"result_priority": self.result_priority.upper(),
"result_summary": self.result_summary,
"platform_meta_data": {
"timesketch_instance": self.platform_meta_data["timesketch_instance"],
"sketch_id": self.platform_meta_data["sketch_id"],
"timeline_id": self.platform_meta_data["timeline_id"],
},
}

# add optional fields if they are not empty
if self.result_markdown and self.result_markdown != "":
output["result_markdown"] = self.result_markdown

if self.references:
output["references"] = self.references

if self.result_attributes:
output["result_attributes"] = self.result_attributes

if self.platform_meta_data["saved_views"]:
output["platform_meta_data"]["saved_views"] = self.platform_meta_data[
"saved_views"
]

if self.platform_meta_data["saved_stories"]:
output["platform_meta_data"]["saved_stories"] = self.platform_meta_data[
"saved_stories"
]

if self.platform_meta_data["saved_graphs"]:
output["platform_meta_data"]["saved_graphs"] = self.platform_meta_data[
"saved_graphs"
]

if self.platform_meta_data["saved_aggregations"]:
output["platform_meta_data"][
"saved_aggregations"
] = self.platform_meta_data["saved_aggregations"]

if self.platform_meta_data["created_tags"]:
output["platform_meta_data"]["created_tags"] = self.platform_meta_data[
"created_tags"
]

return output

def __str__(self) -> str:
"""Returns string output of AnalyzerOutput."""
if self.validate():
return json.dumps(self.to_json())
return ""

0 comments on commit 5c5b83d

Please sign in to comment.