Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file handler to capture langchain agent logging #1515

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/llm/agents/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def _build_agent_executor(model_name: str) -> AgentExecutor:

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)
agent_executor = initialize_agent(tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=logger.isEnabledFor(logging.DEBUG))

return agent_executor

Expand Down
21 changes: 19 additions & 2 deletions morpheus/llm/nodes/langchain_agent_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

from morpheus.llm import LLMContext
from morpheus.llm import LLMNodeBase
from morpheus.utils.logger import get_llm_agent_logger

logger = logging.getLogger(__name__)
agent_logger = get_llm_agent_logger()

if typing.TYPE_CHECKING:
from langchain.agents import AgentExecutor
Expand Down Expand Up @@ -66,9 +68,24 @@ async def _run_single(self, **kwargs: dict[str, typing.Any]) -> dict[str, typing
return results

# We are not dealing with a list, so run single
return await self._agent_executor.arun(**kwargs)
output = []

async def execute(self, context: LLMContext) -> LLMContext:
# langchain 0.19 doesn't have the astream method, newer versions do
astream = getattr(self._agent_executor, "astream", None)
if astream is None:
return await self._agent_executor.arun(**kwargs)

async for chunk in astream(**kwargs):
try:
output.append(chunk["output"])
except KeyError:
pass

agent_logger.debug(" ".join(f"{k}: {v}" for (k, v) in chunk.items()))

return output

async def execute(self, context: LLMContext) -> LLMContext: # pylint: disable=invalid-overridden-method

input_dict = context.get_inputs()

Expand Down
42 changes: 38 additions & 4 deletions morpheus/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from tqdm import tqdm

LogLevels = Enum('LogLevels', logging._nameToLevel)
LLM_AGENT_LOG_NAME = "morpheus:llmagent"


class TqdmLoggingHandler(logging.Handler):
Expand Down Expand Up @@ -125,16 +126,23 @@ def _configure_from_log_level(log_level: int):
# handler
morpheus_logger.addHandler(morpheus_queue_handler)

llm_agent_logger = get_llm_agent_logger()

# This logger will allways log at the debug level, don't propagate upstream
llm_agent_logger.propagate = False
llm_agent_logger.setLevel(logging.DEBUG)

log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log")
llm_agent_log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "agent.log")

# Ensure the log directory exists
os.makedirs(os.path.dirname(log_file), exist_ok=True)

# Now we build all of the handlers for the queue listener
file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}'))
file_handler = _get_file_handler(log_file)
llm_agent_file_handler = _get_file_handler(llm_agent_log_file)
llm_agent_file_handler.addFilter(logging.Filter(name=LLM_AGENT_LOG_NAME))
llm_agent_logger.addHandler(llm_agent_file_handler)
Comment on lines +143 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fyi - This PR updates logger so that you can pass in additional handlers.


# Tqdm stream handler (avoids messing with progress bars)
console_handler = TqdmLoggingHandler()
Expand All @@ -143,6 +151,7 @@ def _configure_from_log_level(log_level: int):
queue_listener = logging.handlers.QueueListener(morpheus_logging_queue,
console_handler,
file_handler,
llm_agent_file_handler,
respect_handler_level=True)
queue_listener.start()
queue_listener._thread.name = "Logging Thread"
Expand Down Expand Up @@ -225,3 +234,28 @@ def deprecated_message_warning(logger, cls, new_cls):
("The '%s' message has been deprecated and will be removed in a future version. Please use '%s' instead."),
cls.__name__,
new_cls.__name__)


def get_llm_agent_logger() -> logging.Logger:
"""
Get the logger for the LangChain agent. This logger is separate from the main Morpheus logger and is used to
separate the agent logs from the rest of the morpheus logs.

Returns
-------
logging.Logger
The logger for the LangChain agent
"""
return logging.getLogger(LLM_AGENT_LOG_NAME)


