Skip to content
Go back

Keeping State Consistent: Database Transactions in LangGraph Workflows

Published:  at  10:00 AM
TL;DR

• LangGraph checkpoints and database transactions are independent state stores that can diverge on crashes

• Three patterns for consistency: idempotent tasks (simplest), read-before-write (state machine), transactional outbox (strongest guarantees)

• Design for re-execution: assume your code will run multiple times and structure operations so that's safe

When building AI agents with LangGraph, you’ll inevitably need to update external systems—databases, APIs, third-party services. The challenge? LangGraph maintains its own state through checkpoints, and your database maintains its own state through transactions. These two systems don’t inherently know about each other, creating opportunities for inconsistency.

Note: The get_db_connection() function used throughout this guide is pseudocode representing your database connection logic. Replace it with your actual connection management (e.g., SQLAlchemy sessions, connection pools, or ORM contexts).

This guide explores three patterns for keeping LangGraph workflow state and database state synchronized, each suited to different requirements around complexity, consistency guarantees, and system architecture.

The Consistency Problem

Consider a simple scenario: an agent processes a customer request and needs to update their account balance in a database.

@task
def update_balance(customer_id: str, amount: float) -> dict:
    with get_db_connection() as conn:
        conn.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, customer_id))
        conn.commit()
    return {"updated": True, "customer_id": customer_id}

@entrypoint(checkpointer=InMemorySaver())
def process_request(request: dict) -> dict:
    result = update_balance(request["customer_id"], request["amount"]).result()
    approved = interrupt({"action": "confirm_update", "result": result})
    return {"result": result, "approved": approved}

What happens if the system crashes after the database commits but before LangGraph checkpoints the task result? On resume, the task re-executes, and the balance is updated twice.

Task starts → DB commits (+£100) → [CRASH] → Resume → Task re-runs → DB commits (+£100 again)

The customer now has £200 instead of £100. The database and the workflow’s understanding of reality have diverged.

Pattern 1: Idempotent Tasks

The most universally applicable pattern is making database operations safe to retry. If running the same operation twice produces the same result, crashes become non-events.

Using Idempotency Keys

Track which operations have been applied using a unique identifier:

import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

@task
def update_balance(customer_id: str, amount: float, operation_id: str) -> dict:
    """
    Idempotent balance update. The operation_id ensures this specific
    update can only be applied once, regardless of how many times
    the task executes.
    """
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            # Check if we've already processed this operation
            cur.execute(
                "SELECT 1 FROM operation_log WHERE operation_id = %s",
                (operation_id,)
            )

            if cur.fetchone():
                # Already applied—fetch current state and return
                cur.execute(
                    "SELECT balance FROM accounts WHERE id = %s",
                    (customer_id,)
                )
                return {
                    "status": "already_applied",
                    "balance": cur.fetchone()[0]
                }

            # Apply the update and log it atomically
            cur.execute(
                """
                UPDATE accounts
                SET balance = balance + %s
                WHERE id = %s
                RETURNING balance
                """,
                (amount, customer_id)
            )
            new_balance = cur.fetchone()[0]

            cur.execute(
                "INSERT INTO operation_log (operation_id, applied_at) VALUES (%s, NOW())",
                (operation_id,)
            )

            conn.commit()
            return {"status": "applied", "balance": new_balance}

@entrypoint(checkpointer=InMemorySaver())
def process_payment(request: dict) -> dict:
    # Generate operation_id once at the start—it persists across resumes
    # because the entrypoint input is part of the checkpoint
    operation_id = request.get("operation_id", str(uuid.uuid4()))

    result = update_balance(
        request["customer_id"],
        request["amount"],
        operation_id
    ).result()

    return result

The key insight: the operation_id ties the workflow invocation to the database operation. Even if the task runs multiple times, the database update happens exactly once.

Using Optimistic Locking

If your tables already have version columns, you can use them to prevent duplicate updates:

