Skip to content

Commit 0637a79

Browse files
committed
polyswarm
Signed-off-by: pranjalg1331 <[email protected]>
1 parent 6a13628 commit 0637a79

File tree

12 files changed

+415
-94
lines changed

12 files changed

+415
-94
lines changed

tests/api_app/analyzers_manager/unit_tests/observable_analyzers/base_test_class.py

Lines changed: 61 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -7,90 +7,65 @@
77
from api_app.analyzers_manager.exceptions import AnalyzerRunException
88
from api_app.analyzers_manager.models import AnalyzerConfig
99

10+
logger = logging.getLogger(__name__)
11+
1012

1113
class BaseAnalyzerTest(TestCase):
1214
analyzer_class = None
13-
# Control logging behavior in tests
1415
suppress_analyzer_logs = True
1516

1617
def setUp(self):
17-
"""Set up test environment including logging configuration"""
1818
super().setUp()
19+
logger.info("Setting up test environment")
1920

2021
if self.suppress_analyzer_logs and self.analyzer_class:
21-
# Suppress logs from the specific analyzer being tested
2222
analyzer_module = self.analyzer_class.__module__
2323
logging.getLogger(analyzer_module).setLevel(logging.CRITICAL)
24-
25-
# Also suppress common analyzer manager logs
2624
logging.getLogger("api_app.analyzers_manager").setLevel(logging.WARNING)
2725

2826
def tearDown(self):
29-
"""Clean up after test"""
3027
super().tearDown()
28+
logger.info("Tearing down test environment")
3129

32-
# Reset logging levels if they were modified
3330
if self.suppress_analyzer_logs and self.analyzer_class:
3431
analyzer_module = self.analyzer_class.__module__
3532
logging.getLogger(analyzer_module).setLevel(logging.NOTSET)
3633
logging.getLogger("api_app.analyzers_manager").setLevel(logging.NOTSET)
3734

3835
@classmethod
3936
def get_sample_observable(cls, observable_type):
40-
mapping = {
37+
return {
4138
"domain": "example.com",
4239
"ip": "8.8.8.8",
4340
"url": "https://example.com",
4441
"hash": "deadbeefdeadbeefdeadbeefdeadbeef",
4542
"generic": "[email protected]",
46-
}
47-
return mapping.get(observable_type, "test")
43+
}.get(observable_type, "test")
4844

4945
@classmethod
5046
def get_extra_config(cls) -> dict:
51-
"""
52-
Subclasses can override this to provide additional runtime configuration
53-
specific to their analyzer (e.g., API keys, URLs, retry counts, etc.).
54-
55-
Returns:
56-
dict: Extra configuration parameters for the analyzer
57-
"""
5847
return {}
5948

6049
def get_mocked_response(self):
61-
"""
62-
Subclasses override this to define expected mocked output.
63-
64-
Can return:
65-
1. A single patch object: patch('module.function')
66-
2. A list of patch objects: [patch('module.func1'), patch('module.func2')]
67-
3. A context manager: patch.multiple() or ExitStack()
68-
4. None: No mocking needed
69-
"""
7050
return None
7151

7252
@classmethod
7353
def _apply_patches(cls, patches):
74-
"""Helper method to apply single or multiple patches"""
7554
if patches is None:
76-
return ExitStack() # No-op context manager
55+
return ExitStack()
7756

78-
# If it's already a context manager, return as-is
7957
if hasattr(patches, "__enter__") and hasattr(patches, "__exit__"):
8058
return patches
8159

82-
# If it's a list of patches, use ExitStack to manage them
8360
if isinstance(patches, (list, tuple)):
8461
stack = ExitStack()
8562
for patch_obj in patches:
8663
stack.enter_context(patch_obj)
8764
return stack
8865

89-
# Single patch object
9066
return patches
9167

9268
def _create_mock_analyzer_job(self, observable_name, observable_type):
93-
"""Create a properly structured mock job object"""
9469
mock_tlp_enum = SimpleNamespace()
9570
mock_tlp_enum.CLEAR = SimpleNamespace(value="clear")
9671
mock_tlp_enum.GREEN = SimpleNamespace(value="green")
@@ -106,29 +81,25 @@ def _create_mock_analyzer_job(self, observable_name, observable_type):
10681
return mock_job
10782

