Tutorial Image: LangGraph Tutorial: Complete System Integration - Unit 2.3 Exercise 10

LangGraph Tutorial: Complete System Integration - Unit 2.3 Exercise 10

Master LangGraph with a complete system integration tutorial! Learn to combine multi-tool parallel execution, advanced state management, error handling, and performance monitoring to build production-ready AI systems.

๐ŸŽฏ What You'll Learn Today

LangGraph Tutorial: Complete System Integration - Unit 2.3 Exercise 10

This tutorial is also available in Google Colab here or for download here

Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.

This tutorial demonstrates how to build a complete multi-tool system in LangGraph, integrating parallel execution, state management, and error handling into a cohesive system. Learn how to create production-ready LangGraph applications with comprehensive monitoring and control.

Key Concepts Covered

  1. System Architecture
  2. Tool Integration
  3. State Management
  4. Error Handling
  5. Performance Monitoring
import asyncio
import time
from typing import Annotated, Any, Literal, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages

Step 1: Core Components Implementation

Implement fundamental system components.

Why This Matters

Core components are crucial because

  1. Provides system foundation
  2. Enables data consistency
  3. Supports tool integration
  4. Facilitates monitoring

Debug Tips

  1. Component Structure:

    • Verify type annotations
    • Check reducer logic
    • Monitor state management
  2. Common Issues:

    • Type mismatches
    • Missing annotations
    • State inconsistencies
def dict_reducer(a: dict[str, Any], b: dict[str, Any] | None) -> dict[str, Any]:
    """Combine dictionaries for result aggregation.

    Args:
        a: Primary dictionary
        b: Secondary dictionary (may be None)

    Returns:
        Combined dictionary with values from both inputs

    Examples:
        >>> dict_reducer({'a': 1}, {'b': 2})
        {'b': 2, 'a': 1}
    """
    if b is None:
        return a
    return {**b, **a}
class State(TypedDict):
    """Complete system state.

    Attributes:
        messages: Conversation history
        pending_tools: Tools awaiting execution
        results: Aggregated tool results
        errors: Error messages
        stats: Execution statistics
    """

    messages: Annotated[list[BaseMessage], add_messages]
    pending_tools: list[dict[str, Any]]
    results: Annotated[
        dict[str, Any], BinaryOperatorAggregate(dict[str, Any], dict_reducer)
    ]
    errors: Annotated[
        dict[str, str], BinaryOperatorAggregate(dict[str, str], dict_reducer)
    ]
    stats: dict[str, Any]

Step 2: Tool Implementation

Implement system tools with error handling.

Why This Matters

Tool implementation is essential because

  1. Provides core functionality
  2. Handles domain logic
  3. Manages errors
  4. Ensures reliability

Debug Tips

  1. Tool Logic:

    • Check error handling
    • Verify return types
    • Monitor performance
  2. Common Problems:

    • Unhandled exceptions
    • Invalid inputs
    • Resource leaks
@tool
async def calculator(expression: str) -> str:
    """Calculate mathematical expressions.

    Args:
        expression: Math expression to evaluate

    Returns:
        Formatted result string

    Raises:
        ValueError: If calculation fails
    """
    try:
        import numexpr

        result = numexpr.evaluate(expression.strip()).item()
        return f"{expression} = {result}"
    except Exception as e:
        raise ValueError(f"Calculation error: {e!s}")
@tool
async def weather_tool(location: str) -> str:
    """Get simulated weather data.

    Args:
        location: Location to get weather for

    Returns:
        Simulated weather string
    """
    await asyncio.sleep(0.1)
    return f"Weather in {location}: 22ยฐC, Sunny"

Step 3: Parallel Execution Implementation

Implement core execution logic with monitoring.

Why This Matters

Parallel execution is crucial because

  1. Maximizes throughput
  2. Optimizes performance
  3. Manages resources
  4. Enables monitoring

Debug Tips

  1. Execution Logic:

    • Monitor task creation
    • Check error handling
    • Verify performance
  2. Common Issues:

    • Resource exhaustion
    • Task failures
    • Memory leaks