@task
def update_with_version(
    customer_id: str,
    amount: float,
    expected_version: int
) -> dict:
    """
    Only applies the update if the row hasn't changed since we read it.
    On retry, the version won't match, so the duplicate is rejected.
    """
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                UPDATE accounts
                SET balance = balance + %s, version = version + 1
                WHERE id = %s AND version = %s
                RETURNING balance, version
                """,
                (amount, customer_id, expected_version)
            )

            row = cur.fetchone()
            conn.commit()

            if row is None:
                # Version mismatch—either we already updated, or someone else did
                cur.execute(
                    "SELECT balance, version FROM accounts WHERE id = %s",
                    (customer_id,)
                )
                current = cur.fetchone()
                return {
                    "status": "version_conflict",
                    "current_balance": current[0],
                    "current_version": current[1]
                }

            return {
                "status": "applied",
                "balance": row[0],
                "version": row[1]
            }

This works well when you need to prevent both duplicate updates and concurrent modifications from other processes.

When to Use Idempotent Tasks

This pattern suits most scenarios. Use it when:


Pattern 2: Read-Before-Write State Synchronization

Sometimes the simplest approach is the right one: read the current database state at the start of each workflow execution, and use that to determine what work remains.

This pattern treats the database as the source of truth. Instead of trying to prevent duplicate writes, you structure the workflow to be driven by what the database currently shows.

The Core Idea

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from enum import Enum

class OrderStatus(str, Enum):
    PENDING = "pending"
    PAYMENT_PROCESSED = "payment_processed"
    INVENTORY_RESERVED = "inventory_reserved"
    SHIPPED = "shipped"
    COMPLETED = "completed"

@task
def read_order_state(order_id: str) -> dict:
    """Read current order state from database."""
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT status, payment_id, inventory_reservation_id, shipping_id
                FROM orders WHERE id = %s
                """,
                (order_id,)
            )
            row = cur.fetchone()
            return {
                "status": row[0],
                "payment_id": row[1],
                "inventory_reservation_id": row[2],
                "shipping_id": row[3]
            }

@task
def process_payment(order_id: str) -> dict:
    """Process payment and update order status atomically."""
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            # Only process if still pending
            cur.execute(
                """
                UPDATE orders
                SET status = %s, payment_id = %s
                WHERE id = %s AND status = %s
                RETURNING id
                """,
                (OrderStatus.PAYMENT_PROCESSED, generate_payment_id(), order_id, OrderStatus.PENDING)
            )
            updated = cur.fetchone() is not None
            conn.commit()
            return {"processed": updated}

