Skip to content

Checkpointing

7 min Intermediate Patterns Interview: 60%

Periodically saving processing state to enable recovery from failures without reprocessing all data from the beginning

πŸ’Ό 60% of streaming interviews
Interview Relevance
60% of streaming interviews
🏭 Flink, Spark, Kafka Streams
Production Impact
Powers systems at Flink, Spark, Kafka Streams
⚑ Fast recovery
Performance
Fast recovery query improvement
πŸ“ˆ Exactly-once semantics
Scalability
Exactly-once semantics

TL;DR

Checkpointing is the process of periodically saving the processing state of a stream processing application, enabling it to resume from the last checkpoint rather than restart from scratch after failures. Essential for exactly-once semantics in systems like Apache Flink, Spark Streaming, and Kafka Streams.

Visual Overview

WITHOUT CHECKPOINTING (Restart from beginning)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Stream Processing Job                         β”‚
β”‚                                                β”‚
β”‚  T0: Start processing from offset 0            β”‚
β”‚  T1: Processed 1,000,000 messages              β”‚
β”‚  T2: Processed 5,000,000 messages              β”‚
β”‚  T3: CRASH! ⚑                                  β”‚
β”‚  T4: Restart from offset 0 βœ•                  β”‚
β”‚      β†’ Reprocess all 5,000,000 messages        β”‚
β”‚      β†’ Hours of lost work                      β”‚
β”‚      β†’ Possible duplicate outputs              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

WITH CHECKPOINTING (Resume from last checkpoint)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Stream Processing Job with Checkpoints        β”‚
β”‚                                                β”‚
β”‚  T0: Start, checkpoint offset=0                β”‚
β”‚  T1: Process 1M messages, checkpoint offset=1M β”‚
β”‚  T2: Process 4M more, checkpoint offset=5M     β”‚
β”‚  T3: CRASH! ⚑                                  β”‚
β”‚  T4: Restore from checkpoint βœ“                 β”‚
β”‚      β†’ Resume from offset=5M                   β”‚
β”‚      β†’ Only reprocess since last checkpoint    β”‚
β”‚      β†’ Fast recovery (seconds vs hours)        β”‚
β”‚      β†’ No duplicates (exactly-once)            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CHECKPOINT LIFECYCLE
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Checkpoint Coordinator                        β”‚
β”‚         ↓                                      β”‚
β”‚  1. Trigger Checkpoint                         β”‚
β”‚     Insert barrier into stream                 β”‚
β”‚         ↓                                      β”‚
β”‚  Stream: [msg1, msg2, BARRIER, msg3, msg4]     β”‚
β”‚                      ↓                         β”‚
β”‚  2. Operator receives barrier                  β”‚
β”‚     - Save current state to storage            β”‚
β”‚     - Save current offset                      β”‚
β”‚     - Acknowledge checkpoint                   β”‚
β”‚         ↓                                      β”‚
β”‚  3. All operators acknowledged?                β”‚
β”‚     YES β†’ Checkpoint COMPLETE βœ“                β”‚
β”‚     NO  β†’ Wait or timeout                      β”‚
β”‚         ↓                                      β”‚
β”‚  4. Commit checkpoint metadata                 β”‚
β”‚     Store: {                                   β”‚
β”‚       checkpoint_id: 123                       β”‚
β”‚       timestamp: T                             β”‚
β”‚       state_location: s3://...                 β”‚
β”‚       offsets: {partition0: 5000, ...}         β”‚
β”‚     }                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

