Skip to main content

Checkpoints

Graflow provides checkpoint/resume functionality for saving and restoring workflow state, enabling fault tolerance and recovery for long-running workflows.

Use Cases

  • Long-running workflows: ML training, data pipelines running for hours/days
  • Fault tolerance: Resume after infrastructure failures or crashes
  • Distributed execution: Workers pick up from checkpoints after restarts
  • Cost optimization: Pause expensive workflows and resume later

Key Design Decisions

DecisionDescription
Explicit checkpointsUsers call context.checkpoint() explicitly
Deferred executionCheckpoint created AFTER task completes, not during
Three-file structure.pkl + .state.json + .meta.json
Full TaskSpec persistenceSaves complete task state (data, retry count, strategy)
Backend agnosticWorks with MemoryChannel and RedisChannel

Checkpoint Flow

Task Execution                     WorkflowEngine
│ │
├── context.checkpoint() │
│ (sets flag only) │
│ │
├── return result │
│ │
└──────────────────────────────────►

mark_task_completed()

if checkpoint_requested:
create_checkpoint()

get_next_task()

Note: context.checkpoint() sets a flag. The actual checkpoint is created by the engine after the task completes successfully.

File Structure

Three files per checkpoint:

FileContents
{base}.pklExecutionContext pickle (graph, channel data for Memory)
{base}.state.jsonSession info, steps, completed tasks, pending TaskSpecs
{base}.meta.jsonCheckpoint ID, timestamps, user metadata

State JSON Schema:

{
"schema_version": "1.0",
"session_id": "12345",
"steps": 42,
"completed_tasks": ["task1", "task2"],
"cycle_counts": {"task1": 3},
"pending_tasks": [
{"task_id": "task3", "retry_count": 0, "...": "..."}
]
}

Basic Usage

Create Checkpoint

@task("process", inject_context=True)
def process(ctx):
do_work()
ctx.checkpoint(metadata={"stage": "step1"})
# Task continues; checkpoint created after return

Resume from Checkpoint

from graflow.core.checkpoint import CheckpointManager
from graflow.core.engine import WorkflowEngine

# Load checkpoint
context, metadata = CheckpointManager.resume_from_checkpoint(
"checkpoints/session_12345.pkl"
)

# Resume execution (auto-resume from queue)
engine = WorkflowEngine()
engine.execute(context)

State Machine Pattern

For iterative tasks using next_iteration(), use channel-based state:

@task("order_processor", inject_context=True)
def process_order(ctx):
channel = ctx.execution_context.channel
state = channel.get("order_state") or "NEW"

if state == "NEW":
validate_order()
channel.set("order_state", "VALIDATED")
ctx.checkpoint(metadata={"stage": "validated"})
ctx.next_iteration()

elif state == "VALIDATED":
process_payment()
channel.set("order_state", "PAID")
ctx.checkpoint(metadata={"stage": "paid"})
ctx.next_iteration()

elif state == "PAID":
ship_order()
return "ORDER_COMPLETE"

On resume, the task re-executes but skips to the correct state via channel data.

Backend Comparison

FeatureMemoryChannelRedisChannel
Checkpoint sizeLarger (includes channel data)Smaller (only session_id)
PortabilitySelf-contained fileRequires same Redis instance
Multi-worker resumeNot supportedAny worker can resume
Queue restorationRe-queued from state.jsonAlready persisted in Redis

MemoryChannel Flow

Checkpoint:
└── Save channel data to .pkl
└── Save pending_tasks to .state.json

Resume:
└── Load .pkl (restores channel data)
└── Re-queue pending_tasks from .state.json

RedisChannel Flow

Checkpoint:
└── Save session_id to .pkl (data already in Redis)
└── Save pending_tasks to .state.json

Resume:
└── Load .pkl (reconnect to Redis with session_id)
└── Data available via same Redis keys

API Reference

CheckpointManager

class CheckpointManager:
@classmethod
def create_checkpoint(
cls,
context: ExecutionContext,
path: Optional[str] = None,
metadata: Optional[dict] = None
) -> tuple[str, CheckpointMetadata]:
"""Create checkpoint. Path auto-generated if None."""

@classmethod
def resume_from_checkpoint(
cls,
checkpoint_path: str
) -> tuple[ExecutionContext, CheckpointMetadata]:
"""Resume from checkpoint file."""

TaskExecutionContext

def checkpoint(self, metadata: Optional[dict] = None) -> None:
"""Request checkpoint (created after task completes).

Automatically includes: task_id, cycle_count, elapsed_time
"""

Best Practices

PracticeDescription
Use channel for stateStore progress in channel for idempotent re-execution
Checkpoint at boundariesAfter expensive operations or state transitions
Use RedisChannel for distributedMemoryChannel not portable across workers
Keep workflows consistentGraph changes between checkpoint/resume may fail
Manage checkpoint filesNo automatic cleanup; user manages retention

Limitations

  • No automatic checkpoint: Must call context.checkpoint() explicitly
  • No automatic cleanup: Old checkpoints accumulate
  • Graph consistency required: Workflow code changes may break resume
  • MemoryChannel not portable: Use RedisChannel for multi-worker scenarios