🎯 What You'll Learn Today
LangGraph Tutorial: Complete System Integration - Unit 2.3 Exercise 10
Try It Yourself
📢 Joint Initiative
This tutorial is part of a collaboration between AIPE and Nebius Academy.
This tutorial demonstrates how to build a complete multi-tool system in LangGraph, integrating parallel execution, state management, and error handling into a cohesive system. Learn how to create production-ready LangGraph applications with comprehensive monitoring and control.
Key Concepts Covered
- System Architecture
- Tool Integration
- State Management
- Error Handling
- Performance Monitoring
import asyncio
import time
from typing import Annotated, Any, Literal, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messagesStep 1: Core Components Implementation
Implement fundamental system components.
Why This Matters
Core components are crucial because
- Provides system foundation
- Enables data consistency
- Supports tool integration
- Facilitates monitoring
Debug Tips
- 
Component Structure: - Verify type annotations
- Check reducer logic
- Monitor state management
 
- 
Common Issues: - Type mismatches
- Missing annotations
- State inconsistencies
 
def dict_reducer(a: dict[str, Any], b: dict[str, Any] | None) -> dict[str, Any]:
    """Combine dictionaries for result aggregation.
    Args:
        a: Primary dictionary
        b: Secondary dictionary (may be None)
    Returns:
        Combined dictionary with values from both inputs
    Examples:
        >>> dict_reducer({'a': 1}, {'b': 2})
        {'b': 2, 'a': 1}
    """
    if b is None:
        return a
    return {**b, **a}class State(TypedDict):
    """Complete system state.
    Attributes:
        messages: Conversation history
        pending_tools: Tools awaiting execution
        results: Aggregated tool results
        errors: Error messages
        stats: Execution statistics
    """
    messages: Annotated[list[BaseMessage], add_messages]
    pending_tools: list[dict[str, Any]]
    results: Annotated[
        dict[str, Any], BinaryOperatorAggregate(dict[str, Any], dict_reducer)
    ]
    errors: Annotated[
        dict[str, str], BinaryOperatorAggregate(dict[str, str], dict_reducer)
    ]
    stats: dict[str, Any]Step 2: Tool Implementation
Implement system tools with error handling.
Why This Matters
Tool implementation is essential because
- Provides core functionality
- Handles domain logic
- Manages errors
- Ensures reliability
Debug Tips
- 
Tool Logic: - Check error handling
- Verify return types
- Monitor performance
 
- 
Common Problems: - Unhandled exceptions
- Invalid inputs
- Resource leaks
 
@tool
async def calculator(expression: str) -> str:
    """Calculate mathematical expressions.
    Args:
        expression: Math expression to evaluate
    Returns:
        Formatted result string
    Raises:
        ValueError: If calculation fails
    """
    try:
        import numexpr
        result = numexpr.evaluate(expression.strip()).item()
        return f"{expression} = {result}"
    except Exception as e:
        raise ValueError(f"Calculation error: {e!s}")@tool
async def weather_tool(location: str) -> str:
    """Get simulated weather data.
    Args:
        location: Location to get weather for
    Returns:
        Simulated weather string
    """
    await asyncio.sleep(0.1)
    return f"Weather in {location}: 22°C, Sunny"Step 3: Parallel Execution Implementation
Implement core execution logic with monitoring.
Why This Matters
Parallel execution is crucial because
- Maximizes throughput
- Optimizes performance
- Manages resources
- Enables monitoring
Debug Tips
- 
Execution Logic: - Monitor task creation
- Check error handling
- Verify performance
 
- 
Common Issues: - Resource exhaustion
- Task failures
- Memory leaks
 
