diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..1072f52 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,25 @@ +name: sandbox-workflow + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["2.7", "3.8", "3.9"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments nose + - name: Test scripts + run: | + coverage run -m unittest discover + nosetests tests/* \ No newline at end of file diff --git a/.gitignore b/.gitignore index 54ffadf..0316c54 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ dist/ _build/ /build_dist.sh .vscode/ +/.virtualenv/ +/.pytest_cache/ +Pipfile.lock diff --git a/.travis.yml b/.travis.yml index 2b6c2cf..eb22c8a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,22 +1,22 @@ language: python -python: -# - "2.6" # -- responses module requires 2.7+ - - "2.7" -# TODO: resolve issues with older 3.x versions - # - "3.3" - # - "3.4" - - "3.5" -install: - - "pip install -r requirements.txt" - - "pip install nose" - - "pip install responses" - - "pip install coveralls" - - "pip install codacy-coverage" - - "pip install collective.checkdocs Pygments" -script: - - nosetests --with-coverage --cover-package=sandboxapi - - python setup.py checkdocs -after_success: - - coveralls - - coverage xml - - bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r coverage.xml +jobs: + include: + - name: "Python 3.9" + python: 3.9 + install: + - "pip install -r requirements.txt" + - "pip install pytest pytest-mock coverage requests-mock responses collective.checkdocs Pygments" + script: + - coverage run -m pytest + - python setup.py checkdocs + after_success: + - coveralls + - coverage xml + - if [ "$TRAVIS_BRANCH" = "master" ]; then bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r coverage.xml; fi + - name: "Python 2.7" + python: 2.7 + install: + - "pip install -r requirements.txt" + - "pip install nose mock requests-mock responses collective.checkdocs Pygments" + script: + - nosetests diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..3a8c89a --- /dev/null +++ b/Pipfile @@ -0,0 +1,19 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +requests = "*" +jbxapi = "*" +xmltodict = "*" + +[dev-packages] +pytest = "*" +coverage = "*" +responses = "*" +"collective.checkdocs" = "*" +pygments = "*" + +[requires] +python_version = "3.9" diff --git a/README.rst b/README.rst index 3ab247c..67bc8cc 100644 --- a/README.rst +++ b/README.rst @@ -7,6 +7,12 @@ sandboxapi .. image:: https://app.travis-ci.com/InQuest/python-sandboxapi.svg?branch=master :target: https://app.travis-ci.com/InQuest/python-sandboxapi :alt: Build Status +.. image:: https://github.com/InQuest/python-sandboxapi/workflows/sandbox-workflow/badge.svg?branch=master + :target: https://github.com/InQuest/python-sandboxapi/actions + :alt: Build Status (GitHub Workflow) +.. image:: https://github.com/InQuest/python-sandboxapi/workflows/sandbox-workflow/badge.svg?branch=develop + :target: https://github.com/InQuest/python-sandboxapi/actions + :alt: Build Status - Dev (GitHub Workflow) .. image:: https://readthedocs.org/projects/sandboxapi/badge/?version=latest :target: https://inquest.readthedocs.io/projects/sandboxapi/en/latest/?badge=latest :alt: Documentation Status diff --git a/requirements.txt b/requirements.txt index b8eaee9..270849c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ requests -jbxapi==2.10.1 +jbxapi xmltodict diff --git a/sandboxapi/fireeye.py b/sandboxapi/fireeye.py index 240fb58..befeef4 100644 --- a/sandboxapi/fireeye.py +++ b/sandboxapi/fireeye.py @@ -144,25 +144,21 @@ def is_available(self): :rtype: bool :return: True if service is available, False otherwise. """ - # if the availability flag is raised, return True immediately. - # NOTE: subsequent API failures will lower this flag. we do this here - # to ensure we don't keep hitting FireEye with requests while - # availability is there. - if self.server_available: - return True - - # otherwise, we have to check with the cloud. - else: - try: - response = self._request("/config") - # we've got fireeye. - if response.status_code == 200: - self.server_available = True - return True + try: + response = self._request("/config") - except sandboxapi.SandboxError: - pass + # Successfully connected to FireEye + if response.status_code == 200: + self.server_available = True + return True + + # Unable to connect to FireEye + if response.status_code >= 500: + self.server_available = False + return False + except sandboxapi.SandboxError: + pass self.server_available = False return False diff --git a/sandboxapi/joe.py b/sandboxapi/joe.py index 081c1be..396d757 100644 --- a/sandboxapi/joe.py +++ b/sandboxapi/joe.py @@ -1,7 +1,5 @@ import json - import jbxapi - import sandboxapi class JoeAPI(sandboxapi.SandboxAPI): @@ -10,9 +8,11 @@ class JoeAPI(sandboxapi.SandboxAPI): This class is actually just a convenience wrapper around jbxapi.JoeSandbox. """ - def __init__(self, apikey, apiurl, accept_tac, timeout=None, verify_ssl=True, retries=3, **kwargs): + def __init__(self, apikey, apiurl, accept_tac, timeout=None, verify_ssl=True, retries=3, chunked=False, **kwargs): """Initialize the interface to Joe Sandbox API.""" sandboxapi.SandboxAPI.__init__(self) + if not jbxapi.__version__.startswith("2"): + self._chunked = chunked self.jbx = jbxapi.JoeSandbox(apikey, apiurl or jbxapi.API_URL, accept_tac, timeout, bool(int(verify_ssl)), retries, **kwargs) def analyze(self, handle, filename): @@ -30,7 +30,10 @@ def analyze(self, handle, filename): handle.seek(0) try: - return self.jbx.submit_sample(handle)['webids'][0] + if not jbxapi.__version__.startswith("2"): + return self.jbx.submit_sample(handle, _chunked_upload=self._chunked)['submission_id'] + else: + return self.jbx.submit_sample(handle)['webids'][0] except (jbxapi.JoeException, KeyError, IndexError) as e: raise sandboxapi.SandboxError("error in analyze: {e}".format(e=e)) @@ -44,12 +47,13 @@ def check(self, item_id): :return: Boolean indicating if a report is done or not. """ try: - return self.jbx.info(item_id).get('status').lower() == 'finished' + if not jbxapi.__version__.startswith("2"): + return self.jbx.analysis_info(item_id).get('status').lower() == 'finished' + else: + return self.jbx.info(item_id).get('status').lower() == 'finished' except jbxapi.JoeException: return False - return False - def is_available(self): """Determine if the Joe Sandbox API server is alive. @@ -93,7 +97,10 @@ def report(self, item_id, report_format="json"): report_format = "jsonfixed" try: - return json.loads(self.jbx.download(item_id, report_format)[1].decode('utf-8')) + if not jbxapi.__version__.startswith("2"): + return json.loads(self.jbx.analysis_download(item_id, report_format)[1].decode('utf-8')) + else: + return json.loads(self.jbx.download(item_id, report_format)[1].decode('utf-8')) except (jbxapi.JoeException, ValueError, IndexError) as e: raise sandboxapi.SandboxError("error in report fetch: {e}".format(e=e)) diff --git a/setup.py b/setup.py index bd7fe79..d86784e 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name='sandboxapi', - version='1.6.0', + version='1.6.1', include_package_data=True, packages=[ 'sandboxapi', diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..fe9bf3b --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,6 @@ +import os +import json + +def read_resource(resource): + with open(os.path.join('tests', 'resources', '{r}.json'.format(r=resource)), 'r') as f: + return json.loads(f.read()) diff --git a/tests/resources/joe_submission_new.json b/tests/resources/joe_submission_new.json new file mode 100644 index 0000000..be119b6 --- /dev/null +++ b/tests/resources/joe_submission_new.json @@ -0,0 +1,5 @@ +{ + "data": { + "submission_id": "100001" + } +} \ No newline at end of file diff --git a/tests/test_cuckoo.py b/tests/test_cuckoo.py new file mode 100644 index 0000000..7de0f98 --- /dev/null +++ b/tests/test_cuckoo.py @@ -0,0 +1,98 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import responses +import sandboxapi.cuckoo +from . import read_resource + +class TestCuckoo(TestCase): + + def setUp(self): + self.sandbox = sandboxapi.cuckoo.CuckooAPI('http://cuckoo.mock:8090/') + + @responses.activate + def test_analyses(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEqual(len(self.sandbox.analyses()), 2) + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://cuckoo.mock:8090/tasks/create/file', + json=read_resource('cuckoo_tasks_create_file')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/view/1', + json=read_resource('cuckoo_tasks_view')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', + json=read_resource('cuckoo_status')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', + json=read_resource('cuckoo_tasks_report')) + self.assertEqual(self.sandbox.report(8)['info']['id'], 8) + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', + json=read_resource('cuckoo_tasks_report')) + self.assertEqual(self.sandbox.score(self.sandbox.report(8)), 5) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) + + @responses.activate + def test_cuckoo_old_style_host_port_path(self): + sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock') + responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEqual(len(self.sandbox.analyses()), 2) + + sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', 9090, '/test') + responses.add(responses.GET, 'http://cuckoo.mock:9090/test/tasks/list', + json=read_resource('cuckoo_tasks_list')) + self.assertEqual(len(self.sandbox.analyses()), 2) diff --git a/tests/test_falcon.py b/tests/test_falcon.py new file mode 100644 index 0000000..f042a30 --- /dev/null +++ b/tests/test_falcon.py @@ -0,0 +1,81 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import responses +import sandboxapi.falcon + +from . import read_resource + +class TestFalcon(TestCase): + + def setUp(self): + self.sandbox = sandboxapi.falcon.FalconAPI('key', 'http://falcon.mock/api/v2') + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://falcon.mock/api/v2/submit/file', + json=read_resource('falcon_submit_file'), status=201) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/state', + json=read_resource('falcon_report_state')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', + json=read_resource('falcon_system_heartbeat')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', + json=read_resource('falcon_report_summary')) + self.assertEqual(self.sandbox.report(1)['job_id'], '1') + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', + json=read_resource('falcon_report_summary')) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 5) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.falcon.FalconAPI('key', self.sandbox.api_url, + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) diff --git a/tests/test_fireeye.py b/tests/test_fireeye.py new file mode 100644 index 0000000..bbefd94 --- /dev/null +++ b/tests/test_fireeye.py @@ -0,0 +1,167 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import responses +import sandboxapi.fireeye +from . import read_resource + +class TestFireEye(TestCase): + sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', + json=read_resource('fireeye_submissions')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + + @responses.activate + def test_check(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + json=read_resource('fireeye_config')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.sandbox.report(1)['msg'], 'concise') + + @responses.activate + def test_score(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 8) + + # Core functionality. + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_get.return_value.content = b'' + m_post.return_value.status_code = 200 + m_post.return_value.content = b'' + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.fireeye.FireEyeAPI('username', 'password', + self.sandbox.api_url, 'profile', + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) + + @responses.activate + def test_reauthenticates_if_logged_out_http_401(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + status=401) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_reauthenticates_if_logged_out_json_401(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_unauthorized')) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEqual(self.sandbox.check('1'), True) + +class TestFireEyeLegacy(TestCase): + legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile', legacy_api=True) + + # Legacy API support + @responses.activate + def legacy_test_analyze(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', + json=read_resource('fireeye_submissions')) + self.assertEqual(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) + + @responses.activate + def legacy_test_check(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', + json=read_resource('fireeye_submissions_status')) + self.assertEqual(self.legacy_sandbox.check('1'), True) + + @responses.activate + def legacy_test_is_available(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + json=read_resource('fireeye_config')) + self.assertTrue(self.legacy_sandbox.is_available()) + + @responses.activate + def legacy_test_not_is_available(self): + self.assertFalse(self.legacy_sandbox.is_available()) + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', + status=500) + self.assertFalse(self.legacy_sandbox.is_available()) + + @responses.activate + def legacy_test_report(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.legacy_sandbox.report(1)['msg'], 'concise') + + @responses.activate + def legacy_test_score(self): + responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', + headers={'X-FeApi-Token': 'MOCK'}) + responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', + json=read_resource('fireeye_submissions_results')) + self.assertEqual(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) \ No newline at end of file diff --git a/tests/test_joe.py b/tests/test_joe.py new file mode 100644 index 0000000..97909b8 --- /dev/null +++ b/tests/test_joe.py @@ -0,0 +1,78 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch +except ImportError: + from mock import patch + +import responses +import sandboxapi.joe +import jbxapi +from . import read_resource + +class TestJoe(TestCase): + + def setUp(self): + self.sandbox = sandboxapi.joe.JoeAPI('key', 'http://joe.mock/api', True) + + @responses.activate + def test_analyze(self): + if not jbxapi.__version__.startswith("2"): + responses.add(responses.POST, 'http://joe.mock/api/v2/submission/new', + json=read_resource('joe_submission_new')) + else: + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/submit', + json=read_resource('joe_analysis_submit')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') + + @responses.activate + def test_check(self): + if not jbxapi.__version__.startswith("2"): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', + json=read_resource('joe_analysis_info')) + else: + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', + json=read_resource('joe_analysis_info')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', + json=read_resource('joe_server_online')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', + json=read_resource('joe_analysis_download')) + self.assertEqual(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) + + @responses.activate + def test_score(self): + responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', + json=read_resource('joe_analysis_download')) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 1) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.joe.JoeAPI('key', self.sandbox.jbx.apiurl, True, + proxies=proxies) + self.assertEqual(api.jbx.session.proxies, proxies) diff --git a/tests/test_sandboxapi.py b/tests/test_sandboxapi.py new file mode 100644 index 0000000..77e7dbc --- /dev/null +++ b/tests/test_sandboxapi.py @@ -0,0 +1,35 @@ +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import sandboxapi + +class TestSandboxAPI(TestCase): + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.SandboxAPI(proxies=proxies) + api.api_url = 'http://sandbox.mock' + api._request('/test') + + m_get.assert_called_once_with('http://sandbox.mock/test', auth=None, + headers=None, params=None, proxies=proxies, + verify=True) + + api._request('/test', method='POST') + + m_post.assert_called_once_with('http://sandbox.mock/test', auth=None, + headers=None, data=None, files=None, + proxies=proxies, verify=True) diff --git a/tests/test_triage.py b/tests/test_triage.py new file mode 100644 index 0000000..0b59a1f --- /dev/null +++ b/tests/test_triage.py @@ -0,0 +1,73 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import responses +import sandboxapi.triage +from . import read_resource + +class TestTriage(TestCase): + def setUp(self): + self.sandbox = sandboxapi.triage.TriageAPI("key", + "http://api.triage.mock") + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, + 'http://api.triage.mock/v0/samples', + json=read_resource('triage_analyze'), status=200) + triage_id = self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), + "testfile") + self.assertEqual(triage_id, "200707-pht1cwk3ls") + + @responses.activate + def test_check(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/status', + json=read_resource('triage_check'), status=200) + self.assertTrue(self.sandbox.check("test")) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://api.triage.mock/v0/samples', + json=read_resource('triage_available'), status=200) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/summary', + json=read_resource('triage_report'), status=200) + data = self.sandbox.report("test") + self.assertEqual( + 10, data["tasks"]["200615-8jbndpgg9n-behavioral1"]["score"]) + + @responses.activate + def test_score(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/test/summary', + json=read_resource('triage_report'), status=200) + score = self.sandbox.score("test") + self.assertEqual(10, score) + + @responses.activate + def test_full_report(self): + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/summary', + json=read_resource('triage_report'), status=200) + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral1/report_triage.json', + json=read_resource('triage_behavioral1'), status=200) + responses.add(responses.GET, + 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral2/report_triage.json', + json=read_resource('triage_behavioral2'), status=200) + + full_report = self.sandbox.full_report("200615-8jbndpgg9n") + self.assertTrue(full_report["tasks"]["behavioral1"]["sample"]["score"], + 10) + self.assertTrue(full_report["tasks"]["behavioral2"]["sample"]["score"], + 10) diff --git a/tests/test_vmray.py b/tests/test_vmray.py new file mode 100644 index 0000000..b0b4501 --- /dev/null +++ b/tests/test_vmray.py @@ -0,0 +1,91 @@ +import io +from unittest import TestCase + +try: + from unittest.mock import patch, ANY as MOCK_ANY +except ImportError: + from mock import patch, ANY as MOCK_ANY + +import responses +import sandboxapi.vmray +from . import read_resource + +class TestVMRay(TestCase): + + def setUp(self): + self.sandbox = sandboxapi.vmray.VMRayAPI('key', 'http://vmray.mock') + + @responses.activate + def test_analyze(self): + responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', + json=read_resource('vmray_sample_submit')) + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) + + @responses.activate + def test_analyze_with_errors(self): + responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', + json=read_resource('vmray_sample_submit_errors')) + with self.assertRaises(sandboxapi.SandboxError): + self.assertEqual(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) + + @responses.activate + def test_check(self): + responses.add(responses.GET, 'http://vmray.mock/rest/submission/sample/1', + json=read_resource('vmray_submission_sample')) + self.assertEqual(self.sandbox.check('1'), True) + + @responses.activate + def test_is_available(self): + responses.add(responses.GET, 'http://vmray.mock/rest/system_info', + json=read_resource('vmray_system_info')) + self.assertTrue(self.sandbox.is_available()) + + @responses.activate + def test_not_is_available(self): + self.assertFalse(self.sandbox.is_available()) + responses.add(responses.GET, 'http://vmray.mock/rest/system_info', + status=500) + self.assertFalse(self.sandbox.is_available()) + + @responses.activate + def test_report(self): + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', + json=read_resource('vmray_analysis_sample')) + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', + json=read_resource('vmray_analysis_archive_logs_summary')) + self.assertEqual(self.sandbox.report(1)['version'], 1) + + @responses.activate + def test_score(self): + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', + json=read_resource('vmray_analysis_sample')) + responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', + json=read_resource('vmray_analysis_archive_logs_summary')) + self.assertEqual(self.sandbox.score(self.sandbox.report(1)), 20) + + @patch('requests.post') + @patch('requests.get') + def test_proxies_is_passed_to_requests(self, m_get, m_post): + + m_get.return_value.status_code = 200 + m_post.return_value.status_code = 200 + + proxies = { + 'http': 'http://10.10.1.10:3128', + 'https': 'http://10.10.1.10:1080', + } + + api = sandboxapi.vmray.VMRayAPI('key', self.sandbox.api_url, + proxies=proxies) + api._request('/test') + + m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, params=MOCK_ANY, + proxies=proxies, verify=MOCK_ANY) + + api._request('/test', method='POST') + + m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, + headers=MOCK_ANY, data=MOCK_ANY, + files=None, proxies=proxies, + verify=MOCK_ANY) diff --git a/tests/tests.py b/tests/tests.py deleted file mode 100644 index 23f6794..0000000 --- a/tests/tests.py +++ /dev/null @@ -1,570 +0,0 @@ -import io -import os -import json -import unittest -try: - from unittest.mock import patch, ANY as MOCK_ANY -except ImportError: - from mock import patch, ANY as MOCK_ANY - -import responses - -import sandboxapi.cuckoo -import sandboxapi.fireeye -import sandboxapi.joe -import sandboxapi.vmray -import sandboxapi.falcon -import sandboxapi.triage - -def read_resource(resource): - with open(os.path.join('tests', 'resources', '{r}.json'.format(r=resource)), 'r') as f: - return json.loads(f.read()) - - -class TestCuckoo(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.cuckoo.CuckooAPI('http://cuckoo.mock:8090/') - - @responses.activate - def test_analyses(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://cuckoo.mock:8090/tasks/create/file', - json=read_resource('cuckoo_tasks_create_file')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/view/1', - json=read_resource('cuckoo_tasks_view')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', - json=read_resource('cuckoo_status')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://cuckoo.mock:8090/cuckoo/status', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', - json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.report(8)['info']['id'], 8) - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/report/8/json', - json=read_resource('cuckoo_tasks_report')) - self.assertEquals(self.sandbox.score(self.sandbox.report(8)), 5) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - @responses.activate - def test_cuckoo_old_style_host_port_path(self): - sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock') - responses.add(responses.GET, 'http://cuckoo.mock:8090/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - sandbox = sandboxapi.cuckoo.CuckooAPI('cuckoo.mock', 9090, '/test') - responses.add(responses.GET, 'http://cuckoo.mock:9090/test/tasks/list', - json=read_resource('cuckoo_tasks_list')) - self.assertEquals(len(self.sandbox.analyses()), 2) - - - -class TestJoe(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.joe.JoeAPI('key', 'http://joe.mock/api', True) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/submit', - json=read_resource('joe_analysis_submit')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '100001') - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/info', - json=read_resource('joe_analysis_info')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', - json=read_resource('joe_server_online')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.POST, 'http://joe.mock/api/v2/server/online', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', - json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.report(8)['analysis']['signaturedetections']['strategy'][1]['score'], 1) - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://joe.mock/api/v2/analysis/download', - json=read_resource('joe_analysis_download')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 1) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.joe.JoeAPI('key', self.sandbox.jbx.apiurl, True, - proxies=proxies) - self.assertEquals(api.jbx.session.proxies, proxies) - - -class TestFireEye(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', 'http://fireeye.mock', 'profile') - self.legacy_sandbox = sandboxapi.fireeye.FireEyeAPI('username', 'password', - 'http://fireeye.mock', 'profile', - legacy_api=True) - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/submissions', - json=read_resource('fireeye_submissions')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - json=read_resource('fireeye_config')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/config', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.report(1)['msg'], 'concise') - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 8) - - # Legacy API support. - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/submissions', - json=read_resource('fireeye_submissions')) - self.assertEquals(self.legacy_sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1) - - @responses.activate - def test_check(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.legacy_sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - json=read_resource('fireeye_config')) - self.assertTrue(self.legacy_sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.legacy_sandbox.is_available()) - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/config', - status=500) - self.assertFalse(self.legacy_sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.report(1)['msg'], 'concise') - - @responses.activate - def test_score(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.1.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.1.0/submissions/results/1', - json=read_resource('fireeye_submissions_results')) - self.assertEquals(self.legacy_sandbox.score(self.legacy_sandbox.report(1)), 8) - - # Core functionality. - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_get.return_value.content = b'' - m_post.return_value.status_code = 200 - m_post.return_value.content = b'' - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.fireeye.FireEyeAPI('username', 'password', - self.sandbox.api_url, 'profile', - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - @responses.activate - def test_reauthenticates_if_logged_out_http_401(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - status=401) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_reauthenticates_if_logged_out_json_401(self): - responses.add(responses.POST, 'http://fireeye.mock/wsapis/v1.2.0/auth/login', - headers={'X-FeApi-Token': 'MOCK'}) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_unauthorized')) - responses.add(responses.GET, 'http://fireeye.mock/wsapis/v1.2.0/submissions/status/1', - json=read_resource('fireeye_submissions_status')) - self.assertEquals(self.sandbox.check('1'), True) - - -class TestVMRay(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.vmray.VMRayAPI('key', 'http://vmray.mock') - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', - json=read_resource('vmray_sample_submit')) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), 1169850) - - @responses.activate - def test_analyze_with_errors(self): - responses.add(responses.POST, 'http://vmray.mock/rest/sample/submit', - json=read_resource('vmray_sample_submit_errors')) - with self.assertRaises(sandboxapi.SandboxError): - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename')) - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://vmray.mock/rest/submission/sample/1', - json=read_resource('vmray_submission_sample')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://vmray.mock/rest/system_info', - json=read_resource('vmray_system_info')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://vmray.mock/rest/system_info', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', - json=read_resource('vmray_analysis_sample')) - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', - json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.report(1)['version'], 1) - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/sample/1', - json=read_resource('vmray_analysis_sample')) - responses.add(responses.GET, 'http://vmray.mock/rest/analysis/1097123/archive/logs/summary.json', - json=read_resource('vmray_analysis_archive_logs_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 20) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.vmray.VMRayAPI('key', self.sandbox.api_url, - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - -class TestTriage(unittest.TestCase): - def setUp(self): - self.sandbox = sandboxapi.triage.TriageAPI("key", - "http://api.triage.mock") - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, - 'http://api.triage.mock/v0/samples', - json=read_resource('triage_analyze'), status=200) - triage_id = self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), - "testfile") - self.assertEquals(triage_id, "200707-pht1cwk3ls") - - @responses.activate - def test_check(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/status', - json=read_resource('triage_check'), status=200) - self.assertTrue(self.sandbox.check("test")) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://api.triage.mock/v0/samples', - json=read_resource('triage_available'), status=200) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/summary', - json=read_resource('triage_report'), status=200) - data = self.sandbox.report("test") - self.assertEquals( - 10, data["tasks"]["200615-8jbndpgg9n-behavioral1"]["score"]) - - @responses.activate - def test_score(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/test/summary', - json=read_resource('triage_report'), status=200) - score = self.sandbox.score("test") - self.assertEquals(10, score) - - @responses.activate - def test_full_report(self): - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/summary', - json=read_resource('triage_report'), status=200) - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral1/report_triage.json', - json=read_resource('triage_behavioral1'), status=200) - responses.add(responses.GET, - 'http://api.triage.mock/v0/samples/200615-8jbndpgg9n/behavioral2/report_triage.json', - json=read_resource('triage_behavioral2'), status=200) - - full_report = self.sandbox.full_report("200615-8jbndpgg9n") - self.assertTrue(full_report["tasks"]["behavioral1"]["sample"]["score"], - 10) - self.assertTrue(full_report["tasks"]["behavioral2"]["sample"]["score"], - 10) - - -class TestFalcon(unittest.TestCase): - - def setUp(self): - self.sandbox = sandboxapi.falcon.FalconAPI('key', 'http://falcon.mock/api/v2') - - @responses.activate - def test_analyze(self): - responses.add(responses.POST, 'http://falcon.mock/api/v2/submit/file', - json=read_resource('falcon_submit_file'), status=201) - self.assertEquals(self.sandbox.analyze(io.BytesIO('test'.encode('ascii')), 'filename'), '1') - - @responses.activate - def test_check(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/state', - json=read_resource('falcon_report_state')) - self.assertEquals(self.sandbox.check('1'), True) - - @responses.activate - def test_is_available(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', - json=read_resource('falcon_system_heartbeat')) - self.assertTrue(self.sandbox.is_available()) - - @responses.activate - def test_not_is_available(self): - self.assertFalse(self.sandbox.is_available()) - responses.add(responses.GET, 'http://falcon.mock/api/v2/system/heartbeat', - status=500) - self.assertFalse(self.sandbox.is_available()) - - @responses.activate - def test_report(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', - json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.report(1)['job_id'], '1') - - @responses.activate - def test_score(self): - responses.add(responses.GET, 'http://falcon.mock/api/v2/report/1/summary', - json=read_resource('falcon_report_summary')) - self.assertEquals(self.sandbox.score(self.sandbox.report(1)), 5) - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.falcon.FalconAPI('key', self.sandbox.api_url, - proxies=proxies) - api._request('/test') - - m_get.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, params=MOCK_ANY, - proxies=proxies, verify=MOCK_ANY) - - api._request('/test', method='POST') - - m_post.assert_called_once_with(api.api_url + '/test', auth=MOCK_ANY, - headers=MOCK_ANY, data=MOCK_ANY, - files=None, proxies=proxies, - verify=MOCK_ANY) - - -class TestSandboxAPI(unittest.TestCase): - - @patch('requests.post') - @patch('requests.get') - def test_proxies_is_passed_to_requests(self, m_get, m_post): - m_get.return_value.status_code = 200 - m_post.return_value.status_code = 200 - - proxies = { - 'http': 'http://10.10.1.10:3128', - 'https': 'http://10.10.1.10:1080', - } - - api = sandboxapi.SandboxAPI(proxies=proxies) - api.api_url = 'http://sandbox.mock' - api._request('/test') - - m_get.assert_called_once_with('http://sandbox.mock/test', auth=None, - headers=None, params=None, proxies=proxies, - verify=True) - - api._request('/test', method='POST') - - m_post.assert_called_once_with('http://sandbox.mock/test', auth=None, - headers=None, data=None, files=None, - proxies=proxies, verify=True)