The minimal, composable library for building LLM-powered agents and pipes.
agentpipe provides a simple, intuitive, and "Pythonic" API for chaining together LLM calls, Python functions, and other operations. The design is heavily inspired by the elegance of zenllm and prioritizes a declarative approach that makes your pipes easy to read and reason about.
- Everything is a
Runnable: ARunnableis any operation in a pipe—an LLM call, a tool, or a chain of otherRunnables. - Composition with
|: The pipe operator|is used to chainRunnables together, creating a new, more powerfulRunnable. This represents a clear data flow. - Execution with
(): ARunnablepipe is executed by calling it like a function:pipe(...). - Implicit State: Intermediate results can be saved to a state dictionary using
.as_("name"), making them automatically available to later steps in the chain.
- Composable Pipes: Chain operations with the
|operator. - Parallel Execution: Run multiple
Runnables concurrently withparallel(). - Tool Integration: Seamlessly integrate Python functions as tools with the
@tooldecorator. - Dynamic Routing: Create conditional workflows with
route()to execute different paths based on input. - Implicit State Management: Effortlessly pass data between non-adjacent steps.
- Minimalist API: The entire library surface is small, expressive, and easy to learn.
pip install agentpipeNote: agentpipe uses zenllm as its default backend for making LLM calls. It is installed automatically as a dependency, but you will still need to configure it with your desired LLM provider.
Let's create a pipe that summarizes a piece of text and then translates that summary into French.
Prompts in agentpipe use {{variable}} style placeholders. This makes it easy to include literal curly braces {} in your prompts (e.g., for JSON examples) without needing to escape them.
from agentpipe import instruct
# 1. Define Runnables
# An 'instruct' Runnable represents a call to an LLM.
summarizer = instruct("Summarize this text: {{input}}")
translator = instruct("Translate this summary into French: {{input}}")
# 2. Compose them into a pipe with the pipe operator
# The output of `summarizer` is implicitly passed as the {{input}} to `translator`.
pipe = summarizer | translator
# 3. Execute the pipe by calling it like a function
text = "The sky is blue because of the way the atmosphere scatters light from the sun. This is known as Rayleigh scattering."
french_summary = pipe(text)
print(french_summary)
# Expected Output: La diffusion de Rayleigh rend le ciel bleu.Sometimes a later step in a pipe needs data from a much earlier step. The .as_() method saves a result to a named state variable, making it available to all subsequent steps.
passthrough() is a special Runnable that passes its input through unchanged. It's very useful for saving the initial input at the start of a pipe.
from agentpipe import instruct, passthrough
# 1. Define the pipe
email_pipe = (
passthrough().as_("original_request") # Save the original input to state
| instruct("Summarize this user request in one sentence: {{input}}").as_("summary")
| instruct(
"Draft a polite, professional email based on the following summary. "
"Start by referencing the original request briefly.\n\n"
"Original Request: {{original_request}}\n"
"Summary: {{summary}}"
)
)
# 2. Execute with a dictionary input. 'input' is the initial pipe_value.
request = {
"input": "Hey can you check the Q3 sales figures and see if they're higher than Q2? Also need the forecast for Q4. Thx."
}
email_draft = email_pipe(request)
print(email_draft)Run multiple Runnables at the same time on the same input. The results are collected into a dictionary.
from agentpipe import instruct, parallel
import json
# Define a parallel analysis pipe
analyzer = parallel(
sentiment=instruct("What is the sentiment of this text: {{input}}"),
entities=instruct("Extract the key entities from this text: {{input}}")
)
# Execute the pipe
analysis_result = analyzer("The sky is blue due to Rayleigh scattering.")
print(json.dumps(analysis_result, indent=2))
# Expected Output:
# {
# "sentiment": "Positive",
# "entities": "Rayleigh scattering, sky"
# }LLMs can be instructed to return structured data, but their output isn't always perfectly clean. from_xml() is a robust parser that uses regular expressions to extract text from an XML-like tag, ignoring surrounding text or formatting issues (like Markdown).
from agentpipe import instruct, from_xml
# Define a pipe that asks a question and then extracts the answer
# from a specific tag.
qa_pipe = (
instruct(
"Answer the user's question. "
"IMPORTANT: Please wrap your final, concise answer in <answer> tags. "
"For example: <answer>The answer</answer>. The user's question: {{input}}"
)
| from_xml("answer")
)
# Execute the pipe
# The LLM might return: "Of course! The answer is <answer>Rome</answer>."
# The `from_xml` runnable will cleanly extract just "Rome".
capital = qa_pipe("What is the capital of Italy?")
print(f"The capital is: {capital}")
# Expected Output: The capital is: RomeYou can make any Python function available to your LLM by decorating it with @tool. agentpipe automatically generates a JSON schema that allows the LLM to understand how and when to use your function.
When an LLM decides to call a tool, the pipe's output will be a dictionary describing the call, like {'name': 'get_current_weather', 'arguments': {'location': 'Boston, MA'}}.
A true agent needs to handle this output, execute the corresponding Python function, and feed the result back to the LLM. This is exactly what the loop() runnable is designed for.
The true power of agentpipe is unlocked when you combine tool use with a loop to create autonomous agents. The library provides a generic loop() runnable that can execute a sub-pipe repeatedly until a condition is met.
This allows you to build agents that can reason, use tools, and self-correct in a loop until they have accomplished their goal.
Let's build a simple research agent that can search the web and read pages to answer a question.
from agentpipe import instruct, tool, loop, is_tool_call, passthrough, route
import json
# 1. Define the agent's tools
@tool
def search_web(query: str) -> str:
"""Searches the web for a query and returns results."""
print(f"--- TOOL: Searching web for '{query}' ---")
# In a real app, this would call a search API. We'll return mock data.
return json.dumps([
{"url": "https://example.com/alphafold_report", "title": "AlphaFold 2023 Report"},
{"url": "https://example.com/news", "title": "Latest AI News"},
])
@tool
def read_page(url: str) -> str:
"""Reads the content of a URL and returns the text."""
print(f"--- TOOL: Reading page '{url}' ---")
if "alphafold" in url:
return "The 2023 AlphaFold report detailed new models for protein-ligand interactions."
return "This page could not be read."
# 2. is_tool_call() is a built-in condition that checks if the LLM's
# last response was a request to call a tool.
#
# 3. Define the "body" of the loop. This pipe is executed on each turn
# where the agent decides to use a tool.
agent_turn = (
passthrough().as_("tool_call") # Save the tool call request
# Route to the correct tool based on its 'name'
| route(
classifier=lambda call: call['name'], # Simple lambda classifier
paths={"search_web": search_web, "read_page": read_page}
).as_("tool_output")
# After running the tool, feed the result back to the LLM and ask for the next step.
| instruct(
"You are a research assistant. You just ran the tool '{{tool_call.name}}' "
"and got this result: {{tool_output}}\n\n"
"What is your next step? Either call another tool, or if you have enough "
"information, provide the final answer to the original question: {{original_question}}"
)
)
# 4. Define the complete agent pipe
research_agent = (
passthrough().as_("original_question")
# Start with an initial prompt to the LLM
| instruct(
"Answer the following question. You can use tools to search the web and read pages. "
"Question: {{input}}",
tools=[search_web, read_page]
)
# Loop as long as the LLM wants to call tools
| loop(
condition=is_tool_call(),
body=agent_turn,
max_iterations=5 # Safety first!
)
)
# 5. Execute the agent
question = "What was the 2023 AlphaFold report about?"
final_answer = research_agent(question)
print(f"\n✅ Final Answer:\n{final_answer}")agentpipe's built-in Runnables like instruct, passthrough, and route are powerful, but the true strength of the library lies in its extensibility. You can create your own Runnable to perform any custom logic, such as complex state manipulation.
Let's extend our research agent to have a memory. We want it to remember every tool it has used and the results it got. This will help it avoid repeating work and allow it to synthesize information from multiple sources.
To do this, we'll create a custom Runnable that updates a research_log in the state after each tool call. Any class that inherits from Runnable and implements the _invoke method can be used in a pipe.
# In your application, you would add these imports at the top of your file
from agentpipe.runnable import Runnable
from agentpipe.context import PipeContext
# 1. Define a custom Runnable to manage state
class UpdateResearchLog(Runnable):
"""
A custom Runnable that appends the last tool call and its result
to a 'research_log' list in the state.
"""
def __repr__(self):
return "update_research_log()"
def _invoke(self, context: PipeContext) -> PipeContext:
# Get the previous log, defaulting to an empty list
research_log = context.state.get("research_log", [])
# Get info about the turn from the state
tool_call = context.state.get("tool_call", {})
tool_output = str(context.state.get("tool_output", ""))
# Format the new entry for the list
log_entry = {
"tool_name": tool_call.get('name'),
"arguments": tool_call.get('arguments'),
"output": tool_output
}
# Use an immutable pattern: create a new list instead of mutating in-place
new_log = research_log + [log_entry]
context.state["research_log"] = new_log
return contextNow, we can integrate this UpdateResearchLog into our agent. Thanks to the new Jinja2-powered templating, we can loop over the research_log list directly in our prompt.
# 2. Update the agent to use the new Runnable and memory
# Create an instance of our custom runnable
update_log = UpdateResearchLog()
# The new prompt uses a Jinja2 for-loop to display the research log.
memory_prompt = instruct(
"""You are a research assistant. Here is your research log so far:
{% for entry in research_log %}
Step {{ loop.index }}: Called tool `{{ entry.tool_name }}` with arguments `{{ entry.arguments }}`.
Result: {{ entry.output }}
{% endfor %}
You just ran the tool '{{tool_call.name}}' and got this result: {{tool_output}}
Based on your log and this new result, what is your next step? Either call another tool, or if you have enough information, provide the final answer to the original question: {{original_question}}"""
)
# We modify the original `agent_turn` to add our custom logging step
agent_turn_with_memory = (
passthrough().as_("tool_call")
| route(
classifier=lambda call: call['name'],
paths={"search_web": search_web, "read_page": read_page}
).as_("tool_output")
| update_log # <-- Our custom runnable is added here!
| memory_prompt # <-- The prompt that now uses the memory
)
# 3. Define the complete agent pipe using the new loop body
research_agent_with_memory = (
passthrough().as_("original_question")
| instruct(
"Answer the following question. You can use tools to search the web and read pages. "
"Question: {{input}}",
tools=[search_web, read_page]
)
| loop(
condition=is_tool_call(),
body=agent_turn_with_memory, # <-- Use the new turn definition
max_iterations=5
)
)
# 4. Execute the agent, providing an initial empty log
question = "What was the 2023 AlphaFold report about?"
# The initial state must include the `input` and an empty `research_log`.
initial_state = {
"input": question,
"research_log": []
}
final_answer = research_agent_with_memory(initial_state)
print(f"\n✅ Final Answer:\n{final_answer}")Create agent-like behavior by routing input to different pipes based on an initial classification.
from agentpipe import instruct, route
# 1. Define the possible paths the pipe can take
paths = {
"translation": instruct("Translate to French: {{input}}"),
"summary": instruct("Summarize this text: {{input}}"),
"other": instruct("I'm not sure how to help. Rephrase this: {{input}}"),
}
# 2. Define a classifier to choose the path
classifier = instruct(
"Does the user want a 'translation' or a 'summary'? "
"Classify the following request, responding with only one word: {{input}}"
)
# 3. Create the router
router = route(classifier, paths)
# 4. Execute with an input that should trigger the translation path
result = router("Your wish is my command.")
print(result)
# Expected Output: Votre souhait est mon ordre.Every pipe execution returns a special PipeResult object. This object behaves just like the final result (e.g., you can print() it directly), but it also contains the full history of the run, which is invaluable for debugging.
from agentpipe import instruct, from_xml
# Define a simple pipe
pipe = instruct("Translate '{{input}}' to Spanish. Answer in an <answer> tag.") | from_xml("answer")
# Execute the pipe
result = pipe("hello")
# The result object prints like a string
print(f"Final answer: {result}")
# But you can also inspect the history
result.print_history()
# Expected Output:
#
# Final answer: hola
#
# --- Pipe Execution History ---
#
# [Step 1: instruct('Translate \'{{input}}\' to Spanish. Answer in an <answer> tag.')]
# Input: 'hello'
# Output: '<answer>hola</answer>'
#
# [Step 2: from_xml('answer')]
# Input: '<answer>hola</answer>'
# Output: 'hola'
#
# --- End of History ---Important: While the PipeResult object prints like the final output, it is not the output itself. To use the result in your application logic (e.g., passing it to another function or saving it to a database), you should always access the data directly via the .final_output attribute:
# Correct way to access the final data for further processing
final_data = result.final_output
if isinstance(final_data, str):
print(final_data.upper())agentpipe uses Python's standard logging module to provide detailed, real-time insights into pipe execution. By default, these logs are silent. To enable them, simply configure the root logger in your application. This is especially useful for debugging complex chains or seeing the exact prompts sent to the LLM.
import logging
from agentpipe import instruct, passthrough
# The application developer configures logging for their entire app.
# They can choose the level, format, and where to send the logs.
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG to see all agentpipe step-by-step logs
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Define a pipe
pipe = (
passthrough().as_("original")
| instruct("Summarize this: {{input}}").as_("summary")
)
# Run it
result = pipe("The quick brown fox jumps over the lazy dog.")
# With logging configured, your console output would look similar to this:
#
# YYYY-MM-DD HH:MM:SS,123 - agentpipe.runnable - INFO - Starting pipe execution with: passthrough().as_('original') | instruct('Summarize this: {{input}}').as_('summary')
# YYYY-MM-DD HH:MM:SS,123 - agentpipe.runnable - DEBUG - Executing step in Pipe: passthrough().as_('original')
# YYYY-MM-DD HH:MM:SS,124 - agentpipe.runnable - DEBUG - Saving output to state key 'original'.
# YYYY-MM-DD HH:MM:SS,124 - agentpipe.runnable - DEBUG - Executing step in Pipe: instruct('Summarize this: {{input}}').as_('summary')
# YYYY-MM-DD HH:MM:SS,125 - agentpipe.tasks - DEBUG - Formatted prompt sent to LLM: Summarize this: The quick brown fox jumps over the lazy dog.
# YYYY-MM-DD HH:MM:SS,500 - agentpipe.tasks - INFO - Received response from LLM.
# YYYY-MM-DD HH:MM:SS,501 - agentpipe.runnable - DEBUG - Saving output to state key 'summary'.
# YYYY-MM-DD HH:MM:SS,502 - agentpipe.runnable - INFO - Pipe execution finished.Contributions are welcome! If you'd like to contribute, please feel free to fork the repository, make your changes, and submit a pull request.
This project is licensed under the MIT License. See the LICENSE file for details.
Authored by Koen van Eijk ([email protected]).