Skip to content
This repository has been archived by the owner on Sep 11, 2020. It is now read-only.

Commit

Permalink
Column verification complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Eppert committed Sep 1, 2015
1 parent 6cbc68e commit d973ecf
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 16 deletions.
331 changes: 315 additions & 16 deletions intel_linter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# 8-24-2015 GPL and pushed to GitHub Aaron Eppert
# 8-25-2015 Small cleanups and proper exit codes for using
# as a git pre-commit hook Aaron Eppert
# 9-1-2015 Added column-based type verifications Aaron Eppert
#
import sys
import re
Expand All @@ -24,7 +25,267 @@ def warning_line(line, *objs):
out += o
warning(out)

###############################################################################
# class bro_intel_indicator_type
#
# This class is for handling the "indicator_type" fields within a Bro Intel
# file. Note, each type of field has a specific handler.
#
class bro_intel_indicator_type:
def __init__(self):
self.__INDICATOR_TYPE_handler = {'Intel::ADDR': self.__handle_intel_addr,
'Intel::URL': self.__handle_intel_url,
'Intel::SOFTWARE': self.__handle_intel_software,
'Intel::EMAIL': self.__handle_intel_email,
'Intel::DOMAIN': self.__handle_intel_domain,
'Intel::USER_NAME': self.__handle_intel_user_name,
'Intel::FILE_HASH': self.__handle_intel_file_hash,
'Intel::FILE_NAME': self.__handle_intel_file_hash,
'Intel::CERT_HASH': self.__handle_intel_cert_hash}

def __handle_intel_addr(self, indicator):
ret = True
import socket
try:
socket.inet_aton(indicator)
except socket.error:
ret = False
return ret

# We will call this minimalist, but effective.
def __handle_intel_url(self, indicator):
ret = False
rx = re.compile(r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
t = rx.search(indicator)
if t:
ret = True
return ret

def __handle_intel_email(self, indicator):
ret = False
rx = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)"
t_email = re.findall(rx, indicator)
if len(t_email) > 0:
ret = True
return ret

def __handle_intel_software(self, indicator):
ret = False
if len(indicator) > 0:
ret = True
return ret

def __handle_intel_domain(self, indicator):
ret = False
rx = r'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(?<!-)\.)+[a-zA-Z]{2,63}$)'
t_domain = re.findall(rx, indicator)
if len(t_domain) > 0:
if indicator in t_domain[0]:
ret = True
return ret

def __handle_intel_user_name(self, indicator):
ret = False
if len(indicator) > 0:
ret = True
return ret

# Pretty weak, but should suffice for now.
def __handle_intel_file_hash(self, indicator):
ret = False
VALID_HASH_LEN = {32: 'md5',
40: 'sha1',
64: 'sha256'}
if VALID_HASH_LEN.get(len(indicator), None):
ret = True
return ret

def __handle_intel_cert_hash(self, indicator):
warning('Intel::CERT_HASH - Needs additional validation')
return True

def verify_indicator_type(self, indicator_type):
ret = False
it = self.__INDICATOR_TYPE_handler.get(indicator_type, None)
if it is not None:
ret = True
return ret

def correlate(self, indicator, indicator_type):
ret = False
if len(indicator) > 1 and len(indicator_type) > 1:
h = self.__INDICATOR_TYPE_handler.get(indicator_type, None)
if h:
ret = h(indicator)
else:
ret = True
return ret


###############################################################################
# class bro_data_intel_field_values
#
# This class is for processing the individual Bro Intel fields and verifying
# their validity.
#
# Note, it may be easily expanded via adding entries to self.__VERIFY within
# the class constructor.
#
class bro_data_intel_field_values:
EMPTY_FIELD_CHAR = '-'
META_DO_NOTICE = ['T', 'F']

