forked from rabobank-cdc/DeTTECT
-
Notifications
You must be signed in to change notification settings - Fork 2
/
eql_yaml.py
443 lines (365 loc) · 17.8 KB
/
eql_yaml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
from generic import *
from health import *
import datetime
import sys
from pprint import pprint
import eql
from copy import deepcopy
def _traverse_dict(obj, callback=None):
"""
Traverse all items in a dictionary
:param obj: dictionary, list or value
:param callback: a function that will be called to modify a value
:return: value or call callback function
"""
if isinstance(obj, dict):
value = {k: _traverse_dict(v, callback)
for k, v in obj.items()}
elif isinstance(obj, list):
value = [_traverse_dict(elem, callback)
for elem in obj]
else:
value = obj
if callback is None: # if a callback is provided, call it to get the new value
return value
else:
return callback(value)
def _traverse_modify_date(obj):
"""
Modifies a datetime.date object to a string value
:param obj: dictionary
:return: function call
"""
# This will get called for every value in the structure
def _transformer(value):
if isinstance(value, datetime.date):
return str(value)
else:
return value
return _traverse_dict(obj, callback=_transformer)
def _techniques_to_events(techniques, obj_type, include_all_score_objs):
"""
Transform visibility or detection objects into EQL 'events'
:param techniques: visibility or detection YAML objects within a list
:param obj_type: 'visibility' or 'detection'
:param include_all_score_objs: include all score objects within the score_logbook for the EQL query
:return: EQL 'events'
"""
technique_events = []
techniques = techniques['techniques']
for tech in techniques:
if not isinstance(tech[obj_type], list):
tech[obj_type] = [tech[obj_type]]
# loop over all visibility or detection objects
for obj in tech[obj_type]:
obj = set_yaml_dv_comments(obj)
if not isinstance(obj['score_logbook'], list):
obj['score_logbook'] = [obj['score_logbook']]
if not include_all_score_objs:
obj['score_logbook'] = [get_latest_score_obj(obj)]
# loop over all scores (if we have multiple) create the actual events for EQL
for scr_log in obj['score_logbook']:
event_lvl_2 = deepcopy(obj)
event_lvl_2['score_logbook'] = scr_log
event_lvl_1 = deepcopy(tech)
del event_lvl_1['visibility']
del event_lvl_1['detection']
event_lvl_1[obj_type] = event_lvl_2
technique_events.append(event_lvl_1)
return technique_events
def _object_in_technique(obj_event, technique_yaml, obj_type):
"""
- Check if the detection/visibility object already exists within the provided technique object ('technique_yaml')
- If it exists return the object's index in the list of other detection/visibility objects to which the
'score_logbook' should be added. This is needed for techniques which have multiple visibility or detection objects
due to 'applicable_to'
:param obj_event: visibility or detection EQL event
:param technique_yaml: the technique object that's being reconstructing from the EQL events
:param obj_type: 'visibility' or 'detection'
:return: -1 if it does not exists, otherwise the index within the list
"""
idx = 0
for obj in technique_yaml[obj_type]:
match = True
for k, v in obj_event.items():
# we need to skip the score_logbook in the comparison this will not match as we are still re-creating the object
if (k in obj and obj[k] == v) or k == 'score_logbook':
continue
else:
match = False
break
if match:
return idx
idx += 1
return -1
def _value_in_dict_list(dict_list, dict_key, dict_value):
"""
Checks if the provided value is present within a certain dict key against a list of dictionaries
:param dict_list: list of dictionaries
:param dict_key: key name
:param dict_value: key value to match on
:return: true or false
"""
items = set(map(lambda k: k[dict_key], dict_list))
if dict_value in items:
return True
else:
return False
def _get_technique_from_list(techniques, tech_id):
"""
Get a technique object from a list of techniques objects that matches the provided technique ID
:param techniques: list of techniques
:param tech_id: technique_id
:return: technique object or None of no match is found
"""
for tech in techniques:
if tech['technique_id'] == tech_id:
return tech
return None
def _events_to_yaml(query_results, obj_type):
"""
Transform the EQL 'events' back to valid YAML objects
:param query_results: list with EQL 'events'
:param obj_type: data_sources, detection or visibility EQL 'events'
:return: list containing YAML objects or None when the events could not be turned into a valid YAML object
"""
if obj_type == 'data_sources':
try:
for r in query_results:
if r['date_registered'] and isinstance(r['date_registered'], str):
r['date_registered'] = REGEX_YAML_VALID_DATE.match(r['date_registered']).group(1)
r['date_registered'] = datetime.datetime.strptime(r['date_registered'], '%Y-%m-%d')
if r['date_connected'] and isinstance(r['date_connected'], str):
r['date_connected'] = REGEX_YAML_VALID_DATE.match(r['date_connected']).group(1)
r['date_connected'] = datetime.datetime.strptime(r['date_connected'], '%Y-%m-%d')
except KeyError:
print(EQL_INVALID_RESULT_DS)
pprint(query_results)
# when using an EQL query that does not result in a dict having valid YAML 'data_source' objects.
return None
# Set 'src_eql' to true. EQL results will not contain the platform, but just data source YAML objects.
# In addition, the search may have excluded certain data sources
if check_health_data_sources(None, {'data_sources': query_results}, health_is_called=False, no_print=True,
src_eql=True):
print(EQL_INVALID_RESULT_DS)
pprint(query_results)
return None
return query_results
elif obj_type in ['visibility', 'detection']:
try:
techniques_yaml = []
# loop over all events and reconstruct the YAML file
for tech_event in query_results:
tech_id = tech_event['technique_id']
tech_name = tech_event['technique_name']
obj_event = tech_event[obj_type]
score_logbook_event = tech_event[obj_type]['score_logbook']
# create the technique dict if not already created
if not _value_in_dict_list(techniques_yaml, 'technique_id', tech_id):
tech_yaml = {
'technique_id': tech_id, 'technique_name': tech_name, 'detection': [], 'visibility': []
}
techniques_yaml.append(tech_yaml)
else:
# The technique dict was already created. Get a tech. dict from the list with a specific tech. ID
tech_yaml = _get_technique_from_list(techniques_yaml, tech_id)
# figure out if the detection/visibility dict already exists
obj_idx = _object_in_technique(obj_event, tech_yaml, obj_type)
# create the score object
score_obj_yaml = {}
for k, v in score_logbook_event.items():
value = v
if isinstance(v, str) and REGEX_YAML_VALID_DATE.match(value):
value = REGEX_YAML_VALID_DATE.match(v).group(1)
value = datetime.datetime.strptime(value, '%Y-%m-%d')
score_obj_yaml[k] = value
# The detection/visibility dict is missing. Create it.
if obj_idx == -1:
obj_event['score_logbook'] = [score_obj_yaml]
tech_yaml[obj_type].append(obj_event)
else:
# add the score object to the score_logbook within the proper detection/visibility object using 'obj_idx'
tech_yaml[obj_type][obj_idx]['score_logbook'].append(score_obj_yaml)
return techniques_yaml
except KeyError:
print(EQL_INVALID_RESULT_TECH + obj_type + ' object(s):')
pprint(query_results)
# when using an EQL query that does not in a valid technique administration file.
return None
def _merge_yaml(yaml_content_org, yaml_content_visibility=None, yaml_content_detection=None):
"""
Merge possible filtered detection and visibility objects into a valid technique administration YAML 'file'
:param yaml_content_org: original, untouched, technique administration 'file'
:param yaml_content_visibility: list of visibility YAML objects
:param yaml_content_detection: list of detection YAML objects
:return: technique administration YAML 'file' (i.e. dict)
"""
# for both a visibility and detection objects an EQL query was provided
if yaml_content_visibility and yaml_content_detection:
techniques_yaml = []
# combine visibility objects with detection objects
for tech_vis in yaml_content_visibility:
detection = _get_technique_from_list(yaml_content_detection, tech_vis['technique_id'])
if detection:
detection = detection['detection']
else:
detection = deepcopy(YAML_OBJ_DETECTION)
new_tech = tech_vis
new_tech['detection'] = detection
techniques_yaml.append(new_tech)
# merge detection objects into 'techniques_yaml' which were not already added by the previous step
for tech_d in yaml_content_detection:
if not _value_in_dict_list(techniques_yaml, 'technique_id', tech_d['technique_id']):
visibility = deepcopy(YAML_OBJ_VISIBILITY)
new_tech = tech_d
new_tech['visibility'] = visibility
techniques_yaml.append(new_tech)
# only a visibility EQL query was provided
elif yaml_content_visibility:
techniques_yaml = yaml_content_visibility
for tech_yaml in techniques_yaml:
tech_org = _get_technique_from_list(yaml_content_org['techniques'], tech_yaml['technique_id'])
tech_yaml['detection'] = tech_org['detection']
# only a detection EQL query was provided
elif yaml_content_detection:
techniques_yaml = yaml_content_detection
for tech_yaml in techniques_yaml:
tech_org = _get_technique_from_list(yaml_content_org['techniques'], tech_yaml['technique_id'])
tech_yaml['visibility'] = tech_org['visibility']
# create the final technique administration YAML 'file'/dict
techniques_yaml_final = yaml_content_org
techniques_yaml_final['techniques'] = techniques_yaml
return techniques_yaml_final
def _prepare_yaml_file(filename, obj_type, include_all_score_objs):
"""
Prepare the YAML file such that it can be used for EQL
:param filename: file location of the YAML file
:param obj_type: technique administration file ('techniques') or data source administration file ('data_sources')
:return: A dict with date fields compatible for JSON and a new key-value pair event-type
for the EQL engine
"""
_yaml = init_yaml()
with open(filename, 'r') as yaml_file:
yaml_content = _yaml.load(yaml_file)
yaml_content_eql = _traverse_modify_date(yaml_content)
yaml_eql_events = []
# create EQL events from the list of dictionaries
if obj_type == 'data_sources':
for item in yaml_content_eql[obj_type]:
yaml_eql_events.append(eql.Event(obj_type, 0, item))
# flatten the technique administration file to EQL events
elif obj_type in ['visibility', 'detection']:
yaml_content_eql = _techniques_to_events(yaml_content_eql, obj_type, include_all_score_objs)
for e in yaml_content_eql:
yaml_eql_events.append(eql.Event('techniques', 0, e))
return yaml_eql_events, yaml_content
def _check_query_results(query_results, obj_type):
"""
Check if the EQL query provided results that
:param query_results: EQL events
:param obj_type: 'data_sources', 'visibility' or 'detection'
:return:
"""
# the EQL query was not compatible with the schema
if query_results is None:
return False
# show an error to the user when the query resulted on zero results
result_len = len(query_results)
if result_len == 0:
error = '[!] The search returned 0 ' + obj_type + ' objects. Refine your search to return 1 or more ' \
+ obj_type + ' objects.'
print(error)
return False
else:
if result_len == 1:
msg = 'The ' + obj_type + ' query executed successfully and provided ' + str(len(query_results)) + ' result.'
else:
msg = 'The ' + obj_type + ' query executed successfully and provided ' + str(len(query_results)) + ' results.'
print(msg)
return True
def _execute_eql_query(events, query):
"""
Execute an EQL query against the provided events
:param events: events
:param query: EQL query
:return: the query results (i.e. filtered events) or None when the query did not match the schema
"""
# learn and load the schema
schema = eql.Schema.learn(events)
query_results = []
def callback(results):
for event in results.events:
query_results.append(event.data)
# create the engine and parse the query
engine = eql.PythonEngine()
with schema:
try:
eql_query = eql.parse_query(query, implied_any=True, implied_base=True)
engine.add_query(eql_query)
except eql.EqlError as e:
print(e, file=sys.stderr)
print('\nTake into account the following schema:')
pprint(schema.schema)
# when using an EQL query that does not match the schema, return None.
return None
engine.add_output_hook(callback)
# execute the query
engine.stream_events(events)
return query_results
def techniques_search(filename, query_visibility=None, query_detection=None, include_all_score_objs=False):
"""
Perform an EQL search on the technique administration file.
:param filename: file location of the YAML file on disk
:param query_visibility: EQL query for the visibility YAML objects
:param query_detection: EQL query for the detection YAML objects
:param include_all_score_objs: include all score objects within the score_logbook for the EQL query
:return: a filtered technique administration YAML 'file' (i.e. dict) or None when the query was not successful
"""
results_visibility_yaml = None
results_detection_yaml = None
if query_visibility:
visibility_events, yaml_content_org = _prepare_yaml_file(filename, 'visibility',
include_all_score_objs=include_all_score_objs)
results_visibility = _execute_eql_query(visibility_events, query_visibility)
if not _check_query_results(results_visibility, 'visibility'):
return None # the EQL query was not compatible with the schema
results_visibility_yaml = _events_to_yaml(results_visibility, 'visibility')
if query_detection:
detection_events, yaml_content_org = _prepare_yaml_file(filename, 'detection',
include_all_score_objs=include_all_score_objs)
results_detection = _execute_eql_query(detection_events, query_detection)
if not _check_query_results(results_detection, 'detection'):
return None # the EQL query was not compatible with the schema
results_detection_yaml = _events_to_yaml(results_detection, 'detection')
if (query_visibility and not results_visibility_yaml) or (query_detection and not results_detection_yaml):
# when using an EQL query that does not result in a dict having a valid technique administration YAML content
return None
if query_visibility and query_detection:
yaml_content = _merge_yaml(yaml_content_org, results_visibility_yaml, results_detection_yaml)
elif results_visibility_yaml:
yaml_content = _merge_yaml(yaml_content_org, yaml_content_visibility=results_visibility_yaml)
elif results_detection_yaml:
yaml_content = _merge_yaml(yaml_content_org, yaml_content_detection=results_detection_yaml)
else:
return filename
return yaml_content
def data_source_search(filename, query=''):
"""
Perform an EQL search on a data source administration file
:param filename: file location of the YAML file on disk
:param query: EQL query
:return: a filtered YAML 'file' (i.e. dict) or None when the query was not successful
"""
yaml_content_eql, yaml_content_org = _prepare_yaml_file(filename, 'data_sources',
include_all_score_objs=False)
query_results = _execute_eql_query(yaml_content_eql, query)
if not _check_query_results(query_results, 'data_sources'):
return None # the EQL query was not compatible with the schema
query_results_yaml = _events_to_yaml(query_results, 'data_sources')
if query_results_yaml:
yaml_content = yaml_content_org
yaml_content['data_sources'] = query_results_yaml
return yaml_content
else:
# when using an EQL query that does not result in a dict having valid YAML objects, return None
return None