-
Notifications
You must be signed in to change notification settings - Fork 43
/
db_controller.py
80 lines (71 loc) · 2.95 KB
/
db_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine.reflection import Inspector
from malware_sample_model import MalwareIndex
import os
import re
class MalwareDbController(object):
def __init__(self, config):
self.config = config
self.db = None
def initialize_db(self):
""" Initializes sqllite location and filename
Returns [True, False] accordingly
"""
# Get DB configuration
uri = self.config.get('database', 'uri')
try:
self.engine = create_engine(uri)
self.engine.echo = False # Try changing this to True and see what happens
self.metadata = MetaData(self.engine)
self.engine.connect()
Session = sessionmaker()
Session.configure(bind=self.engine)
self.session = Session()
return True
except Exception, err:
print "Failed to initialize DB\nPlease verify your db settings-%s" %(err)
return False
def find_sample(self, find_string):
""" Searches the malware db for samples
specified by the find_string query
"""
try:
if re.findall(r"^([a-fA-F\d]{64})$", find_string):
query = self.session.query(MalwareIndex).filter_by(sha256=find_string)
elif re.findall(r"^([a-fA-F\d]{32})$", find_string):
query = self.session.query(MalwareIndex).filter_by(md5=find_string)
else:
query = self.session.query(MalwareIndex).filter_by(name=find_string)
data = [malware_sample.__dict__ for malware_sample in query]
return data
except Exception, err:
print err
def recent(self, quantity='5'):
"""Returns a summary of the last n (default: n = 5) pieces of malware."""
try:
query = self.session.query(MalwareIndex).limit(quantity)
data = [malware_sample.__dict__ for malware_sample in query]
return data
except Exception, err:
print err
def load_db(self, report_json):
"""Load information about the sample into the index DB."""
try:
# Set the values with some reflection awesomeness
malware_sample = MalwareIndex()
for k, v in report_json.iteritems():
if hasattr(malware_sample, k):
if isinstance(v, list):
setattr(malware_sample, k, ",".join(v))
else:
setattr(malware_sample, k, v)
# Add to DB
self.session.add(malware_sample)
self.session.commit()
print "Sample %s loaded..." % report_json['name']
return True
except Exception, err:
print "Error", err