async def parallel_executor(state: State) -> State:
    """Execute tools in parallel with monitoring.

    Args:
        state: Current system state

    Returns:
        Updated state with results and stats
    """
    if not state.get("pending_tools"):
        return state

    start_time = time.time()
    tasks = []
    tools = {"calculator": calculator, "weather": weather_tool}

    for tool_call in state["pending_tools"]:
        tool = tools.get(tool_call["tool_name"])
        if not tool:
            continue
        tasks.append(tool.ainvoke(tool_call["args"]["query"]))

    results = await asyncio.gather(*tasks, return_exceptions=True)
    execution_time = time.time() - start_time

    new_results = {}
    new_errors = {}
    for tool_call, result in zip(state["pending_tools"], results, strict=False):
        if isinstance(result, Exception):
            new_errors[tool_call["id"]] = str(result)
        else:
            new_results[tool_call["id"]] = result

    stats = {
        "execution_time": execution_time,
        "total_tools": len(tasks),
        "successful": len(new_results),
        "failed": len(new_errors),
    }

    return {**state, "results": new_results, "errors": new_errors, "stats": stats}

Step 4: Result Processing Implementation

Implement result aggregation and error handling.

Why This Matters

Result processing is essential because

  1. Combines tool outputs
  2. Handles errors gracefully
  3. Creates user messages
  4. Maintains context

Debug Tips

  1. Processing Logic:

    • Verify result formatting
    • Check message creation
    • Monitor error handling
  2. Common Problems:

    • Missing results
    • Invalid formats
    • Message corruption
def result_aggregator(state: State) -> State:
    """Aggregate successful results and handle any errors.

    Args:
        state: Current state with results

    Returns:
        Updated state with processed messages
    """
    messages = [state["messages"][0]]

    # Process successful results
    if state["results"]:
        for tool_id, result in state["results"].items():
            messages.append(
                ToolMessage(
                    content=str(result),
                    tool_call_id=tool_id,
                    name=tool_id.split("_")[0],
                )
            )

        stats = state.get("stats", {})
        success_msg = (
            f"Successfully completed {stats.get('successful', 0)} "
            f"tools in {stats.get('execution_time', 0):.2f}s"
        )
        messages.append(AIMessage(content=success_msg))

    # Process any errors if present
    if state["errors"]:
        error_msg = "Additionally encountered errors:\n" + "\n".join(
            f"- {k}: {v}" for k, v in state["errors"].items()
        )
        messages.append(AIMessage(content=error_msg))

    return {**state, "messages": messages}
def error_handler(state: State) -> State:
    """Process execution errors.

    Args:
        state: Current state with errors

    Returns:
        Updated state with error messages
    """
    messages = list(state["messages"])

    if state["errors"]:
        error_msg = f"Encountered {len(state['errors'])} errors:\n" + "\n".join(
            f"- {k}: {v}" for k, v in state["errors"].items()
        )
        messages.append(AIMessage(content=error_msg))

    return {**state, "messages": messages}

Step 5: System Configuration

Implement system initialization and routing.

Why This Matters

System configuration is crucial because

  1. Sets up initial state
  2. Defines execution flow
  3. Manages routing
  4. Enables recovery

Debug Tips

  1. Configuration Logic:

    • Verify initialization
    • Check routing rules
    • Monitor state setup
  2. Common Issues:

    • Invalid initial state
    • Routing errors
    • Configuration conflicts
def route_results(state: State) -> Literal["result_processor", "error_processor"]:
    """Route based on execution results and errors.

    Args:
        state: Current system state

    Returns:
        Next processor identifier
    """
    if state["errors"] and not state["results"]:
        return "error_processor"
    return "result_processor"
def get_initial_state(state: State = None) -> State:
    """Initialize system state.

    Args:
        state: Optional existing state

    Returns:
        Initialized system state
    """
    return {
        "messages": [SystemMessage(content="Starting parallel execution system")],
        "pending_tools": [
            {"id": "calc_1", "tool_name": "calculator", "args": {"query": "2 * 3"}},
            {"id": "weather_1", "tool_name": "weather", "args": {"query": "London"}},
            {
                "id": "calc_error",
                "tool_name": "calculator",
                "args": {"query": "invalid"},
            },
        ],
        "results": {},
        "errors": {},
        "stats": {},
    }

Step 6: Graph Construction

Create complete system graph.

Why This Matters

Graph construction is essential because

  1. Defines system structure
  2. Manages execution flow
  3. Enables monitoring
  4. Facilitates testing

Debug Tips

  1. Graph Structure:

    • Verify node connections
    • Check edge conditions
    • Monitor compilation
  2. Common Problems:

    • Missing edges
    • Invalid routes
    • Compilation errors
def create_parallel_system() -> StateGraph:
    """Create complete execution system with parallel execution.

    Returns:
        Configured system graph
    """
    graph = StateGraph(State)

    # Add nodes
    graph.add_node("init", get_initial_state)
    graph.add_node("executor", parallel_executor)
    graph.add_node("result_processor", result_aggregator)
    graph.add_node("error_processor", error_handler)

    # Configure flow with conditional routing
    graph.add_edge(START, "init")
    graph.add_edge("init", "executor")

    # Conditional routing based on results/errors
    graph.add_conditional_edges(
        "executor",
        route_results,
        {"result_processor": "result_processor", "error_processor": "error_processor"},
    )

    graph.add_edge("result_processor", END)
    graph.add_edge("error_processor", END)

    return graph

