Skip to content

Commit

Permalink
Added options to split queries by time period to data_providers.py (#110
Browse files Browse the repository at this point in the history
)

Split queries by time period
  • Loading branch information
ianhelle authored Oct 15, 2020
1 parent 260a23c commit 524aaed
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 31 deletions.
72 changes: 57 additions & 15 deletions docs/source/data_acquisition/DataProviders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ can use that when running a query to automatically supply the ``start`` and
<p>5 rows × 30 columns</p>
</div>


.. _ad-hoc-queries:

Running an ad-hoc query
-----------------------
Expand Down Expand Up @@ -1129,20 +1129,6 @@ execute.
query_test.head()
.. parsed-literal::
<IPython.core.display.Javascript object>
.. parsed-literal::
<IPython.core.display.Javascript object>
.. raw:: html

<div>
Expand Down Expand Up @@ -1313,6 +1299,62 @@ execute.
</div>


Splitting Query Execution into Chunks
-------------------------------------

Some queries return too much data or take too long to execute in a
single request. The MSTICPy data providers have an option to
split a query into time ranges. Each sub-range is run as an independent
query and the results are combined before being returned as a
DataFrame.

To use this feature you must specify the keyword parameter ``split_queries_by``
when executing the query function. The value to this parameter is a
string that specifies a time period. The time range specified by the
``start`` and ``end`` parameters to the query is split into sub-ranges
each of which are the length of the split time period. For example, if you
specify ``split_queries_by="1H"`` the query will be split into one hour
chunks.