RECOVERY FLOW
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Job Failure Detected                          β”‚
β”‚         ↓                                      β”‚
β”‚  1. Find Latest Successful Checkpoint          β”‚
β”‚     checkpoint_id: 123                         β”‚
β”‚     timestamp: 2 minutes ago                   β”‚
β”‚         ↓                                      β”‚
β”‚  2. Restore Operator State                     β”‚
β”‚     Load state from: s3://checkpoint-123/      β”‚
β”‚     - Counters, windows, caches                β”‚
β”‚     - Application state                        β”‚
β”‚         ↓                                      β”‚
β”‚  3. Reset Stream Positions                     β”‚
β”‚     Kafka offsets: {partition0: 5000, ...}     β”‚
β”‚         ↓                                      β”‚
β”‚  4. Resume Processing                          β”‚
β”‚     Start from checkpoint offsets              β”‚
β”‚     Reprocess messages since checkpoint        β”‚
β”‚                                                β”‚
β”‚  Recovery Time Objective (RTO):                β”‚
β”‚  - Checkpoint interval: 1 minute               β”‚
β”‚  - State restore: 10 seconds                   β”‚
β”‚  - Reprocess: 1 minute of data                 β”‚
β”‚  Total: ~90 seconds downtime                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Explanation

What is Checkpointing?

Checkpointing is a fault-tolerance mechanism that periodically saves the state of a distributed computation to durable storage. For stream processing, this includes:

  1. Application State: Counters, aggregations, windows, caches
  2. Input Positions: Kafka offsets, file positions, database cursors
  3. Metadata: Checkpoint ID, timestamp, version

Key Property: Consistency

A checkpoint must capture a consistent snapshot across all parallel operators, ensuring:

  • No messages are lost (completeness)
  • No messages are duplicated (exactly-once)
  • State is consistent across operators

Checkpoint Barriers (Chandy-Lamport Algorithm)

Stream processing systems use barriers to coordinate consistent snapshots without stopping the stream:

How Barriers Work:

Source Operator injects barriers into stream:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka Topic                           β”‚
β”‚  [msg1, msg2, msg3, BARRIER_10, msg4]  β”‚
β”‚         ↓          ↓           ↓       β”‚
β”‚    Operator A  processes messages      β”‚
β”‚    When BARRIER_10 arrives:            β”‚
β”‚    1. Save state (e.g., count=150)     β”‚
β”‚    2. Forward barrier downstream       β”‚
β”‚    3. Continue processing              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Multiple Input Streams (Barrier Alignment):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Stream 1: [msg1, BARRIER_10, msg2]    β”‚
β”‚  Stream 2: [msg3, msg4, BARRIER_10]    β”‚
β”‚         ↓           ↓                  β”‚
β”‚    Join Operator                       β”‚
β”‚    ↓                                   β”‚
β”‚  Wait for BARRIER_10 from BOTH streams β”‚
β”‚  (buffer messages from faster stream)  β”‚
β”‚    ↓                                   β”‚
β”‚  All barriers received β†’ Save state    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why Barriers?
βœ“ No need to pause the stream (live checkpointing)
βœ“ Consistent snapshot across distributed operators
βœ“ Minimal impact on throughput (<5% overhead)

Checkpoint Interval Trade-offs

Frequent Checkpoints (e.g., every 10 seconds):
Pros:
βœ“ Fast recovery (less data to reprocess)
βœ“ Small recovery window
Cons:
βœ— Higher overhead (I/O, CPU for serialization)
βœ— More storage costs
βœ— Can slow down processing

Infrequent Checkpoints (e.g., every 10 minutes):
Pros:
βœ“ Lower overhead
βœ“ Better throughput
Cons:
βœ— Slower recovery (more data to reprocess)
βœ— Larger recovery window
βœ— Higher risk of data loss

Recommended: 1-5 minutes for most applications

Checkpoint Storage

Where to Store Checkpoints:

1. Distributed File Systems:
   - HDFS, S3, GCS, Azure Blob
   - Pros: Scalable, durable, cost-effective
   - Cons: Higher latency (~100ms writes)
   - Use case: Production systems

2. Distributed Databases:
   - RocksDB (local + replicated)
   - Cassandra, DynamoDB
   - Pros: Fast writes (~10ms)
   - Cons: Higher cost, operational complexity
   - Use case: Low-latency requirements

3. In-Memory (with replication):
   - Redis, Memcached with persistence
   - Pros: Very fast (~1ms)
   - Cons: Limited capacity, expensive
   - Use case: Small state, ultra-low latency

