đ¯ What You'll Learn Today
LangGraph Tutorial: Parallel Tool Execution - Unit 2.3 Exercise 4
Try It Yourself
đĸ Joint Initiative
This tutorial is part of a collaboration between
AI Product Engineer
and
Nebius Academy
.
This tutorial demonstrates how to implement efficient parallel tool execution in LangGraph using Python's asyncio capabilities. You'll learn how to manage concurrent operations, handle errors gracefully, and monitor performance in a production environment.
Key Concepts Covered
- Asynchronous Tool Implementation
- Parallel Execution Patterns
- State Management
- Error Handling and Recovery
- Performance Monitoring
import asyncio
import time
from typing import Annotated, Any, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import BaseMessage
from langchain_core.tools import tool
from langgraph.graph.message import add_messagesStep 1: State Definition
Define the state structure for managing parallel tool execution.
Why This Matters
State management is crucial because
- Maintains consistency across parallel operations
- Tracks execution progress and results
- Enables error recovery and debugging
- Facilitates tool coordination
Debug Tips
-
State Initialization:
- Verify all fields are properly initialized
- Check type annotations match runtime types
- Monitor state mutations during execution
-
Common Errors:
- KeyError: Missing state fields
- TypeError: Incorrect type annotations
- AttributeError: Accessing undefined state properties
class State(TypedDict):
"""State for parallel tool execution.
Attributes:
messages: List of interaction messages
pending_tools: Tools waiting for execution
results: Successful tool execution results
errors: Error messages from failed executions
"""
messages: Annotated[list[BaseMessage], add_messages]
pending_tools: list[dict[str, Any]]
results: dict[str, Any]
errors: dict[str, str]Step 2: Tool Implementation
Create async tools with different execution characteristics.
Why This Matters
Tool implementation patterns are essential because
- Models realistic API interactions
- Demonstrates varying response times
- Shows error handling patterns
- Illustrates async/await usage
Debug Tips
-
Tool Execution:
- Use
asyncio.sleep()for testing timing - Monitor tool execution duration
- Check for proper error propagation
- Use
-
Common Issues:
RuntimeError: Missing awaitasyncio.TimeoutError: Slow executionValueError: Invalid tool parameters
@tool
async def fast_tool(query: str) -> str:
"""Simulated fast tool.
Args:
query: Input query string
Returns:
str: Formatted result string
"""
await asyncio.sleep(0.1)
return f"Fast result for: {query}"@tool
async def slow_tool(query: str) -> str:
"""Simulated slow tool with occasional failures.
Args:
query: Input query string
Returns:
str: Formatted result string
Raises:
ValueError: If query contains 'fail'
"""
await asyncio.sleep(1.0)
if "fail" in query:
raise ValueError("Simulated failure")
return f"Slow result for: {query}"Step 3: Tool Execution Handler
Implement the core tool execution logic with error handling.
Why This Matters
Error handling is critical because
- Ensures system resilience
- Provides meaningful error messages
- Enables debugging and monitoring
- Maintains system stability
Debug Tips
-
Error Handling:
- Check exception types
- Verify error message formatting
- Monitor error propagation
-
Common Problems:
- UnboundLocalError: Tool not found
- AttributeError: Invalid tool interface
- TypeError: Incorrect argument types
async def execute_tool(tool_call: dict[str, Any]) -> tuple[str, Any]:
"""Execute a single tool with error handling.
Args:
tool_call: Dictionary containing tool execution details
Returns:
Tuple of (tool_id, result/error)
"""
tools = {"fast_tool": fast_tool, "slow_tool": slow_tool}
try:
tool = tools.get(tool_call["tool_name"])
if not tool:
return tool_call["id"], f"Error: Unknown tool {tool_call['tool_name']}"
result = await tool.ainvoke(tool_call["args"]["query"])
return tool_call["id"], result
except Exception as e:
return tool_call["id"], f"Error: {e!s}"Step 4: Parallel Execution Implementation
Core parallel execution logic using asyncio.gather.
Why This Matters
Parallel execution is essential because
- Maximizes resource utilization
- Reduces overall execution time
- Handles multiple tools efficiently
- Provides performance metrics
Debug Tips
-
Execution Issues:
- Monitor task creation and completion
- Check for resource constraints
- Verify proper task cleanup
-
Performance Problems:
- High memory usage
- Slow task completion
- Task starvation
async def parallel_executor(state: State) -> State:
"""Execute multiple tools in parallel.
Args:
state: Current state with pending tools
Returns:
Updated state with results and errors
"""
if not state["pending_tools"]:
return state
tasks = [execute_tool(tool_call) for tool_call in state["pending_tools"]]
start_time = time.time()
results = await asyncio.gather(*tasks)
execution_time = time.time() - start_time
new_results = {}
new_errors = {}
for tool_id, result in results:
if isinstance(result, str) and result.startswith("Error:"):
new_errors[tool_id] = result
else:
new_results[tool_id] = result
print(f"Executed {len(tasks)} tools in {execution_time:.2f} seconds")
return {
"messages": state["messages"],
"pending_tools": [],
"results": new_results,
"errors": new_errors,
}Step 5: Demonstration Implementation
Example usage showing practical application.
Why This Matters
Demonstration code is valuable because
- Shows real-world usage patterns
- Illustrates proper initialization
- Demonstrates result handling
- Provides execution examples
Debug Tips
-
Setup Issues:
- Verify environment configuration
- Check import dependencies
- Monitor resource availability
-
Runtime Problems:
- Invalid state initialization
- Incorrect tool configuration
- Resource cleanup issues
async def demonstrate_parallel_execution():
"""Demonstrate parallel tool execution."""
# Initial state with multiple tool calls
state = {
"messages": [],
"pending_tools": [
{"id": "fast_1", "tool_name": "fast_tool", "args": {"query": "task 1"}},
{"id": "slow_1", "tool_name": "slow_tool", "args": {"query": "task 2"}},
{"id": "fast_2", "tool_name": "fast_tool", "args": {"query": "task 3"}},
{"id": "error_1", "tool_name": "slow_tool", "args": {"query": "fail"}},
],
"results": {},
"errors": {},
}
print("\nParallel Execution Demo")
print("=" * 50)
result = await parallel_executor(state)
print("\nSuccessful Results:")
for tool_id, tool_result in result["results"].items():
print(f"{tool_id}: {tool_result}")
print("\nErrors:")
for tool_id, error in result["errors"].items():
print(f"{tool_id}: {error}")Common Pitfalls
-
Not handling tool timeouts
- Tools may hang indefinitely
- System resources can be exhausted
- User experience degrades
-
Improper error handling
- Unhandled exceptions crash the system
- Missing error context
- Silent failures
-
Resource management
- Too many concurrent tasks
- Memory leaks
- Connection pool exhaustion
-
State management issues
- Inconsistent state updates
- Missing state validation
- Race conditions
Key Takeaways
-
Parallel execution significantly improves performance
- Multiple tools run concurrently
- Resource utilization is optimized
- System throughput increases
-
Proper error handling is crucial
- All exceptions are caught and handled
- Errors are properly categorized
- System remains stable
-
State management requires careful design
- State structure is well-defined
- Updates are atomic
- Recovery is possible
Next Steps
-
Add timeout handling
- Implement per-tool timeouts
- Add global execution timeouts
- Create timeout recovery strategies
-
Implement batch processing
- Add batch size limits
- Create batch prioritization
- Optimize batch execution
-
Enhance monitoring
- Add detailed logging
- Implement metrics collection
- Create performance dashboards
Expected Output
Parallel Execution Demo
Executed 4 tools in 1.02 seconds
## Successful Results
fast_1: Fast result for: task 1
fast_2: Fast result for: task 3
slow_1: Slow result for: task 2
## Errors
error_1: Error: Simulated failureif __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
asyncio.run(demonstrate_parallel_execution())