Step 7: System Demonstration

Implement demonstration runner.

Why This Matters

System demonstration is crucial because

  1. Shows system behavior
  2. Verifies functionality
  3. Tests integration
  4. Validates output

Debug Tips

  1. Demo Execution:

    • Monitor system flow
    • Check output format
    • Verify error handling
  2. Common Issues:

    • Setup failures
    • Output errors
    • State corruption
async def demonstrate_system():
    """Run full system demonstration."""
    print("Complete System Demo")
    print("=" * 50)

    graph = create_parallel_system()
    chain = graph.compile()

    # Initialize with some tools
    initial_state = get_initial_state()
    print("\nInitial state messages:", len(initial_state["messages"]))
    for msg in initial_state["messages"]:
        print(f"Initial {type(msg).__name__}: {msg.content}")

    print("\nExecuting tools...")
    result = await chain.ainvoke(initial_state)

    print("\nResults dictionary:", result["results"])
    print("\nErrors dictionary:", result["errors"])

    print("\nFinal Messages:")
    for msg in result["messages"]:
        prefix = type(msg).__name__.replace("Message", "")
        print(f"\n{prefix}: {msg.content}")
        print(f"Message type: {type(msg)}")

    print("\nStats:", result["stats"])

Common Pitfalls

  1. Resource Management

    • Memory leaks
    • Connection pooling
    • Task cleanup
  2. Error Handling

    • Missing recovery
    • Silent failures
    • Lost context
  3. Performance Issues

    • Task overflow
    • Slow execution
    • Resource exhaustion
  4. State Management

    • Inconsistent updates
    • Lost messages
    • Corrupt state

Key Takeaways

  1. System Architecture

    • Clean structure
    • Clear flow
    • Error handling
  2. Performance Management

    • Parallel execution
    • Resource control
    • Monitoring
  3. Integration Patterns

    • Tool management
    • State handling
    • Message flow

Next Steps

  1. System Enhancement

    • Add monitoring
    • Implement caching
    • Create dashboard
  2. Performance Optimization

    • Add rate limiting
    • Implement batching
    • Create queuing
  3. Reliability Features

    • Add retries
    • Implement fallbacks
    • Create timeouts

Expected Output

Complete System Demo
Initial state messages: 1
Initial SystemMessage: Starting parallel execution system
Executing tools...
Results dictionary: {
    'calc_1': '2 * 3 = 6',
    'weather_1': 'Weather in London: 22ยฐC, Sunny'
}
Errors dictionary: {
    'calc_error': 'Calculation error: invalid syntax'
}

Final Messages

System: Starting parallel execution system
Tool: 2 * 3 = 6
Tool: Weather in London: 22ยฐC, Sunny
AI: Successfully completed 2 tools in 0.12s
AI: Additionally encountered errors:

- calc_error: Calculation error: invalid syntax

Stats: {
    'execution_time': 0.12,
    'total_tools': 3,
    'successful': 2,
    'failed': 1
}
if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(demonstrate_system())

Rod Rivera

๐Ÿ‡ฌ๐Ÿ‡ง Chapter

More Tutorials

Post Image: Efficient Document Retrieval with ColPali and BLIP-2 for AI Queries

Efficient Document Retrieval with ColPali and BLIP-2 for AI Queries

Discover a cutting-edge AI-powered document retrieval system using ColPali and BLIP-2 models. This open-source project efficiently extracts relevant pages from large document sets before querying them with BLIP-2, significantly reducing computational costs. Ideal for handling PDFs and images, the system enables quick, accurate insights with minimal resources. Explore the full repository and learn how to enhance your document query process at AI Product Engineer.

Jens Weber

๐Ÿ‡ฉ๐Ÿ‡ช Chapter

Post Image: LangGraph Tutorial: Message Classification and Routing System - Unit 1.3 Exercise 5

LangGraph Tutorial: Message Classification and Routing System - Unit 1.3 Exercise 5

Learn how to build a dynamic message classification and routing system in LangGraph with this hands-on tutorial. Explore advanced state management, confidence-based classification, and multi-node response handling. Develop robust workflows with conditional edge routing and create flexible, scalable systems for intelligent conversation management.

Rod Rivera

๐Ÿ‡ฌ๐Ÿ‡ง Chapter