@task
def reserve_inventory(order_id: str) -> dict:
    """Reserve inventory and update order status atomically."""
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                UPDATE orders
                SET status = %s, inventory_reservation_id = %s
                WHERE id = %s AND status = %s
                RETURNING id
                """,
                (OrderStatus.INVENTORY_RESERVED, generate_reservation_id(), order_id, OrderStatus.PAYMENT_PROCESSED)
            )
            updated = cur.fetchone() is not None
            conn.commit()
            return {"reserved": updated}

@entrypoint(checkpointer=InMemorySaver())
def fulfill_order(request: dict) -> dict:
    """
    Order fulfillment workflow that syncs with database state on each execution.

    On resume after a crash, we read current state and skip completed steps.
    The database status column acts as a durable record of progress.
    """
    order_id = request["order_id"]

    # Always start by reading current state
    current_state = read_order_state(order_id).result()
    status = current_state["status"]

    # Process payment if not yet done
    if status == OrderStatus.PENDING:
        process_payment(order_id).result()
        status = OrderStatus.PAYMENT_PROCESSED

    # Reserve inventory if payment complete but inventory not reserved
    if status == OrderStatus.PAYMENT_PROCESSED:
        reserve_inventory(order_id).result()
        status = OrderStatus.INVENTORY_RESERVED

    # Human approval checkpoint
    if status == OrderStatus.INVENTORY_RESERVED:
        approved = interrupt({
            "action": "approve_shipment",
            "order_id": order_id
        })

        if approved:
            ship_order(order_id).result()
            status = OrderStatus.SHIPPED

    return {"order_id": order_id, "final_status": status}

Handling the Read-Write Race

There’s a subtle issue: what if the database state changes between reading and writing? Each task should use conditional updates:

@task
def process_payment(order_id: str) -> dict:
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            # This UPDATE only succeeds if status is still PENDING
            cur.execute(
                """
                UPDATE orders
                SET status = 'payment_processed', payment_id = %s
                WHERE id = %s AND status = 'pending'
                RETURNING id
                """,
                (generate_payment_id(), order_id)
            )

            if cur.fetchone() is None:
                # Status already changed—read current state
                cur.execute("SELECT status FROM orders WHERE id = %s", (order_id,))
                current = cur.fetchone()[0]
                conn.commit()
                return {"processed": False, "current_status": current}

            conn.commit()
            return {"processed": True}

The conditional WHERE status = 'pending' ensures idempotency: if we’ve already processed payment (perhaps in a previous execution before a crash), the UPDATE affects zero rows and we move on.

State Machine Progression

This pattern works particularly well when your workflow follows a linear state machine:

PENDING → PAYMENT_PROCESSED → INVENTORY_RESERVED → SHIPPED → COMPLETED

Each state transition is guarded by a conditional update. The workflow reads current state, determines which transition to attempt next, and uses the database’s conditional update to ensure exactly-once semantics.

@entrypoint(checkpointer=InMemorySaver())
def state_machine_workflow(request: dict) -> dict:
    order_id = request["order_id"]

    while True:
        # Read current state
        state = read_order_state(order_id).result()

        match state["status"]:
            case OrderStatus.PENDING:
                process_payment(order_id).result()

            case OrderStatus.PAYMENT_PROCESSED:
                reserve_inventory(order_id).result()

            case OrderStatus.INVENTORY_RESERVED:
                approved = interrupt({"action": "approve", "order_id": order_id})
                if approved:
                    ship_order(order_id).result()
                else:
                    cancel_order(order_id).result()
                    return {"status": "cancelled"}

            case OrderStatus.SHIPPED:
                mark_completed(order_id).result()

            case OrderStatus.COMPLETED:
                return {"status": "completed", "order_id": order_id}

            case OrderStatus.CANCELLED:
                return {"status": "cancelled", "order_id": order_id}

When to Use Read-Before-Write

This pattern excels when:

The tradeoff is that every workflow execution starts with a database read, and you’re coupling your workflow logic tightly to database schema.


Pattern 3: Transactional Outbox with Pause/Resume

For systems requiring stronger consistency guarantees or involving multiple services, the transactional outbox pattern decouples the “request” from the “execution” while maintaining exactly-once semantics.

The Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                           LangGraph Workflow                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. Write request to outbox  ──►  2. Interrupt (pause workflow)         │
│                                                                         │
│  5. Resume with response  ◄──────  (waiting for external completion)    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
                │                                       ▲
                │                                       │
                ▼                                       │
┌───────────────────────────┐       ┌───────────────────────────────────┐
│        Database           │       │       Message Queue / Events      │
├───────────────────────────┤       ├───────────────────────────────────┤
│                           │       │                                   │
│  Outbox Table             │──────►│  4. Publish completion event      │
│  ┌─────────────────────┐  │       │                                   │
│  │ id: uuid            │  │       │                                   │
│  │ request_type: str   │  │       └───────────────────────────────────┘
│  │ payload: jsonb      │  │                       ▲
│  │ status: pending     │  │                       │
│  │ thread_id: str      │  │                       │
│  │ created_at: ts      │  │                       │
│  └─────────────────────┘  │                       │
│                           │       ┌───────────────────────────────────┐
└───────────────────────────┘       │       Worker Service              │
                                    ├───────────────────────────────────┤
                                    │                                   │
                                    │  3. Process outbox entries        │
                                    │     - Execute actual operation    │
                                    │     - Update outbox status        │
                                    │     - Publish completion event    │
                                    │                                   │
                                    └───────────────────────────────────┘

Implementation

First, define the outbox schema:

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    request_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    thread_id VARCHAR(255) NOT NULL,  -- Links back to LangGraph workflow
    result JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    processed_at TIMESTAMP,

    INDEX idx_outbox_status (status),
    INDEX idx_outbox_thread (thread_id)
);

The workflow writes to the outbox and pauses:

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
import uuid

@task
def write_to_outbox(
    request_type: str,
    payload: dict,
    thread_id: str
) -> dict:
    """
    Write a request to the outbox table within a transaction.
    This is the only write the workflow makes directly.
    """
    outbox_id = str(uuid.uuid4())

    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO outbox (id, request_type, payload, thread_id, status)
                VALUES (%s, %s, %s, %s, 'pending')
                RETURNING id
                """,
                (outbox_id, request_type, Json(payload), thread_id)
            )
            conn.commit()

    return {"outbox_id": outbox_id, "status": "pending"}

