diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 5ee09141b0..0527e4277b 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -68,6 +68,7 @@ dependencies: - libgrpc>=1.49 - librdkafka=1.9.2 - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 + - lxml=4.9.1 - mlflow>=2.2.1,<3 - mrc=24.03 - networkx>=2.8 diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index cafefa17c8..c4c64876df 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -102,16 +102,23 @@ def __init__(self, run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input) self._run_indefinitely = run_indefinitely + self._enable_cache = enable_cache - self._session = None if enable_cache: self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"), backend="sqlite") + else: + self._session = requests.session() + + self._session.headers.update({ + "User-Agent": + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36" + }) self._feed_stats_dict = { - input: + url: FeedStats(failure_count=0, success_count=0, last_failure=-1, last_success=-1, last_try_result="Unknown") - for input in self._feed_input + for url in self._feed_input } @property @@ -119,14 +126,9 @@ def run_indefinitely(self): """Property that determines to run the source indefinitely""" return self._run_indefinitely - @property - def session_exist(self) -> bool: - """Property that indicates the existence of a session.""" - return bool(self._session) - def get_feed_stats(self, feed_url: str) -> FeedStats: """ - Get feed input stats. + Get feed url stats. Parameters ---------- @@ -141,30 +143,20 @@ def get_feed_stats(self, feed_url: str) -> FeedStats: Raises ------ ValueError - If the feed URL is not found in the feed input provided to the constructor. + If the feed URL is not found in the feed url provided to the constructor. """ if feed_url not in self._feed_stats_dict: - raise ValueError("The feed URL is not part of the feed input provided to the constructor.") + raise ValueError("The feed URL is not part of the feed url provided to the constructor.") return self._feed_stats_dict[feed_url] - def _get_response_text(self, url: str) -> str: - if self.session_exist: - response = self._session.get(url) - else: - response = requests.get(url, timeout=self._request_timeout) - - return response.text - def _read_file_content(self, file_path: str) -> str: with open(file_path, 'r', encoding="utf-8") as file: return file.read() - def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict": - - feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input) + def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict": - soup = BeautifulSoup(feed_input, 'xml') + soup = BeautifulSoup(feed_input, 'lxml') # Verify whether the given feed has 'item' or 'entry' tags. if soup.find('item'): @@ -205,32 +197,28 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": fallback = False cache_hit = False - is_url_with_session = is_url and self.session_exist - if is_url_with_session: - response = self._session.get(url) - cache_hit = response.from_cache + if is_url: + response = self._session.get(url, timeout=self._request_timeout) feed_input = response.text + if self._enable_cache: + cache_hit = response.from_cache else: feed_input = url feed = feedparser.parse(feed_input) if feed["bozo"]: - cache_hit = False - - if is_url_with_session: - fallback = True - logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url) - feed = feedparser.parse(url) - - if feed["bozo"]: - try: - logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception']) - feed = self._try_parse_feed_with_beautiful_soup(url, is_url) - except Exception: - logger.error("Failed to parse the feed manually: %s", url) - raise + fallback = True + try: + if not is_url: + # Read file content + feed_input = self._read_file_content(feed_input) + # Parse feed content with beautifulsoup + feed = self._try_parse_feed_with_beautiful_soup(feed_input) + except Exception: + logger.error("Failed to parse the feed manually: %s", url) + raise logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback) @@ -312,17 +300,17 @@ def fetch_dataframes(self): @classmethod def is_url(cls, feed_input: str) -> bool: """ - Check if the provided input is a valid URL. + Check if the provided url is a valid URL. Parameters ---------- feed_input : str - The input string to be checked. + The url string to be checked. Returns ------- bool - True if the input is a valid URL, False otherwise. + True if the url is a valid URL, False otherwise. """ try: parsed_url = urlparse(feed_input) diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index e388783fbe..94726c4783 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -78,10 +78,12 @@ def test_run_indefinitely_false(feed_input: list[str]): @pytest.mark.parametrize("feed_input", test_urls) -def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict): +def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: - mock_feedparser_parse.return_value = mock_feed + + patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed) + + with patch("requests.Session.get", return_value=mock_get_response): feed = list(controller.parse_feeds())[0] assert feed.entries @@ -112,11 +114,14 @@ def test_is_url_false(feed_input: list[str]): @pytest.mark.parametrize("feed_input", [test_urls, test_urls[0]]) -def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser.FeedParserDict): +def test_fetch_dataframes_url(feed_input: str | list[str], + mock_feed: feedparser.FeedParserDict, + mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: - mock_feedparser_parse.return_value = mock_feed + patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed) + + with patch("requests.Session.get", return_value=mock_get_response): dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) assert isinstance(dataframe, pd.DataFrame) @@ -142,26 +147,15 @@ def test_batch_size(feed_input: list[str], batch_size: int): assert len(df) <= batch_size -@pytest.mark.parametrize("feed_input, is_url, enable_cache", [(test_file_paths[0], False, False), - (test_urls[0], True, True), (test_urls[0], True, False)]) -def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock): +@pytest.mark.parametrize("feed_input, enable_cache", [(test_file_paths[0], False), (test_urls[0], True), + (test_urls[0], False)]) +def test_try_parse_feed_with_beautiful_soup(feed_input: str, enable_cache: bool, mock_get_response: Mock): controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) - if is_url: - if enable_cache: - with patch("morpheus.controllers.rss_controller.requests_cache.CachedSession.get") as mock_get: - mock_get.return_value = mock_get_response - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) - else: - with patch("morpheus.controllers.rss_controller.requests.get") as mock_get: - mock_get.return_value = mock_get_response - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) - - else: - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + # When enable_cache is set to 'True', the feed content is provided as input. + feed_data = controller._try_parse_feed_with_beautiful_soup(mock_get_response.text) assert isinstance(feed_data, feedparser.FeedParserDict) - assert len(feed_data.entries) > 0 for entry in feed_data.entries: @@ -180,16 +174,6 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl assert isinstance(feed_data["entries"], list) -@pytest.mark.parametrize("enable_cache", [True, False]) -def test_enable_disable_cache(enable_cache): - controller = RSSController(feed_input=test_urls, enable_cache=enable_cache) - - if enable_cache: - assert controller.session_exist - else: - assert not controller.session_exist - - def test_parse_feeds(mock_feed: feedparser.FeedParserDict): feed_input = test_urls[0] cooldown_interval = 620 @@ -239,3 +223,15 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): with pytest.raises(ValueError): controller.get_feed_stats("http://testfeed.com") + + +@pytest.mark.parametrize("feed_input", [test_urls[0]]) +def test_redundant_fetch(feed_input: str, mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): + + controller = RSSController(feed_input=feed_input) + mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response) as mocked_session_get: + mock_feedparser_parse.return_value = mock_feed + dataframes_generator = controller.fetch_dataframes() + next(dataframes_generator, None) + assert mocked_session_get.call_count == 1