Tutorial Image: LangGraph Tutorial: Result Aggregation Patterns - Unit 2.3 Exercise 5

LangGraph Tutorial: Result Aggregation Patterns - Unit 2.3 Exercise 5

Learn how to implement result aggregation in LangGraph using binary operators and fan-in patterns. This tutorial covers combining tool outputs, structuring error summaries, and maintaining message flow for efficient and coherent conversations.

๐ŸŽฏ What You'll Learn Today

LangGraph Tutorial: Result Aggregation Patterns - Unit 2.3 Exercise 5

This tutorial is also available in Google Colab here or for download here

Joint Initiative: This tutorial is part of a collaboration between AI Product Engineer and the Nebius Academy.

This tutorial demonstrates how to implement efficient result aggregation in LangGraph using fan-in patterns, binary operators, and appropriate message types. You'll learn how to combine results from multiple tools and create meaningful summaries.

Key Concepts Covered

  1. Binary Operator Aggregation
  2. Message Type Management
  3. State Aggregation Patterns
  4. Fan-in Implementation
  5. Result Summarization
from typing import Annotated, Any, TypedDict
!pip install langchain-core
!pip install langgraph
from langchain_core.messages import AIMessage, BaseMessage, ToolMessage
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.graph.message import add_messages

Step 1: Dictionary Reducer Implementation

Implement the core reducer function for combining dictionaries.

Why This Matters

Dictionary reduction is fundamental because

  1. Enables scalable result combination
  2. Maintains data consistency
  3. Handles partial results
  4. Supports incremental updates

Debug Tips

  1. Dictionary Merging:

    • Check for None handling
    • Verify key conflicts
    • Monitor memory usage
  2. Common Issues:

    • KeyError: Missing dictionary keys
    • TypeError: Invalid dictionary types
    • Memory: Large dictionary merges
def dict_reducer(a: dict, b: dict | None) -> dict:
    """Combine two dictionaries for result aggregation.

    Args:
        a: First dictionary
        b: Second dictionary (may be None)

    Returns:
        Combined dictionary with values from both inputs

    Examples:
        >>> dict_reducer({'a': 1}, {'b': 2})
        {'a': 1, 'b': 2}
        >>> dict_reducer({'a': 1}, None)
        {'a': 1}
    """
    if b is None:
        return a
    return {**a, **b}

Step 2: State Definition with Aggregation

Define the state structure supporting result aggregation.

Why This Matters

Aggregation-aware state is crucial because:

  1. Enables parallel processing
  2. Supports incremental updates
  3. Maintains consistency
  4. Facilitates error handling

Debug Tips

  1. State Structure:

    • Verify annotations
    • Check operator configuration
    • Monitor state size
  2. Common Problems:

    • TypeError: Invalid annotations
    • ValueError: Incorrect operator setup
    • AttributeError: Missing state fields
class State(TypedDict):
    """State with aggregation support.

    Attributes:
        messages: Conversation history
        pending_tools: List of tools to execute
        results: Tool execution results with aggregation
        errors: Error messages with aggregation
    """

    messages: Annotated[list[BaseMessage], add_messages]
    pending_tools: list[dict]
    results: Annotated[
        dict[str, Any], BinaryOperatorAggregate(dict[str, Any], dict_reducer)
    ]
    errors: Annotated[
        dict[str, str], BinaryOperatorAggregate(dict[str, str], dict_reducer)
    ]

Step 3: Result Aggregator Implementation

Implement the main aggregation logic with message type handling.

Why This Matters

Result aggregation is essential because

  1. Creates coherent summaries
  2. Maintains message type consistency
  3. Enables structured responses
  4. Facilitates error reporting

Debug Tips

  1. Message Creation:

    • Verify message types
    • Check content formatting
    • Monitor message order
  2. Common Issues:

    • TypeError: Invalid message types
    • ValueError: Malformed content
    • IndexError: Message list access