META_IF_IN = ['-',
'Conn::IN_ORIG',
'Conn::IN_RESP',
'Files::IN_HASH',
'Files::IN_NAME',
'DNS::IN_REQUEST',
'DNS::IN_RESPONSE',
'HTTP::IN_HOST_HEADER',
'HTTP::IN_REFERRER_HEADER',
'HTTP::IN_USER_AGENT_HEADER',
'HTTP::IN_X_FORWARDED_FOR_HEADER',
'HTTP::IN_URL',
'SMTP::IN_MAIL_FROM',
'SMTP::IN_RCPT_TO',
'SMTP::IN_FROM',
'SMTP::IN_TO',
'SMTP::IN_RECEIVED_HEADER',
'SMTP::IN_REPLY_TO',
'SMTP::IN_X_ORIGINATING_IP_HEADER',
'SMTP::IN_MESSAGE',
'SSL::IN_SERVER_CERT',
'SSL::IN_CLIENT_CERT',
'SSL::IN_SERVER_NAME',
'SMTP::IN_HEADER']

def __init__(self):
self.__VERIFY = {'indicator': self.verify_indicator,
'indicator_type': self.verify_indicator_type,
'meta.do_notice': self.verify_meta_do_notice,
'meta.if_in': self.verify_meta_if_in,
'meta.desc': self.verify_meta_desc,
'meta.source': self.verify_meta_source,
'meta.cif_confidence': self.verify_meta_cif_confidence,
'meta.url': self.verify_meta_url,
'meta.whitelist': self.verify_meta_whitelist,
'meta.severity': self.verify_meta_severity,
'meta.cif_severity': self.verify_meta_cif_severity,
'meta.cif_impact': self.verify_meta_cif_impact}

self.biit = bro_intel_indicator_type()

def get_verifier(self, v):
return self.__VERIFY.get(v, self.default)

def __is_ignore_field(self, t):
return self.EMPTY_FIELD_CHAR in t

def verify_indicator(self, t):
ret = False
if len(t) > 1:
ret = True
return ret

def verify_indicator_type(self, t):
return self.biit.verify_indicator_type(t)

def correlate_indictor_and_indicator_type(self, i, it):
return self.biit.correlate(i, it)

def verify_meta_do_notice(self, t):
return t in bro_data_intel_field_values.META_DO_NOTICE

def verify_meta_if_in(self, t):
return t in bro_data_intel_field_values.META_IF_IN

def verify_meta_cif_confidence(self, t):
ret = False
try:
t_int = int(t)
if isinstance(t_int, (int, long)) and (t_int > 0 and t_int < 10):
ret = True
except ValueError:
ret = False
return ret

def verify_meta_desc(self, t):
ret = False
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

def verify_meta_source(self, t):
ret = False
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

def verify_meta_url(self, t):
ret = False
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

def verify_meta_whitelist(self, t):
ret = False
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

def verify_meta_severity(self, t):
ret = False
try:
t_int = int(t)
if isinstance(t_int, (int, long)) and (t_int > 0 and t_int < 10):
ret = True
except ValueError:
ret = False
return ret

def verify_meta_cif_severity(self, t):
ret = False
VALID_SEVERITY = ['low', 'medium', 'med', 'high']
if t in VALID_SEVERITY:
ret = True
return ret

def verify_meta_cif_impact(self, t):
ret = False
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

def default(self, t):
ret = False
warning("Running default handler for: %s" % (t))
if self.__is_ignore_field(t):
ret = True
elif len(t) > 1:
ret = True
return ret