def _get_file_handler(
log_file: str,
backup_count: int = 5,
max_bytes: int = 1000000,
level: int = logging.DEBUG,
fmt: str = '%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}') -> logging.Handler:
handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=backup_count, maxBytes=max_bytes)
handler.setLevel(level)
handler.setFormatter(logging.Formatter(fmt))
return handler
29 changes: 29 additions & 0 deletions tests/_utils/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import typing
from unittest import mock

from morpheus.llm import InputMap
from morpheus.llm import LLMContext
Expand Down Expand Up @@ -73,3 +74,31 @@ def execute_task_handler(task_handler: LLMTaskHandler,
message = asyncio.run(task_handler.try_handle(context))

return message


def mock_langchain_agent_executor(return_values: list = None) -> tuple[mock.MagicMock, bool]:
from langchain.agents import AgentExecutor

# Create a mock AgentExecutor with the original class as a spec, which ensures that the mocked object will not
# return any false positives from calling hasattr & getattr.
mock_agent_ex = mock.MagicMock(AgentExecutor)
mock_agent_ex.return_value = mock_agent_ex
mock_agent_ex.input_keys = ["prompt"]
mock_agent_ex.arun = mock.AsyncMock()

if return_values is None:
return_values = []

mock_agent_ex.arun.return_value = return_values

has_astream = hasattr(mock_agent_ex, "astream")

if has_astream: # astream is a newer method in langchain

async def async_iter(*args, **kwargs): # pylint: disable=unused-argument
for i in return_values:
yield {"output": i}

mock_agent_ex.astream.side_effect = async_iter

return (mock_agent_ex, has_astream)
16 changes: 14 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ def nemollm_fixture(fail_missing: bool):
"""
skip_reason = ("Tests for the NeMoLLMService require the nemollm package to be installed, to install this run:\n"
"`conda env update --solver=libmamba -n morpheus "
"--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune`")
"--file conda/environments/all_cuda-121_arch-x86_64.yaml --prune`")
yield import_or_skip("nemollm", reason=skip_reason, fail_missing=fail_missing)


Expand All @@ -1044,10 +1044,22 @@ def openai_fixture(fail_missing: bool):
"""
skip_reason = ("Tests for the OpenAIChatService require the openai package to be installed, to install this run:\n"
"`conda env update --solver=libmamba -n morpheus "
"--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune`")
"--file conda/environments/all_cuda-121_arch-x86_64.yaml --prune`")
yield import_or_skip("openai", reason=skip_reason, fail_missing=fail_missing)


@pytest.fixture(name="langchain", scope='session')
def langchain_fixture(fail_missing: bool):
"""
Fixture to ensure langchain is installed
"""

skip_reason = ("Tests for langchain nodes require the langchain package to be installed, to install this run:\n"
"`conda env update --solver=libmamba -n morpheus "
"--file conda/environments/all_cuda-121_arch-x86_64.yaml --prune`")
yield import_or_skip("langchain", reason=skip_reason, fail_missing=fail_missing)


@pytest.mark.usefixtures("openai")
@pytest.fixture(name="mock_chat_completion")
def mock_chat_completion_fixture():
Expand Down
9 changes: 2 additions & 7 deletions tests/examples/llm/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import pytest

from _utils import TEST_DIRS
from _utils import import_or_skip


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -53,12 +52,8 @@ def import_content_extractor_module(restore_sys_path): # pylint: disable=unused


@pytest.fixture(name="langchain", autouse=True, scope='session')
def langchain_fixture(fail_missing: bool):
def langchain_fixture(langchain):
"""
All the tests in this subdir require langchain
"""

skip_reason = ("Tests for the WebScraperStage require the langchain package to be installed, to install this run:\n"
"`conda env update --solver=libmamba -n morpheus "
"--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune`")
yield import_or_skip("langchain", reason=skip_reason, fail_missing=fail_missing)
yield langchain
10 changes: 4 additions & 6 deletions tests/llm/nodes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ def mock_llm_client_fixture():