10883
def _setup_analyzer(self, config, observable_type, observable_value):
109-
"""Setup analyzer instance with proper configuration"""
84+
logger.info(f"Setting up analyzer for {observable_type}: {observable_value}")
11085
analyzer = self.analyzer_class(config)
11186
analyzer.observable_name = observable_value
11287
analyzer.observable_classification = observable_type
11388
analyzer._job = self._create_mock_analyzer_job(
11489
observable_value, observable_type
11590
)
11691

117-
# Apply extra configuration
118-
extra_config = self.get_extra_config()
119-
for key, value in extra_config.items():
92+
for key, value in self.get_extra_config().items():
12093
setattr(analyzer, key, value)
12194

12295
return analyzer
12396

12497
def _validate_response(self, response, observable_type):
125-
"""Validate analyzer response format and content"""
126-
127-
# If response is a string, try to parse it as JSON
12898
if isinstance(response, str):
12999
try:
130100
response = json.loads(response)
131101
except json.JSONDecodeError:
102+
logger.error("Invalid JSON response for %s", observable_type)
132103
self.fail(
133104
f"Analyzer response for {observable_type} is a string but not valid JSON"
134105
)
@@ -141,12 +112,9 @@ def _validate_response(self, response, observable_type):
141112
self.assertTrue(
142113
response, f"Analyzer response for {observable_type} should not be empty"
143114
)
144-
145-
# Additional validation can be added here
146-
# e.g., check for required fields, data types, etc.
115+
logger.info("Valid response for %s", observable_type)
147116

148117
def test_analyzer_on_supported_observables(self):
149-
"""Test analyzer on all supported observable types"""
150118
if self.analyzer_class is None:
151119
self.skipTest("analyzer_class is not set")
152120

@@ -166,9 +134,8 @@ def test_analyzer_on_supported_observables(self):
166134
continue
167135

168136
with self.subTest(observable_type=observable_type):
169-
print(f"Testing observable_type: {observable_type}")
137+
logger.info("Testing observable type: %s", observable_type)
170138

171-
# Apply patches using the improved system
172139
patches = self.get_mocked_response()
173140
with self._apply_patches(patches):
174141
observable_value = self.get_sample_observable(observable_type)
@@ -179,59 +146,59 @@ def test_analyzer_on_supported_observables(self):
179146
try:
180147
response = analyzer.run()
181148
self._validate_response(response, observable_type)
182-
print(f"SUCCESS {observable_type}")
183-
149+
logger.info("Analyzer run successful for %s", observable_type)
184150
except AnalyzerRunException as e:
151+
logger.error("AnalyzerRunException: %s", e)
185152
self.fail(
186-
f"AnalyzerRunException raised for {observable_type} "
187-
f"with valid format: {str(e)}"
153+
f"AnalyzerRunException for {observable_type}: {str(e)}"
188154
)
189155
except Exception as e:
156+
logger.exception("Unexpected exception for %s", observable_type)
190157
self.fail(
191-
f"Unexpected exception for {observable_type}: "
192-
f"{type(e).__name__}: {str(e)}"
158+
f"Unexpected exception for {observable_type}: {type(e).__name__}: {str(e)}"
159+
)
160+
161+
def test_analyzer_error_handling(self):
162+
if self.analyzer_class is None:
163+
self.skipTest("analyzer_class is not set")
164+
165+
configs = AnalyzerConfig.objects.filter(
166+
python_module=self.analyzer_class.python_module
167+
)
168+
169+
if not configs.exists():
170+
self.skipTest(
171+
f"No AnalyzerConfig found for {self.analyzer_class.python_module}"
172+
)
173+
174+
config = configs.first()
175+
176+
invalid_observables = {
177+
"domain": "invalid..domain",
178+
"ip": "999.999.999.999",
179+
"url": "not-a-url",
180+
"hash": "tooshort",
181+
}
182+
183+
for observable_type in config.observable_supported:
184+
if observable_type in invalid_observables:
185+
with self.subTest(observable_type=f"{observable_type}_invalid"):
186+
patches = self.get_mocked_response()
187+
with self._apply_patches(patches):
188+
invalid_value = invalid_observables[observable_type]
189+
analyzer = self._setup_analyzer(
190+
config, observable_type, invalid_value
193191
)
194192

