• LangGraph checkpoints and database transactions are independent state stores that can diverge on crashes
• Three patterns for consistency: idempotent tasks (simplest), read-before-write (state machine), transactional outbox (strongest guarantees)
• Design for re-execution: assume your code will run multiple times and structure operations so that's safe
When building AI agents with LangGraph, you’ll inevitably need to update external systems—databases, APIs, third-party services. The challenge? LangGraph maintains its own state through checkpoints, and your database maintains its own state through transactions. These two systems don’t inherently know about each other, creating opportunities for inconsistency.
Note: The
get_db_connection()function used throughout this guide is pseudocode representing your database connection logic. Replace it with your actual connection management (e.g., SQLAlchemy sessions, connection pools, or ORM contexts).
This guide explores three patterns for keeping LangGraph workflow state and database state synchronized, each suited to different requirements around complexity, consistency guarantees, and system architecture.
The Consistency Problem
Consider a simple scenario: an agent processes a customer request and needs to update their account balance in a database.
@task
def update_balance(customer_id: str, amount: float) -> dict:
with get_db_connection() as conn:
conn.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, customer_id))
conn.commit()
return {"updated": True, "customer_id": customer_id}
@entrypoint(checkpointer=InMemorySaver())
def process_request(request: dict) -> dict:
result = update_balance(request["customer_id"], request["amount"]).result()
approved = interrupt({"action": "confirm_update", "result": result})
return {"result": result, "approved": approved}
What happens if the system crashes after the database commits but before LangGraph checkpoints the task result? On resume, the task re-executes, and the balance is updated twice.
Task starts → DB commits (+£100) → [CRASH] → Resume → Task re-runs → DB commits (+£100 again)
The customer now has £200 instead of £100. The database and the workflow’s understanding of reality have diverged.
Pattern 1: Idempotent Tasks
The most universally applicable pattern is making database operations safe to retry. If running the same operation twice produces the same result, crashes become non-events.
Using Idempotency Keys
Track which operations have been applied using a unique identifier:
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
@task
def update_balance(customer_id: str, amount: float, operation_id: str) -> dict:
"""
Idempotent balance update. The operation_id ensures this specific
update can only be applied once, regardless of how many times
the task executes.
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
# Check if we've already processed this operation
cur.execute(
"SELECT 1 FROM operation_log WHERE operation_id = %s",
(operation_id,)
)
if cur.fetchone():
# Already applied—fetch current state and return
cur.execute(
"SELECT balance FROM accounts WHERE id = %s",
(customer_id,)
)
return {
"status": "already_applied",
"balance": cur.fetchone()[0]
}
# Apply the update and log it atomically
cur.execute(
"""
UPDATE accounts
SET balance = balance + %s
WHERE id = %s
RETURNING balance
""",
(amount, customer_id)
)
new_balance = cur.fetchone()[0]
cur.execute(
"INSERT INTO operation_log (operation_id, applied_at) VALUES (%s, NOW())",
(operation_id,)
)
conn.commit()
return {"status": "applied", "balance": new_balance}
@entrypoint(checkpointer=InMemorySaver())
def process_payment(request: dict) -> dict:
# Generate operation_id once at the start—it persists across resumes
# because the entrypoint input is part of the checkpoint
operation_id = request.get("operation_id", str(uuid.uuid4()))
result = update_balance(
request["customer_id"],
request["amount"],
operation_id
).result()
return result
The key insight: the operation_id ties the workflow invocation to the database operation. Even if the task runs multiple times, the database update happens exactly once.
Using Optimistic Locking
If your tables already have version columns, you can use them to prevent duplicate updates:
@task
def update_with_version(
customer_id: str,
amount: float,
expected_version: int
) -> dict:
"""
Only applies the update if the row hasn't changed since we read it.
On retry, the version won't match, so the duplicate is rejected.
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE accounts
SET balance = balance + %s, version = version + 1
WHERE id = %s AND version = %s
RETURNING balance, version
""",
(amount, customer_id, expected_version)
)
row = cur.fetchone()
conn.commit()
if row is None:
# Version mismatch—either we already updated, or someone else did
cur.execute(
"SELECT balance, version FROM accounts WHERE id = %s",
(customer_id,)
)
current = cur.fetchone()
return {
"status": "version_conflict",
"current_balance": current[0],
"current_version": current[1]
}
return {
"status": "applied",
"balance": row[0],
"version": row[1]
}
This works well when you need to prevent both duplicate updates and concurrent modifications from other processes.
When to Use Idempotent Tasks
This pattern suits most scenarios. Use it when:
- You control the database schema (can add operation logs or version columns)
- Operations are relatively simple and self-contained
- You want minimal architectural complexity
- Latency requirements are modest (the check-then-update adds overhead)
Pattern 2: Read-Before-Write State Synchronization
Sometimes the simplest approach is the right one: read the current database state at the start of each workflow execution, and use that to determine what work remains.
This pattern treats the database as the source of truth. Instead of trying to prevent duplicate writes, you structure the workflow to be driven by what the database currently shows.
The Core Idea
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from enum import Enum
class OrderStatus(str, Enum):
PENDING = "pending"
PAYMENT_PROCESSED = "payment_processed"
INVENTORY_RESERVED = "inventory_reserved"
SHIPPED = "shipped"
COMPLETED = "completed"
@task
def read_order_state(order_id: str) -> dict:
"""Read current order state from database."""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT status, payment_id, inventory_reservation_id, shipping_id
FROM orders WHERE id = %s
""",
(order_id,)
)
row = cur.fetchone()
return {
"status": row[0],
"payment_id": row[1],
"inventory_reservation_id": row[2],
"shipping_id": row[3]
}
@task
def process_payment(order_id: str) -> dict:
"""Process payment and update order status atomically."""
with get_db_connection() as conn:
with conn.cursor() as cur:
# Only process if still pending
cur.execute(
"""
UPDATE orders
SET status = %s, payment_id = %s
WHERE id = %s AND status = %s
RETURNING id
""",
(OrderStatus.PAYMENT_PROCESSED, generate_payment_id(), order_id, OrderStatus.PENDING)
)
updated = cur.fetchone() is not None
conn.commit()
return {"processed": updated}
@task
def reserve_inventory(order_id: str) -> dict:
"""Reserve inventory and update order status atomically."""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE orders
SET status = %s, inventory_reservation_id = %s
WHERE id = %s AND status = %s
RETURNING id
""",
(OrderStatus.INVENTORY_RESERVED, generate_reservation_id(), order_id, OrderStatus.PAYMENT_PROCESSED)
)
updated = cur.fetchone() is not None
conn.commit()
return {"reserved": updated}
@entrypoint(checkpointer=InMemorySaver())
def fulfill_order(request: dict) -> dict:
"""
Order fulfillment workflow that syncs with database state on each execution.
On resume after a crash, we read current state and skip completed steps.
The database status column acts as a durable record of progress.
"""
order_id = request["order_id"]
# Always start by reading current state
current_state = read_order_state(order_id).result()
status = current_state["status"]
# Process payment if not yet done
if status == OrderStatus.PENDING:
process_payment(order_id).result()
status = OrderStatus.PAYMENT_PROCESSED
# Reserve inventory if payment complete but inventory not reserved
if status == OrderStatus.PAYMENT_PROCESSED:
reserve_inventory(order_id).result()
status = OrderStatus.INVENTORY_RESERVED
# Human approval checkpoint
if status == OrderStatus.INVENTORY_RESERVED:
approved = interrupt({
"action": "approve_shipment",
"order_id": order_id
})
if approved:
ship_order(order_id).result()
status = OrderStatus.SHIPPED
return {"order_id": order_id, "final_status": status}
Handling the Read-Write Race
There’s a subtle issue: what if the database state changes between reading and writing? Each task should use conditional updates:
@task
def process_payment(order_id: str) -> dict:
with get_db_connection() as conn:
with conn.cursor() as cur:
# This UPDATE only succeeds if status is still PENDING
cur.execute(
"""
UPDATE orders
SET status = 'payment_processed', payment_id = %s
WHERE id = %s AND status = 'pending'
RETURNING id
""",
(generate_payment_id(), order_id)
)
if cur.fetchone() is None:
# Status already changed—read current state
cur.execute("SELECT status FROM orders WHERE id = %s", (order_id,))
current = cur.fetchone()[0]
conn.commit()
return {"processed": False, "current_status": current}
conn.commit()
return {"processed": True}
The conditional WHERE status = 'pending' ensures idempotency: if we’ve already processed payment (perhaps in a previous execution before a crash), the UPDATE affects zero rows and we move on.
State Machine Progression
This pattern works particularly well when your workflow follows a linear state machine:
PENDING → PAYMENT_PROCESSED → INVENTORY_RESERVED → SHIPPED → COMPLETED
Each state transition is guarded by a conditional update. The workflow reads current state, determines which transition to attempt next, and uses the database’s conditional update to ensure exactly-once semantics.
@entrypoint(checkpointer=InMemorySaver())
def state_machine_workflow(request: dict) -> dict:
order_id = request["order_id"]
while True:
# Read current state
state = read_order_state(order_id).result()
match state["status"]:
case OrderStatus.PENDING:
process_payment(order_id).result()
case OrderStatus.PAYMENT_PROCESSED:
reserve_inventory(order_id).result()
case OrderStatus.INVENTORY_RESERVED:
approved = interrupt({"action": "approve", "order_id": order_id})
if approved:
ship_order(order_id).result()
else:
cancel_order(order_id).result()
return {"status": "cancelled"}
case OrderStatus.SHIPPED:
mark_completed(order_id).result()
case OrderStatus.COMPLETED:
return {"status": "completed", "order_id": order_id}
case OrderStatus.CANCELLED:
return {"status": "cancelled", "order_id": order_id}
When to Use Read-Before-Write
This pattern excels when:
- Your workflow maps naturally to a state machine
- The database already tracks entity status
- You want the database to be the single source of truth
- Multiple systems might interact with the same entities
- You prefer simplicity over distributed coordination
The tradeoff is that every workflow execution starts with a database read, and you’re coupling your workflow logic tightly to database schema.
Pattern 3: Transactional Outbox with Pause/Resume
For systems requiring stronger consistency guarantees or involving multiple services, the transactional outbox pattern decouples the “request” from the “execution” while maintaining exactly-once semantics.
The Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ LangGraph Workflow │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Write request to outbox ──► 2. Interrupt (pause workflow) │
│ │
│ 5. Resume with response ◄────── (waiting for external completion) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
│ ▲
│ │
▼ │
┌───────────────────────────┐ ┌───────────────────────────────────┐
│ Database │ │ Message Queue / Events │
├───────────────────────────┤ ├───────────────────────────────────┤
│ │ │ │
│ Outbox Table │──────►│ 4. Publish completion event │
│ ┌─────────────────────┐ │ │ │
│ │ id: uuid │ │ │ │
│ │ request_type: str │ │ └───────────────────────────────────┘
│ │ payload: jsonb │ │ ▲
│ │ status: pending │ │ │
│ │ thread_id: str │ │ │
│ │ created_at: ts │ │ │
│ └─────────────────────┘ │ │
│ │ ┌───────────────────────────────────┐
└───────────────────────────┘ │ Worker Service │
├───────────────────────────────────┤
│ │
│ 3. Process outbox entries │
│ - Execute actual operation │
│ - Update outbox status │
│ - Publish completion event │
│ │
└───────────────────────────────────┘
Implementation
First, define the outbox schema:
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
thread_id VARCHAR(255) NOT NULL, -- Links back to LangGraph workflow
result JSONB,
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP,
INDEX idx_outbox_status (status),
INDEX idx_outbox_thread (thread_id)
);
The workflow writes to the outbox and pauses:
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
import uuid
@task
def write_to_outbox(
request_type: str,
payload: dict,
thread_id: str
) -> dict:
"""
Write a request to the outbox table within a transaction.
This is the only write the workflow makes directly.
"""
outbox_id = str(uuid.uuid4())
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO outbox (id, request_type, payload, thread_id, status)
VALUES (%s, %s, %s, %s, 'pending')
RETURNING id
""",
(outbox_id, request_type, Json(payload), thread_id)
)
conn.commit()
return {"outbox_id": outbox_id, "status": "pending"}
@entrypoint(checkpointer=InMemorySaver())
def transfer_funds(request: dict, *, config: RunnableConfig) -> dict:
"""
Fund transfer workflow using transactional outbox pattern.
The workflow never directly modifies account balances. Instead, it:
1. Writes a transfer request to the outbox
2. Pauses and waits for external processing
3. Resumes when the transfer service completes the operation
"""
thread_id = config["configurable"]["thread_id"]
# Step 1: Write the transfer request to outbox
outbox_entry = write_to_outbox(
request_type="fund_transfer",
payload={
"from_account": request["from_account"],
"to_account": request["to_account"],
"amount": request["amount"],
"reference": request.get("reference", str(uuid.uuid4()))
},
thread_id=thread_id
).result()
# Step 2: Pause workflow—external service will process and resume
# The interrupt payload tells the orchestration layer what we're waiting for
transfer_result = interrupt({
"waiting_for": "fund_transfer_completion",
"outbox_id": outbox_entry["outbox_id"],
"message": "Waiting for transfer service to process request"
})
# Step 3: Workflow resumes here with the transfer result
if transfer_result["status"] == "completed":
return {
"status": "success",
"transfer_id": transfer_result["transfer_id"],
"from_account": request["from_account"],
"to_account": request["to_account"],
"amount": request["amount"]
}
else:
return {
"status": "failed",
"error": transfer_result.get("error"),
"outbox_id": outbox_entry["outbox_id"]
}
The Worker Service
A separate service polls the outbox and processes requests:
# worker_service.py
import time
from datetime import datetime
def process_outbox():
"""
Worker that processes outbox entries.
Runs as a separate service, not within LangGraph.
"""
while True:
with get_db_connection() as conn:
with conn.cursor() as cur:
# Claim a pending entry (with row locking for concurrency)
cur.execute(
"""
UPDATE outbox
SET status = 'processing'
WHERE id = (
SELECT id FROM outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, request_type, payload, thread_id
"""
)
entry = cur.fetchone()
if not entry:
conn.commit()
time.sleep(1) # No work, wait before polling again
continue
outbox_id, request_type, payload, thread_id = entry
conn.commit()
# Process based on request type
try:
if request_type == "fund_transfer":
result = execute_fund_transfer(payload)
else:
result = {"status": "error", "error": f"Unknown request type: {request_type}"}
# Update outbox with result
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE outbox
SET status = 'completed', result = %s, processed_at = NOW()
WHERE id = %s
""",
(Json(result), outbox_id)
)
conn.commit()
# Resume the LangGraph workflow
resume_workflow(thread_id, result)
except Exception as e:
# Mark as failed, workflow can handle retry logic
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE outbox
SET status = 'failed', result = %s, processed_at = NOW()
WHERE id = %s
""",
(Json({"status": "error", "error": str(e)}), outbox_id)
)
conn.commit()
resume_workflow(thread_id, {"status": "error", "error": str(e)})
def execute_fund_transfer(payload: dict) -> dict:
"""
Execute the actual fund transfer within a database transaction.
This is where the real business logic lives.
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
# Check idempotency—has this reference been processed?
cur.execute(
"SELECT transfer_id FROM transfers WHERE reference = %s",
(payload["reference"],)
)
existing = cur.fetchone()
if existing:
return {"status": "completed", "transfer_id": existing[0], "note": "already_processed"}
# Execute transfer atomically
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s AND balance >= %s RETURNING id",
(payload["amount"], payload["from_account"], payload["amount"])
)
if not cur.fetchone():
conn.rollback()
return {"status": "error", "error": "insufficient_funds"}
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(payload["amount"], payload["to_account"])
)
# Record the transfer
transfer_id = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO transfers (id, reference, from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
""",
(transfer_id, payload["reference"], payload["from_account"],
payload["to_account"], payload["amount"])
)
conn.commit()
return {"status": "completed", "transfer_id": transfer_id}
def resume_workflow(thread_id: str, result: dict):
"""Resume the paused LangGraph workflow with the processing result."""
from langgraph.types import Command
# Load the workflow (you'd have a reference to your entrypoint)
config = {"configurable": {"thread_id": thread_id}}
# Resume with the result
for chunk in transfer_funds.stream(Command(resume=result), config):
pass # Process to completion
Event-Driven Variant
Instead of the worker directly resuming workflows, you can use a message queue for better decoupling:
# Worker publishes completion events
def process_outbox_with_events():
# ... (same claim logic as before)
result = execute_fund_transfer(payload)
# Publish event instead of direct resume
publish_event(
topic="workflow.resume",
payload={
"thread_id": thread_id,
"outbox_id": outbox_id,
"result": result
}
)
# Separate service handles workflow resumption
def workflow_resume_handler():
for event in subscribe("workflow.resume"):
config = {"configurable": {"thread_id": event["thread_id"]}}
for chunk in transfer_funds.stream(Command(resume=event["result"]), config):
pass
Handling Multiple Outbox Requests
For workflows that need multiple external operations:
@entrypoint(checkpointer=InMemorySaver())
def complex_order_workflow(request: dict, *, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
order_id = request["order_id"]
# Request 1: Reserve inventory
inventory_outbox = write_to_outbox(
request_type="reserve_inventory",
payload={"order_id": order_id, "items": request["items"]},
thread_id=thread_id
).result()
inventory_result = interrupt({
"waiting_for": "inventory_reservation",
"outbox_id": inventory_outbox["outbox_id"]
})
if inventory_result["status"] != "completed":
return {"status": "failed", "step": "inventory", "error": inventory_result.get("error")}
# Request 2: Process payment
payment_outbox = write_to_outbox(
request_type="process_payment",
payload={
"order_id": order_id,
"amount": request["total"],
"reservation_id": inventory_result["reservation_id"]
},
thread_id=thread_id
).result()
payment_result = interrupt({
"waiting_for": "payment_processing",
"outbox_id": payment_outbox["outbox_id"]
})
if payment_result["status"] != "completed":
# Compensation: release inventory
write_to_outbox(
request_type="release_inventory",
payload={"reservation_id": inventory_result["reservation_id"]},
thread_id=thread_id
).result()
interrupt({"waiting_for": "inventory_release"})
return {"status": "failed", "step": "payment", "error": payment_result.get("error")}
return {
"status": "completed",
"order_id": order_id,
"reservation_id": inventory_result["reservation_id"],
"payment_id": payment_result["payment_id"]
}
When to Use the Outbox Pattern
This pattern is ideal when:
- You need strong consistency guarantees across services
- Operations must be processed by specialized services (payment gateways, external APIs)
- You want to decouple workflow logic from execution details
- You need audit trails of all requested operations
- Operations may take significant time (minutes to hours)
- You’re building saga patterns with compensation logic
The tradeoff is architectural complexity: you need a worker service, possibly message queues, and careful handling of the resume flow.
Pattern Comparison
| Aspect | Idempotent Tasks | Read-Before-Write | Transactional Outbox |
|---|---|---|---|
| Complexity | Low | Low-Medium | High |
| Consistency | Eventual | Eventual | Strong |
| Best for | Simple operations | State machines | Multi-service coordination |
| Latency | Lowest | Low | Higher (async) |
| Audit trail | Manual | Via status column | Built-in (outbox table) |
| Compensation | Manual | Manual | Natural fit |
| External services | Direct calls | Direct calls | Decoupled via outbox |
Choosing the Right Pattern
Start with idempotent tasks if you’re building straightforward workflows with direct database access. It’s the simplest approach and handles most crash scenarios.
Use read-before-write when your workflow naturally maps to a state machine with clear progression stages. The database becomes your source of truth, and the workflow simply advances the state.
Adopt the outbox pattern when you need to coordinate across multiple services, require strong consistency guarantees, or want clean separation between “what to do” and “how to do it.” It’s more infrastructure, but it scales to complex distributed scenarios.
In practice, you might combine patterns: idempotent tasks for simple operations, with the outbox pattern reserved for critical external integrations that require exactly-once delivery guarantees.
Connection Management Best Practices
Regardless of which pattern you choose, keep database connections short-lived:
# Wrong: Connection held across interrupt
@entrypoint(checkpointer=checkpointer)
def bad_workflow(inputs: dict):
conn = get_db_connection()
try:
result = update_something(conn, inputs)
interrupt("review") # Connection held indefinitely!
finalize(conn, result)
finally:
conn.close()
# Correct: Each task manages its own connection
@entrypoint(checkpointer=checkpointer)
def good_workflow(inputs: dict):
result = update_task(inputs).result() # Connection opened/closed within task
interrupt("review") # No connection held
finalize_task(result).result() # Fresh connection for this task
Workflows can pause for minutes, hours, or days at interrupt points. Holding database connections across these boundaries wastes resources and causes connection pool exhaustion.
Summary
Building reliable AI agents that interact with databases requires careful thought about consistency. LangGraph’s checkpoint system and your database’s transaction system are independent state stores, and keeping them synchronized requires explicit patterns.
The three approaches covered here—idempotent tasks, read-before-write synchronization, and the transactional outbox—give you a toolkit for handling everything from simple CRUD operations to complex multi-service workflows. Choose based on your consistency requirements, architectural constraints, and operational complexity tolerance.
The common thread across all patterns: design for re-execution. Assume your code will run multiple times and structure operations so that’s safe. Whether through idempotency keys, conditional updates, or the outbox pattern, the goal is making retries invisible to your business logic.
Related Posts
- From Process Managers to Stable Agent Workflows — stability patterns including idempotent execution, human-in-the-loop pauses, and observable execution
- Agents Are Still Just Software — why agent systems are fundamentally distributed systems problems