Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate Redundant Fetches in RSS Controller #1442

1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ dependencies:
- libgrpc>=1.49
- librdkafka=1.9.2
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- lxml=4.9.1
- mlflow>=2.2.1,<3
- mrc=24.03
- networkx>=2.8
Expand Down
78 changes: 33 additions & 45 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,31 +102,33 @@ def __init__(self,
run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input)

self._run_indefinitely = run_indefinitely
self._enable_cache = enable_cache

self._session = None
if enable_cache:
self._session = requests_cache.CachedSession(os.path.join(cache_dir, "RSSController.sqlite"),
backend="sqlite")
else:
self._session = requests.session()

self._session.headers.update({
"User-Agent":
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
})

self._feed_stats_dict = {
input:
url:
FeedStats(failure_count=0, success_count=0, last_failure=-1, last_success=-1, last_try_result="Unknown")
for input in self._feed_input
for url in self._feed_input
}

@property
def run_indefinitely(self):
"""Property that determines to run the source indefinitely"""
return self._run_indefinitely

@property
def session_exist(self) -> bool:
"""Property that indicates the existence of a session."""
return bool(self._session)

def get_feed_stats(self, feed_url: str) -> FeedStats:
"""
Get feed input stats.
Get feed url stats.

Parameters
----------
Expand All @@ -141,30 +143,20 @@ def get_feed_stats(self, feed_url: str) -> FeedStats:
Raises
------
ValueError
If the feed URL is not found in the feed input provided to the constructor.
If the feed URL is not found in the feed url provided to the constructor.
"""
if feed_url not in self._feed_stats_dict:
raise ValueError("The feed URL is not part of the feed input provided to the constructor.")
raise ValueError("The feed URL is not part of the feed url provided to the constructor.")

return self._feed_stats_dict[feed_url]

def _get_response_text(self, url: str) -> str:
if self.session_exist:
response = self._session.get(url)
else:
response = requests.get(url, timeout=self._request_timeout)

return response.text

def _read_file_content(self, file_path: str) -> str:
with open(file_path, 'r', encoding="utf-8") as file:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)
def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict":

soup = BeautifulSoup(feed_input, 'xml')
soup = BeautifulSoup(feed_input, 'lxml')

# Verify whether the given feed has 'item' or 'entry' tags.
if soup.find('item'):
Expand Down Expand Up @@ -205,32 +197,28 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":

fallback = False
cache_hit = False
is_url_with_session = is_url and self.session_exist

if is_url_with_session:
response = self._session.get(url)
cache_hit = response.from_cache
if is_url:
response = self._session.get(url, timeout=self._request_timeout)
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
feed_input = response.text
if self._enable_cache:
cache_hit = response.from_cache
else:
feed_input = url

feed = feedparser.parse(feed_input)

if feed["bozo"]:
cache_hit = False

if is_url_with_session:
fallback = True
logger.info("Failed to parse feed: %s. Trying to parse using feedparser directly.", url)
feed = feedparser.parse(url)

if feed["bozo"]:
try:
logger.info("Failed to parse feed: %s, %s. Try parsing feed manually", url, feed['bozo_exception'])
feed = self._try_parse_feed_with_beautiful_soup(url, is_url)
except Exception:
logger.error("Failed to parse the feed manually: %s", url)
raise
fallback = True
try:
if not is_url:
# Read file content
feed_input = self._read_file_content(feed_input)
# Parse feed content with beautifulsoup
feed = self._try_parse_feed_with_beautiful_soup(feed_input)
except Exception:
logger.error("Failed to parse the feed manually: %s", url)
raise

logger.debug("Parsed feed: %s. Cache hit: %s. Fallback: %s", url, cache_hit, fallback)

Expand Down Expand Up @@ -312,17 +300,17 @@ def fetch_dataframes(self):
@classmethod
def is_url(cls, feed_input: str) -> bool:
"""
Check if the provided input is a valid URL.
Check if the provided url is a valid URL.

Parameters
----------
feed_input : str
The input string to be checked.
The url string to be checked.

Returns
-------
bool
True if the input is a valid URL, False otherwise.
True if the url is a valid URL, False otherwise.
"""
try:
parsed_url = urlparse(feed_input)
Expand Down
62 changes: 29 additions & 33 deletions tests/controllers/test_rss_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -78,10 +78,12 @@ def test_run_indefinitely_false(feed_input: list[str]):


