Tutorial Image: LangGraph Tutorial: Graph Configuration and Routing - Unit 2.3 Exercise 8

LangGraph Tutorial: Graph Configuration and Routing - Unit 2.3 Exercise 8

Master the essentials of building and configuring execution graphs in LangGraph. This tutorial explores graph structure design, parallel processing, conditional routing, and state management to create scalable, robust workflows for multi-tool systems.

๐ŸŽฏ What You'll Learn Today

LangGraph Tutorial: Graph Configuration and Routing - Unit 2.3 Exercise 8

This tutorial is also available in Google Colab here or for download here

Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.

This tutorial demonstrates how to construct and configure execution graphs in LangGraph, focusing on parallel tool execution, state management, and conditional routing. Learn how to create robust, scalable graph structures for complex workflows.

Key Concepts Covered

  1. Graph Structure Definition
  2. State Management
  3. Node Configuration
  4. Conditional Routing
  5. Parallel Execution
import asyncio
from typing import Annotated, Any, Literal, TypedDict
#!pip install langchain-core
#!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, SystemMessage, ToolMessage
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages

Step 1: State Definition with Aggregation

Define state structure with support for result aggregation.

Why This Matters

State definition is crucial because

  1. Enables parallel processing
  2. Supports result aggregation
  3. Maintains execution context
  4. Facilitates error handling

Debug Tips

  1. State Structure:

    • Verify aggregation operators
    • Check type annotations
    • Monitor state growth
  2. Common Issues:

    • Missing aggregation fields
    • Type mismatches
    • Memory leaks
class State(TypedDict):
    """State container for graph execution.

    Attributes:
        messages: Conversation history
        pending_tools: Tools awaiting execution
        results: Aggregated tool results
        errors: Aggregated error messages
        tool_configs: Tool-specific configurations
    """

    messages: Annotated[list[BaseMessage], add_messages]
    pending_tools: list[dict[str, Any]]
    results: Annotated[
        dict[str, Any],
        BinaryOperatorAggregate(dict[str, Any], lambda a, b: {**(b or {}), **a}),
    ]
    errors: Annotated[
        dict[str, str],
        BinaryOperatorAggregate(dict[str, str], lambda a, b: {**(b or {}), **a}),
    ]
    tool_configs: dict[str, dict[str, Any]]

Step 2: Node Implementation

Implement core processing nodes for the graph.

Why This Matters

Node implementation is essential because

  1. Defines processing logic
  2. Handles state transitions
  3. Manages tool execution
  4. Processes results/errors

Debug Tips

  1. Node Logic:

    • Verify state updates
    • Check async operations
    • Monitor resource usage
  2. Common Problems:

    • State corruption
    • Resource leaks
    • Concurrency issues
async def parallel_executor(state: State) -> State:
    """Execute pending tools in parallel.

    Args:
        state: Current execution state

    Returns:
        Updated state with results/errors
    """
    if not state["pending_tools"]:
        return state

    tasks = []
    for tool_call in state["pending_tools"]:
        tasks.append(
            asyncio.sleep(0.1)  # Simulate tool execution
        )

    await asyncio.gather(*tasks)

    # Simulate results/errors
    new_results = {}
    new_errors = {}
    for tool in state["pending_tools"]:
        if "error" in tool["args"].get("query", ""):
            new_errors[tool["id"]] = "Simulated error"
        else:
            new_results[tool["id"]] = f"Result for {tool['args']['query']}"

    return {**state, "results": new_results, "errors": new_errors}
def result_aggregator(state: State) -> State:
    """Aggregate successful results.

    Args:
        state: Current state with results

    Returns:
        Updated state with aggregated messages
    """
    messages = list(state["messages"])

    for tool_id, result in state["results"].items():
        messages.append(
            ToolMessage(
                content=str(result), tool_call_id=tool_id, name=tool_id.split("_")[0]
            )
        )

    messages.append(
        AIMessage(content=f"Processed {len(state['results'])} results successfully.")
    )

    return {**state, "messages": messages}
def error_handler(state: State) -> State:
    """Process execution errors.

    Args:
        state: Current state with errors

    Returns:
        Updated state with error messages
    """
    messages = list(state["messages"])

    for tool_id, error in state["errors"].items():
        messages.append(AIMessage(content=f"Error in {tool_id}: {error}"))

    return {**state, "messages": messages}

Step 3: State and Routing Management

Implement state initialization and routing logic.

Why This Matters

State management is critical because

  1. Ensures proper initialization
  2. Controls execution flow
  3. Maintains consistency
  4. Enables recovery

Debug Tips

  1. State Management:

    • Verify initialization
    • Check state transitions
    • Monitor routing decisions
  2. Common Issues:

    • Invalid initial state
    • Routing errors
    • State inconsistencies