Storage Structure:
checkpoint_dir/
β”œβ”€β”€ checkpoint-000010/
β”‚   β”œβ”€β”€ operator-state-0
β”‚   β”œβ”€β”€ operator-state-1
β”‚   └── metadata
β”œβ”€β”€ checkpoint-000011/
β”‚   └── ...
└── _metadata  (latest checkpoint info)

Incremental Checkpointing

Full Checkpoint (naive approach):
Every checkpoint saves entire state
State size: 10 GB
Checkpoint interval: 1 minute
I/O: 10 GB/minute = 166 MB/s βœ• (expensive!)

Incremental Checkpoint (RocksDB-based):
Only save changed state since last checkpoint
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Checkpoint 10: Save full state (10GB) β”‚
β”‚  Checkpoint 11: Save delta (100MB)     β”‚
β”‚  Checkpoint 12: Save delta (150MB)     β”‚
β”‚  Checkpoint 13: Save delta (120MB)     β”‚
β”‚  ↓                                     β”‚
β”‚  Recovery: Restore checkpoint 10       β”‚
β”‚            + Apply deltas 11,12,13     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Benefits:
βœ“ Reduce I/O by 10-100x
βœ“ Faster checkpoint completion
βœ“ Lower storage costs

Implementation: Apache Flink RocksDB state backend

Exactly-Once Semantics with Checkpointing

How Checkpointing Enables Exactly-Once:

1. Atomic Commits:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Process messages 1-100                β”‚
β”‚  Update state (count += 100)           β”‚
β”‚  ↓                                     β”‚
β”‚  Checkpoint:                           β”‚
β”‚  - Save state: count=100               β”‚
β”‚  - Save offset: 100                    β”‚
β”‚  - Commit offset to Kafka (atomic)     β”‚
β”‚  ↓                                     β”‚
β”‚  If crash before checkpoint:           β”‚
β”‚  β†’ Restore offset=0, count=0           β”‚
β”‚  β†’ Reprocess messages 1-100 βœ“          β”‚
β”‚                                        β”‚
β”‚  If crash after checkpoint:            β”‚
β”‚  β†’ Restore offset=100, count=100       β”‚
β”‚  β†’ Skip messages 1-100 βœ“               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Two-Phase Commit (for sinks):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Phase 1: Pre-commit                   β”‚
β”‚  - Write to staging table              β”‚
β”‚  - Don't make visible yet              β”‚
β”‚  ↓                                     β”‚
β”‚  Checkpoint completes βœ“                β”‚
β”‚  ↓                                     β”‚
β”‚  Phase 2: Commit                       β”‚
β”‚  - Move from staging to production     β”‚
β”‚  - Make writes visible                 β”‚
β”‚                                        β”‚
β”‚  If crash before Phase 2:              β”‚
β”‚  β†’ Staging writes discarded βœ“          β”‚
β”‚  β†’ Reprocess and retry                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Real Systems Using Checkpointing

SystemCheckpoint MechanismDefault IntervalState BackendUse Case
Apache FlinkChandy-Lamport barriersDisabled (manual)RocksDB, HeapReal-time analytics, ETL
Spark Structured StreamingMicro-batch checkpointing5 secondsHDFS, S3Batch + streaming
Kafka StreamsState stores + offset commits30 secondsRocksDBStream processing
Apache StormRecord-level ackingN/A (acking-based)N/ALow-latency streaming
Apache SamzaChangelog-based1 minuteRocksDB + KafkaStateful streaming
Flink Checkpoint Configuration:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60000);

// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();

// Exactly-once mode (vs at-least-once)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Minimum pause between checkpoints (prevent overload)
config.setMinPauseBetweenCheckpoints(30000);

// Checkpoint timeout (fail if takes too long)
config.setCheckpointTimeout(600000);

// Max concurrent checkpoints
config.setMaxConcurrentCheckpoints(1);

// Retain checkpoints on cancellation (for savepoints)
config.enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// State backend (where to store state)
env.setStateBackend(new RocksDBStateBackend("s3://checkpoints/"));

