๐ฏ What You'll Learn Today
LangGraph Tutorial: Result Aggregation Patterns - Unit 2.3 Exercise 5
This tutorial is also available in Google Colab here or for download here
Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.
This tutorial demonstrates how to implement efficient result aggregation in LangGraph using fan-in patterns, binary operators, and appropriate message types. You'll learn how to combine results from multiple tools and create meaningful summaries.
Key Concepts Covered
- Binary Operator Aggregation
- Message Type Management
- State Aggregation Patterns
- Fan-in Implementation
- Result Summarization
from typing import Annotated, Any, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, ToolMessage
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph.message import add_messages
Step 1: Dictionary Reducer Implementation
Implement the core reducer function for combining dictionaries.
Why This Matters
Dictionary reduction is fundamental because
- Enables scalable result combination
- Maintains data consistency
- Handles partial results
- Supports incremental updates
Debug Tips
-
Dictionary Merging:
- Check for None handling
- Verify key conflicts
- Monitor memory usage
-
Common Issues:
- KeyError: Missing dictionary keys
- TypeError: Invalid dictionary types
- Memory: Large dictionary merges
def dict_reducer(a: dict, b: dict | None) -> dict:
"""Combine two dictionaries for result aggregation.
Args:
a: First dictionary
b: Second dictionary (may be None)
Returns:
Combined dictionary with values from both inputs
Examples:
>>> dict_reducer({'a': 1}, {'b': 2})
{'a': 1, 'b': 2}
>>> dict_reducer({'a': 1}, None)
{'a': 1}
"""
if b is None:
return a
return {**a, **b}
Step 2: State Definition with Aggregation
Define the state structure supporting result aggregation.
Why This Matters
Aggregation-aware state is crucial because:
- Enables parallel processing
- Supports incremental updates
- Maintains consistency
- Facilitates error handling
Debug Tips
-
State Structure:
- Verify annotations
- Check operator configuration
- Monitor state size
-
Common Problems:
- TypeError: Invalid annotations
- ValueError: Incorrect operator setup
- AttributeError: Missing state fields
class State(TypedDict):
"""State with aggregation support.
Attributes:
messages: Conversation history
pending_tools: List of tools to execute
results: Tool execution results with aggregation
errors: Error messages with aggregation
"""
messages: Annotated[list[BaseMessage], add_messages]
pending_tools: list[dict]
results: Annotated[
dict[str, Any], BinaryOperatorAggregate(dict[str, Any], dict_reducer)
]
errors: Annotated[
dict[str, str], BinaryOperatorAggregate(dict[str, str], dict_reducer)
]
Step 3: Result Aggregator Implementation
Implement the main aggregation logic with message type handling.
Why This Matters
Result aggregation is essential because
- Creates coherent summaries
- Maintains message type consistency
- Enables structured responses
- Facilitates error reporting
Debug Tips
-
Message Creation:
- Verify message types
- Check content formatting
- Monitor message order
-
Common Issues:
- TypeError: Invalid message types
- ValueError: Malformed content
- IndexError: Message list access
def result_aggregator(state: State) -> State:
"""Aggregate results from parallel tool execution.
Creates appropriate message types for tool results and assistant responses.
Args:
state: Current state with results to aggregate
Returns:
Updated state with aggregated messages
"""
messages = list(state["messages"])
# Process successful results
if state["results"]:
# First add tool messages for each result
for tool_id, result in state["results"].items():
tool_message = ToolMessage(
content=str(result),
tool_call_id=tool_id,
name=tool_id.split("_")[0], # Extract tool name from ID
)
messages.append(tool_message)
# Add assistant summary of results
summary = "Here are the tool results:\n" + "\n".join(
f"- {tool_id}: {result}" for tool_id, result in state["results"].items()
)
messages.append(AIMessage(content=summary))
# Process errors if any
if state["errors"]:
error_summary = "Some tools encountered errors:\n" + "\n".join(
f"- {tool_id}: {error}" for tool_id, error in state["errors"].items()
)
messages.append(AIMessage(content=error_summary))
return {
"messages": messages,
"pending_tools": [],
"results": state["results"],
"errors": state["errors"],
}
Step 4: Demonstration Implementation
Example usage showing aggregation in action.
Why This Matters
Demonstration code is valuable because
- Shows practical usage patterns
- Illustrates message flow
- Demonstrates error handling
- Provides testing scenarios
Debug Tips
-
Demo Execution:
- Monitor message creation
- Verify result formatting
- Check error handling
-
Common Problems:
- RuntimeError: Async execution
- ValueError: Invalid state
- TypeError: Message formatting
async def demonstrate_aggregation():
"""Demonstrate result aggregation with proper message types."""
# Initial state with some results and errors
state = {
"messages": [],
"pending_tools": [],
"results": {"calculator_1": "2 + 2 = 4", "weather_1": "22ยฐC, Sunny"},
"errors": {"search_1": "API rate limit exceeded"},
}
print("Result Aggregation Demo")
print("=" * 50)
result = result_aggregator(state)
print("\nFinal Messages:")
for msg in result["messages"]:
if isinstance(msg, ToolMessage):
print(f"\nTool ({msg.name}): {msg.content}")
elif isinstance(msg, AIMessage):
print(f"\nAssistant: {msg.content}")
Common Pitfalls
-
Incorrect Message Type Usage
- Using wrong message classes
- Missing required fields
- Improper type conversion
-
Poor Error Aggregation
- Missing error context
- Inconsistent error formats
- Lost error details
-
State Management Issues
- Inconsistent state updates
- Missing state validation
- State corruption
-
Result Format Inconsistencies
- Mixed data types
- Inconsistent string formats
- Missing type conversion
Key Takeaways
-
Binary Operator Pattern
- Enables scalable aggregation
- Maintains consistency
- Supports partial results
-
Message Type Importance
- Ensures proper message flow
- Maintains context
- Enables structured responses
-
Error Handling Strategy
- Comprehensive error capture
- Structured error reporting
- Clear error communication
Next Steps
-
Enhanced Result Formatting
- Add custom formatters
- Implement type-specific handling
- Create template system
-
Advanced Error Management
- Add error categorization
- Implement recovery strategies
- Create error hierarchies
-
Custom Aggregation Patterns
- Create specialized reducers
- Implement filters
- Add transformation pipelines
Expected Output
Result Aggregation Demo
Final Messages
Tool (calculator): 2 + 2 = 4
Tool (weather): 22ยฐC, Sunny
if __name__ == "__main__":
import asyncio
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demonstrate_aggregation())
๐ฌ๐ง Chapter