def result_aggregator(state: State) -> State:
    """Aggregate results from parallel tool execution.

    Creates appropriate message types for tool results and assistant responses.

    Args:
        state: Current state with results to aggregate

    Returns:
        Updated state with aggregated messages
    """
    messages = list(state["messages"])

    # Process successful results
    if state["results"]:
        # First add tool messages for each result
        for tool_id, result in state["results"].items():
            tool_message = ToolMessage(
                content=str(result),
                tool_call_id=tool_id,
                name=tool_id.split("_")[0],  # Extract tool name from ID
            )
            messages.append(tool_message)

        # Add assistant summary of results
        summary = "Here are the tool results:\n" + "\n".join(
            f"- {tool_id}: {result}" for tool_id, result in state["results"].items()
        )
        messages.append(AIMessage(content=summary))

    # Process errors if any
    if state["errors"]:
        error_summary = "Some tools encountered errors:\n" + "\n".join(
            f"- {tool_id}: {error}" for tool_id, error in state["errors"].items()
        )
        messages.append(AIMessage(content=error_summary))

    return {
        "messages": messages,
        "pending_tools": [],
        "results": state["results"],
        "errors": state["errors"],
    }

Step 4: Demonstration Implementation

Example usage showing aggregation in action.

Why This Matters

Demonstration code is valuable because

  1. Shows practical usage patterns
  2. Illustrates message flow
  3. Demonstrates error handling
  4. Provides testing scenarios

Debug Tips

  1. Demo Execution:

    • Monitor message creation
    • Verify result formatting
    • Check error handling
  2. Common Problems:

    • RuntimeError: Async execution
    • ValueError: Invalid state
    • TypeError: Message formatting
async def demonstrate_aggregation():
    """Demonstrate result aggregation with proper message types."""
    # Initial state with some results and errors
    state = {
        "messages": [],
        "pending_tools": [],
        "results": {"calculator_1": "2 + 2 = 4", "weather_1": "22ยฐC, Sunny"},
        "errors": {"search_1": "API rate limit exceeded"},
    }

    print("Result Aggregation Demo")
    print("=" * 50)

    result = result_aggregator(state)

    print("\nFinal Messages:")
    for msg in result["messages"]:
        if isinstance(msg, ToolMessage):
            print(f"\nTool ({msg.name}): {msg.content}")
        elif isinstance(msg, AIMessage):
            print(f"\nAssistant: {msg.content}")

Common Pitfalls

  1. Incorrect Message Type Usage

    • Using wrong message classes
    • Missing required fields
    • Improper type conversion
  2. Poor Error Aggregation

    • Missing error context
    • Inconsistent error formats
    • Lost error details
  3. State Management Issues

    • Inconsistent state updates
    • Missing state validation
    • State corruption
  4. Result Format Inconsistencies

    • Mixed data types
    • Inconsistent string formats
    • Missing type conversion

Key Takeaways

  1. Binary Operator Pattern

    • Enables scalable aggregation
    • Maintains consistency
    • Supports partial results
  2. Message Type Importance

    • Ensures proper message flow
    • Maintains context
    • Enables structured responses
  3. Error Handling Strategy

    • Comprehensive error capture
    • Structured error reporting
    • Clear error communication

Next Steps

  1. Enhanced Result Formatting

    • Add custom formatters
    • Implement type-specific handling
    • Create template system
  2. Advanced Error Management

    • Add error categorization
    • Implement recovery strategies
    • Create error hierarchies
  3. Custom Aggregation Patterns

    • Create specialized reducers
    • Implement filters
    • Add transformation pipelines

Expected Output

Result Aggregation Demo

Final Messages

Tool (calculator): 2 + 2 = 4
Tool (weather): 22ยฐC, Sunny
if __name__ == "__main__":
    import asyncio
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(demonstrate_aggregation())

Rod Rivera

๐Ÿ‡ฌ๐Ÿ‡ง Chapter