๐ฏ What You'll Learn Today
LangGraph Tutorial: Graph Configuration and Routing - Unit 2.3 Exercise 8
This tutorial is also available in Google Colab here or for download here
Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.
This tutorial demonstrates how to construct and configure execution graphs in LangGraph, focusing on parallel tool execution, state management, and conditional routing. Learn how to create robust, scalable graph structures for complex workflows.
Key Concepts Covered
- Graph Structure Definition
- State Management
- Node Configuration
- Conditional Routing
- Parallel Execution
import asyncio
from typing import Annotated, Any, Literal, TypedDict
#!pip install langchain-core
#!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, SystemMessage, ToolMessage
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
Step 1: State Definition with Aggregation
Define state structure with support for result aggregation.
Why This Matters
State definition is crucial because
- Enables parallel processing
- Supports result aggregation
- Maintains execution context
- Facilitates error handling
Debug Tips
-
State Structure:
- Verify aggregation operators
- Check type annotations
- Monitor state growth
-
Common Issues:
- Missing aggregation fields
- Type mismatches
- Memory leaks
class State(TypedDict):
"""State container for graph execution.
Attributes:
messages: Conversation history
pending_tools: Tools awaiting execution
results: Aggregated tool results
errors: Aggregated error messages
tool_configs: Tool-specific configurations
"""
messages: Annotated[list[BaseMessage], add_messages]
pending_tools: list[dict[str, Any]]
results: Annotated[
dict[str, Any],
BinaryOperatorAggregate(dict[str, Any], lambda a, b: {**(b or {}), **a}),
]
errors: Annotated[
dict[str, str],
BinaryOperatorAggregate(dict[str, str], lambda a, b: {**(b or {}), **a}),
]
tool_configs: dict[str, dict[str, Any]]
Step 2: Node Implementation
Implement core processing nodes for the graph.
Why This Matters
Node implementation is essential because
- Defines processing logic
- Handles state transitions
- Manages tool execution
- Processes results/errors
Debug Tips
-
Node Logic:
- Verify state updates
- Check async operations
- Monitor resource usage
-
Common Problems:
- State corruption
- Resource leaks
- Concurrency issues
async def parallel_executor(state: State) -> State:
"""Execute pending tools in parallel.
Args:
state: Current execution state
Returns:
Updated state with results/errors
"""
if not state["pending_tools"]:
return state
tasks = []
for tool_call in state["pending_tools"]:
tasks.append(
asyncio.sleep(0.1) # Simulate tool execution
)
await asyncio.gather(*tasks)
# Simulate results/errors
new_results = {}
new_errors = {}
for tool in state["pending_tools"]:
if "error" in tool["args"].get("query", ""):
new_errors[tool["id"]] = "Simulated error"
else:
new_results[tool["id"]] = f"Result for {tool['args']['query']}"
return {**state, "results": new_results, "errors": new_errors}
def result_aggregator(state: State) -> State:
"""Aggregate successful results.
Args:
state: Current state with results
Returns:
Updated state with aggregated messages
"""
messages = list(state["messages"])
for tool_id, result in state["results"].items():
messages.append(
ToolMessage(
content=str(result), tool_call_id=tool_id, name=tool_id.split("_")[0]
)
)
messages.append(
AIMessage(content=f"Processed {len(state['results'])} results successfully.")
)
return {**state, "messages": messages}
def error_handler(state: State) -> State:
"""Process execution errors.
Args:
state: Current state with errors
Returns:
Updated state with error messages
"""
messages = list(state["messages"])
for tool_id, error in state["errors"].items():
messages.append(AIMessage(content=f"Error in {tool_id}: {error}"))
return {**state, "messages": messages}
Step 3: State and Routing Management
Implement state initialization and routing logic.
Why This Matters
State management is critical because
- Ensures proper initialization
- Controls execution flow
- Maintains consistency
- Enables recovery
Debug Tips
-
State Management:
- Verify initialization
- Check state transitions
- Monitor routing decisions
-
Common Issues:
- Invalid initial state
- Routing errors
- State inconsistencies
def get_initial_state(state: State = None) -> State:
"""Initialize state with test tools.
Args:
state: Optional existing state
Returns:
Initialized state with test configuration
"""
return {
"messages": [SystemMessage(content="Starting execution")],
"pending_tools": [
{
"id": f"tool_{i}",
"tool_name": "test_tool",
"args": {"query": f"test_query_{i}"},
}
for i in range(3)
],
"results": {},
"errors": {},
"tool_configs": {},
}
def route_results(state: State) -> Literal["result_aggregator", "error_handler"]:
"""Route based on execution results.
Args:
state: Current execution state
Returns:
Next node identifier
"""
return "error_handler" if state.get("errors") else "result_aggregator"
Step 4: Graph Construction
Implement graph creation and configuration.
Why This Matters
Graph construction is essential because
- Defines execution flow
- Establishes node relationships
- Enables conditional routing
- Supports parallel processing
Debug Tips
-
Graph Structure:
- Verify node connections
- Check edge conditions
- Monitor graph compilation
-
Common Problems:
- Missing edges
- Invalid routes
- Dead ends
def create_parallel_graph() -> StateGraph:
"""Create execution graph with parallel processing.
Graph Structure:
START โ init โ parallel_executor โ [result_aggregator|error_handler] โ END
Returns:
Configured StateGraph
"""
graph = StateGraph(State)
# Add nodes
graph.add_node("init", get_initial_state)
graph.add_node("parallel_executor", parallel_executor)
graph.add_node("result_aggregator", result_aggregator)
graph.add_node("error_handler", error_handler)
# Configure edges
graph.add_edge(START, "init")
graph.add_edge("init", "parallel_executor")
# Add conditional routing
graph.add_conditional_edges(
"parallel_executor",
route_results,
{"result_aggregator": "result_aggregator", "error_handler": "error_handler"},
)
# Connect to END
graph.add_edge("result_aggregator", END)
graph.add_edge("error_handler", END)
return graph
Step 5: Demonstration Implementation
Example usage showing graph execution.
Why This Matters
Demonstration code is valuable because
- Shows practical usage patterns
- Illustrates execution flow
- Demonstrates error handling
- Provides testing scenarios
Debug Tips
-
Demo Execution:
- Monitor graph flow
- Verify state updates
- Check message generation
-
Common Issues:
- Initialization errors
- Execution failures
- Message formatting
async def demonstrate_graph():
"""Demonstrate graph execution."""
print("Graph Execution Demo")
print("=" * 50)
graph = create_parallel_graph()
chain = graph.compile()
# Initialize state properly
initial_state = {
"messages": [SystemMessage(content="Starting execution")],
"pending_tools": [],
"results": {},
"errors": {},
"tool_configs": {},
}
result = await chain.ainvoke(initial_state)
print("\nExecution Messages:")
for msg in result["messages"]:
prefix = type(msg).__name__.replace("Message", "")
print(f"\n{prefix}: {msg.content}")
print("\nResults:", result["results"])
print("Errors:", result["errors"])
Common Pitfalls
-
Graph Structure Issues
- Missing node connections
- Invalid routing conditions
- Unreachable nodes
-
State Management Problems
- Incomplete initialization
- State corruption
- Missing updates
-
Execution Flow Issues
- Deadlocks
- Infinite loops
- Resource exhaustion
-
Error Handling Gaps
- Unhandled exceptions
- Lost errors
- Missing recovery paths
Key Takeaways
-
Graph Architecture
- Clear node structure
- Proper routing
- Error handling
-
State Management
- Consistent updates
- Proper aggregation
- Type safety
-
Execution Patterns
- Parallel processing
- Conditional routing
- Resource management
Next Steps
-
Enhanced Graph Structure
- Add recovery nodes
- Implement caching
- Create visualizations
-
Advanced Routing
- Add priorities
- Create fallbacks
- Implement retries
-
Monitoring
- Add metrics
- Create logging
- Implement tracing
Expected Output
Graph Execution Demo
## Execution Messages
System: Starting execution
Tool: Result for test_query_0
Tool: Result for test_query_1
Tool: Result for test_query_2
AI: Processed 3 results successfully
Results: {'tool_0': 'Result for test_query_0', 'tool_1': 'Result for test_query_1', 'tool_2': 'Result for test_query_2'}
Errors: {}
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demonstrate_graph())
๐ฌ๐ง Chapter