@pytest.fixture(name="mock_agent_executor")
def mock_agent_executor_fixture():
mock_agent_ex = mock.MagicMock()
mock_agent_ex.return_value = mock_agent_ex
mock_agent_ex.input_keys = ["prompt"]
mock_agent_ex.arun = mock.AsyncMock()
return mock_agent_ex
def mock_agent_executor_fixture(langchain): # pylint: disable=unused-argument
from _utils.llm import mock_langchain_agent_executor

return mock_langchain_agent_executor()[0]
17 changes: 15 additions & 2 deletions tests/llm/nodes/test_langchain_agent_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@
import pytest

from _utils.llm import execute_node
from _utils.llm import mock_langchain_agent_executor
from morpheus.llm import LLMNodeBase
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode


@pytest.fixture(name="langchain", autouse=True, scope='session')
def langchain_fixture(langchain):
"""
All the tests in this module require langchain
"""
yield langchain


def test_constructor(mock_agent_executor: mock.MagicMock):
node = LangChainAgentNode(agent_executor=mock_agent_executor)
assert isinstance(node, LLMNodeBase)
Expand Down Expand Up @@ -50,8 +59,12 @@ def test_execute(
expected_output: list,
expected_calls: list[mock.call],
):
mock_agent_executor.arun.return_value = arun_return
(mock_agent_executor, has_astream) = mock_langchain_agent_executor(arun_return)

node = LangChainAgentNode(agent_executor=mock_agent_executor)
assert execute_node(node, **values) == expected_output
mock_agent_executor.arun.assert_has_calls(expected_calls)

if has_astream:
mock_agent_executor.astream.assert_has_calls(expected_calls)
else:
mock_agent_executor.arun.assert_has_calls(expected_calls)
25 changes: 20 additions & 5 deletions tests/llm/nodes/test_langchain_agent_node_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

from unittest import mock

import pytest

from _utils import assert_results
from _utils.dataset_manager import DatasetManager
from _utils.llm import mock_langchain_agent_executor
from morpheus.config import Config
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
Expand All @@ -30,20 +33,28 @@
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage


def _build_engine(mock_agent_executor: mock.MagicMock) -> LLMEngine:
@pytest.fixture(name="langchain", autouse=True, scope='session')
def langchain_fixture(langchain):
"""
All the tests in this module require langchain
"""
yield langchain


def _build_engine(agent_executor: mock.MagicMock) -> LLMEngine:
engine = LLMEngine()
engine.add_node("extracter", node=ExtracterNode())
engine.add_node("chain", inputs=["/extracter"], node=LangChainAgentNode(agent_executor=mock_agent_executor))
engine.add_node("chain", inputs=["/extracter"], node=LangChainAgentNode(agent_executor=agent_executor))
engine.add_task_handler(inputs=["/chain"], handler=SimpleTaskHandler())

return engine


def test_pipeline(config: Config, dataset_cudf: DatasetManager, mock_agent_executor: mock.MagicMock):
def test_pipeline(config: Config, dataset_cudf: DatasetManager):
(mock_agent_executor, has_astream) = mock_langchain_agent_executor(['frogs'])
input_df = dataset_cudf["filter_probs.csv"]
expected_df = input_df.copy(deep=True)

mock_agent_executor.arun.return_value = 'frogs'
expected_df['response'] = 'frogs'
expected_calls = [mock.call(prompt=x) for x in expected_df['v3'].values_host]

Expand All @@ -59,4 +70,8 @@ def test_pipeline(config: Config, dataset_cudf: DatasetManager, mock_agent_execu
pipe.run()

assert_results(sink.get_results())
mock_agent_executor.arun.assert_has_calls(expected_calls)

if has_astream:
mock_agent_executor.astream.assert_has_calls(expected_calls)
else:
mock_agent_executor.arun.assert_has_calls(expected_calls)
Loading