Skip to content

Commit

Permalink
Fix http input decode error (#654)
Browse files Browse the repository at this point in the history
* add general exception for decode error
* add test
* fix and refactor http input tests
---------

Co-authored-by: djkhl <[email protected]>
  • Loading branch information
ekneg54 and djkhl authored Aug 27, 2024
1 parent 4b5694d commit 16257ca
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 18 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
### Breaking
### Features
### Improvements
### Bugfix

## 13.1.1
### Improvements

* adds ability to bypass the processing of events if there is no pipeline. This is useful for pure connector deployments.
* adds experimental feature to bypass the rule tree by setting `LOGPREP_BYPASS_RULE_TREE` environment variable

### Bugfix

* fixes a bug in the `http_output` used by the http generator, where the timeout parameter does only set the read_timeout not the write_timeout
* fixes a bug in the `http_input` not handling decode errors

## 13.1.0
### Features
Expand Down
12 changes: 9 additions & 3 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from attrs import define, field, validators
from falcon import ( # pylint: disable=no-name-in-module
HTTP_200,
HTTPBadRequest,
HTTPMethodNotAllowed,
HTTPTooManyRequests,
HTTPUnauthorized,
Expand Down Expand Up @@ -140,8 +141,12 @@ async def func_wrapper(*args, **kwargs):
return
else:
raise HTTPMethodNotAllowed(["POST"])
except queue.Full as exc:
raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from exc
except HTTPUnauthorized as error:
raise error from error
except queue.Full as error:
raise HTTPTooManyRequests(description="Logprep Message Queue is full.") from error
except Exception as error: # pylint: disable=broad-except
raise HTTPBadRequest(str(error)) from error
return func_wrapper

return func_wrapper
Expand Down Expand Up @@ -272,7 +277,8 @@ class JSONLHttpEndpoint(HttpEndpoint):
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""jsonl endpoint method"""
self.collect_metrics()
events = self._decoder.decode_lines(await self.get_data(req))
data = await self.get_data(req)
events = self._decoder.decode_lines(data)
for event in events:
self.messages.put(event | kwargs["metadata"], block=False, batch_size=len(events))

Expand Down
76 changes: 61 additions & 15 deletions tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import gzip
import json
import multiprocessing
import queue
import random
import re
from copy import deepcopy
Expand Down Expand Up @@ -116,16 +117,10 @@ def test_get_method_returns_200_with_authentication(self):

def test_get_error_code_too_many_requests(self):
data = {"message": "my log message"}
self.object.messages.put = mock.MagicMock()
self.object.messages.put.side_effect = queue.Full()
session = requests.Session()
for _ in range(100):
resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert self.object.messages.qsize() == 100
resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert self.object.messages._maxsize == 100
assert resp.status_code == 429
for _ in range(100):
resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5)
resp = requests.get(url=f"{self.target}/json", json=data, timeout=0.5)
resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert resp.status_code == 429

def test_json_endpoint_accepts_post_request(self):
Expand Down Expand Up @@ -322,26 +317,71 @@ def test_get_next_with_hmac_of_raw_message(self):
connector_next_msg, _ = connector.get_next(1)
assert connector_next_msg == expected_event, "Output event with hmac is not as expected"

def test_endpoint_has_basic_auth(self, credentials_file_path):
def test_endpoint_returns_401_if_authorization_not_provided(self, credentials_file_path):
mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path}
data = {"message": "my log message"}
with mock.patch.dict("os.environ", mock_env):
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
resp = requests.post(url=f"{self.target}/auth-json-file", timeout=0.5)
resp = requests.post(
url=f"{self.target}/auth-json-file", timeout=0.5, data=json.dumps(data)
)
assert resp.status_code == 401

def test_endpoint_returns_401_on_wrong_authorization(self, credentials_file_path):
mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path}
data = {"message": "my log message"}
with mock.patch.dict("os.environ", mock_env):
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
basic = HTTPBasicAuth("wrong", "credentials")
resp = requests.post(url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5)
resp = requests.post(
url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data
)
assert resp.status_code == 401

def test_endpoint_returns_200_on_correct_authorization_with_password_from_file(
self, credentials_file_path
):
mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path}
data = {"message": "my log message"}
with mock.patch.dict("os.environ", mock_env):
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
basic = HTTPBasicAuth("user", "file_password")
resp = requests.post(url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5)
resp = requests.post(
url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data
)
assert resp.status_code == 200

def test_endpoint_returns_200_on_correct_authorization_with_password_within_credentials_file(
self, credentials_file_path
):
mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path}
data = {"message": "my log message"}
with mock.patch.dict("os.environ", mock_env):
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
basic = HTTPBasicAuth("user", "secret_password")
resp = requests.post(url=f"{self.target}/auth-json-secret", auth=basic, timeout=0.5)
resp = requests.post(
url=f"{self.target}/auth-json-secret", auth=basic, timeout=0.5, json=data
)
assert resp.status_code == 200

def test_endpoint_returns_200_on_correct_authorization_for_subpath(self, credentials_file_path):
mock_env = {ENV_NAME_LOGPREP_CREDENTIALS_FILE: credentials_file_path}
data = {"message": "my log message"}
with mock.patch.dict("os.environ", mock_env):
new_connector = Factory.create({"test connector": self.CONFIG})
new_connector.pipeline_index = 1
new_connector.setup()
basic = HTTPBasicAuth("user", "password")
resp = requests.post(
url=f"{self.target}/auth-json-secret/AB/json", auth=basic, timeout=0.5
url=f"{self.target}/auth-json-secret/AB/json", auth=basic, timeout=0.5, json=data
)
assert resp.status_code == 200

Expand Down Expand Up @@ -408,3 +448,9 @@ def test_endpoint_handles_gzip_compression(self, endpoint):
timeout=0.5,
)
assert resp.status_code == 200

@pytest.mark.parametrize("endpoint", ["json", "jsonl"])
def test_raises_http_bad_request_on_decode_error(self, endpoint):
data = "this is not a valid json nor jsonl"
resp = requests.post(url=f"{self.target}/{endpoint}", data=data, timeout=0.5)
assert resp.status_code == 400

0 comments on commit 16257ca

Please sign in to comment.