195-
# def test_analyzer_error_handling(self):
196-
# """Test analyzer behavior with invalid inputs"""
197-
# if self.analyzer_class is None:
198-
# self.skipTest("analyzer_class is not set")
199-
200-
# configs = AnalyzerConfig.objects.filter(
201-
# python_module=self.analyzer_class.python_module
202-
# )
203-
204-
# if not configs.exists():
205-
# self.skipTest(
206-
# f"No AnalyzerConfig found for {self.analyzer_class.python_module}"
207-
# )
208-
209-
# config = configs.first()
210-
211-
# # Test with invalid observable types if applicable
212-
# invalid_observables = {
213-
# "domain": "invalid..domain",
214-
# "ip": "999.999.999.999",
215-
# "url": "not-a-url",
216-
# "hash": "tooshort",
217-
# }
218-
219-
# for observable_type in config.observable_supported:
220-
# if observable_type in invalid_observables:
221-
# with self.subTest(observable_type=f"{observable_type}_invalid"):
222-
# patches = self.get_mocked_response()
223-
# with self._apply_patches(patches):
224-
# invalid_value = invalid_observables[observable_type]
225-
# analyzer = self._setup_analyzer(
226-
# config, observable_type, invalid_value
227-
# )
228-
229-
# # Depending on your analyzer's design, this might raise
230-
# # an exception or return an error response
231-
# try:
232-
# response = analyzer.run()
233-
# # If no exception, validate it's a proper error response
234-
# self.assertIsInstance(response, (dict, list))
235-
# except (AnalyzerRunException, ValueError) as e:
236-
# # Expected behavior for invalid input
237-
# print(f"Expected error for invalid {observable_type}: {e}")
193+
try:
194+
response = analyzer.run()
195+
logger.warning(
196+
"Analyzer ran with invalid input: %s = %s",
197+
observable_type,
198+
invalid_value,
199+
)
200+
self.assertTrue(response)
201+
except (AnalyzerRunException, ValueError) as e:
202+
logger.info(
203+
"Expected failure for %s: %s", observable_type, e
204+
)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.nuclei import NucleiAnalyzer
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
8+
9+
class NucleiAnalyzerTestCase(BaseAnalyzerTest):
10+
analyzer_class = NucleiAnalyzer
11+
12+
@staticmethod
13+
def get_mocked_response():
14+
mock_response = {
15+
"data": [
16+
{
17+
"templateID": "cves/2022/CVE-2022-XXXXX",
18+
"info": {
19+
"name": "Sample CVE Test",
20+
"severity": "high",
21+
"description": "Mock vulnerability detected",
22+
},
23+
"matched-at": "https://example.com",
24+
}
25+
]
26+
}
27+
28+
return patch(
29+
"api_app.analyzers_manager.classes.DockerBasedAnalyzer._docker_run",
30+
return_value=mock_response,
31+
)
32+
33+
@classmethod
34+
def get_extra_config(cls) -> dict:
35+
return {
36+
"template_dirs": ["cves", "http"], # use valid dirs to avoid warnings
37+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.onionscan import Onionscan
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
8+
9+
class OnionscanTestCase(BaseAnalyzerTest):
10+
analyzer_class = Onionscan
11+
12+
@staticmethod
13+
def get_mocked_response():
14+
mock_response = {
15+
"data": {
16+
"hiddenService": "http://exampleonion.onion",
17+
"webDetected": True,
18+
"webServerFingerprint": "nginx",
19+
"relatedServices": ["IRC", "FTP"],
20+
"pgpKeys": [],
21+
"sshKey": None,
22+
}
23+
}
24+
25+
return patch(
26+
"api_app.analyzers_manager.classes.DockerBasedAnalyzer._docker_run",
27+
return_value=mock_response,
28+
)
29+
30+
@classmethod
31+
def get_extra_config(cls) -> dict:
32+
return {"verbose": True, "tor_proxy_address": "127.0.0.1:9050"}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from unittest.mock import patch
2+
3+
from api_app.analyzers_manager.observable_analyzers.onyphe import Onyphe
4+
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
5+
BaseAnalyzerTest,
6+
)
7+
from tests.mock_utils import MockUpResponse
8+
9+
10+
class OnypheTestCase(BaseAnalyzerTest):
11+
analyzer_class = Onyphe
12+
13+
@staticmethod
14+
def get_mocked_response():
15+
mock_response = {
16+
"ip": "8.8.8.8",
17+
"results": {
18+
"geolocation": {
19+
"country_name": "United States",
20+
"city": "Mountain View",
21+
},
22+
"as": {"asn": 15169, "org": "Google LLC"},
23+
},
24+
}
25+
26+
return patch("requests.get", return_value=MockUpResponse(mock_response, 200))
27+
28+
@classmethod
29+
def get_extra_config(cls) -> dict:
30+
return {"_api_key_name": "mock-api-key"}

0 commit comments

Comments
 (0)