@entrypoint(checkpointer=InMemorySaver())
def transfer_funds(request: dict, *, config: RunnableConfig) -> dict:
    """
    Fund transfer workflow using transactional outbox pattern.

    The workflow never directly modifies account balances. Instead, it:
    1. Writes a transfer request to the outbox
    2. Pauses and waits for external processing
    3. Resumes when the transfer service completes the operation
    """
    thread_id = config["configurable"]["thread_id"]

    # Step 1: Write the transfer request to outbox
    outbox_entry = write_to_outbox(
        request_type="fund_transfer",
        payload={
            "from_account": request["from_account"],
            "to_account": request["to_account"],
            "amount": request["amount"],
            "reference": request.get("reference", str(uuid.uuid4()))
        },
        thread_id=thread_id
    ).result()

    # Step 2: Pause workflow—external service will process and resume
    # The interrupt payload tells the orchestration layer what we're waiting for
    transfer_result = interrupt({
        "waiting_for": "fund_transfer_completion",
        "outbox_id": outbox_entry["outbox_id"],
        "message": "Waiting for transfer service to process request"
    })

    # Step 3: Workflow resumes here with the transfer result
    if transfer_result["status"] == "completed":
        return {
            "status": "success",
            "transfer_id": transfer_result["transfer_id"],
            "from_account": request["from_account"],
            "to_account": request["to_account"],
            "amount": request["amount"]
        }
    else:
        return {
            "status": "failed",
            "error": transfer_result.get("error"),
            "outbox_id": outbox_entry["outbox_id"]
        }

The Worker Service

A separate service polls the outbox and processes requests:

# worker_service.py
import time
from datetime import datetime

def process_outbox():
    """
    Worker that processes outbox entries.
    Runs as a separate service, not within LangGraph.
    """
    while True:
        with get_db_connection() as conn:
            with conn.cursor() as cur:
                # Claim a pending entry (with row locking for concurrency)
                cur.execute(
                    """
                    UPDATE outbox
                    SET status = 'processing'
                    WHERE id = (
                        SELECT id FROM outbox
                        WHERE status = 'pending'
                        ORDER BY created_at
                        LIMIT 1
                        FOR UPDATE SKIP LOCKED
                    )
                    RETURNING id, request_type, payload, thread_id
                    """
                )

                entry = cur.fetchone()
                if not entry:
                    conn.commit()
                    time.sleep(1)  # No work, wait before polling again
                    continue

                outbox_id, request_type, payload, thread_id = entry
                conn.commit()

        # Process based on request type
        try:
            if request_type == "fund_transfer":
                result = execute_fund_transfer(payload)
            else:
                result = {"status": "error", "error": f"Unknown request type: {request_type}"}

            # Update outbox with result
            with get_db_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        UPDATE outbox
                        SET status = 'completed', result = %s, processed_at = NOW()
                        WHERE id = %s
                        """,
                        (Json(result), outbox_id)
                    )
                    conn.commit()

            # Resume the LangGraph workflow
            resume_workflow(thread_id, result)

        except Exception as e:
            # Mark as failed, workflow can handle retry logic
            with get_db_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        UPDATE outbox
                        SET status = 'failed', result = %s, processed_at = NOW()
                        WHERE id = %s
                        """,
                        (Json({"status": "error", "error": str(e)}), outbox_id)
                    )
                    conn.commit()

            resume_workflow(thread_id, {"status": "error", "error": str(e)})