@pytest.mark.parametrize("feed_input", test_urls)
def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict):
def test_parse_feed_valid_url(feed_input: list[str], mock_feed: feedparser.FeedParserDict, mock_get_response: Mock):
controller = RSSController(feed_input=feed_input)
with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse:
mock_feedparser_parse.return_value = mock_feed

patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed)

with patch("requests.Session.get", return_value=mock_get_response):
feed = list(controller.parse_feeds())[0]
assert feed.entries

Expand Down Expand Up @@ -112,11 +114,14 @@ def test_is_url_false(feed_input: list[str]):


@pytest.mark.parametrize("feed_input", [test_urls, test_urls[0]])
def test_fetch_dataframes_url(feed_input: str | list[str], mock_feed: feedparser.FeedParserDict):
def test_fetch_dataframes_url(feed_input: str | list[str],
mock_feed: feedparser.FeedParserDict,
mock_get_response: Mock):
controller = RSSController(feed_input=feed_input)

with patch("morpheus.controllers.rss_controller.feedparser.parse") as mock_feedparser_parse:
mock_feedparser_parse.return_value = mock_feed
patch("morpheus.controllers.rss_controller.feedparser.parse", return_value=mock_feed)

with patch("requests.Session.get", return_value=mock_get_response):
dataframes_generator = controller.fetch_dataframes()
dataframe = next(dataframes_generator, None)
assert isinstance(dataframe, pd.DataFrame)
Expand All @@ -142,26 +147,15 @@ def test_batch_size(feed_input: list[str], batch_size: int):
assert len(df) <= batch_size


@pytest.mark.parametrize("feed_input, is_url, enable_cache", [(test_file_paths[0], False, False),
(test_urls[0], True, True), (test_urls[0], True, False)])
def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enable_cache: bool, mock_get_response: Mock):
@pytest.mark.parametrize("feed_input, enable_cache", [(test_file_paths[0], False), (test_urls[0], True),
(test_urls[0], False)])
def test_try_parse_feed_with_beautiful_soup(feed_input: str, enable_cache: bool, mock_get_response: Mock):
controller = RSSController(feed_input=feed_input, enable_cache=enable_cache)

if is_url:
if enable_cache:
with patch("morpheus.controllers.rss_controller.requests_cache.CachedSession.get") as mock_get:
mock_get.return_value = mock_get_response
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)
else:
with patch("morpheus.controllers.rss_controller.requests.get") as mock_get:
mock_get.return_value = mock_get_response
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)

else:
feed_data = controller._try_parse_feed_with_beautiful_soup(feed_input, is_url)
# When enable_cache is set to 'True', the feed content is provided as input.
feed_data = controller._try_parse_feed_with_beautiful_soup(mock_get_response.text)

assert isinstance(feed_data, feedparser.FeedParserDict)

assert len(feed_data.entries) > 0

for entry in feed_data.entries:
Expand All @@ -180,16 +174,6 @@ def test_try_parse_feed_with_beautiful_soup(feed_input: str, is_url: bool, enabl
assert isinstance(feed_data["entries"], list)


@pytest.mark.parametrize("enable_cache", [True, False])
def test_enable_disable_cache(enable_cache):
controller = RSSController(feed_input=test_urls, enable_cache=enable_cache)

if enable_cache:
assert controller.session_exist
else:
assert not controller.session_exist


bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
def test_parse_feeds(mock_feed: feedparser.FeedParserDict):
feed_input = test_urls[0]
cooldown_interval = 620
Expand Down Expand Up @@ -239,3 +223,15 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict):

with pytest.raises(ValueError):
controller.get_feed_stats("http://testfeed.com")


@pytest.mark.parametrize("feed_input", [test_urls[0]])
def test_redundant_fetch(feed_input: str, mock_feed: feedparser.FeedParserDict, mock_get_response: Mock):

controller = RSSController(feed_input=feed_input)
mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse")
with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response) as mocked_session_get:
mock_feedparser_parse.return_value = mock_feed
dataframes_generator = controller.fetch_dataframes()
next(dataframes_generator, None)
assert mocked_session_get.call_count == 1