Checkpoint Flow in Flink:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  1. JobManager triggers checkpoint     β”‚
β”‚     - Increment checkpoint ID          β”‚
β”‚     - Send trigger to sources          β”‚
β”‚  ↓                                     β”‚
β”‚  2. Sources inject barriers            β”‚
β”‚     - Insert barrier into stream       β”‚
β”‚     - Save current offset              β”‚
β”‚  ↓                                     β”‚
β”‚  3. Barriers flow through operators    β”‚
β”‚     - Each operator saves state        β”‚
β”‚     - Acknowledges checkpoint          β”‚
β”‚  ↓                                     β”‚
β”‚  4. Sinks receive barriers             β”‚
β”‚     - Pre-commit external writes       β”‚
β”‚     - Acknowledge checkpoint           β”‚
β”‚  ↓                                     β”‚
β”‚  5. JobManager collects acks           β”‚
β”‚     - All tasks acknowledged? βœ“        β”‚
β”‚     - Commit checkpoint metadata       β”‚
β”‚     - Finalize external writes         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Recovery in Flink:
1. Job fails (e.g., task exception, node crash)
2. JobManager restarts job from latest checkpoint
3. All tasks restore state from checkpoint
4. Sources reset to checkpoint offsets
5. Processing resumes (exactly-once guaranteed)

Case Study: Kafka Streams Checkpointing

Kafka Streams uses:
- Local RocksDB for state storage
- Kafka topics for changelog (state replication)
- Kafka offsets for position tracking

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Commit interval (checkpoint frequency)
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);

// State store configuration
config.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 600000);

How it works:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  State Changes:                        β”‚
β”‚  1. Write to local RocksDB             β”‚
β”‚  2. Write to changelog topic (Kafka)   β”‚
β”‚  3. Commit offset every 30s            β”‚
β”‚  ↓                                     β”‚
β”‚  Recovery:                             β”‚
β”‚  1. Restore RocksDB from changelog     β”‚
β”‚  2. Resume from committed offset       β”‚
β”‚  3. Replay uncommitted messages        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Exactly-once in Kafka Streams (EOS):
- Transactional writes to output topics
- Atomic offset commits
- Changelog updates in same transaction
- Result: End-to-end exactly-once

When to Use Checkpointing

βœ“ Perfect Use Cases

Long-Running Stream Processing Jobs

Scenario: Real-time analytics running 24/7
Requirement: No data loss, exactly-once semantics
Solution: Checkpoint every 1 minute to S3
Benefit: Recover from failures in < 2 minutes

Stateful Aggregations

Scenario: Counting events per user over 24-hour windows
State size: 100 GB
Solution: Incremental checkpointing with RocksDB
Benefit: Preserve state across restarts, avoid recomputation

Complex Event Processing

Scenario: Multi-stage pipeline with joins, enrichment
Requirement: Consistent state across operators
Solution: Distributed checkpointing with barriers
Benefit: Consistent snapshots without pausing stream

βœ• When NOT to Use (or Use Carefully)

Stateless Processing

Problem: No state to checkpoint, pure transformation
Example: Filter, map, simple parsing
Alternative: Just reprocess from source (no checkpointing overhead)

Ultra-Low Latency Requirements (< 1ms)

Problem: Checkpointing adds latency (barrier alignment)
Alternative: At-least-once processing with deduplication
Example: High-frequency trading, real-time bidding

Small Batch Jobs (< 1 minute)

Problem: Checkpoint overhead > job duration
Alternative: Just rerun the job on failure
Example: Scheduled micro-batch jobs

Interview Application

Common Interview Question

Q: β€œDesign a real-time fraud detection system processing millions of transactions/second. How would you ensure exactly-once processing and fast recovery from failures?”

Strong Answer:

β€œI’d design a checkpointed stream processing system:

Architecture:

  • Framework: Apache Flink (for exactly-once guarantees)
  • State Backend: RocksDB with incremental checkpoints
  • Checkpoint Storage: S3 (durable, scalable)
  • Checkpoint Interval: 1 minute (balance overhead vs recovery time)