def execute_fund_transfer(payload: dict) -> dict:
    """
    Execute the actual fund transfer within a database transaction.
    This is where the real business logic lives.
    """
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            # Check idempotency—has this reference been processed?
            cur.execute(
                "SELECT transfer_id FROM transfers WHERE reference = %s",
                (payload["reference"],)
            )
            existing = cur.fetchone()
            if existing:
                return {"status": "completed", "transfer_id": existing[0], "note": "already_processed"}

            # Execute transfer atomically
            cur.execute(
                "UPDATE accounts SET balance = balance - %s WHERE id = %s AND balance >= %s RETURNING id",
                (payload["amount"], payload["from_account"], payload["amount"])
            )
            if not cur.fetchone():
                conn.rollback()
                return {"status": "error", "error": "insufficient_funds"}

            cur.execute(
                "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                (payload["amount"], payload["to_account"])
            )

            # Record the transfer
            transfer_id = str(uuid.uuid4())
            cur.execute(
                """
                INSERT INTO transfers (id, reference, from_account, to_account, amount, created_at)
                VALUES (%s, %s, %s, %s, %s, NOW())
                """,
                (transfer_id, payload["reference"], payload["from_account"],
                 payload["to_account"], payload["amount"])
            )

            conn.commit()
            return {"status": "completed", "transfer_id": transfer_id}


def resume_workflow(thread_id: str, result: dict):
    """Resume the paused LangGraph workflow with the processing result."""
    from langgraph.types import Command

    # Load the workflow (you'd have a reference to your entrypoint)
    config = {"configurable": {"thread_id": thread_id}}

    # Resume with the result
    for chunk in transfer_funds.stream(Command(resume=result), config):
        pass  # Process to completion

Event-Driven Variant

Instead of the worker directly resuming workflows, you can use a message queue for better decoupling:

# Worker publishes completion events
def process_outbox_with_events():
    # ... (same claim logic as before)

    result = execute_fund_transfer(payload)

    # Publish event instead of direct resume
    publish_event(
        topic="workflow.resume",
        payload={
            "thread_id": thread_id,
            "outbox_id": outbox_id,
            "result": result
        }
    )

# Separate service handles workflow resumption
def workflow_resume_handler():
    for event in subscribe("workflow.resume"):
        config = {"configurable": {"thread_id": event["thread_id"]}}
        for chunk in transfer_funds.stream(Command(resume=event["result"]), config):
            pass

Handling Multiple Outbox Requests

For workflows that need multiple external operations:

@entrypoint(checkpointer=InMemorySaver())
def complex_order_workflow(request: dict, *, config: RunnableConfig) -> dict:
    thread_id = config["configurable"]["thread_id"]
    order_id = request["order_id"]

    # Request 1: Reserve inventory
    inventory_outbox = write_to_outbox(
        request_type="reserve_inventory",
        payload={"order_id": order_id, "items": request["items"]},
        thread_id=thread_id
    ).result()

    inventory_result = interrupt({
        "waiting_for": "inventory_reservation",
        "outbox_id": inventory_outbox["outbox_id"]
    })

    if inventory_result["status"] != "completed":
        return {"status": "failed", "step": "inventory", "error": inventory_result.get("error")}

    # Request 2: Process payment
    payment_outbox = write_to_outbox(
        request_type="process_payment",
        payload={
            "order_id": order_id,
            "amount": request["total"],
            "reservation_id": inventory_result["reservation_id"]
        },
        thread_id=thread_id
    ).result()

    payment_result = interrupt({
        "waiting_for": "payment_processing",
        "outbox_id": payment_outbox["outbox_id"]
    })

    if payment_result["status"] != "completed":
        # Compensation: release inventory
        write_to_outbox(
            request_type="release_inventory",
            payload={"reservation_id": inventory_result["reservation_id"]},
            thread_id=thread_id
        ).result()
        interrupt({"waiting_for": "inventory_release"})
        return {"status": "failed", "step": "payment", "error": payment_result.get("error")}

    return {
        "status": "completed",
        "order_id": order_id,
        "reservation_id": inventory_result["reservation_id"],
        "payment_id": payment_result["payment_id"]
    }

When to Use the Outbox Pattern

This pattern is ideal when:

The tradeoff is architectural complexity: you need a worker service, possibly message queues, and careful handling of the resume flow.


Pattern Comparison

AspectIdempotent TasksRead-Before-WriteTransactional Outbox
ComplexityLowLow-MediumHigh
ConsistencyEventualEventualStrong
Best forSimple operationsState machinesMulti-service coordination
LatencyLowestLowHigher (async)
Audit trailManualVia status columnBuilt-in (outbox table)
CompensationManualManualNatural fit
External servicesDirect callsDirect callsDecoupled via outbox

Choosing the Right Pattern

Start with idempotent tasks if you’re building straightforward workflows with direct database access. It’s the simplest approach and handles most crash scenarios.

Use read-before-write when your workflow naturally maps to a state machine with clear progression stages. The database becomes your source of truth, and the workflow simply advances the state.

Adopt the outbox pattern when you need to coordinate across multiple services, require strong consistency guarantees, or want clean separation between “what to do” and “how to do it.” It’s more infrastructure, but it scales to complex distributed scenarios.

In practice, you might combine patterns: idempotent tasks for simple operations, with the outbox pattern reserved for critical external integrations that require exactly-once delivery guarantees.


Connection Management Best Practices

Regardless of which pattern you choose, keep database connections short-lived:

# Wrong: Connection held across interrupt
@entrypoint(checkpointer=checkpointer)
def bad_workflow(inputs: dict):
    conn = get_db_connection()
    try:
        result = update_something(conn, inputs)
        interrupt("review")  # Connection held indefinitely!
        finalize(conn, result)
    finally:
        conn.close()

# Correct: Each task manages its own connection
@entrypoint(checkpointer=checkpointer)
def good_workflow(inputs: dict):
    result = update_task(inputs).result()  # Connection opened/closed within task
    interrupt("review")  # No connection held
    finalize_task(result).result()  # Fresh connection for this task

Workflows can pause for minutes, hours, or days at interrupt points. Holding database connections across these boundaries wastes resources and causes connection pool exhaustion.


Summary

Building reliable AI agents that interact with databases requires careful thought about consistency. LangGraph’s checkpoint system and your database’s transaction system are independent state stores, and keeping them synchronized requires explicit patterns.

The three approaches covered here—idempotent tasks, read-before-write synchronization, and the transactional outbox—give you a toolkit for handling everything from simple CRUD operations to complex multi-service workflows. Choose based on your consistency requirements, architectural constraints, and operational complexity tolerance.

The common thread across all patterns: design for re-execution. Assume your code will run multiple times and structure operations so that’s safe. Whether through idempotency keys, conditional updates, or the outbox pattern, the goal is making retries invisible to your business logic.



Suggest Changes

Previous Post
From Process Managers to Stable Agent Workflows
Next Post
An Intuitive Journey Through Statistics & Hypothesis Testing