async def parallel_executor(state: State) -> State:
    """Execute tools in parallel with monitoring.
    Args:
        state: Current system state
    Returns:
        Updated state with results and stats
    """
    if not state.get("pending_tools"):
        return state
    start_time = time.time()
    tasks = []
    tools = {"calculator": calculator, "weather": weather_tool}
    for tool_call in state["pending_tools"]:
        tool = tools.get(tool_call["tool_name"])
        if not tool:
            continue
        tasks.append(tool.ainvoke(tool_call["args"]["query"]))
    results = await asyncio.gather(*tasks, return_exceptions=True)
    execution_time = time.time() - start_time
    new_results = {}
    new_errors = {}
    for tool_call, result in zip(state["pending_tools"], results, strict=False):
        if isinstance(result, Exception):
            new_errors[tool_call["id"]] = str(result)
        else:
            new_results[tool_call["id"]] = result
    stats = {
        "execution_time": execution_time,
        "total_tools": len(tasks),
        "successful": len(new_results),
        "failed": len(new_errors),
    }
    return {**state, "results": new_results, "errors": new_errors, "stats": stats}Step 4: Result Processing Implementation
Implement result aggregation and error handling.
Why This Matters
Result processing is essential because
- Combines tool outputs
- Handles errors gracefully
- Creates user messages
- Maintains context
Debug Tips
- 
Processing Logic: - Verify result formatting
- Check message creation
- Monitor error handling
 
- 
Common Problems: - Missing results
- Invalid formats
- Message corruption
 
def result_aggregator(state: State) -> State:
    """Aggregate successful results and handle any errors.
    Args:
        state: Current state with results
    Returns:
        Updated state with processed messages
    """
    messages = [state["messages"][0]]
    # Process successful results
    if state["results"]:
        for tool_id, result in state["results"].items():
            messages.append(
                ToolMessage(
                    content=str(result),
                    tool_call_id=tool_id,
                    name=tool_id.split("_")[0],
                )
            )
        stats = state.get("stats", {})
        success_msg = (
            f"Successfully completed {stats.get('successful', 0)} "
            f"tools in {stats.get('execution_time', 0):.2f}s"
        )
        messages.append(AIMessage(content=success_msg))
    # Process any errors if present
    if state["errors"]:
        error_msg = "Additionally encountered errors:\n" + "\n".join(
            f"- {k}: {v}" for k, v in state["errors"].items()
        )
        messages.append(AIMessage(content=error_msg))
    return {**state, "messages": messages}def error_handler(state: State) -> State:
    """Process execution errors.
    Args:
        state: Current state with errors
    Returns:
        Updated state with error messages
    """
    messages = list(state["messages"])
    if state["errors"]:
        error_msg = f"Encountered {len(state['errors'])} errors:\n" + "\n".join(
            f"- {k}: {v}" for k, v in state["errors"].items()
        )
        messages.append(AIMessage(content=error_msg))
    return {**state, "messages": messages}Step 5: System Configuration
Implement system initialization and routing.
Why This Matters
System configuration is crucial because
- Sets up initial state
- Defines execution flow
- Manages routing
- Enables recovery
Debug Tips
- 
Configuration Logic: - Verify initialization
- Check routing rules
- Monitor state setup
 
- 
Common Issues: - Invalid initial state
- Routing errors
- Configuration conflicts
 
def route_results(state: State) -> Literal["result_processor", "error_processor"]:
    """Route based on execution results and errors.
    Args:
        state: Current system state
    Returns:
        Next processor identifier
    """
    if state["errors"] and not state["results"]:
        return "error_processor"
    return "result_processor"def get_initial_state(state: State = None) -> State:
    """Initialize system state.
    Args:
        state: Optional existing state
    Returns:
        Initialized system state
    """
    return {
        "messages": [SystemMessage(content="Starting parallel execution system")],
        "pending_tools": [
            {"id": "calc_1", "tool_name": "calculator", "args": {"query": "2 * 3"}},
            {"id": "weather_1", "tool_name": "weather", "args": {"query": "London"}},
            {
                "id": "calc_error",
                "tool_name": "calculator",
                "args": {"query": "invalid"},
            },
        ],
        "results": {},
        "errors": {},
        "stats": {},
    }Step 6: Graph Construction
