๐ฏ What You'll Learn Today
LangGraph Tutorial: Complete System Integration - Unit 2.3 Exercise 10
This tutorial is also available in Google Colab here or for download here
Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.
This tutorial demonstrates how to build a complete multi-tool system in LangGraph, integrating parallel execution, state management, and error handling into a cohesive system. Learn how to create production-ready LangGraph applications with comprehensive monitoring and control.
Key Concepts Covered
- System Architecture
- Tool Integration
- State Management
- Error Handling
- Performance Monitoring
import asyncio
import time
from typing import Annotated, Any, Literal, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
Step 1: Core Components Implementation
Implement fundamental system components.
Why This Matters
Core components are crucial because
- Provides system foundation
- Enables data consistency
- Supports tool integration
- Facilitates monitoring
Debug Tips
-
Component Structure:
- Verify type annotations
- Check reducer logic
- Monitor state management
-
Common Issues:
- Type mismatches
- Missing annotations
- State inconsistencies
def dict_reducer(a: dict[str, Any], b: dict[str, Any] | None) -> dict[str, Any]:
"""Combine dictionaries for result aggregation.
Args:
a: Primary dictionary
b: Secondary dictionary (may be None)
Returns:
Combined dictionary with values from both inputs
Examples:
>>> dict_reducer({'a': 1}, {'b': 2})
{'b': 2, 'a': 1}
"""
if b is None:
return a
return {**b, **a}
class State(TypedDict):
"""Complete system state.
Attributes:
messages: Conversation history
pending_tools: Tools awaiting execution
results: Aggregated tool results
errors: Error messages
stats: Execution statistics
"""
messages: Annotated[list[BaseMessage], add_messages]
pending_tools: list[dict[str, Any]]
results: Annotated[
dict[str, Any], BinaryOperatorAggregate(dict[str, Any], dict_reducer)
]
errors: Annotated[
dict[str, str], BinaryOperatorAggregate(dict[str, str], dict_reducer)
]
stats: dict[str, Any]
Step 2: Tool Implementation
Implement system tools with error handling.
Why This Matters
Tool implementation is essential because
- Provides core functionality
- Handles domain logic
- Manages errors
- Ensures reliability
Debug Tips
-
Tool Logic:
- Check error handling
- Verify return types
- Monitor performance
-
Common Problems:
- Unhandled exceptions
- Invalid inputs
- Resource leaks
@tool
async def calculator(expression: str) -> str:
"""Calculate mathematical expressions.
Args:
expression: Math expression to evaluate
Returns:
Formatted result string
Raises:
ValueError: If calculation fails
"""
try:
import numexpr
result = numexpr.evaluate(expression.strip()).item()
return f"{expression} = {result}"
except Exception as e:
raise ValueError(f"Calculation error: {e!s}")
@tool
async def weather_tool(location: str) -> str:
"""Get simulated weather data.
Args:
location: Location to get weather for
Returns:
Simulated weather string
"""
await asyncio.sleep(0.1)
return f"Weather in {location}: 22ยฐC, Sunny"
Step 3: Parallel Execution Implementation
Implement core execution logic with monitoring.
Why This Matters
Parallel execution is crucial because
- Maximizes throughput
- Optimizes performance
- Manages resources
- Enables monitoring
Debug Tips
-
Execution Logic:
- Monitor task creation
- Check error handling
- Verify performance
-
Common Issues:
- Resource exhaustion
- Task failures
- Memory leaks
async def parallel_executor(state: State) -> State:
"""Execute tools in parallel with monitoring.
Args:
state: Current system state
Returns:
Updated state with results and stats
"""
if not state.get("pending_tools"):
return state
start_time = time.time()
tasks = []
tools = {"calculator": calculator, "weather": weather_tool}
for tool_call in state["pending_tools"]:
tool = tools.get(tool_call["tool_name"])
if not tool:
continue
tasks.append(tool.ainvoke(tool_call["args"]["query"]))
results = await asyncio.gather(*tasks, return_exceptions=True)
execution_time = time.time() - start_time
new_results = {}
new_errors = {}
for tool_call, result in zip(state["pending_tools"], results, strict=False):
if isinstance(result, Exception):
new_errors[tool_call["id"]] = str(result)
else:
new_results[tool_call["id"]] = result
stats = {
"execution_time": execution_time,
"total_tools": len(tasks),
"successful": len(new_results),
"failed": len(new_errors),
}
return {**state, "results": new_results, "errors": new_errors, "stats": stats}
Step 4: Result Processing Implementation
Implement result aggregation and error handling.
Why This Matters
Result processing is essential because
- Combines tool outputs
- Handles errors gracefully
- Creates user messages
- Maintains context
Debug Tips
-
Processing Logic:
- Verify result formatting
- Check message creation
- Monitor error handling
-
Common Problems:
- Missing results
- Invalid formats
- Message corruption
def result_aggregator(state: State) -> State:
"""Aggregate successful results and handle any errors.
Args:
state: Current state with results
Returns:
Updated state with processed messages
"""
messages = [state["messages"][0]]
# Process successful results
if state["results"]:
for tool_id, result in state["results"].items():
messages.append(
ToolMessage(
content=str(result),
tool_call_id=tool_id,
name=tool_id.split("_")[0],
)
)
stats = state.get("stats", {})
success_msg = (
f"Successfully completed {stats.get('successful', 0)} "
f"tools in {stats.get('execution_time', 0):.2f}s"
)
messages.append(AIMessage(content=success_msg))
# Process any errors if present
if state["errors"]:
error_msg = "Additionally encountered errors:\n" + "\n".join(
f"- {k}: {v}" for k, v in state["errors"].items()
)
messages.append(AIMessage(content=error_msg))
return {**state, "messages": messages}
def error_handler(state: State) -> State:
"""Process execution errors.
Args:
state: Current state with errors
Returns:
Updated state with error messages
"""
messages = list(state["messages"])
if state["errors"]:
error_msg = f"Encountered {len(state['errors'])} errors:\n" + "\n".join(
f"- {k}: {v}" for k, v in state["errors"].items()
)
messages.append(AIMessage(content=error_msg))
return {**state, "messages": messages}
Step 5: System Configuration
Implement system initialization and routing.
Why This Matters
System configuration is crucial because
- Sets up initial state
- Defines execution flow
- Manages routing
- Enables recovery
Debug Tips
-
Configuration Logic:
- Verify initialization
- Check routing rules
- Monitor state setup
-
Common Issues:
- Invalid initial state
- Routing errors
- Configuration conflicts
def route_results(state: State) -> Literal["result_processor", "error_processor"]:
"""Route based on execution results and errors.
Args:
state: Current system state
Returns:
Next processor identifier
"""
if state["errors"] and not state["results"]:
return "error_processor"
return "result_processor"
def get_initial_state(state: State = None) -> State:
"""Initialize system state.
Args:
state: Optional existing state
Returns:
Initialized system state
"""
return {
"messages": [SystemMessage(content="Starting parallel execution system")],
"pending_tools": [
{"id": "calc_1", "tool_name": "calculator", "args": {"query": "2 * 3"}},
{"id": "weather_1", "tool_name": "weather", "args": {"query": "London"}},
{
"id": "calc_error",
"tool_name": "calculator",
"args": {"query": "invalid"},
},
],
"results": {},
"errors": {},
"stats": {},
}
Step 6: Graph Construction
Create complete system graph.
Why This Matters
Graph construction is essential because
- Defines system structure
- Manages execution flow
- Enables monitoring
- Facilitates testing
Debug Tips
-
Graph Structure:
- Verify node connections
- Check edge conditions
- Monitor compilation
-
Common Problems:
- Missing edges
- Invalid routes
- Compilation errors
def create_parallel_system() -> StateGraph:
"""Create complete execution system with parallel execution.
Returns:
Configured system graph
"""
graph = StateGraph(State)
# Add nodes
graph.add_node("init", get_initial_state)
graph.add_node("executor", parallel_executor)
graph.add_node("result_processor", result_aggregator)
graph.add_node("error_processor", error_handler)
# Configure flow with conditional routing
graph.add_edge(START, "init")
graph.add_edge("init", "executor")
# Conditional routing based on results/errors
graph.add_conditional_edges(
"executor",
route_results,
{"result_processor": "result_processor", "error_processor": "error_processor"},
)
graph.add_edge("result_processor", END)
graph.add_edge("error_processor", END)
return graph
Step 7: System Demonstration
Implement demonstration runner.
Why This Matters
System demonstration is crucial because
- Shows system behavior
- Verifies functionality
- Tests integration
- Validates output
Debug Tips
-
Demo Execution:
- Monitor system flow
- Check output format
- Verify error handling
-
Common Issues:
- Setup failures
- Output errors
- State corruption
async def demonstrate_system():
"""Run full system demonstration."""
print("Complete System Demo")
print("=" * 50)
graph = create_parallel_system()
chain = graph.compile()
# Initialize with some tools
initial_state = get_initial_state()
print("\nInitial state messages:", len(initial_state["messages"]))
for msg in initial_state["messages"]:
print(f"Initial {type(msg).__name__}: {msg.content}")
print("\nExecuting tools...")
result = await chain.ainvoke(initial_state)
print("\nResults dictionary:", result["results"])
print("\nErrors dictionary:", result["errors"])
print("\nFinal Messages:")
for msg in result["messages"]:
prefix = type(msg).__name__.replace("Message", "")
print(f"\n{prefix}: {msg.content}")
print(f"Message type: {type(msg)}")
print("\nStats:", result["stats"])
Common Pitfalls
-
Resource Management
- Memory leaks
- Connection pooling
- Task cleanup
-
Error Handling
- Missing recovery
- Silent failures
- Lost context
-
Performance Issues
- Task overflow
- Slow execution
- Resource exhaustion
-
State Management
- Inconsistent updates
- Lost messages
- Corrupt state
Key Takeaways
-
System Architecture
- Clean structure
- Clear flow
- Error handling
-
Performance Management
- Parallel execution
- Resource control
- Monitoring
-
Integration Patterns
- Tool management
- State handling
- Message flow
Next Steps
-
System Enhancement
- Add monitoring
- Implement caching
- Create dashboard
-
Performance Optimization
- Add rate limiting
- Implement batching
- Create queuing
-
Reliability Features
- Add retries
- Implement fallbacks
- Create timeouts
Expected Output
Complete System Demo
Initial state messages: 1
Initial SystemMessage: Starting parallel execution system
Executing tools...
Results dictionary: {
'calc_1': '2 * 3 = 6',
'weather_1': 'Weather in London: 22ยฐC, Sunny'
}
Errors dictionary: {
'calc_error': 'Calculation error: invalid syntax'
}
Final Messages
System: Starting parallel execution system
Tool: 2 * 3 = 6
Tool: Weather in London: 22ยฐC, Sunny
AI: Successfully completed 2 tools in 0.12s
AI: Additionally encountered errors:
- calc_error: Calculation error: invalid syntax
Stats: {
'execution_time': 0.12,
'total_tools': 3,
'successful': 2,
'failed': 1
}
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demonstrate_system())
๐ฌ๐ง Chapter