def get_initial_state(state: State = None) -> State:
    """Initialize state with test tools.

    Args:
        state: Optional existing state

    Returns:
        Initialized state with test configuration
    """
    return {
        "messages": [SystemMessage(content="Starting execution")],
        "pending_tools": [
            {
                "id": f"tool_{i}",
                "tool_name": "test_tool",
                "args": {"query": f"test_query_{i}"},
            }
            for i in range(3)
        ],
        "results": {},
        "errors": {},
        "tool_configs": {},
    }
def route_results(state: State) -> Literal["result_aggregator", "error_handler"]:
    """Route based on execution results.

    Args:
        state: Current execution state

    Returns:
        Next node identifier
    """
    return "error_handler" if state.get("errors") else "result_aggregator"

Step 4: Graph Construction

Implement graph creation and configuration.

Why This Matters

Graph construction is essential because

  1. Defines execution flow
  2. Establishes node relationships
  3. Enables conditional routing
  4. Supports parallel processing

Debug Tips

  1. Graph Structure:

    • Verify node connections
    • Check edge conditions
    • Monitor graph compilation
  2. Common Problems:

    • Missing edges
    • Invalid routes
    • Dead ends
def create_parallel_graph() -> StateGraph:
    """Create execution graph with parallel processing.

    Graph Structure:
    START โ†’ init โ†’ parallel_executor โ†’ [result_aggregator|error_handler] โ†’ END

    Returns:
        Configured StateGraph
    """
    graph = StateGraph(State)

    # Add nodes
    graph.add_node("init", get_initial_state)
    graph.add_node("parallel_executor", parallel_executor)
    graph.add_node("result_aggregator", result_aggregator)
    graph.add_node("error_handler", error_handler)

    # Configure edges
    graph.add_edge(START, "init")
    graph.add_edge("init", "parallel_executor")

    # Add conditional routing
    graph.add_conditional_edges(
        "parallel_executor",
        route_results,
        {"result_aggregator": "result_aggregator", "error_handler": "error_handler"},
    )

    # Connect to END
    graph.add_edge("result_aggregator", END)
    graph.add_edge("error_handler", END)

    return graph

Step 5: Demonstration Implementation

Example usage showing graph execution.

Why This Matters

Demonstration code is valuable because

  1. Shows practical usage patterns
  2. Illustrates execution flow
  3. Demonstrates error handling
  4. Provides testing scenarios

Debug Tips

  1. Demo Execution:

    • Monitor graph flow
    • Verify state updates
    • Check message generation
  2. Common Issues:

    • Initialization errors
    • Execution failures
    • Message formatting
async def demonstrate_graph():
    """Demonstrate graph execution."""
    print("Graph Execution Demo")
    print("=" * 50)

    graph = create_parallel_graph()
    chain = graph.compile()

    # Initialize state properly
    initial_state = {
        "messages": [SystemMessage(content="Starting execution")],
        "pending_tools": [],
        "results": {},
        "errors": {},
        "tool_configs": {},
    }

    result = await chain.ainvoke(initial_state)

    print("\nExecution Messages:")
    for msg in result["messages"]:
        prefix = type(msg).__name__.replace("Message", "")
        print(f"\n{prefix}: {msg.content}")

    print("\nResults:", result["results"])
    print("Errors:", result["errors"])

Common Pitfalls

  1. Graph Structure Issues

    • Missing node connections
    • Invalid routing conditions
    • Unreachable nodes
  2. State Management Problems

    • Incomplete initialization
    • State corruption
    • Missing updates
  3. Execution Flow Issues

    • Deadlocks
    • Infinite loops
    • Resource exhaustion
  4. Error Handling Gaps

    • Unhandled exceptions
    • Lost errors
    • Missing recovery paths

Key Takeaways

  1. Graph Architecture

    • Clear node structure
    • Proper routing
    • Error handling
  2. State Management

    • Consistent updates
    • Proper aggregation
    • Type safety
  3. Execution Patterns

    • Parallel processing
    • Conditional routing
    • Resource management

Next Steps

  1. Enhanced Graph Structure

    • Add recovery nodes
    • Implement caching
    • Create visualizations
  2. Advanced Routing

    • Add priorities
    • Create fallbacks
    • Implement retries
  3. Monitoring

    • Add metrics
    • Create logging
    • Implement tracing

Expected Output

Graph Execution Demo

## Execution Messages

System: Starting execution
Tool: Result for test_query_0
Tool: Result for test_query_1
Tool: Result for test_query_2
AI: Processed 3 results successfully
Results: {'tool_0': 'Result for test_query_0', 'tool_1': 'Result for test_query_1', 'tool_2': 'Result for test_query_2'}
Errors: {}
if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(demonstrate_graph())

Rod Rivera

๐Ÿ‡ฌ๐Ÿ‡ง Chapter