From 6450d904439a9504af1628463a0d8988feed392a Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Sun, 17 Dec 2023 21:14:01 -0800 Subject: [PATCH 1/5] Fixed redundant fetches --- morpheus/controllers/rss_controller.py | 53 ++++++++++++++---------- tests/controllers/test_rss_controller.py | 30 ++++---------- 2 files changed, 38 insertions(+), 45 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 3774bac9d4..149dea97e6 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -102,11 +102,18 @@ def __init__(self, run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input) self._run_indefinitely = run_indefinitely + self._enable_cache = enable_cache - self._session = None if enable_cache: self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"), backend="sqlite") + else: + self._session = requests.session() + + self._session.headers.update({ + "User-Agent": + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36" + }) self._feed_stats_dict = { input: @@ -119,11 +126,6 @@ def run_indefinitely(self): """Property that determines to run the source indefinitely""" return self._run_indefinitely - @property - def session_exist(self) -> bool: - """Property that indicates the existence of a session.""" - return bool(self._session) - def get_feed_stats(self, feed_url: str) -> FeedStats: """ Get feed input stats. @@ -148,22 +150,27 @@ def get_feed_stats(self, feed_url: str) -> FeedStats: return self._feed_stats_dict[feed_url] - def _get_response_text(self, url: str) -> str: - if self.session_exist: - response = self._session.get(url) - else: - response = requests.get(url, timeout=self._request_timeout) - - return response.text - def _read_file_content(self, file_path: str) -> str: with open(file_path, 'r', encoding="utf-8") as file: return file.read() - def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict": + def _fetch_feed_content(self, feed_input: str, is_url: bool) -> str: + # If input is an URL. + if is_url: + if not self._enable_cache: + # If cache is not enabled, fetch feed directly using requests.Session. + response = self._session.get(feed_input, timeout=self._request_timeout) + return response.text + + # If we are here, feed_input is an actual feed content retrieved from the cache. + return feed_input - feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input) + # If original input is not an URL, then read the content from the file path. + return self._read_file_content(feed_input) + + def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict": + feed_input = self._fetch_feed_content(feed_input, is_url) soup = BeautifulSoup(feed_input, 'xml') # Verify whether the given feed has 'item' or 'entry' tags. @@ -205,10 +212,10 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": fallback = False cache_hit = False - is_url_with_session = is_url and self.session_exist + use_cache = is_url and self._enable_cache - if is_url_with_session: - response = self._session.get(url) + if use_cache: + response = self._session.get(url, timeout=self._request_timeout) cache_hit = response.from_cache feed_input = response.text else: @@ -219,15 +226,15 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": if feed["bozo"]: cache_hit = False - if is_url_with_session: + if use_cache: fallback = True - logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url) + logger.info("Parsing the cached feed for URL '%s' failed. Attempting direct parsing with feedparser.", + url) feed = feedparser.parse(url) if feed["bozo"]: try: - logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception']) - feed = self._try_parse_feed_with_beautiful_soup(url, is_url) + feed = self._parse_feed_with_beautiful_soup(feed_input, is_url) except Exception: logger.error("Failed to parse the feed manually: %s", url) raise diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index e388783fbe..90e9d8f7fc 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -21,6 +21,7 @@ import feedparser import pandas as pd import pytest +import requests from _utils import TEST_DIRS from morpheus.controllers.rss_controller import FeedStats @@ -147,21 +148,16 @@ def test_batch_size(feed_input: list[str], batch_size: int): def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock): controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) - if is_url: - if enable_cache: - with patch("morpheus.controllers.rss_controller.requests_cache.CachedSession.get") as mock_get: - mock_get.return_value = mock_get_response - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) - else: - with patch("morpheus.controllers.rss_controller.requests.get") as mock_get: - mock_get.return_value = mock_get_response - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) - + if is_url and not enable_cache: + with patch.object(requests.Session, 'get') as mock_get: + mock_get.return_value = mock_get_response + feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) else: - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) + # When enable_cache is set to 'True', the feed content is provided as input. + feed_data = controller._try_parse_feed_with_beautiful_soup( + mock_get_response.text if enable_cache else feed_input, is_url) assert isinstance(feed_data, feedparser.FeedParserDict) - assert len(feed_data.entries) > 0 for entry in feed_data.entries: @@ -180,16 +176,6 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl assert isinstance(feed_data["entries"], list) -@pytest.mark.parametrize("enable_cache", [True, False]) -def test_enable_disable_cache(enable_cache): - controller = RSSController(feed_input=test_urls, enable_cache=enable_cache) - - if enable_cache: - assert controller.session_exist - else: - assert not controller.session_exist - - def test_parse_feeds(mock_feed: feedparser.FeedParserDict): feed_input = test_urls[0] cooldown_interval = 620 From 064a727420df75df7dc339abae5e0dca063cb786 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Sun, 17 Dec 2023 21:33:18 -0800 Subject: [PATCH 2/5] Updated private function name --- morpheus/controllers/rss_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 149dea97e6..4ea4f9e3b3 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -234,7 +234,7 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": if feed["bozo"]: try: - feed = self._parse_feed_with_beautiful_soup(feed_input, is_url) + feed = self._try_parse_feed_with_beautiful_soup(feed_input, is_url) except Exception: logger.error("Failed to parse the feed manually: %s", url) raise From c8f95a1706c5c4a1e0ba7b702a7f8507133c890e Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 3 Jan 2024 11:58:05 -0500 Subject: [PATCH 3/5] Simplified rss feed parsing --- morpheus/controllers/rss_controller.py | 64 ++++++++---------------- tests/controllers/test_rss_controller.py | 33 ++++++------ 2 files changed, 39 insertions(+), 58 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index f5cbe6b373..0b7082b0ca 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -116,9 +116,9 @@ def __init__(self, }) self._feed_stats_dict = { - input: + url: FeedStats(failure_count=0, success_count=0, last_failure=-1, last_success=-1, last_try_result="Unknown") - for input in self._feed_input + for url in self._feed_input } @property @@ -128,7 +128,7 @@ def run_indefinitely(self): def get_feed_stats(self, feed_url: str) -> FeedStats: """ - Get feed input stats. + Get feed url stats. Parameters ---------- @@ -143,10 +143,10 @@ def get_feed_stats(self, feed_url: str) -> FeedStats: Raises ------ ValueError - If the feed URL is not found in the feed input provided to the constructor. + If the feed URL is not found in the feed url provided to the constructor. """ if feed_url not in self._feed_stats_dict: - raise ValueError("The feed URL is not part of the feed input provided to the constructor.") + raise ValueError("The feed URL is not part of the feed url provided to the constructor.") return self._feed_stats_dict[feed_url] @@ -154,23 +154,8 @@ def _read_file_content(self, file_path: str) -> str: with open(file_path, 'r', encoding="utf-8") as file: return file.read() - def _fetch_feed_content(self, feed_input: str, is_url: bool) -> str: - # If input is an URL. - if is_url: - if not self._enable_cache: - # If cache is not enabled, fetch feed directly using requests.Session. - response = self._session.get(feed_input, timeout=self._request_timeout) - return response.text - - # If we are here, feed_input is an actual feed content retrieved from the cache. - return feed_input - - # If original input is not an URL, then read the content from the file path. - return self._read_file_content(feed_input) - - def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict": + def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict": - feed_input = self._fetch_feed_content(feed_input, is_url) soup = BeautifulSoup(feed_input, 'xml') # Verify whether the given feed has 'item' or 'entry' tags. @@ -211,12 +196,9 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": is_url = RSSController.is_url(url) fallback = False - cache_hit = False - use_cache = is_url and self._enable_cache - if use_cache: + if is_url: response = self._session.get(url, timeout=self._request_timeout) - cache_hit = response.from_cache feed_input = response.text else: feed_input = url @@ -224,22 +206,18 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": feed = feedparser.parse(feed_input) if feed["bozo"]: - cache_hit = False - - if use_cache: - fallback = True - logger.info("Parsing the cached feed for URL '%s' failed. Attempting direct parsing with feedparser.", - url) - feed = feedparser.parse(url) - - if feed["bozo"]: - try: - feed = self._try_parse_feed_with_beautiful_soup(feed_input, is_url) - except Exception: - logger.error("Failed to parse the feed manually: %s", url) - raise + fallback = True + try: + if not is_url: + # Read file content + feed_input = self._read_file_content(feed_input) + # Parse feed content with beautifulsoup + feed = self._try_parse_feed_with_beautiful_soup(feed_input) + except Exception: + logger.error("Failed to parse the feed manually: %s", url) + raise - logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback) + logger.debug("Parsed feed: %s. Fallback: %s", url, fallback) return feed @@ -319,17 +297,17 @@ def fetch_dataframes(self): @classmethod def is_url(cls, feed_input: str) -> bool: """ - Check if the provided input is a valid URL. + Check if the provided url is a valid URL. Parameters ---------- feed_input : str - The input string to be checked. + The url string to be checked. Returns ------- bool - True if the input is a valid URL, False otherwise. + True if the url is a valid URL, False otherwise. """ try: parsed_url = urlparse(feed_input) diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 90e9d8f7fc..94f5ee9555 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -79,9 +79,13 @@ def test_run_indefinitely_false(feed_input: list[str]): @pytest.mark.parametrize("feed_input", test_urls) -def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict): +def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: + + mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + mock_get = patch.object(requests.Session, 'get') + with mock_feedparser_parse, mock_get: + mock_get.return_value = mock_get_response mock_feedparser_parse.return_value = mock_feed feed = list(controller.parse_feeds())[0] assert feed.entries @@ -113,10 +117,15 @@ def test_is_url_false(feed_input: list[str]): @pytest.mark.parametrize("feed_input", [test_urls, test_urls[0]]) -def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser.FeedParserDict): +def test_fetch_dataframes_url(feed_input: str | list[str], + mock_feed: feedparser.FeedParserDict, + mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse: + mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + mock_get = patch.object(requests.Session, 'get') + with mock_feedparser_parse, mock_get: + mock_get.return_value = mock_get_response mock_feedparser_parse.return_value = mock_feed dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) @@ -143,19 +152,13 @@ def test_batch_size(feed_input: list[str], batch_size: int): assert len(df) <= batch_size -@pytest.mark.parametrize("feed_input, is_url, enable_cache", [(test_file_paths[0], False, False), - (test_urls[0], True, True), (test_urls[0], True, False)]) -def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock): +@pytest.mark.parametrize("feed_input, enable_cache", [(test_file_paths[0], False), (test_urls[0], True), + (test_urls[0], False)]) +def test_try_parse_feed_with_beautiful_soup(feed_input: str, enable_cache: bool, mock_get_response: Mock): controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) - if is_url and not enable_cache: - with patch.object(requests.Session, 'get') as mock_get: - mock_get.return_value = mock_get_response - feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url) - else: - # When enable_cache is set to 'True', the feed content is provided as input. - feed_data = controller._try_parse_feed_with_beautiful_soup( - mock_get_response.text if enable_cache else feed_input, is_url) + # When enable_cache is set to 'True', the feed content is provided as input. + feed_data = controller._try_parse_feed_with_beautiful_soup(mock_get_response.text) assert isinstance(feed_data, feedparser.FeedParserDict) assert len(feed_data.entries) > 0 From 2af66b34d67cd5c149f6ffa9cdd552838d75bb1c Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Fri, 5 Jan 2024 13:32:40 -0500 Subject: [PATCH 4/5] Simplified rss feed parsing --- morpheus/controllers/rss_controller.py | 2 +- tests/controllers/test_rss_controller.py | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 0b7082b0ca..72e2a4a476 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 94f5ee9555..5adf8c7859 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,7 +21,6 @@ import feedparser import pandas as pd import pytest -import requests from _utils import TEST_DIRS from morpheus.controllers.rss_controller import FeedStats @@ -83,9 +82,8 @@ def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedP controller = RSSController(feed_input=feed_input) mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") - mock_get = patch.object(requests.Session, 'get') - with mock_feedparser_parse, mock_get: - mock_get.return_value = mock_get_response + + with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response): mock_feedparser_parse.return_value = mock_feed feed = list(controller.parse_feeds())[0] assert feed.entries @@ -123,9 +121,8 @@ def test_fetch_dataframes_url(feed_input: str | list[str], controller = RSSController(feed_input=feed_input) mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") - mock_get = patch.object(requests.Session, 'get') - with mock_feedparser_parse, mock_get: - mock_get.return_value = mock_get_response + + with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response): mock_feedparser_parse.return_value = mock_feed dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) From 69817716d1cfa642e7f944271cc4e6bebe6ec433 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 16 Jan 2024 08:48:42 -0800 Subject: [PATCH 5/5] Added tests --- docker/conda/environments/cuda11.8_dev.yml | 1 + morpheus/controllers/rss_controller.py | 7 +++++-- tests/controllers/test_rss_controller.py | 22 ++++++++++++++++------ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 5ee09141b0..0527e4277b 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -68,6 +68,7 @@ dependencies: - libgrpc>=1.49 - librdkafka=1.9.2 - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 + - lxml=4.9.1 - mlflow>=2.2.1,<3 - mrc=24.03 - networkx>=2.8 diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 72e2a4a476..c4c64876df 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -156,7 +156,7 @@ def _read_file_content(self, file_path: str) -> str: def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict": - soup = BeautifulSoup(feed_input, 'xml') + soup = BeautifulSoup(feed_input, 'lxml') # Verify whether the given feed has 'item' or 'entry' tags. if soup.find('item'): @@ -196,10 +196,13 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": is_url = RSSController.is_url(url) fallback = False + cache_hit = False if is_url: response = self._session.get(url, timeout=self._request_timeout) feed_input = response.text + if self._enable_cache: + cache_hit = response.from_cache else: feed_input = url @@ -217,7 +220,7 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": logger.error("Failed to parse the feed manually: %s", url) raise - logger.debug("Parsed feed: %s. Fallback: %s", url, fallback) + logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback) return feed diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index 5adf8c7859..94726c4783 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -81,10 +81,9 @@ def test_run_indefinitely_false(feed_input: list[str]): def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed) - with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response): - mock_feedparser_parse.return_value = mock_feed + with patch("requests.Session.get", return_value=mock_get_response): feed = list(controller.parse_feeds())[0] assert feed.entries @@ -120,10 +119,9 @@ def test_fetch_dataframes_url(feed_input: str | list[str], mock_get_response: Mock): controller = RSSController(feed_input=feed_input) - mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed) - with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response): - mock_feedparser_parse.return_value = mock_feed + with patch("requests.Session.get", return_value=mock_get_response): dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) assert isinstance(dataframe, pd.DataFrame) @@ -225,3 +223,15 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): with pytest.raises(ValueError): controller.get_feed_stats("http://testfeed.com") + + +@pytest.mark.parametrize("feed_input", [test_urls[0]]) +def test_redundant_fetch(feed_input: str, mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): + + controller = RSSController(feed_input=feed_input) + mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") + with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response) as mocked_session_get: + mock_feedparser_parse.return_value = mock_feed + dataframes_generator = controller.fetch_dataframes() + next(dataframes_generator, None) + assert mocked_session_get.call_count == 1