Create complete system graph.
Why This Matters
Graph construction is essential because
- Defines system structure
- Manages execution flow
- Enables monitoring
- Facilitates testing
Debug Tips
- 
Graph Structure: - Verify node connections
- Check edge conditions
- Monitor compilation
 
- 
Common Problems: - Missing edges
- Invalid routes
- Compilation errors
 
def create_parallel_system() -> StateGraph:
    """Create complete execution system with parallel execution.
    Returns:
        Configured system graph
    """
    graph = StateGraph(State)
    # Add nodes
    graph.add_node("init", get_initial_state)
    graph.add_node("executor", parallel_executor)
    graph.add_node("result_processor", result_aggregator)
    graph.add_node("error_processor", error_handler)
    # Configure flow with conditional routing
    graph.add_edge(START, "init")
    graph.add_edge("init", "executor")
    # Conditional routing based on results/errors
    graph.add_conditional_edges(
        "executor",
        route_results,
        {"result_processor": "result_processor", "error_processor": "error_processor"},
    )
    graph.add_edge("result_processor", END)
    graph.add_edge("error_processor", END)
    return graphStep 7: System Demonstration
Implement demonstration runner.
Why This Matters
System demonstration is crucial because
- Shows system behavior
- Verifies functionality
- Tests integration
- Validates output
Debug Tips
- 
Demo Execution: - Monitor system flow
- Check output format
- Verify error handling
 
- 
Common Issues: - Setup failures
- Output errors
- State corruption
 
async def demonstrate_system():
    """Run full system demonstration."""
    print("Complete System Demo")
    print("=" * 50)
    graph = create_parallel_system()
    chain = graph.compile()
    # Initialize with some tools
    initial_state = get_initial_state()
    print("\nInitial state messages:", len(initial_state["messages"]))
    for msg in initial_state["messages"]:
        print(f"Initial {type(msg).__name__}: {msg.content}")
    print("\nExecuting tools...")
    result = await chain.ainvoke(initial_state)
    print("\nResults dictionary:", result["results"])
    print("\nErrors dictionary:", result["errors"])
    print("\nFinal Messages:")
    for msg in result["messages"]:
        prefix = type(msg).__name__.replace("Message", "")
        print(f"\n{prefix}: {msg.content}")
        print(f"Message type: {type(msg)}")
    print("\nStats:", result["stats"])Common Pitfalls
- 
Resource Management - Memory leaks
- Connection pooling
- Task cleanup
 
- 
Error Handling - Missing recovery
- Silent failures
- Lost context
 
- 
Performance Issues - Task overflow
- Slow execution
- Resource exhaustion
 
- 
State Management - Inconsistent updates
- Lost messages
- Corrupt state
 
Key Takeaways
- 
System Architecture - Clean structure
- Clear flow
- Error handling
 
- 
Performance Management - Parallel execution
- Resource control
- Monitoring
 
- 
Integration Patterns - Tool management
- State handling
- Message flow
 
Next Steps
- 
System Enhancement - Add monitoring
- Implement caching
- Create dashboard
 
- 
Performance Optimization - Add rate limiting
- Implement batching
- Create queuing
 
- 
Reliability Features - Add retries
- Implement fallbacks
- Create timeouts
 
Expected Output
Complete System Demo
Initial state messages: 1
Initial SystemMessage: Starting parallel execution system
Executing tools...
Results dictionary: {
    'calc_1': '2 * 3 = 6',
    'weather_1': 'Weather in London: 22°C, Sunny'
}
Errors dictionary: {
    'calc_error': 'Calculation error: invalid syntax'
}Final Messages
System: Starting parallel execution system
Tool: 2 * 3 = 6
Tool: Weather in London: 22°C, Sunny
AI: Successfully completed 2 tools in 0.12s
AI: Additionally encountered errors:
- calc_error: Calculation error: invalid syntax
Stats: {
    'execution_time': 0.12,
    'total_tools': 3,
    'successful': 2,
    'failed': 1
}if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(demonstrate_system())

