Tutorial Image: LangGraph Tutorial: Asynchronous Tool Execution - Unit 2.3 Exercise 3

LangGraph Tutorial: Asynchronous Tool Execution - Unit 2.3 Exercise 3

Learn how to implement asynchronous tool execution in LangGraph, including parallel processing, timeout management, and error handling, for efficient and responsive conversational systems.

Rod Rivera

๐Ÿ‡ฌ๐Ÿ‡ง Chapter

๐ŸŽฏ What You'll Learn Today

LangGraph Tutorial: Asynchronous Tool Execution - Unit 2.3 Exercise 3

Try It Yourself

Open In ColabDownload Notebook

๐Ÿ“ข Joint Initiative

This tutorial is part of a collaboration between AI Product Engineer and Nebius Academy.

This tutorial demonstrates how to implement asynchronous tool execution in LangGraph, focusing on parallel execution, error handling, and timeout management.

Key Concepts Covered

  1. Async Tool Implementation
  2. Parallel Execution
  3. Error Handling
  4. Timeout Management
import asyncio
import random
import time
from typing import Any
import numexpr
!pip install langchain-core
import nest_asyncio # Uncomment if you are using Collab Notebooks

from langchain_core.tools import tool

Step 1: Tool Implementation

Create tools with proper error handling and timing simulation.

Why This Matters

Tool implementation is crucial because

  1. Defines core functionality
  2. Handles errors properly
  3. Simulates real-world delays

Debug Tips

  1. Tool Implementation Issues:
  • Monitor execution times
  • Check error handling
  • Verify return types
@tool
def weather_tool(location: str) -> str:
    """Simulated weather tool with random delays.

    Notes:
        - Simulates API latency
        - Includes error handling
        - Returns consistent format

    Args:
        location: Target city name

    Returns:
        Formatted weather information
    """
    try:
        # Simulate API latency
        time.sleep(random.uniform(0.1, 0.5))
        return f"Weather in {location}: 22ยฐC, Sunny"
    except Exception as e:
        return f"Weather API error: {e!s}"
@tool
def calculator_tool(expression: str) -> str:
    """Safe calculator with proper error handling.

    Notes:
        - Uses numexpr for safety
        - Handles invalid input
        - Returns formatted result

    Args:
        expression: Math expression to evaluate

    Returns:
        Formatted calculation result
    """
    try:
        result = numexpr.evaluate(expression.strip()).item()
        return f"{expression} = {result}"
    except Exception as e:
        return f"Calculation error: {e!s}"

Step 2: Async Execution Implementation

Implement async tool execution with timeout handling.

Why This Matters

Async execution is essential because

  1. Enables parallel operations
  2. Manages timeouts
  3. Handles errors gracefully

Debug Tips

  1. Async Issues:
  • Check timeout handling
  • Monitor task cancellation
  • Verify error propagation
async def execute_tool(
    tool_call: dict[str, Any], timeout: float = 5.0
) -> tuple[str, Any]:
    """Execute single tool asynchronously with timeout.

    Notes:
        - Handles tool lookup
        - Manages timeouts
        - Provides error handling

    Args:
        tool_call: Tool execution details
        timeout: Maximum execution time

    Returns:
        Tool ID and result tuple
    """
    tools = {"calculator": calculator_tool, "weather": weather_tool}

    try:
        tool = tools.get(tool_call["tool_name"])
        if not tool:
            return tool_call["id"], f"Error: Tool '{tool_call['tool_name']}' not found"

        # Extract first argument value
        arg_value = next(iter(tool_call["args"].values()))

        # Execute with timeout
        result = await asyncio.wait_for(
            asyncio.to_thread(tool.invoke, arg_value), timeout=timeout
        )

        return tool_call["id"], result

    except TimeoutError:
        return tool_call["id"], f"Error: Tool execution timed out after {timeout}s"
    except Exception as e:
        return tool_call["id"], f"Error executing tool: {e!s}"

Step 3: Parallel Execution Implementation

Implement parallel tool execution manager.

Why This Matters

Parallel execution is crucial because

  1. Improves performance
  2. Manages multiple tools
  3. Handles concurrent errors

Debug Tips

  1. Parallel Issues:
  • Monitor task coordination
  • Check result aggregation
  • Verify error handling
async def execute_multiple_tools(tool_calls: list[dict[str, Any]]) -> dict[str, Any]:
    """Execute multiple tools in parallel.

    Notes:
        - Creates parallel tasks
        - Aggregates results
        - Handles errors

    Args:
        tool_calls: List of tools to execute

    Returns:
        Mapping of tool IDs to results
    """
    tasks = [execute_tool(tool_call) for tool_call in tool_calls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return {tool_id: result for tool_id, result in results}

Step 4: System Demonstration

Demonstrate the async execution system.

Why This Matters

Demonstration verifies

  1. Proper execution
  2. Error handling
  3. Timeout management

Debug Tips

  1. Demo Issues:
  • Check all scenarios
  • Verify timeout behavior
  • Monitor error cases
async def demonstration():
    """Run system demonstration with various cases."""
    print("Async Tool Execution Demo")
    print("=" * 50)

    tool_calls = [
        {"id": "calc_1", "tool_name": "calculator", "args": {"expression": "2 + 2"}},
        {"id": "weather_1", "tool_name": "weather", "args": {"location": "London"}},
        {"id": "calc_2", "tool_name": "calculator", "args": {"expression": "invalid!"}},
        {"id": "unknown_1", "tool_name": "unknown_tool", "args": {}},
    ]

    print("\nExecuting single tool:")
    tool_id, result = await execute_tool(tool_calls[0])
    print(f"Tool {tool_id}: {result}")

    print("\nExecuting multiple tools in parallel:")
    results = await execute_multiple_tools(tool_calls)
    for tool_id, result in results.items():
        print(f"Tool {tool_id}: {result}")
async def run_demonstration():
    """Safe demonstration runner for all environments."""
    await demonstration()
def main():
    """Main entry point with environment handling."""
    try:
        asyncio.run(demonstration())
    except RuntimeError:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run_demonstration())

Common Pitfalls

  1. Not handling timeouts properly
  2. Missing error cases
  3. Improper task cancellation
  4. Poor result aggregation

Key Takeaways

  1. Async execution enables parallelism
  2. Proper error handling is crucial
  3. Timeout management prevents hangs
  4. Task coordination requires care

Next Steps

  1. Add retry mechanisms
  2. Implement rate limiting
  3. Add result caching
  4. Enhance error recovery

Expected Output

Async Tool Execution Demo

Executing single tool

Tool calc_1: 2 + 2 = 4

Executing multiple tools in parallel

Tool calc_1: 2 + 2 = 4 Tool weather_1: Weather in London: 22ยฐC, Sunny Tool calc_2: Calculation error: invalid syntax Tool unknown_1: Error: Tool 'unknown_tool' not found

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(demonstration())