State Management:

  • User Profile Cache: Latest transactions per user (for pattern detection)
  • Fraud Rules State: Configurable thresholds, ML model parameters
  • Windows: 5-minute sliding windows for transaction aggregations
  • State Size Estimate: 50 GB (10M users Γ— 5KB profile)

Checkpointing Strategy:

  1. Incremental Checkpoints:
    • Full checkpoint hourly
    • Incremental deltas every minute
    • Reduces I/O from 50 GB/min to ~500 MB/min
  2. Asynchronous Snapshots:
    • Checkpoint in background threads
    • Minimal impact on throughput (<5%)
  3. Barrier Alignment:
    • Use Chandy-Lamport barriers for consistency
    • Handle out-of-order messages correctly

Exactly-Once Guarantees:

  1. Source: Read from Kafka with transactional semantics
  2. Processing: Flink exactly-once mode
  3. Sink: Two-phase commit to output database
    • Pre-commit during checkpoint
    • Finalize after checkpoint complete

Recovery Flow:

  1. Failure Detection: Flink JobManager detects task failure
  2. Checkpoint Restore:
    • Restore state from latest checkpoint (S3)
    • Reset Kafka offsets to checkpoint position
    • Time: ~30 seconds
  3. Reprocessing:
    • Replay 1 minute of messages since checkpoint
    • Time: ~30 seconds (depends on throughput)
  4. Total RTO: ~90 seconds

Optimization:

  • Local State: RocksDB on SSD for fast access
  • State Sharding: Partition state by user_id (key-by user_id)
  • Compression: Enable Snappy compression for checkpoints
  • Monitoring: Alert on checkpoint duration > 60s

Trade-offs:

  • Checkpoint interval: 1 min = 5% overhead, 90s recovery
  • Faster checkpoints (30s): 8% overhead, 60s recovery
  • Slower checkpoints (5 min): 2% overhead, 5-6 min recovery
  • Chose 1 min as balance for fraud detection SLA

Disaster Recovery:

  • Savepoints: Manual checkpoints for version upgrades
  • Cross-region replication: Replicate checkpoints to DR region
  • Retention: Keep last 10 checkpoints (10 hours history)β€œ

Code Example

Simple Checkpointing Implementation

import time
import json
import hashlib
from typing import Dict, Any
from pathlib import Path

class CheckpointManager:
    """
    Simple checkpointing system for stream processing
    """
    def __init__(self, checkpoint_dir: str, interval_seconds: int = 60):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.interval_seconds = interval_seconds
        self.last_checkpoint_time = 0
        self.checkpoint_id = 0

    def should_checkpoint(self) -> bool:
        """Check if it's time to create a checkpoint"""
        return time.time() - self.last_checkpoint_time >= self.interval_seconds

    def create_checkpoint(self, state: Dict[str, Any], offset: int) -> int:
        """
        Create a checkpoint by saving state and offset
        Returns checkpoint ID
        """
        self.checkpoint_id += 1
        checkpoint_path = self.checkpoint_dir / f"checkpoint-{self.checkpoint_id}"
        checkpoint_path.mkdir(parents=True, exist_ok=True)

        # Save application state
        state_file = checkpoint_path / "state.json"
        with open(state_file, 'w') as f:
            json.dump(state, f, indent=2)

        # Save stream offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'w') as f:
            f.write(str(offset))

        # Save checkpoint metadata
        metadata = {
            'checkpoint_id': self.checkpoint_id,
            'timestamp': time.time(),
            'offset': offset,
            'state_size': state_file.stat().st_size,
            'checksum': self._compute_checksum(state_file)
        }

        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)

        # Update last checkpoint pointer
        latest_file = self.checkpoint_dir / "_latest"
        with open(latest_file, 'w') as f:
            f.write(str(self.checkpoint_id))

        self.last_checkpoint_time = time.time()

        print(f"βœ“ Checkpoint {self.checkpoint_id} created: "
              f"offset={offset}, state_size={metadata['state_size']} bytes")

        return self.checkpoint_id

    def restore_latest_checkpoint(self) -> tuple[Dict[str, Any], int]:
        """
        Restore state and offset from latest checkpoint
        Returns (state, offset) tuple
        """
        latest_file = self.checkpoint_dir / "_latest"

        if not latest_file.exists():
            print("No checkpoint found, starting from beginning")
            return {}, 0

        # Read latest checkpoint ID
        with open(latest_file, 'r') as f:
            checkpoint_id = int(f.read().strip())

        checkpoint_path = self.checkpoint_dir / f"checkpoint-{checkpoint_id}"

        # Verify checkpoint integrity
        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)

        state_file = checkpoint_path / "state.json"
        checksum = self._compute_checksum(state_file)

        if checksum != metadata['checksum']:
            raise Exception(f"Checkpoint {checkpoint_id} corrupted! "
                          f"Checksum mismatch")

        # Restore state
        with open(state_file, 'r') as f:
            state = json.load(f)

        # Restore offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'r') as f:
            offset = int(f.read().strip())

        self.checkpoint_id = checkpoint_id

        print(f"βœ“ Restored checkpoint {checkpoint_id}: "
              f"offset={offset}, state_size={len(state)} items")

        return state, offset

    def _compute_checksum(self, file_path: Path) -> str:
        """Compute SHA256 checksum of file"""
        sha256 = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                sha256.update(chunk)
        return sha256.hexdigest()

    def cleanup_old_checkpoints(self, keep_last: int = 3):
        """Remove old checkpoints, keeping only the last N"""
        checkpoints = sorted(
            [d for d in self.checkpoint_dir.iterdir() if d.is_dir()],
            key=lambda d: int(d.name.split('-')[1])
        )

        # Remove old checkpoints
        for checkpoint in checkpoints[:-keep_last]:
            print(f"Removing old checkpoint: {checkpoint.name}")
            for file in checkpoint.iterdir():
                file.unlink()
            checkpoint.rmdir()

