๐ฏ What You'll Learn Today
LangGraph Tutorial: Asynchronous Tool Execution - Unit 2.3 Exercise 3
Try It Yourself
๐ข Joint Initiative
This tutorial is part of a collaboration between AI Product Engineer and Nebius Academy.
This tutorial demonstrates how to implement asynchronous tool execution in LangGraph, focusing on parallel execution, error handling, and timeout management.
Key Concepts Covered
- Async Tool Implementation
- Parallel Execution
- Error Handling
- Timeout Management
import asyncio
import random
import time
from typing import Any
import numexpr
!pip install langchain-core
import nest_asyncio # Uncomment if you are using Collab Notebooks
from langchain_core.tools import tool
Step 1: Tool Implementation
Create tools with proper error handling and timing simulation.
Why This Matters
Tool implementation is crucial because
- Defines core functionality
- Handles errors properly
- Simulates real-world delays
Debug Tips
- Tool Implementation Issues:
- Monitor execution times
- Check error handling
- Verify return types
@tool
def weather_tool(location: str) -> str:
"""Simulated weather tool with random delays.
Notes:
- Simulates API latency
- Includes error handling
- Returns consistent format
Args:
location: Target city name
Returns:
Formatted weather information
"""
try:
# Simulate API latency
time.sleep(random.uniform(0.1, 0.5))
return f"Weather in {location}: 22ยฐC, Sunny"
except Exception as e:
return f"Weather API error: {e!s}"
@tool
def calculator_tool(expression: str) -> str:
"""Safe calculator with proper error handling.
Notes:
- Uses numexpr for safety
- Handles invalid input
- Returns formatted result
Args:
expression: Math expression to evaluate
Returns:
Formatted calculation result
"""
try:
result = numexpr.evaluate(expression.strip()).item()
return f"{expression} = {result}"
except Exception as e:
return f"Calculation error: {e!s}"
Step 2: Async Execution Implementation
Implement async tool execution with timeout handling.
Why This Matters
Async execution is essential because
- Enables parallel operations
- Manages timeouts
- Handles errors gracefully
Debug Tips
- Async Issues:
- Check timeout handling
- Monitor task cancellation
- Verify error propagation
async def execute_tool(
tool_call: dict[str, Any], timeout: float = 5.0
) -> tuple[str, Any]:
"""Execute single tool asynchronously with timeout.
Notes:
- Handles tool lookup
- Manages timeouts
- Provides error handling
Args:
tool_call: Tool execution details
timeout: Maximum execution time
Returns:
Tool ID and result tuple
"""
tools = {"calculator": calculator_tool, "weather": weather_tool}
try:
tool = tools.get(tool_call["tool_name"])
if not tool:
return tool_call["id"], f"Error: Tool '{tool_call['tool_name']}' not found"
# Extract first argument value
arg_value = next(iter(tool_call["args"].values()))
# Execute with timeout
result = await asyncio.wait_for(
asyncio.to_thread(tool.invoke, arg_value), timeout=timeout
)
return tool_call["id"], result
except TimeoutError:
return tool_call["id"], f"Error: Tool execution timed out after {timeout}s"
except Exception as e:
return tool_call["id"], f"Error executing tool: {e!s}"
Step 3: Parallel Execution Implementation
Implement parallel tool execution manager.
Why This Matters
Parallel execution is crucial because
- Improves performance
- Manages multiple tools
- Handles concurrent errors
Debug Tips
- Parallel Issues:
- Monitor task coordination
- Check result aggregation
- Verify error handling
async def execute_multiple_tools(tool_calls: list[dict[str, Any]]) -> dict[str, Any]:
"""Execute multiple tools in parallel.
Notes:
- Creates parallel tasks
- Aggregates results
- Handles errors
Args:
tool_calls: List of tools to execute
Returns:
Mapping of tool IDs to results
"""
tasks = [execute_tool(tool_call) for tool_call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {tool_id: result for tool_id, result in results}
Step 4: System Demonstration
Demonstrate the async execution system.
Why This Matters
Demonstration verifies
- Proper execution
- Error handling
- Timeout management
Debug Tips
- Demo Issues:
- Check all scenarios
- Verify timeout behavior
- Monitor error cases
async def demonstration():
"""Run system demonstration with various cases."""
print("Async Tool Execution Demo")
print("=" * 50)
tool_calls = [
{"id": "calc_1", "tool_name": "calculator", "args": {"expression": "2 + 2"}},
{"id": "weather_1", "tool_name": "weather", "args": {"location": "London"}},
{"id": "calc_2", "tool_name": "calculator", "args": {"expression": "invalid!"}},
{"id": "unknown_1", "tool_name": "unknown_tool", "args": {}},
]
print("\nExecuting single tool:")
tool_id, result = await execute_tool(tool_calls[0])
print(f"Tool {tool_id}: {result}")
print("\nExecuting multiple tools in parallel:")
results = await execute_multiple_tools(tool_calls)
for tool_id, result in results.items():
print(f"Tool {tool_id}: {result}")
async def run_demonstration():
"""Safe demonstration runner for all environments."""
await demonstration()
def main():
"""Main entry point with environment handling."""
try:
asyncio.run(demonstration())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(run_demonstration())
Common Pitfalls
- Not handling timeouts properly
- Missing error cases
- Improper task cancellation
- Poor result aggregation
Key Takeaways
- Async execution enables parallelism
- Proper error handling is crucial
- Timeout management prevents hangs
- Task coordination requires care
Next Steps
- Add retry mechanisms
- Implement rate limiting
- Add result caching
- Enhance error recovery
Expected Output
Async Tool Execution Demo
Executing single tool
Tool calc_1: 2 + 2 = 4
Executing multiple tools in parallel
Tool calc_1: 2 + 2 = 4 Tool weather_1: Weather in London: 22ยฐC, Sunny Tool calc_2: Calculation error: invalid syntax Tool unknown_1: Error: Tool 'unknown_tool' not found
if __name__ == "__main__":
nest_asyncio.apply()
asyncio.run(demonstration())