๐ฏ What You'll Learn Today
LangGraph Tutorial: Asynchronous Tool Execution - Unit 2.3 Exercise 3
Try It Yourself
๐ข Joint Initiative
This tutorial is part of a collaboration between AIPE and Nebius Academy.
This tutorial demonstrates how to implement asynchronous tool execution in LangGraph, focusing on parallel execution, error handling, and timeout management.
Key Concepts Covered
- Async Tool Implementation
- Parallel Execution
- Error Handling
- Timeout Management
import asyncio
import random
import time
from typing import Any
import numexpr
!pip install langchain-core
import nest_asyncio # Uncomment if you are using Collab Notebooks
from langchain_core.tools import tool
Step 1: Tool Implementation
Create tools with proper error handling and timing simulation.
Why This Matters
Tool implementation is crucial because
- Defines core functionality
- Handles errors properly
- Simulates real-world delays
Debug Tips
- Tool Implementation Issues:
- Monitor execution times
- Check error handling
- Verify return types
@tool
def weather_tool(location: str) -> str:
"""Simulated weather tool with random delays.
Notes:
- Simulates API latency
- Includes error handling
- Returns consistent format
Args:
location: Target city name
Returns:
Formatted weather information
"""
try:
# Simulate API latency
time.sleep(random.uniform(0.1, 0.5))
return f"Weather in {location}: 22ยฐC, Sunny"
except Exception as e:
return f"Weather API error: {e!s}"
@tool
def calculator_tool(expression: str) -> str:
"""Safe calculator with proper error handling.
Notes:
- Uses numexpr for safety
- Handles invalid input
- Returns formatted result
Args:
expression: Math expression to evaluate
Returns:
Formatted calculation result
"""
try:
result = numexpr.evaluate(expression.strip()).item()
return f"{expression} = {result}"
except Exception as e:
return f"Calculation error: {e!s}"
Step 2: Async Execution Implementation
Implement async tool execution with timeout handling.
Why This Matters
Async execution is essential because
- Enables parallel operations
- Manages timeouts
- Handles errors gracefully
Debug Tips
- Async Issues:
- Check timeout handling
- Monitor task cancellation
- Verify error propagation
async def execute_tool(
tool_call: dict[str, Any], timeout: float = 5.0
) -> tuple[str, Any]:
"""Execute single tool asynchronously with timeout.
Notes:
- Handles tool lookup
- Manages timeouts
- Provides error handling
Args:
tool_call: Tool execution details
timeout: Maximum execution time
Returns:
Tool ID and result tuple
"""
tools = {"calculator": calculator_tool, "weather": weather_tool}
try:
tool = tools.get(tool_call["tool_name"])
if not tool:
return tool_call["id"], f"Error: Tool '{tool_call['tool_name']}' not found"
# Extract first argument value
arg_value = next(iter(tool_call["args"].values()))
# Execute with timeout
result = await asyncio.wait_for(
asyncio.to_thread(tool.invoke, arg_value), timeout=timeout
)
return tool_call["id"], result
except TimeoutError:
return tool_call["id"], f"Error: Tool execution timed out after {timeout}s"
except Exception as e:
return tool_call["id"], f"Error executing tool: {e!s}"
Step 3: Parallel Execution Implementation
Implement parallel tool execution manager.
Why This Matters
Parallel execution is crucial because
- Improves performance
- Manages multiple tools
- Handles concurrent errors
Debug Tips
- Parallel Issues:
- Monitor task coordination
- Check result aggregation
- Verify error handling
async def execute_multiple_tools(tool_calls: list[dict[str, Any]]) -> dict[str, Any]:
"""Execute multiple tools in parallel.
Notes:
- Creates parallel tasks
- Aggregates results
- Handles errors
Args:
tool_calls: List of tools to execute
Returns:
Mapping of tool IDs to results
"""
tasks = [execute_tool(tool_call) for tool_call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {tool_id: result for tool_id, result in results}
Step 4: System Demonstration
Demonstrate the async execution system.
Why This Matters
Demonstration verifies
- Proper execution
- Error handling
- Timeout management
Debug Tips
- Demo Issues:
- Check all scenarios
- Verify timeout behavior
- Monitor error cases
async def demonstration():
"""Run system demonstration with various cases."""
print("Async Tool Execution Demo")
print("=" * 50)
tool_calls = [
{"id": "calc_1", "tool_name": "calculator", "args": {"expression": "2 + 2"}},
{"id": "weather_1", "tool_name": "weather", "args": {"location": "London"}},
{"id": "calc_2", "tool_name": "calculator", "args": {"expression": "invalid!"}},
{"id": "unknown_1", "tool_name": "unknown_tool", "args": {}},
]
print("\nExecuting single tool:")
tool_id, result = await execute_tool(tool_calls[0])
print(f"Tool {tool_id}: {result}")
print("\nExecuting multiple tools in parallel:")
results = await execute_multiple_tools(tool_calls)
for tool_id, result in results.items():
print(f"Tool {tool_id}: {result}")
async def run_demonstration():
"""Safe demonstration runner for all environments."""
await demonstration()
def main():
"""Main entry point with environment handling."""
try:
asyncio.run(demonstration())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(run_demonstration())
Common Pitfalls
- Not handling timeouts properly
- Missing error cases
- Improper task cancellation
- Poor result aggregation
Key Takeaways
- Async execution enables parallelism
- Proper error handling is crucial
- Timeout management prevents hangs
- Task coordination requires care
Next Steps
- Add retry mechanisms
- Implement rate limiting
- Add result caching
- Enhance error recovery
Expected Output
Async Tool Execution Demo
Executing single tool
Tool calc_1: 2 + 2 = 4
Executing multiple tools in parallel
Tool calc_1: 2 + 2 = 4 Tool weather_1: Weather in London: 22ยฐC, Sunny Tool calc_2: Calculation error: invalid syntax Tool unknown_1: Error: Tool 'unknown_tool' not found
if __name__ == "__main__":
nest_asyncio.apply()
asyncio.run(demonstration())