# Stream Processor with Checkpointing
class StreamProcessor:
    """
    Stream processor with checkpoint support
    """
    def __init__(self, checkpoint_dir: str):
        self.checkpoint_manager = CheckpointManager(checkpoint_dir)
        self.state: Dict[str, int] = {}  # word -> count
        self.current_offset = 0

        # Restore from last checkpoint
        self.state, self.current_offset = \
            self.checkpoint_manager.restore_latest_checkpoint()

    def process_message(self, message: str):
        """Process a single message (word count)"""
        words = message.lower().split()

        for word in words:
            self.state[word] = self.state.get(word, 0) + 1

        self.current_offset += 1

        # Checkpoint if interval elapsed
        if self.checkpoint_manager.should_checkpoint():
            self.checkpoint()

    def checkpoint(self):
        """Create a checkpoint"""
        self.checkpoint_manager.create_checkpoint(
            self.state,
            self.current_offset
        )

    def get_word_count(self, word: str) -> int:
        """Query current state"""
        return self.state.get(word, 0)

# Usage Example
if __name__ == '__main__':
    import sys

    processor = StreamProcessor('/tmp/checkpoints')

    # Simulate stream processing
    messages = [
        "hello world",
        "hello kafka",
        "stream processing",
        "checkpoint test",
        "hello checkpoint"
    ]

    print("\n=== Processing Stream ===")
    for i, msg in enumerate(messages):
        print(f"Message {i}: {msg}")
        processor.process_message(msg)

        # Simulate checkpoint every 2 messages
        if (i + 1) % 2 == 0:
            processor.checkpoint()

    # Query state
    print("\n=== Query State ===")
    print(f"Count of 'hello': {processor.get_word_count('hello')}")
    print(f"Count of 'checkpoint': {processor.get_word_count('checkpoint')}")

    # Simulate crash and recovery
    print("\n=== Simulating Crash and Recovery ===")
    del processor  # "crash"

    processor_recovered = StreamProcessor('/tmp/checkpoints')
    print(f"Count of 'hello' after recovery: "
          f"{processor_recovered.get_word_count('hello')}")
    print(f"Current offset after recovery: "
          f"{processor_recovered.current_offset}")

    # Cleanup
    processor_recovered.checkpoint_manager.cleanup_old_checkpoints(keep_last=2)
