๐ฏ What You'll Learn Today
LangGraph Tutorial: Parallel Tool Execution - Unit 2.3 Exercise 4
This tutorial is also available in Google Colab here or for download here
Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.
This tutorial demonstrates how to implement efficient parallel tool execution in LangGraph using Python's asyncio capabilities. You'll learn how to manage concurrent operations, handle errors gracefully, and monitor performance in a production environment.
Key Concepts Covered
- Asynchronous Tool Implementation
- Parallel Execution Patterns
- State Management
- Error Handling and Recovery
- Performance Monitoring
import asyncio
import time
from typing import Annotated, Any, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import BaseMessage
from langchain_core.tools import tool
from langgraph.graph.message import add_messages
Step 1: State Definition
Define the state structure for managing parallel tool execution.
Why This Matters
State management is crucial because
- Maintains consistency across parallel operations
- Tracks execution progress and results
- Enables error recovery and debugging
- Facilitates tool coordination
Debug Tips
-
State Initialization:
- Verify all fields are properly initialized
- Check type annotations match runtime types
- Monitor state mutations during execution
-
Common Errors:
- KeyError: Missing state fields
- TypeError: Incorrect type annotations
- AttributeError: Accessing undefined state properties
class State(TypedDict):
"""State for parallel tool execution.
Attributes:
messages: List of interaction messages
pending_tools: Tools waiting for execution
results: Successful tool execution results
errors: Error messages from failed executions
"""
messages: Annotated[list[BaseMessage], add_messages]
pending_tools: list[dict[str, Any]]
results: dict[str, Any]
errors: dict[str, str]
Step 2: Tool Implementation
Create async tools with different execution characteristics.
Why This Matters
Tool implementation patterns are essential because
- Models realistic API interactions
- Demonstrates varying response times
- Shows error handling patterns
- Illustrates async/await usage
Debug Tips
-
Tool Execution:
- Use
asyncio.sleep()
for testing timing - Monitor tool execution duration
- Check for proper error propagation
- Use
-
Common Issues:
RuntimeError
: Missing awaitasyncio.TimeoutError
: Slow executionValueError
: Invalid tool parameters
@tool
async def fast_tool(query: str) -> str:
"""Simulated fast tool.
Args:
query: Input query string
Returns:
str: Formatted result string
"""
await asyncio.sleep(0.1)
return f"Fast result for: {query}"
@tool
async def slow_tool(query: str) -> str:
"""Simulated slow tool with occasional failures.
Args:
query: Input query string
Returns:
str: Formatted result string
Raises:
ValueError: If query contains 'fail'
"""
await asyncio.sleep(1.0)
if "fail" in query:
raise ValueError("Simulated failure")
return f"Slow result for: {query}"
Step 3: Tool Execution Handler
Implement the core tool execution logic with error handling.
Why This Matters
Error handling is critical because
- Ensures system resilience
- Provides meaningful error messages
- Enables debugging and monitoring
- Maintains system stability
Debug Tips
-
Error Handling:
- Check exception types
- Verify error message formatting
- Monitor error propagation
-
Common Problems:
- UnboundLocalError: Tool not found
- AttributeError: Invalid tool interface
- TypeError: Incorrect argument types
async def execute_tool(tool_call: dict[str, Any]) -> tuple[str, Any]:
"""Execute a single tool with error handling.
Args:
tool_call: Dictionary containing tool execution details
Returns:
Tuple of (tool_id, result/error)
"""
tools = {"fast_tool": fast_tool, "slow_tool": slow_tool}
try:
tool = tools.get(tool_call["tool_name"])
if not tool:
return tool_call["id"], f"Error: Unknown tool {tool_call['tool_name']}"
result = await tool.ainvoke(tool_call["args"]["query"])
return tool_call["id"], result
except Exception as e:
return tool_call["id"], f"Error: {e!s}"
Step 4: Parallel Execution Implementation
Core parallel execution logic using asyncio.gather.
Why This Matters
Parallel execution is essential because
- Maximizes resource utilization
- Reduces overall execution time
- Handles multiple tools efficiently
- Provides performance metrics
Debug Tips
-
Execution Issues:
- Monitor task creation and completion
- Check for resource constraints
- Verify proper task cleanup
-
Performance Problems:
- High memory usage
- Slow task completion
- Task starvation
async def parallel_executor(state: State) -> State:
"""Execute multiple tools in parallel.
Args:
state: Current state with pending tools
Returns:
Updated state with results and errors
"""
if not state["pending_tools"]:
return state
tasks = [execute_tool(tool_call) for tool_call in state["pending_tools"]]
start_time = time.time()
results = await asyncio.gather(*tasks)
execution_time = time.time() - start_time
new_results = {}
new_errors = {}
for tool_id, result in results:
if isinstance(result, str) and result.startswith("Error:"):
new_errors[tool_id] = result
else:
new_results[tool_id] = result
print(f"Executed {len(tasks)} tools in {execution_time:.2f} seconds")
return {
"messages": state["messages"],
"pending_tools": [],
"results": new_results,
"errors": new_errors,
}
Step 5: Demonstration Implementation
Example usage showing practical application.
Why This Matters
Demonstration code is valuable because
- Shows real-world usage patterns
- Illustrates proper initialization
- Demonstrates result handling
- Provides execution examples
Debug Tips
-
Setup Issues:
- Verify environment configuration
- Check import dependencies
- Monitor resource availability
-
Runtime Problems:
- Invalid state initialization
- Incorrect tool configuration
- Resource cleanup issues
async def demonstrate_parallel_execution():
"""Demonstrate parallel tool execution."""
# Initial state with multiple tool calls
state = {
"messages": [],
"pending_tools": [
{"id": "fast_1", "tool_name": "fast_tool", "args": {"query": "task 1"}},
{"id": "slow_1", "tool_name": "slow_tool", "args": {"query": "task 2"}},
{"id": "fast_2", "tool_name": "fast_tool", "args": {"query": "task 3"}},
{"id": "error_1", "tool_name": "slow_tool", "args": {"query": "fail"}},
],
"results": {},
"errors": {},
}
print("\nParallel Execution Demo")
print("=" * 50)
result = await parallel_executor(state)
print("\nSuccessful Results:")
for tool_id, tool_result in result["results"].items():
print(f"{tool_id}: {tool_result}")
print("\nErrors:")
for tool_id, error in result["errors"].items():
print(f"{tool_id}: {error}")
Common Pitfalls
-
Not handling tool timeouts
- Tools may hang indefinitely
- System resources can be exhausted
- User experience degrades
-
Improper error handling
- Unhandled exceptions crash the system
- Missing error context
- Silent failures
-
Resource management
- Too many concurrent tasks
- Memory leaks
- Connection pool exhaustion
-
State management issues
- Inconsistent state updates
- Missing state validation
- Race conditions
Key Takeaways
-
Parallel execution significantly improves performance
- Multiple tools run concurrently
- Resource utilization is optimized
- System throughput increases
-
Proper error handling is crucial
- All exceptions are caught and handled
- Errors are properly categorized
- System remains stable
-
State management requires careful design
- State structure is well-defined
- Updates are atomic
- Recovery is possible
Next Steps
-
Add timeout handling
- Implement per-tool timeouts
- Add global execution timeouts
- Create timeout recovery strategies
-
Implement batch processing
- Add batch size limits
- Create batch prioritization
- Optimize batch execution
-
Enhance monitoring
- Add detailed logging
- Implement metrics collection
- Create performance dashboards
Expected Output
Parallel Execution Demo
Executed 4 tools in 1.02 seconds
## Successful Results
fast_1: Fast result for: task 1
fast_2: Fast result for: task 3
slow_1: Slow result for: task 2
## Errors
error_1: Error: Simulated failure
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demonstrate_parallel_execution())
๐ฌ๐ง Chapter