.. note:: The final chunk may cover a time period larger or smaller
than the split period that you specified in the *split_queries_by*
parameter. This can happen if *start* and *end* are not aligned
exactly on time boundaries (e.g. if you used a one hour split period
and *end* is 10 hours 15 min after *start*. The query split logic
will create a larger final slice if *end* is close to the final time
range or it will insert an extra time range to ensure that the full
*start** to *end* time range is covered.

The subranges are used to generate a query for each time range. The
queries are then executed in sequence and the results concatenated into
a single DataFrame before being returned.

The values acceptable for the *split_queries_by* parameter have the format:

::

{N}{TimeUnit}

where N is the number of units and TimeUnit is a mnemonic of the unit, e.g.
H = hour, D = day, etc. For the full list of these see the documentation
for Timedelta in the
`pandas documentation<https://pandas.pydata.org/pandas-docs>`__

.. warning:: There are some important caveats to this feature.

1. It currently only works with pre-defined queries (including ones
that you may create and add yourself, see :ref:`creating-new-queries`
below). It does not work with :ref:`ad hoc queries <ad-hoc-queries>`
2. If the query contains joins, the joins will only happen within
the time ranges of each subquery.
3. It only supports queries that have *start* and *end* parameters.
4. Very large queries may return results that can exhaust the memory
on the Python client machine.
5. Duplicate records are possible at the time boundaries. The code
tries to avoid returning duplicate records occuring
exactly on the time boundaries but some data sources may not use
granular enough time stamps to avoid this.

.. _creating-new-queries:

Creating new queries
Expand Down
2 changes: 1 addition & 1 deletion msticpy/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Version file."""
VERSION = "0.8.3"
VERSION = "0.8.5"
128 changes: 113 additions & 15 deletions msticpy/data/data_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,34 @@
# license information.
# --------------------------------------------------------------------------
"""Data provider loader."""
import warnings
from datetime import datetime
from functools import partial
from itertools import tee
from pathlib import Path
from typing import Union, Any, List, Dict, Optional, Iterable
import warnings
from typing import Any, Dict, Iterable, List, Optional, Union

import pandas as pd
from tqdm.auto import tqdm

from .._version import VERSION
from ..common import pkg_config as config
from ..common.utility import export, valid_pyname
from .browsers.query_browser import browse_queries
from .drivers import (
DriverBase,
KqlDriver,
SecurityGraphDriver,
MDATPDriver,
LocalDataDriver,
SplunkDriver,
MDATPDriver,
MordorDriver,
SecurityGraphDriver,
SplunkDriver,
)
from .browsers.query_browser import browse_queries
from .query_store import QueryStore
from .query_container import QueryContainer
from .param_extractor import extract_query_params
from .query_container import QueryContainer
from .query_defns import DataEnvironment

from ..common.utility import export, valid_pyname
from ..common import pkg_config as config
from .._version import VERSION
from .query_source import QuerySource
from .query_store import QueryStore

__version__ = VERSION
__author__ = "Ian Hellen"
Expand Down Expand Up @@ -309,19 +312,40 @@ def _execute_query(self, *args, **kwargs) -> Union[pd.DataFrame, Any]:
query_source.help()
raise ValueError(f"No values found for these parameters: {missing}")

param_formatters = self._query_provider.formatters
query_str = query_source.create_query(formatters=param_formatters, **params)
split_by = kwargs.pop("split_query_by", None)
if split_by:
split_result = self._exec_split_query(
split_by=split_by,
query_source=query_source,
query_params=params,
args=args,
**kwargs,
)
if split_result is not None:
return split_result
# if split queries could not be created, fall back to default
query_str = query_source.create_query(
formatters=self._query_provider.formatters, **params
)
if "print" in args or "query" in args:
return query_str

# Handle any query options passed
query_options = self._get_query_options(params, kwargs)
return self._query_provider.query(query_str, query_source, **query_options)

@staticmethod
def _get_query_options(
params: Dict[str, Any], kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""Return any kwargs not already in params."""
query_options = kwargs.pop("query_options", {})
if not query_options:
# Any kwargs left over we send to the query provider driver
query_options = {
key: val for key, val in kwargs.items() if key not in params
}
return self._query_provider.query(query_str, query_source, **query_options)
return query_options

def _read_queries_from_paths(self, query_paths) -> Dict[str, QueryStore]:
"""Fetch queries from YAML files in specified paths."""
Expand Down Expand Up @@ -395,6 +419,80 @@ def _add_driver_queries(self, queries: Iterable[Dict[str, str]]):
# queries it should not be noticeable.
self._add_query_functions()

def _exec_split_query(
self,
split_by: str,
query_source: QuerySource,
query_params: Dict[str, Any],
args,
**kwargs,
) -> Union[pd.DataFrame, str, None]:
start = query_params.pop("start", None)
end = query_params.pop("end", None)
if not (start or end):
print(
"Cannot split a query that does not have 'start' and 'end' parameters"
)
return None
try:
split_delta = pd.Timedelta(split_by)
except ValueError:
split_delta = pd.Timedelta("1D")

ranges = self._calc_split_ranges(start, end, split_delta)

split_queries = [
query_source.create_query(
formatters=self._query_provider.formatters,
start=q_start,
end=q_end,
**query_params,
)
for q_start, q_end in ranges
]
if "print" in args or "query" in args:
return "\n\n".join(split_queries)

# Retrive any query options passed (other than query params)
# and send to query function.
query_options = self._get_query_options(query_params, kwargs)
query_dfs = [
self._query_provider.query(query_str, query_source, **query_options)
for query_str in tqdm(split_queries, unit="sub-queries", desc="Running")
]

return pd.concat(query_dfs)

@staticmethod
def _calc_split_ranges(start: datetime, end: datetime, split_delta: pd.Timedelta):
"""Return a list of time ranges split by `split_delta`."""
# Use pandas date_range and split the result into 2 iterables
s_ranges, e_ranges = tee(pd.date_range(start, end, freq=split_delta))
next(e_ranges, None) # skip to the next item in the 2nd iterable
# Zip them together to get a list of (start, end) tuples of ranges
# Note: we subtract 1 nanosecond from the 'end' value of each range so
# to avoid getting duplicated records at the boundaries of the ranges.
# Some providers don't have nanosecond granularity so we might
# get duplicates in these cases
ranges = [
(s_time, e_time - pd.Timedelta("1ns"))
for s_time, e_time in zip(s_ranges, e_ranges)
]

# Since the generated time ranges are based on deltas from 'start'
# we need to adjust the end time on the final range.
# If the difference between the calculated last range end and
# the query 'end' that the user requested is small (< 10% of a delta),
# we just replace the last "end" time with our query end time.
if (ranges[-1][1] - end) < (split_delta / 10):
ranges[-1] = ranges[-1][0], end
else:
# otherwise append a new range starting after the last range
# in ranges and ending in 'end"
# note - we need to add back our subtracted 1 nanosecond
ranges.append((ranges[-1][0] + pd.Timedelta("1ns"), end))
return ranges

@classmethod
def _resolve_package_path(cls, config_path: str) -> Optional[str]:
"""Resolve path relative to current package."""
Expand Down
43 changes: 43 additions & 0 deletions tests/data/test_dataqueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------
"""datq query test class."""
from datetime import datetime
import unittest
from functools import partial
from pathlib import Path
Expand Down Expand Up @@ -337,3 +338,45 @@ def test_connect_queries_dotted(self):
q_store = data_provider._query_store
q_src = q_store.get_query("Saved.Searches.test.query3")
self.assertEqual(q_src.query, dotted_container_qs[2]["query"])

def test_split_ranges(self):
"""Test time range split logic."""
start = datetime.utcnow() - pd.Timedelta("5H")
end = datetime.utcnow() + pd.Timedelta("5min")
delta = pd.Timedelta("1H")

ranges = QueryProvider._calc_split_ranges(start, end, delta)
self.assertEqual(len(ranges), 5)
self.assertEqual(ranges[0][0], start)
self.assertEqual(ranges[-1][1], end)

st_times = [start_tm[0] for start_tm in ranges]
for end_time in (end_tm[1] for end_tm in ranges):
self.assertNotIn(end_time, st_times)

end = end + pd.Timedelta("20min")
ranges = QueryProvider._calc_split_ranges(start, end, delta)
self.assertEqual(len(ranges), 5)
self.assertEqual(ranges[0][0], start)
self.assertEqual(ranges[-1][1], end)

def test_split_queries(self):
"""Test queries split into time segments."""
la_provider = self.la_provider

start = datetime.utcnow() - pd.Timedelta("5H")
end = datetime.utcnow() + pd.Timedelta("5min")
delta = pd.Timedelta("1H")

ranges = QueryProvider._calc_split_ranges(start, end, delta)
result_queries = la_provider.all_queries.list_alerts(
"print", start=start, end=end, split_query_by="1H"
)
queries = result_queries.split("\n\n")
self.assertEqual(len(queries), 5)

for idx, (st_time, e_time) in enumerate(ranges):
self.assertIn(st_time.isoformat(sep="T") + "Z", queries[idx])
self.assertIn(e_time.isoformat(sep="T") + "Z", queries[idx])
self.assertIn(start.isoformat(sep="T") + "Z", queries[0])
self.assertIn(end.isoformat(sep="T") + "Z", queries[-1])

0 comments on commit 524aaed

Please sign in to comment.