from dataclasses import dataclass
from typing import Union, List
import queue

@dataclass
class Message:
    """Regular stream message"""
    data: str
    offset: int

@dataclass
class Barrier:
    """Checkpoint barrier"""
    checkpoint_id: int

class BarrierBuffer:
    """
    Buffer messages until all input barriers received
    (for operators with multiple inputs)
    """
    def __init__(self, num_inputs: int):
        self.num_inputs = num_inputs
        self.pending_checkpoint = None
        self.received_barriers = set()
        self.buffered_messages = {i: [] for i in range(num_inputs)}

    def process(self, message: Union[Message, Barrier], input_id: int):
        """
        Process message or barrier from input stream
        Returns: (messages_to_process, should_checkpoint)
        """
        if isinstance(message, Barrier):
            if self.pending_checkpoint is None:
                # First barrier for this checkpoint
                self.pending_checkpoint = message.checkpoint_id
                self.received_barriers = {input_id}

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []
                return buffered, False

            elif message.checkpoint_id == self.pending_checkpoint:
                # Another barrier for same checkpoint
                self.received_barriers.add(input_id)

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []

                # All barriers received?
                if len(self.received_barriers) == self.num_inputs:
                    # Checkpoint!
                    self.pending_checkpoint = None
                    self.received_barriers = set()
                    return buffered, True
                else:
                    return buffered, False

        else:  # Regular message
            if self.pending_checkpoint is not None and \
               input_id not in self.received_barriers:
                # Buffer message (barrier not yet received from this input)
                self.buffered_messages[input_id].append(message)
                return [], False
            else:
                # Process message immediately
                return [message], False

class CheckpointedOperator:
    """
    Stream operator with checkpoint support
    """
    def __init__(self, num_inputs: int = 1):
        self.state = {}
        self.barrier_buffer = BarrierBuffer(num_inputs)
        self.checkpoint_manager = CheckpointManager('/tmp/operator-checkpoints')

    def on_message(self, message: Union[Message, Barrier], input_id: int = 0):
        """Handle incoming message or barrier"""
        messages, should_checkpoint = \
            self.barrier_buffer.process(message, input_id)

        # Process buffered/immediate messages
        for msg in messages:
            self._process_data(msg.data)

        # Checkpoint if all barriers received
        if should_checkpoint:
            self._checkpoint()

    def _process_data(self, data: str):
        """Application logic (word count)"""
        self.state[data] = self.state.get(data, 0) + 1

    def _checkpoint(self):
        """Save state to checkpoint"""
        print(f"β†’ Operator checkpointing: state size = {len(self.state)}")
        self.checkpoint_manager.create_checkpoint(self.state, 0)

# Example: Two-input operator (join)
if __name__ == '__main__':
    operator = CheckpointedOperator(num_inputs=2)

    # Stream 1 messages
    operator.on_message(Message("hello", 0), input_id=0)
    operator.on_message(Message("world", 1), input_id=0)
    operator.on_message(Barrier(checkpoint_id=1), input_id=0)

    # Stream 2 messages (delayed barrier)
    operator.on_message(Message("kafka", 0), input_id=1)
    operator.on_message(Message("flink", 1), input_id=1)

    # This completes the checkpoint
    operator.on_message(Barrier(checkpoint_id=1), input_id=1)

    print(f"Final state: {operator.state}")

Prerequisites:

Related Concepts:

Used In Systems:

  • Apache Flink: Distributed checkpointing with barriers
  • Spark Structured Streaming: Micro-batch checkpointing
  • Kafka Streams: State stores with changelog

Explained In Detail:

  • Stream Processing Deep Dive - Checkpointing implementation details

Quick Self-Check

  • Can explain checkpointing in 60 seconds?
  • Understand how checkpoint barriers work?
  • Know the trade-offs between checkpoint frequencies?
  • Can explain how checkpointing enables exactly-once semantics?
  • Understand incremental vs full checkpointing?
  • Can design a checkpoint strategy for given requirements?