###############################################################################
# class bro_intel_feed_verifier
#
# This is the control class for Bro Intel Feed verification
#
class bro_intel_feed_verifier:
required_fields = ['indicator',
'indicator_type',
Expand All @@ -34,6 +295,8 @@ class bro_intel_feed_verifier:
feed_rx = r'([\S]+)'
feed_sep_rx = r'(\t)+'

header_fields = []

def __init__(self, feed_file):
self.feed_file = feed_file
self.__feed_header_found = False
Expand All @@ -58,6 +321,7 @@ def __are_header_fields_valid(self, l):
continue
if item in self.required_fields:
_fields_found.append(item)
self.header_fields.append(item)

t_list_diff = list(set(self.required_fields) - set(_fields_found))
if len(t_list_diff) == 0:
Expand Down Expand Up @@ -86,15 +350,6 @@ def __verify_non_space(self, offset, l):
ret = False
return ret

def __verify_single_char_entry(self, offset, l):
val = ['-', 'T', 'F']
ret = True
r = [i for i, x in enumerate(l) if (len(x) == 1 and x not in val)]
if len(r) > 0:
warning_line(offset, 'Invalid single character field entry, offset %s' % (self.__make_one_indexed(r)))
ret = False
return ret

def __get_field_contents(self, l):
return l.split('\t')

Expand Down Expand Up @@ -132,15 +387,53 @@ def __verify_header(self, index, l):
warning_line(index, "Duplicate header found")
return ret

def __verify_fields(self, index, content):
ret = True
_fields_to_process = {}
validator = bro_data_intel_field_values()

#
# Not thrilled about this, but we need it to pull out correlatable fields
# since, order of the actual feed fields aren't guaranteed. Ugly for now,
# but workable and can likely be optimized shortly.
#
for content_index, t in enumerate(content):
_fields_to_process[self.header_fields[content_index]] = t

for k in _fields_to_process:
r = validator.get_verifier(k)(_fields_to_process[k])
if not r:
warning_line(index, 'Invalid entry \"%s\" for column \"%s\"' % (_fields_to_process[k], k))
ret = False
break

if ret:
# Special case to verify indicator with indicator_type
c = validator.correlate_indictor_and_indicator_type(_fields_to_process['indicator'],
_fields_to_process['indicator_type'])

if not c:
warning_line(index, 'Indicator type \"%s\" does not correlate with indicator: \"%s\"' % (_fields_to_process['indicator_type'], _fields_to_process['indicator']))
ret = False
return ret

def __verify_entry(self, index, l):
ret = False
contents = self.__get_field_contents(l)
if self.__verify_field_count(contents) == 0:
if self.__verify_field_sep(index, l) and self.__verify_non_space(index, contents) and self.__verify_single_char_entry(index, contents):
_content_field_count = self.__verify_field_count(contents)
_warn_str = None

if _content_field_count == 0:
if self.__verify_field_sep(index, l) and self.__verify_non_space(index, contents) and self.__verify_fields(index, contents):
ret = True
else:
warning_line(index, 'Invalid number of fields - Found: %d, Header Fields: %d - Verify EMPTY fields' %
(len(contents), self.__num_of_fields))
elif _content_field_count > 0:
_warn_str = 'Invalid number of fields - Found: %d, Header Fields: %d - Look for: EXTRA fields or tab seperators' % (len(contents), self.__num_of_fields)
elif _content_field_count < 0:
_warn_str = 'Invalid number of fields - Found: %d, Header Fields: %d - Look for: EMPTY fields' % (len(contents), self.__num_of_fields)

if _warn_str:
warning_line(index, _warn_str)

return ret

def __load_feed(self, feed):
Expand All @@ -161,10 +454,12 @@ def verify(self):
if not self.__verify_entry(index, l):
sys.exit(3)


###############################################################################
# main()
###############################################################################
def main():
parser = OptionParser()
parser.add_option('--file', dest='feed_file', help='Bro Intel Feed to Verify')
parser.add_option('-f', '--file', dest='feed_file', help='Bro Intel Feed to Verify')
(options, args) = parser.parse_args()

for o in options.__dict__.keys():
Expand All @@ -176,5 +471,9 @@ def main():
bifv = bro_intel_feed_verifier(options.feed_file)
bifv.verify()


###############################################################################
# __name__ checking
###############################################################################
if __name__ == '__main__':
main()
5 changes: 5 additions & 0 deletions tests/t.intel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#fields indicator indicator_type meta.source meta.desc meta.url meta.do_notice meta.if_in meta.whitelist meta.severity
my.domain.com Intel::DOMAIN domain DOMAIN - F - - 6
your.com Intel::DOMAIN domain DOMAIN - F - - 6
other.domain.com Intel::DOMAIN domain DOMAIN - F - - 6

Loading

0 comments on commit d973ecf

Please sign in to comment.