From 7fda2a562c96ef7552d837a30d737bc27686496a Mon Sep 17 00:00:00 2001 From: Keath Milligan Date: Thu, 22 Sep 2022 11:41:22 -0500 Subject: [PATCH 01/10] Refactor tests, update jbx --- .gitignore | 3 + Pipfile | 19 + requirements.txt | 2 +- sandboxapi/joe.py | 13 +- tests/__init__.py | 6 + tests/resources/joe_submission_new.json | 5 + tests/test_cuckoo.py | 97 ++++ tests/test_falcon.py | 81 ++++ tests/test_fireeye.py | 168 +++++++ tests/test_joe.py | 68 +++ tests/test_opswat.py | 11 + tests/test_sandboxapi.py | 39 ++ tests/test_triage.py | 74 +++ tests/test_vmray.py | 92 ++++ tests/test_wildfire.py | 10 + tests/tests.py | 570 ------------------------ 16 files changed, 679 insertions(+), 579 deletions(-) create mode 100644 Pipfile create mode 100644 tests/__init__.py create mode 100644 tests/resources/joe_submission_new.json create mode 100644 tests/test_cuckoo.py create mode 100644 tests/test_falcon.py create mode 100644 tests/test_fireeye.py create mode 100644 tests/test_joe.py create mode 100644 tests/test_opswat.py create mode 100644 tests/test_sandboxapi.py create mode 100644 tests/test_triage.py create mode 100644 tests/test_vmray.py create mode 100644 tests/test_wildfire.py delete mode 100644 tests/tests.py diff --git a/.gitignore b/.gitignore index 54ffadf..0316c54 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ dist/ _build/ /build_dist.sh .vscode/ +/.virtualenv/ +/.pytest_cache/ +Pipfile.lock diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..e25eb97 --- /dev/null +++ b/Pipfile @@ -0,0 +1,19 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +requests = "*" +jbxapi = "*" +xmltodict = "*" + +[dev-packages] +pytest = "*" +coverage = "*" +responses = "*" +"collective.checkdocs" = "*" +pygments = "*" + +[requires] +python_version = "3.10" diff --git a/requirements.txt b/requirements.txt index b8eaee9..270849c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ requests -jbxapi==2.10.1 +jbxapi xmltodict diff --git a/sandboxapi/joe.py b/sandboxapi/joe.py index 081c1be..36ac6c1 100644 --- a/sandboxapi/joe.py +++ b/sandboxapi/joe.py @@ -1,7 +1,5 @@ import json - import jbxapi - import sandboxapi class JoeAPI(sandboxapi.SandboxAPI): @@ -10,9 +8,10 @@ class JoeAPI(sandboxapi.SandboxAPI): This class is actually just a convenience wrapper around jbxapi.JoeSandbox. """ - def __init__(self, apikey, apiurl, accept_tac, timeout=None, verify_ssl=True, retries=3, **kwargs): + def __init__(self, apikey, apiurl, accept_tac, timeout=None, verify_ssl=True, retries=3, chunked=False, **kwargs): """Initialize the interface to Joe Sandbox API.""" sandboxapi.SandboxAPI.__init__(self) + self._chunked = chunked self.jbx = jbxapi.JoeSandbox(apikey, apiurl or jbxapi.API_URL, accept_tac, timeout, bool(int(verify_ssl)), retries, **kwargs) def analyze(self, handle, filename): @@ -30,7 +29,7 @@ def analyze(self, handle, filename): handle.seek(0) try: - return self.jbx.submit_sample(handle)['webids'][0] + return self.jbx.submit_sample(handle, _chunked_upload=self._chunked)['submission_id'] except (jbxapi.JoeException, KeyError, IndexError) as e: raise sandboxapi.SandboxError("error in analyze: {e}".format(e=e)) @@ -44,12 +43,10 @@ def check(self, item_id): :return: Boolean indicating if a report is done or not. """ try: - return self.jbx.info(item_id).get('status').lower() == 'finished' + return self.jbx.analysis_info(item_id).get('status').lower() == 'finished' except jbxapi.JoeException: return False - return False - def is_available(self): """Determine if the Joe Sandbox API server is alive. @@ -93,7 +90,7 @@ def report(self, item_id, report_format="json"): report_format = "jsonfixed" try: - return json.loads(self.jbx.download(item_id, report_format)[1].decode('utf-8')) + return json.loads(self.jbx.analysis_download(item_id, report_format)[1].decode('utf-8')) except (jbxapi.JoeException, ValueError, IndexError) as e: raise sandboxapi.SandboxError("error in report fetch: {e}".format(e=e)) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..fe9bf3b --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,6 @@ +import os +import json + +def read_resource(resource): + with open(os.path.join('tests', 'resources', '{r}.json'.format(r=resource)), 'r') as f: + return json.loads(f.read()) diff --git a/tests/resources/joe_submission_new.json b/tests/resources/joe_submission_new.json new file mode 100644 index 0000000..be119b6 --- /dev/null +++ b/tests/resources/joe_submission_new.json @@ -0,0 +1,5 @@ +{ + "data": { + "submission_id": "100001" + } +} \ No newline at end of file diff --git a/tests/test_cuckoo.py b/tests/test_cuckoo.py new file mode 100644 index 0000000..758cadd --- /dev/null +++ b/tests/test_cuckoo.py @@ -0,0 +1,97 @@ +import io +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.cuckoo +from . import read_resource + + +class TestCuckoo(unittest.TestCase): + + def setUp(self): + self.sandbox = sandboxapi.cuckoo.CuckooAPI('http://cuckoo.mock:8090/') + + @responses.activate + def test_analyses(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEquals(len(self.sandbox.analyses()), 2) + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://cuckoo.mock:8090/tasks/create/file', + json=read_resource('cuckoo_tasks_create_file')) + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/view/1', + json=read_resource('cuckoo_tasks_view')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', + json=read_resource('cuckoo_status')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', + json=read_resource('cuckoo_tasks_report')) + self.assertEquals(self.sandbox.report(8)['info']['id'], 8) + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', + json=read_resource('cuckoo_tasks_report')) + self.assertEquals(self.sandbox.score(self.sandbox.report(8)), 5) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) + + @responses.activate + def test_cuckoo_old_style_host_port_path(self): + sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock') + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEquals(len(self.sandbox.analyses()), 2) + + sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', 9090, '/test') + responses.add(responses.GET, 'http://cuckoo.mock:9090/test/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEquals(len(self.sandbox.analyses()), 2) diff --git a/tests/test_falcon.py b/tests/test_falcon.py new file mode 100644 index 0000000..a76cfd3 --- /dev/null +++ b/tests/test_falcon.py @@ -0,0 +1,81 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.falcon +from . import read_resource + + +class TestFalcon(unittest.TestCase): + + def setUp(self): + self.sandbox = sandboxapi.falcon.FalconAPI('key', 'http://falcon.mock/api/v2') + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://falcon.mock/api/v2/submit/file', + json=read_resource('falcon_submit_file'), status=201) + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/state', + json=read_resource('falcon_report_state')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', + json=read_resource('falcon_system_heartbeat')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', + json=read_resource('falcon_report_summary')) + self.assertEquals(self.sandbox.report(1)['job_id'], '1') + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', + json=read_resource('falcon_report_summary')) + self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 5) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.falcon.FalconAPI('key', self.sandbox.api_url, + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py new file mode 100644 index 0000000..c7fec73 --- /dev/null +++ b/tests/test_fireeye.py @@ -0,0 +1,168 @@ +import io +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.fireeye +from . import read_resource + + +class TestFireEye(unittest.TestCase): + + def setUp(self): + self.sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') + self.legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', + 'http://fireeye.mock', 'profile', + legacy_api=True) + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', + json=read_resource('fireeye_submissions')) + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + + @responses.activate + def test_check(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + json=read_resource('fireeye_config')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEquals(self.sandbox.report(1)['msg'], 'concise') + + @responses.activate + def test_score(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 8) + + # Legacy API support. + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', + json=read_resource('fireeye_submissions')) + self.assertEquals(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + + @responses.activate + def test_check(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEquals(self.legacy_sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + json=read_resource('fireeye_config')) + self.assertTrue(self.legacy_sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.legacy_sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + status=500) + self.assertFalse(self.legacy_sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEquals(self.legacy_sandbox.report(1)['msg'], 'concise') + + @responses.activate + def test_score(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEquals(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) + + # Core functionality. + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_get.return_value.content = b'' + m_post.return_value.status_code = 200 + m_post.return_value.content = b'' + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.fireeye.FireEyeAPI('username', 'password', + self.sandbox.api_url, 'profile', + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) + + @responses.activate + def test_reauthenticates_if_logged_out_http_401(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + status=401) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_reauthenticates_if_logged_out_json_401(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_unauthorized')) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEquals(self.sandbox.check('1'), True) diff --git a/tests/test_joe.py b/tests/test_joe.py new file mode 100644 index 0000000..39a61f7 --- /dev/null +++ b/tests/test_joe.py @@ -0,0 +1,68 @@ +import io +import unittest +try: + from unittest.mock import patch +except ImportError: + from mock import patch +import responses +import sandboxapi.joe +from . import read_resource + + +class TestJoe(unittest.TestCase): + + def setUp(self): + self.sandbox = sandboxapi.joe.JoeAPI('key', 'http://joe.mock/api', True) + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/submission/new', + json=read_resource('joe_submission_new')) + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') + + @responses.activate + def test_check(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', + json=read_resource('joe_analysis_info')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', + json=read_resource('joe_server_online')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', + json=read_resource('joe_analysis_download')) + self.assertEquals(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) + + @responses.activate + def test_score(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', + json=read_resource('joe_analysis_download')) + self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 1) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.joe.JoeAPI('key', self.sandbox.jbx.apiurl, True, + proxies=proxies) + self.assertEquals(api.jbx.session.proxies, proxies) diff --git a/tests/test_opswat.py b/tests/test_opswat.py new file mode 100644 index 0000000..373b52b --- /dev/null +++ b/tests/test_opswat.py @@ -0,0 +1,11 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.opswat +from . import read_resource diff --git a/tests/test_sandboxapi.py b/tests/test_sandboxapi.py new file mode 100644 index 0000000..aff72ce --- /dev/null +++ b/tests/test_sandboxapi.py @@ -0,0 +1,39 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi +from . import read_resource + + +class TestSandboxAPI(unittest.TestCase): + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.SandboxAPI(proxies=proxies) + api.api_url = 'http://sandbox.mock' + api._request('/test') + + m_get.assert_called_once_with('http://sandbox.mock/test', auth=None, + headers=None, params=None, proxies=proxies, + verify=True) + + api._request('/test', method='POST') + + m_post.assert_called_once_with('http://sandbox.mock/test', auth=None, + headers=None, data=None, files=None, + proxies=proxies, verify=True) diff --git a/tests/test_triage.py b/tests/test_triage.py new file mode 100644 index 0000000..ed98e03 --- /dev/null +++ b/tests/test_triage.py @@ -0,0 +1,74 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.triage +from . import read_resource + + +class TestTriage(unittest.TestCase): + def setUp(self): + self.sandbox = sandboxapi.triage.TriageAPI("key", + "http://api.triage.mock") + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, + 'http://api.triage.mock/v0/samples', + json=read_resource('triage_analyze'), status=200) + triage_id = self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), + "testfile") + self.assertEquals(triage_id, "200707-pht1cwk3ls") + + @responses.activate + def test_check(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/status', + json=read_resource('triage_check'), status=200) + self.assertTrue(self.sandbox.check("test")) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://api.triage.mock/v0/samples', + json=read_resource('triage_available'), status=200) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/summary', + json=read_resource('triage_report'), status=200) + data = self.sandbox.report("test") + self.assertEquals( + 10, data["tasks"]["200615-8jbndpgg9n-behavioral1"]["score"]) + + @responses.activate + def test_score(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/summary', + json=read_resource('triage_report'), status=200) + score = self.sandbox.score("test") + self.assertEquals(10, score) + + @responses.activate + def test_full_report(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/summary', + json=read_resource('triage_report'), status=200) + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral1/report_triage.json', + json=read_resource('triage_behavioral1'), status=200) + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral2/report_triage.json', + json=read_resource('triage_behavioral2'), status=200) + + full_report = self.sandbox.full_report("200615-8jbndpgg9n") + self.assertTrue(full_report["tasks"]["behavioral1"]["sample"]["score"], + 10) + self.assertTrue(full_report["tasks"]["behavioral2"]["sample"]["score"], + 10) diff --git a/tests/test_vmray.py b/tests/test_vmray.py new file mode 100644 index 0000000..8d59fae --- /dev/null +++ b/tests/test_vmray.py @@ -0,0 +1,92 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +import sandboxapi.vmray +from . import read_resource + + +class TestVMRay(unittest.TestCase): + + def setUp(self): + self.sandbox = sandboxapi.vmray.VMRayAPI('key', 'http://vmray.mock') + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', + json=read_resource('vmray_sample_submit')) + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) + + @responses.activate + def test_analyze_with_errors(self): + responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', + json=read_resource('vmray_sample_submit_errors')) + with self.assertRaises(sandboxapi.SandboxError): + self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://vmray.mock/rest/submission/sample/1', + json=read_resource('vmray_submission_sample')) + self.assertEquals(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://vmray.mock/rest/system_info', + json=read_resource('vmray_system_info')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://vmray.mock/rest/system_info', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', + json=read_resource('vmray_analysis_sample')) + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', + json=read_resource('vmray_analysis_archive_logs_summary')) + self.assertEquals(self.sandbox.report(1)['version'], 1) + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', + json=read_resource('vmray_analysis_sample')) + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', + json=read_resource('vmray_analysis_archive_logs_summary')) + self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 20) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.vmray.VMRayAPI('key', self.sandbox.api_url, + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) diff --git a/tests/test_wildfire.py b/tests/test_wildfire.py new file mode 100644 index 0000000..05802b9 --- /dev/null +++ b/tests/test_wildfire.py @@ -0,0 +1,10 @@ +import io +import os +import json +import unittest +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY +import responses +from . import read_resource diff --git a/tests/tests.py b/tests/tests.py deleted file mode 100644 index 23f6794..0000000 --- a/tests/tests.py +++ /dev/null @@ -1,570 +0,0 @@ -import io -import os -import json -import unittest -try: - from unittest.mock import patch, ANY as MOCK_ANY -except ImportError: - from mock import patch, ANY as MOCK_ANY - -import responses - -import sandboxapi.cuckoo -import sandboxapi.fireeye -import sandboxapi.joe -import sandboxapi.vmray -import sandboxapi.falcon -import sandboxapi.triage - -def read_resource(resource): - with open(os.path.join('tests', 'resources', '{r}.json'.format(r=resource)), 'r') as f: - return json.loads(f.read()) - - -class TestCuckoo(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.cuckoo.CuckooAPI('http://cuckoo.mock:8090/') - - @responses.activate - def test_analyses(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://cuckoo.mock:8090/tasks/create/file', - json=read_resource('cuckoo_tasks_create_file')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/view/1', - json=read_resource('cuckoo_tasks_view')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', - json=read_resource('cuckoo_status')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', - json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.report(8)['info']['id'], 8) - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', - json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.score(self.sandbox.report(8)), 5) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - @responses.activate - def test_cuckoo_old_style_host_port_path(self): - sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock') - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', 9090, '/test') - responses.add(responses.GET, 'http://cuckoo.mock:9090/test/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - - -class TestJoe(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.joe.JoeAPI('key', 'http://joe.mock/api', True) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/submit', - json=read_resource('joe_analysis_submit')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', - json=read_resource('joe_analysis_info')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', - json=read_resource('joe_server_online')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', - json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', - json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 1) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.joe.JoeAPI('key', self.sandbox.jbx.apiurl, True, - proxies=proxies) - self.assertEquals(api.jbx.session.proxies, proxies) - - -class TestFireEye(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') - self.legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', - 'http://fireeye.mock', 'profile', - legacy_api=True) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', - json=read_resource('fireeye_submissions')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - json=read_resource('fireeye_config')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.report(1)['msg'], 'concise') - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 8) - - # Legacy API support. - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', - json=read_resource('fireeye_submissions')) - self.assertEquals(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.legacy_sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - json=read_resource('fireeye_config')) - self.assertTrue(self.legacy_sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.legacy_sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - status=500) - self.assertFalse(self.legacy_sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.report(1)['msg'], 'concise') - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) - - # Core functionality. - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_get.return_value.content = b'' - m_post.return_value.status_code = 200 - m_post.return_value.content = b'' - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.fireeye.FireEyeAPI('username', 'password', - self.sandbox.api_url, 'profile', - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - @responses.activate - def test_reauthenticates_if_logged_out_http_401(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - status=401) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_reauthenticates_if_logged_out_json_401(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_unauthorized')) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - -class TestVMRay(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.vmray.VMRayAPI('key', 'http://vmray.mock') - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', - json=read_resource('vmray_sample_submit')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) - - @responses.activate - def test_analyze_with_errors(self): - responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', - json=read_resource('vmray_sample_submit_errors')) - with self.assertRaises(sandboxapi.SandboxError): - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://vmray.mock/rest/submission/sample/1', - json=read_resource('vmray_submission_sample')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://vmray.mock/rest/system_info', - json=read_resource('vmray_system_info')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://vmray.mock/rest/system_info', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', - json=read_resource('vmray_analysis_sample')) - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', - json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.report(1)['version'], 1) - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', - json=read_resource('vmray_analysis_sample')) - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', - json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 20) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.vmray.VMRayAPI('key', self.sandbox.api_url, - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - -class TestTriage(unittest.TestCase): - def setUp(self): - self.sandbox = sandboxapi.triage.TriageAPI("key", - "http://api.triage.mock") - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, - 'http://api.triage.mock/v0/samples', - json=read_resource('triage_analyze'), status=200) - triage_id = self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), - "testfile") - self.assertEquals(triage_id, "200707-pht1cwk3ls") - - @responses.activate - def test_check(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/status', - json=read_resource('triage_check'), status=200) - self.assertTrue(self.sandbox.check("test")) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://api.triage.mock/v0/samples', - json=read_resource('triage_available'), status=200) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/summary', - json=read_resource('triage_report'), status=200) - data = self.sandbox.report("test") - self.assertEquals( - 10, data["tasks"]["200615-8jbndpgg9n-behavioral1"]["score"]) - - @responses.activate - def test_score(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/summary', - json=read_resource('triage_report'), status=200) - score = self.sandbox.score("test") - self.assertEquals(10, score) - - @responses.activate - def test_full_report(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/summary', - json=read_resource('triage_report'), status=200) - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral1/report_triage.json', - json=read_resource('triage_behavioral1'), status=200) - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral2/report_triage.json', - json=read_resource('triage_behavioral2'), status=200) - - full_report = self.sandbox.full_report("200615-8jbndpgg9n") - self.assertTrue(full_report["tasks"]["behavioral1"]["sample"]["score"], - 10) - self.assertTrue(full_report["tasks"]["behavioral2"]["sample"]["score"], - 10) - - -class TestFalcon(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.falcon.FalconAPI('key', 'http://falcon.mock/api/v2') - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://falcon.mock/api/v2/submit/file', - json=read_resource('falcon_submit_file'), status=201) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/state', - json=read_resource('falcon_report_state')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', - json=read_resource('falcon_system_heartbeat')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', - json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.report(1)['job_id'], '1') - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', - json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 5) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.falcon.FalconAPI('key', self.sandbox.api_url, - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - -class TestSandboxAPI(unittest.TestCase): - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.SandboxAPI(proxies=proxies) - api.api_url = 'http://sandbox.mock' - api._request('/test') - - m_get.assert_called_once_with('http://sandbox.mock/test', auth=None, - headers=None, params=None, proxies=proxies, - verify=True) - - api._request('/test', method='POST') - - m_post.assert_called_once_with('http://sandbox.mock/test', auth=None, - headers=None, data=None, files=None, - proxies=proxies, verify=True) From 6622c1a64bba476ee79851d86e9116a6edc39c14 Mon Sep 17 00:00:00 2001 From: Keath Milligan Date: Wed, 5 Oct 2022 20:54:14 -0500 Subject: [PATCH 02/10] Multi-version support --- .travis.yml | 42 +++++++++++++++++++++--------------------- Pipfile | 2 +- sandboxapi/joe.py | 18 ++++++++++++++---- setup.py | 2 +- tests/test_cuckoo.py | 14 +++++++------- tests/test_falcon.py | 8 ++++---- tests/test_fireeye.py | 20 ++++++++++---------- tests/test_joe.py | 27 ++++++++++++++++++--------- tests/test_triage.py | 6 +++--- tests/test_vmray.py | 10 +++++----- 10 files changed, 84 insertions(+), 65 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2b6c2cf..eb22c8a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,22 +1,22 @@ language: python -python: -# - "2.6" # -- responses module requires 2.7+ - - "2.7" -# TODO: resolve issues with older 3.x versions - # - "3.3" - # - "3.4" - - "3.5" -install: - - "pip install -r requirements.txt" - - "pip install nose" - - "pip install responses" - - "pip install coveralls" - - "pip install codacy-coverage" - - "pip install collective.checkdocs Pygments" -script: - - nosetests --with-coverage --cover-package=sandboxapi - - python setup.py checkdocs -after_success: - - coveralls - - coverage xml - - bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r coverage.xml +jobs: + include: + - name: "Python 3.9" + python: 3.9 + install: + - "pip install -r requirements.txt" + - "pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments" + script: + - coverage run -m pytest + - python setup.py checkdocs + after_success: + - coveralls + - coverage xml + - if [ "$TRAVIS_BRANCH" = "master" ]; then bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r coverage.xml; fi + - name: "Python 2.7" + python: 2.7 + install: + - "pip install -r requirements.txt" + - "pip install nose mock requests-mock responses collective.checkdocs Pygments" + script: + - nosetests diff --git a/Pipfile b/Pipfile index e25eb97..3a8c89a 100644 --- a/Pipfile +++ b/Pipfile @@ -16,4 +16,4 @@ responses = "*" pygments = "*" [requires] -python_version = "3.10" +python_version = "3.9" diff --git a/sandboxapi/joe.py b/sandboxapi/joe.py index 36ac6c1..396d757 100644 --- a/sandboxapi/joe.py +++ b/sandboxapi/joe.py @@ -11,7 +11,8 @@ class JoeAPI(sandboxapi.SandboxAPI): def __init__(self, apikey, apiurl, accept_tac, timeout=None, verify_ssl=True, retries=3, chunked=False, **kwargs): """Initialize the interface to Joe Sandbox API.""" sandboxapi.SandboxAPI.__init__(self) - self._chunked = chunked + if not jbxapi.__version__.startswith("2"): + self._chunked = chunked self.jbx = jbxapi.JoeSandbox(apikey, apiurl or jbxapi.API_URL, accept_tac, timeout, bool(int(verify_ssl)), retries, **kwargs) def analyze(self, handle, filename): @@ -29,7 +30,10 @@ def analyze(self, handle, filename): handle.seek(0) try: - return self.jbx.submit_sample(handle, _chunked_upload=self._chunked)['submission_id'] + if not jbxapi.__version__.startswith("2"): + return self.jbx.submit_sample(handle, _chunked_upload=self._chunked)['submission_id'] + else: + return self.jbx.submit_sample(handle)['webids'][0] except (jbxapi.JoeException, KeyError, IndexError) as e: raise sandboxapi.SandboxError("error in analyze: {e}".format(e=e)) @@ -43,7 +47,10 @@ def check(self, item_id): :return: Boolean indicating if a report is done or not. """ try: - return self.jbx.analysis_info(item_id).get('status').lower() == 'finished' + if not jbxapi.__version__.startswith("2"): + return self.jbx.analysis_info(item_id).get('status').lower() == 'finished' + else: + return self.jbx.info(item_id).get('status').lower() == 'finished' except jbxapi.JoeException: return False @@ -90,7 +97,10 @@ def report(self, item_id, report_format="json"): report_format = "jsonfixed" try: - return json.loads(self.jbx.analysis_download(item_id, report_format)[1].decode('utf-8')) + if not jbxapi.__version__.startswith("2"): + return json.loads(self.jbx.analysis_download(item_id, report_format)[1].decode('utf-8')) + else: + return json.loads(self.jbx.download(item_id, report_format)[1].decode('utf-8')) except (jbxapi.JoeException, ValueError, IndexError) as e: raise sandboxapi.SandboxError("error in report fetch: {e}".format(e=e)) diff --git a/setup.py b/setup.py index bd7fe79..d86784e 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name='sandboxapi', - version='1.6.0', + version='1.6.1', include_package_data=True, packages=[ 'sandboxapi', diff --git a/tests/test_cuckoo.py b/tests/test_cuckoo.py index 758cadd..1a5da38 100644 --- a/tests/test_cuckoo.py +++ b/tests/test_cuckoo.py @@ -18,19 +18,19 @@ def setUp(self): def test_analyses(self): responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) + self.assertEqual(len(self.sandbox.analyses()), 2) @responses.activate def test_analyze(self): responses.add(responses.POST, 'http://cuckoo.mock:8090/tasks/create/file', json=read_resource('cuckoo_tasks_create_file')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') @responses.activate def test_check(self): responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/view/1', json=read_resource('cuckoo_tasks_view')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -49,13 +49,13 @@ def test_not_is_available(self): def test_report(self): responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.report(8)['info']['id'], 8) + self.assertEqual(self.sandbox.report(8)['info']['id'], 8) @responses.activate def test_score(self): responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.score(self.sandbox.report(8)), 5) + self.assertEqual(self.sandbox.score(self.sandbox.report(8)), 5) @patch('requests.post') @patch('requests.get') @@ -89,9 +89,9 @@ def test_cuckoo_old_style_host_port_path(self): sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock') responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) + self.assertEqual(len(self.sandbox.analyses()), 2) sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', 9090, '/test') responses.add(responses.GET, 'http://cuckoo.mock:9090/test/tasks/list', json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) + self.assertEqual(len(self.sandbox.analyses()), 2) diff --git a/tests/test_falcon.py b/tests/test_falcon.py index a76cfd3..b960349 100644 --- a/tests/test_falcon.py +++ b/tests/test_falcon.py @@ -20,13 +20,13 @@ def setUp(self): def test_analyze(self): responses.add(responses.POST, 'http://falcon.mock/api/v2/submit/file', json=read_resource('falcon_submit_file'), status=201) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') @responses.activate def test_check(self): responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/state', json=read_resource('falcon_report_state')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -45,13 +45,13 @@ def test_not_is_available(self): def test_report(self): responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.report(1)['job_id'], '1') + self.assertEqual(self.sandbox.report(1)['job_id'], '1') @responses.activate def test_score(self): responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 5) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 5) @patch('requests.post') @patch('requests.get') diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py index c7fec73..3d0209d 100644 --- a/tests/test_fireeye.py +++ b/tests/test_fireeye.py @@ -23,7 +23,7 @@ def test_analyze(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', json=read_resource('fireeye_submissions')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) @responses.activate def test_check(self): @@ -31,7 +31,7 @@ def test_check(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -56,7 +56,7 @@ def test_report(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.report(1)['msg'], 'concise') + self.assertEqual(self.sandbox.report(1)['msg'], 'concise') @responses.activate def test_score(self): @@ -64,7 +64,7 @@ def test_score(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 8) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 8) # Legacy API support. @responses.activate @@ -73,7 +73,7 @@ def test_analyze(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', json=read_resource('fireeye_submissions')) - self.assertEquals(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + self.assertEqual(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) @responses.activate def test_check(self): @@ -81,7 +81,7 @@ def test_check(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.legacy_sandbox.check('1'), True) + self.assertEqual(self.legacy_sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -106,7 +106,7 @@ def test_report(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.report(1)['msg'], 'concise') + self.assertEqual(self.legacy_sandbox.report(1)['msg'], 'concise') @responses.activate def test_score(self): @@ -114,7 +114,7 @@ def test_score(self): headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) + self.assertEqual(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) # Core functionality. @patch('requests.post') @@ -155,7 +155,7 @@ def test_reauthenticates_if_logged_out_http_401(self): status=401) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_reauthenticates_if_logged_out_json_401(self): @@ -165,4 +165,4 @@ def test_reauthenticates_if_logged_out_json_401(self): json=read_resource('fireeye_unauthorized')) responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) diff --git a/tests/test_joe.py b/tests/test_joe.py index 39a61f7..648a19e 100644 --- a/tests/test_joe.py +++ b/tests/test_joe.py @@ -6,6 +6,7 @@ from mock import patch import responses import sandboxapi.joe +import jbxapi from . import read_resource @@ -16,15 +17,23 @@ def setUp(self): @responses.activate def test_analyze(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/submission/new', - json=read_resource('joe_submission_new')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') + if not jbxapi.__version__.startswith("2"): + responses.add(responses.POST, 'http://joe.mock/api/v2/submission/new', + json=read_resource('joe_submission_new')) + else: + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/submit', + json=read_resource('joe_analysis_submit')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') @responses.activate def test_check(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', - json=read_resource('joe_analysis_info')) - self.assertEquals(self.sandbox.check('1'), True) + if not jbxapi.__version__.startswith("2"): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', + json=read_resource('joe_analysis_info')) + else: + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', + json=read_resource('joe_analysis_info')) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -43,13 +52,13 @@ def test_not_is_available(self): def test_report(self): responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) + self.assertEqual(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) @responses.activate def test_score(self): responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 1) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 1) @patch('requests.post') @patch('requests.get') @@ -65,4 +74,4 @@ def test_proxies_is_passed_to_requests(self, m_get, m_post): api = sandboxapi.joe.JoeAPI('key', self.sandbox.jbx.apiurl, True, proxies=proxies) - self.assertEquals(api.jbx.session.proxies, proxies) + self.assertEqual(api.jbx.session.proxies, proxies) diff --git a/tests/test_triage.py b/tests/test_triage.py index ed98e03..ea2d43e 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -23,7 +23,7 @@ def test_analyze(self): json=read_resource('triage_analyze'), status=200) triage_id = self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), "testfile") - self.assertEquals(triage_id, "200707-pht1cwk3ls") + self.assertEqual(triage_id, "200707-pht1cwk3ls") @responses.activate def test_check(self): @@ -44,7 +44,7 @@ def test_report(self): 'http://api.triage.mock/v0/samples/test/summary', json=read_resource('triage_report'), status=200) data = self.sandbox.report("test") - self.assertEquals( + self.assertEqual( 10, data["tasks"]["200615-8jbndpgg9n-behavioral1"]["score"]) @responses.activate @@ -53,7 +53,7 @@ def test_score(self): 'http://api.triage.mock/v0/samples/test/summary', json=read_resource('triage_report'), status=200) score = self.sandbox.score("test") - self.assertEquals(10, score) + self.assertEqual(10, score) @responses.activate def test_full_report(self): diff --git a/tests/test_vmray.py b/tests/test_vmray.py index 8d59fae..1f2ead4 100644 --- a/tests/test_vmray.py +++ b/tests/test_vmray.py @@ -20,20 +20,20 @@ def setUp(self): def test_analyze(self): responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', json=read_resource('vmray_sample_submit')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) @responses.activate def test_analyze_with_errors(self): responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', json=read_resource('vmray_sample_submit_errors')) with self.assertRaises(sandboxapi.SandboxError): - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) @responses.activate def test_check(self): responses.add(responses.GET, 'http://vmray.mock/rest/submission/sample/1', json=read_resource('vmray_submission_sample')) - self.assertEquals(self.sandbox.check('1'), True) + self.assertEqual(self.sandbox.check('1'), True) @responses.activate def test_is_available(self): @@ -54,7 +54,7 @@ def test_report(self): json=read_resource('vmray_analysis_sample')) responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.report(1)['version'], 1) + self.assertEqual(self.sandbox.report(1)['version'], 1) @responses.activate def test_score(self): @@ -62,7 +62,7 @@ def test_score(self): json=read_resource('vmray_analysis_sample')) responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 20) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 20) @patch('requests.post') @patch('requests.get') From bac703ab58da33ffe8ab29a029ae4df0bfe456ae Mon Sep 17 00:00:00 2001 From: Hifumi1337 Date: Thu, 27 Oct 2022 10:50:02 -0500 Subject: [PATCH 03/10] Update unit tests - Improved testing suite, minimizing imports for CI and performance - FireEyeAPI unit test modern/legacy separation - Platforms tested: Windows, Linux - Versions tested: Python2, Python3 --- tests/test_cuckoo.py | 7 ++- tests/test_falcon.py | 10 +-- tests/test_fireeye.py | 133 +++++++++++++++++++++------------------ tests/test_joe.py | 7 ++- tests/test_sandboxapi.py | 12 ++-- tests/test_triage.py | 9 ++- tests/test_vmray.py | 9 ++- 7 files changed, 96 insertions(+), 91 deletions(-) diff --git a/tests/test_cuckoo.py b/tests/test_cuckoo.py index 1a5da38..7de0f98 100644 --- a/tests/test_cuckoo.py +++ b/tests/test_cuckoo.py @@ -1,15 +1,16 @@ import io -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY + import responses import sandboxapi.cuckoo from . import read_resource - -class TestCuckoo(unittest.TestCase): +class TestCuckoo(TestCase): def setUp(self): self.sandbox = sandboxapi.cuckoo.CuckooAPI('http://cuckoo.mock:8090/') diff --git a/tests/test_falcon.py b/tests/test_falcon.py index b960349..f042a30 100644 --- a/tests/test_falcon.py +++ b/tests/test_falcon.py @@ -1,17 +1,17 @@ import io -import os -import json -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY + import responses import sandboxapi.falcon -from . import read_resource +from . import read_resource -class TestFalcon(unittest.TestCase): +class TestFalcon(TestCase): def setUp(self): self.sandbox = sandboxapi.falcon.FalconAPI('key', 'http://falcon.mock/api/v2') diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py index 3d0209d..40dfd5a 100644 --- a/tests/test_fireeye.py +++ b/tests/test_fireeye.py @@ -1,30 +1,36 @@ import io -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY + import responses -import sandboxapi.fireeye +from sandboxapi.fireeye import FireEyeAPI from . import read_resource +class Init(): -class TestFireEye(unittest.TestCase): + def setup_test(is_legacy: bool) -> FireEyeAPI: + if is_legacy: + legacy_sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) + return legacy_sandbox + else: + sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') + return sandbox - def setUp(self): - self.sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') - self.legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', - 'http://fireeye.mock', 'profile', - legacy_api=True) +class TestFireEye(TestCase): + sandbox = Init.setup_test(False) @responses.activate def test_analyze(self): responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) + headers={'X-FeApi-Token': 'MOCK'}) responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', - json=read_resource('fireeye_submissions')) + json=read_resource('fireeye_submissions')) self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - + @responses.activate def test_check(self): responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', @@ -66,56 +72,6 @@ def test_score(self): json=read_resource('fireeye_submissions_results')) self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 8) - # Legacy API support. - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', - json=read_resource('fireeye_submissions')) - self.assertEqual(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEqual(self.legacy_sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - json=read_resource('fireeye_config')) - self.assertTrue(self.legacy_sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.legacy_sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - status=500) - self.assertFalse(self.legacy_sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEqual(self.legacy_sandbox.report(1)['msg'], 'concise') - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEqual(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) - # Core functionality. @patch('requests.post') @patch('requests.get') @@ -131,7 +87,7 @@ def test_proxies_is_passed_to_requests(self, m_get, m_post): 'https': 'http://10.10.1.10:1080', } - api = sandboxapi.fireeye.FireEyeAPI('username', 'password', + api = FireEyeAPI('username', 'password', self.sandbox.api_url, 'profile', proxies=proxies) api._request('/test') @@ -166,3 +122,56 @@ def test_reauthenticates_if_logged_out_json_401(self): responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', json=read_resource('fireeye_submissions_status')) self.assertEqual(self.sandbox.check('1'), True) + +class TestFireEyeLegacy(TestCase): + legacy_sandbox = Init.setup_test(True) + + # Legacy API support + @responses.activate + def legacy_test_analyze(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', + json=read_resource('fireeye_submissions')) + self.assertEqual(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + + @responses.activate + def legacy_test_check(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEqual(self.legacy_sandbox.check('1'), True) + + @responses.activate + def legacy_test_is_available(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + json=read_resource('fireeye_config')) + self.assertTrue(self.legacy_sandbox.is_available()) + + @responses.activate + def legacy_test_not_is_available(self): + self.assertFalse(self.legacy_sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + status=500) + self.assertFalse(self.legacy_sandbox.is_available()) + + @responses.activate + def legacy_test_report(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.legacy_sandbox.report(1)['msg'], 'concise') + + @responses.activate + def legacy_test_score(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) \ No newline at end of file diff --git a/tests/test_joe.py b/tests/test_joe.py index 648a19e..97909b8 100644 --- a/tests/test_joe.py +++ b/tests/test_joe.py @@ -1,16 +1,17 @@ import io -import unittest +from unittest import TestCase + try: from unittest.mock import patch except ImportError: from mock import patch + import responses import sandboxapi.joe import jbxapi from . import read_resource - -class TestJoe(unittest.TestCase): +class TestJoe(TestCase): def setUp(self): self.sandbox = sandboxapi.joe.JoeAPI('key', 'http://joe.mock/api', True) diff --git a/tests/test_sandboxapi.py b/tests/test_sandboxapi.py index aff72ce..77e7dbc 100644 --- a/tests/test_sandboxapi.py +++ b/tests/test_sandboxapi.py @@ -1,17 +1,13 @@ -import io -import os -import json -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY -import responses -import sandboxapi -from . import read_resource +import sandboxapi -class TestSandboxAPI(unittest.TestCase): +class TestSandboxAPI(TestCase): @patch('requests.post') @patch('requests.get') diff --git a/tests/test_triage.py b/tests/test_triage.py index ea2d43e..0b59a1f 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -1,17 +1,16 @@ import io -import os -import json -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY + import responses import sandboxapi.triage from . import read_resource - -class TestTriage(unittest.TestCase): +class TestTriage(TestCase): def setUp(self): self.sandbox = sandboxapi.triage.TriageAPI("key", "http://api.triage.mock") diff --git a/tests/test_vmray.py b/tests/test_vmray.py index 1f2ead4..b0b4501 100644 --- a/tests/test_vmray.py +++ b/tests/test_vmray.py @@ -1,17 +1,16 @@ import io -import os -import json -import unittest +from unittest import TestCase + try: from unittest.mock import patch, ANY as MOCK_ANY except ImportError: from mock import patch, ANY as MOCK_ANY + import responses import sandboxapi.vmray from . import read_resource - -class TestVMRay(unittest.TestCase): +class TestVMRay(TestCase): def setUp(self): self.sandbox = sandboxapi.vmray.VMRayAPI('key', 'http://vmray.mock') From 6b5e737c326b649b113e44010117567e2586e087 Mon Sep 17 00:00:00 2001 From: Hifumi1337 Date: Mon, 31 Oct 2022 08:12:07 -0500 Subject: [PATCH 04/10] Fixed CI issues, should now run a successful CI workflow - Removed 2 test conditions for now --- tests/test_fireeye.py | 46 +++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py index 40dfd5a..4a5d270 100644 --- a/tests/test_fireeye.py +++ b/tests/test_fireeye.py @@ -10,18 +10,8 @@ from sandboxapi.fireeye import FireEyeAPI from . import read_resource -class Init(): - - def setup_test(is_legacy: bool) -> FireEyeAPI: - if is_legacy: - legacy_sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) - return legacy_sandbox - else: - sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') - return sandbox - class TestFireEye(TestCase): - sandbox = Init.setup_test(False) + sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') @responses.activate def test_analyze(self): @@ -47,14 +37,14 @@ def test_is_available(self): json=read_resource('fireeye_config')) self.assertTrue(self.sandbox.is_available()) - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - status=500) - self.assertFalse(self.sandbox.is_available()) + # @responses.activate + # def test_not_is_available(self): + # self.assertFalse(self.sandbox.is_available()) + # responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + # headers={'X-FeApi-Token': 'MOCK'}) + # responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + # status=500) + # self.assertFalse(self.sandbox.is_available()) @responses.activate def test_report(self): @@ -124,7 +114,7 @@ def test_reauthenticates_if_logged_out_json_401(self): self.assertEqual(self.sandbox.check('1'), True) class TestFireEyeLegacy(TestCase): - legacy_sandbox = Init.setup_test(True) + legacy_sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) # Legacy API support @responses.activate @@ -151,14 +141,14 @@ def legacy_test_is_available(self): json=read_resource('fireeye_config')) self.assertTrue(self.legacy_sandbox.is_available()) - @responses.activate - def legacy_test_not_is_available(self): - self.assertFalse(self.legacy_sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - status=500) - self.assertFalse(self.legacy_sandbox.is_available()) + # @responses.activate + # def legacy_test_not_is_available(self): + # self.assertFalse(self.legacy_sandbox.is_available()) + # responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + # headers={'X-FeApi-Token': 'MOCK'}) + # responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + # status=500) + # self.assertFalse(self.legacy_sandbox.is_available()) @responses.activate def legacy_test_report(self): From 584922d43c38fc1533e206497911a8b8fbf4925b Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Tue, 6 Dec 2022 12:45:01 -0600 Subject: [PATCH 05/10] Added custom workflow for tests --- .github/workflows/tests.yml | 25 +++++++++++++++++++++++++ README.rst | 3 +++ 2 files changed, 28 insertions(+) create mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..be06c05 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,25 @@ +name: sandbox-workflow + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["2.7", "3.8", "3.9"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments nose + - name: Test scripts + run: | + coverage run -m pytest + nosetests \ No newline at end of file diff --git a/README.rst b/README.rst index 3ab247c..e6c3e61 100644 --- a/README.rst +++ b/README.rst @@ -7,6 +7,9 @@ sandboxapi .. image:: https://app.travis-ci.com/InQuest/python-sandboxapi.svg?branch=master :target: https://app.travis-ci.com/InQuest/python-sandboxapi :alt: Build Status +.. image:: https://github.com/InQuest/python-sandboxapi/workflows/sandbox-workflow/badge.svg?branch=master + :target: https://github.com/InQuest/python-sandboxapi/actions + :alt: Build Status (GitHub Workflow) .. image:: https://readthedocs.org/projects/sandboxapi/badge/?version=latest :target: https://inquest.readthedocs.io/projects/sandboxapi/en/latest/?badge=latest :alt: Documentation Status From 4a25a4d853fcd904f8609da8c9c887bc4a42219f Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Tue, 6 Dec 2022 12:53:02 -0600 Subject: [PATCH 06/10] Update README.rst --- README.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.rst b/README.rst index e6c3e61..67bc8cc 100644 --- a/README.rst +++ b/README.rst @@ -10,6 +10,9 @@ sandboxapi .. image:: https://github.com/InQuest/python-sandboxapi/workflows/sandbox-workflow/badge.svg?branch=master :target: https://github.com/InQuest/python-sandboxapi/actions :alt: Build Status (GitHub Workflow) +.. image:: https://github.com/InQuest/python-sandboxapi/workflows/sandbox-workflow/badge.svg?branch=develop + :target: https://github.com/InQuest/python-sandboxapi/actions + :alt: Build Status - Dev (GitHub Workflow) .. image:: https://readthedocs.org/projects/sandboxapi/badge/?version=latest :target: https://inquest.readthedocs.io/projects/sandboxapi/en/latest/?badge=latest :alt: Documentation Status From a116bd1962634a88d180a54bb0a784b94df8cb06 Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Fri, 9 Dec 2022 10:36:13 -0600 Subject: [PATCH 07/10] Fixed workflow --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index be06c05..22be17e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,5 +21,5 @@ jobs: pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments nose - name: Test scripts run: | - coverage run -m pytest - nosetests \ No newline at end of file + coverage run -m unittest tests/* + nosetests tests/* \ No newline at end of file From 06c2331b47ccb318293a8d36763add2931914968 Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Fri, 9 Dec 2022 10:42:38 -0600 Subject: [PATCH 08/10] Back in business --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 22be17e..1072f52 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,5 +21,5 @@ jobs: pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments nose - name: Test scripts run: | - coverage run -m unittest tests/* + coverage run -m unittest discover nosetests tests/* \ No newline at end of file From 232341dba5391bc9d2724b61554bc32f9a538fd5 Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Fri, 9 Dec 2022 11:22:32 -0600 Subject: [PATCH 09/10] Fixed FireEye test --- sandboxapi/fireeye.py | 31 ++++++++++++++----------------- tests/test_fireeye.py | 40 ++++++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/sandboxapi/fireeye.py b/sandboxapi/fireeye.py index 240fb58..2a7ede9 100644 --- a/sandboxapi/fireeye.py +++ b/sandboxapi/fireeye.py @@ -144,25 +144,22 @@ def is_available(self): :rtype: bool :return: True if service is available, False otherwise. """ - # if the availability flag is raised, return True immediately. - # NOTE: subsequent API failures will lower this flag. we do this here - # to ensure we don't keep hitting FireEye with requests while - # availability is there. - if self.server_available: - return True - - # otherwise, we have to check with the cloud. - else: - try: - response = self._request("/config") - # we've got fireeye. - if response.status_code == 200: - self.server_available = True - return True + try: + response = self._request("/config") - except sandboxapi.SandboxError: - pass + # Successfully connected to FireEye + if response.status_code == 200: + self.server_available = True + return True + + # Unable to connect to FireEye + if response.status_code >= 500: + self.server_available = False + return False + + except sandboxapi.SandboxError: + pass self.server_available = False return False diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py index 4a5d270..bbefd94 100644 --- a/tests/test_fireeye.py +++ b/tests/test_fireeye.py @@ -7,11 +7,11 @@ from mock import patch, ANY as MOCK_ANY import responses -from sandboxapi.fireeye import FireEyeAPI +import sandboxapi.fireeye from . import read_resource class TestFireEye(TestCase): - sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') + sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') @responses.activate def test_analyze(self): @@ -37,14 +37,14 @@ def test_is_available(self): json=read_resource('fireeye_config')) self.assertTrue(self.sandbox.is_available()) - # @responses.activate - # def test_not_is_available(self): - # self.assertFalse(self.sandbox.is_available()) - # responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - # headers={'X-FeApi-Token': 'MOCK'}) - # responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - # status=500) - # self.assertFalse(self.sandbox.is_available()) + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + status=500) + self.assertFalse(self.sandbox.is_available()) @responses.activate def test_report(self): @@ -77,7 +77,7 @@ def test_proxies_is_passed_to_requests(self, m_get, m_post): 'https': 'http://10.10.1.10:1080', } - api = FireEyeAPI('username', 'password', + api = sandboxapi.fireeye.FireEyeAPI('username', 'password', self.sandbox.api_url, 'profile', proxies=proxies) api._request('/test') @@ -114,7 +114,7 @@ def test_reauthenticates_if_logged_out_json_401(self): self.assertEqual(self.sandbox.check('1'), True) class TestFireEyeLegacy(TestCase): - legacy_sandbox = FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) + legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) # Legacy API support @responses.activate @@ -141,14 +141,14 @@ def legacy_test_is_available(self): json=read_resource('fireeye_config')) self.assertTrue(self.legacy_sandbox.is_available()) - # @responses.activate - # def legacy_test_not_is_available(self): - # self.assertFalse(self.legacy_sandbox.is_available()) - # responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - # headers={'X-FeApi-Token': 'MOCK'}) - # responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - # status=500) - # self.assertFalse(self.legacy_sandbox.is_available()) + @responses.activate + def legacy_test_not_is_available(self): + self.assertFalse(self.legacy_sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + status=500) + self.assertFalse(self.legacy_sandbox.is_available()) @responses.activate def legacy_test_report(self): From ee1f9987ab57ad72e5f074bf7b5719c712d3d811 Mon Sep 17 00:00:00 2001 From: Hifumi1337 <56496067+Hifumi1337@users.noreply.github.com> Date: Fri, 9 Dec 2022 11:26:59 -0600 Subject: [PATCH 10/10] Make Codacy happy --- sandboxapi/fireeye.py | 1 - tests/test_opswat.py | 11 ----------- tests/test_wildfire.py | 10 ---------- 3 files changed, 22 deletions(-) delete mode 100644 tests/test_opswat.py delete mode 100644 tests/test_wildfire.py diff --git a/sandboxapi/fireeye.py b/sandboxapi/fireeye.py index 2a7ede9..befeef4 100644 --- a/sandboxapi/fireeye.py +++ b/sandboxapi/fireeye.py @@ -157,7 +157,6 @@ def is_available(self): if response.status_code >= 500: self.server_available = False return False - except sandboxapi.SandboxError: pass diff --git a/tests/test_opswat.py b/tests/test_opswat.py deleted file mode 100644 index 373b52b..0000000 --- a/tests/test_opswat.py +++ /dev/null @@ -1,11 +0,0 @@ -import io -import os -import json -import unittest -try: - from unittest.mock import patch, ANY as MOCK_ANY -except ImportError: - from mock import patch, ANY as MOCK_ANY -import responses -import sandboxapi.opswat -from . import read_resource diff --git a/tests/test_wildfire.py b/tests/test_wildfire.py deleted file mode 100644 index 05802b9..0000000 --- a/tests/test_wildfire.py +++ /dev/null @@ -1,10 +0,0 @@ -import io -import os -import json -import unittest -try: - from unittest.mock import patch, ANY as MOCK_ANY -except ImportError: - from mock import patch, ANY as MOCK_ANY -import responses -from . import read_resource