Skip to content

Offset Management

9 min Intermediate Messaging Interview: 80%

How distributed messaging systems track consumer progress through partitions using offsets, enabling fault tolerance, exactly-once processing, and replay capabilities

πŸ’Ό 80% of streaming interviews
Interview Relevance
80% of streaming interviews
🏭 Prevents data loss at scale
Production Impact
Powers systems at Prevents data loss at scale
⚑ Fault tolerance
Performance
Fault tolerance query improvement
πŸ“ˆ Historical replay
Scalability
Historical replay

TL;DR

Offsets are monotonically increasing integers that uniquely identify each message’s position within a partition. Consumers track their progress by storing offsets, enabling fault tolerance (resume from last position), replay (restart from any point), and exactly-once semantics (commit offsets transactionally with processing).

Visual Overview

PARTITION WITH OFFSETS:

Partition 0: user-events
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Offset 0β”‚Offset 1β”‚Offset 2β”‚Offset 3β”‚Offset 4β”‚Offset 5β”‚Offset 6β”‚
β”‚msg A   β”‚msg B   β”‚msg C   β”‚msg D   β”‚msg E   β”‚msg F   β”‚msg G   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              ↑                          ↑
                    Last Committed: 3          Current Position: 6
                    (safe to resume here)       (reading here now)

OFFSET STATES:
β”œβ”€β”€ Current Offset: Next message to read (6)
β”œβ”€β”€ Committed Offset: Last confirmed processed (3)
β”œβ”€β”€ Log End Offset: Last message in partition (6)
└── High Water Mark: Last replicated message (6)

CONSUMER OFFSET TRACKING:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ __consumer_offsets topic (internal) β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Key: (group, topic, partition)      β”‚
β”‚ Value: offset metadata               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ (analytics, events, 0) β†’ offset: 3  β”‚
β”‚ (analytics, events, 1) β†’ offset: 8  β”‚
β”‚ (fraud, events, 0) β†’ offset: 5      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Explanation

What is an Offset?

An offset is a unique, sequential 64-bit integer assigned to each message within a partition:

  • Unique per partition: Offset 5 in Partition 0 β‰  Offset 5 in Partition 1
  • Monotonically increasing: Never decreases, always incrementing
  • Permanent: Once assigned, never changes (immutable)
  • Zero-indexed: First message is offset 0
Partition Lifecycle:
1. Message arrives β†’ Broker assigns next offset β†’ Writes to log
2. Offset starts at 0 β†’ Increments forever β†’ No reuse after deletion
3. Old messages deleted (retention) β†’ Offset counter keeps increasing

Example:
Day 1: Offsets 0-1000 written
Day 2: Offsets 1001-2000 written
Day 3: Offsets 0-500 deleted (retention), but new messages start at 2001 (not 0!)

Offset Semantics: At-Most-Once vs At-Least-Once vs Exactly-Once

At-Most-Once (Commit Before Processing):

1. Consumer reads message (offset 5)
2. Consumer commits offset 6 immediately
3. Consumer processes message
   └─ If crash here: Message lost! βœ•

Risk: Data loss
Use case: Metrics, logs where loss is acceptable

At-Least-Once (Commit After Processing):

1. Consumer reads message (offset 5)
2. Consumer processes message
3. Consumer commits offset 6
   └─ If crash after step 2: Reprocesses offset 5 ⚠️

Risk: Duplicate processing
Use case: Idempotent operations (safe to retry)

Exactly-Once (Transactional Commit):

1. Consumer reads message (offset 5)
2. BEGIN TRANSACTION
   β”œβ”€ Process message (write to database)
   └─ Commit offset 6
3. COMMIT TRANSACTION
   └─ If crash anywhere: Transaction rolls back, restarts from offset 5

Risk: None (atomic processing + offset commit)
Use case: Financial transactions, state updates

Offset Storage: __consumer_offsets Topic

Internal Kafka Topic:

Topic: __consumer_offsets
β”œβ”€β”€ 50 partitions (default, configurable)
β”œβ”€β”€ Compacted log (keeps latest offset per key)
└── Replicated (fault tolerance)

KEY FORMAT:
(group.id, topic, partition) β†’ (consumer_group, topic_name, partition_number)

VALUE FORMAT:
{
  "offset": 12345,
  "metadata": "processed at 2025-10-12T12:00:00Z",
  "timestamp": 1728739200000,
  "leaderEpoch": 5
}

EXAMPLE ENTRIES:
Key: ("analytics", "user-events", 0) β†’ Value: {"offset": 1000, ...}
Key: ("analytics", "user-events", 1) β†’ Value: {"offset": 2500, ...}
Key: ("fraud-detection", "user-events", 0) β†’ Value: {"offset": 500, ...}

Coordinator Lookup:

// How consumer finds where to commit offsets
int partition = Math.abs(groupId.hashCode()) % numOffsetsTopicPartitions;
Broker coordinator = findLeader("__consumer_offsets", partition);
// All offset commits for "analytics" group go to same coordinator

Manual vs Automatic Offset Commit

Automatic Commit (Default):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Every 5 seconds

// Kafka automatically commits offsets in background
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record); // If crash here, might reprocess up to 5 seconds of data
    }
}

⚠️ At-least-once semantics (potential duplicates on crash)

Manual Commit (Precise Control):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);

        // Commit after EACH message (safest, slowest)
        consumer.commitSync();
    }
}

// OR batch commit (faster, risk multiple reprocessing)
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    consumer.commitSync(); // Commit entire batch
}

Async Commit (Performance):

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed for offsets: " + offsets, exception);
        // Handle commit failure (retry, alert, etc.)
    }
});

Offset Reset Strategies

What happens when no committed offset exists?

// EARLIEST: Start from beginning of partition
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use case: New consumer group wants all historical data

// LATEST (default): Start from end (only new messages)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// Use case: Only care about new events going forward

// NONE: Throw exception if no offset found
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
// Use case: Require explicit offset initialization

Example Scenarios:

Partition has offsets 0-1000:

Scenario 1: New consumer, auto.offset.reset=earliest
  β†’ Starts at offset 0 (processes all 1001 messages)

Scenario 2: New consumer, auto.offset.reset=latest
  β†’ Starts at offset 1001 (waits for new messages)

Scenario 3: Consumer crashes, had committed offset 500
  β†’ Resumes at offset 500 (regardless of auto.offset.reset)

Scenario 4: Consumer committed offset 500, retention deleted 0-600
  β†’ Starts at offset 601 (first available offset)

Seeking to Specific Offsets

Manual Offset Control:

// Seek to specific offset
consumer.seek(new TopicPartition("user-events", 0), 12345);

// Seek to beginning
consumer.seekToBeginning(Collections.singletonList(
    new TopicPartition("user-events", 0)
));

// Seek to end
consumer.seekToEnd(Collections.singletonList(
    new TopicPartition("user-events", 0)
));

// Seek by timestamp (find offset at specific time)
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(
    new TopicPartition("user-events", 0),
    System.currentTimeMillis() - 86400000 // 24 hours ago
);
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(timestampsToSearch);

Use Cases:

  • Debugging: Replay specific time range to reproduce issue
  • Reprocessing: Reprocess data after bug fix
  • Data Recovery: Restore state after data corruption
  • Testing: Start from known offset for integration tests

Tradeoffs

Advantages:

  • βœ“ Fault tolerance (resume from last position)
  • βœ“ Replay capability (reprocess historical data)
  • βœ“ Consumer independence (each group tracks own offsets)
  • βœ“ Flexible semantics (at-least-once, at-most-once, exactly-once)

Disadvantages:

  • βœ• Offset commits add latency overhead
  • βœ• Managing manual commits is complex
  • βœ• Duplicate processing with at-least-once semantics
  • βœ• Offset storage can become bottleneck at extreme scale

Real Systems Using This

Kafka (Apache)

  • Implementation: __consumer_offsets compacted topic with 50 partitions
  • Scale: Billions of offset commits per day at LinkedIn
  • Typical Setup: Auto-commit for simple pipelines, manual for critical data

Amazon Kinesis

  • Implementation: DynamoDB table for checkpoint storage (similar to offsets)
  • Scale: Auto-scaling checkpoint storage
  • Typical Setup: Kinesis Client Library (KCL) manages checkpoints automatically

Apache Pulsar

  • Implementation: Managed cursors stored in ledger metadata
  • Scale: Automatic cursor management with acknowledgment tracking
  • Typical Setup: Subscription cursors per consumer group

When to Use Different Commit Strategies

βœ“ Auto-Commit (Simplicity)

Scenario: Non-critical analytics pipeline, duplicates acceptable
Config: enable.auto.commit=true, auto.commit.interval.ms=5000
Trade-off: Simple code, potential duplicates on crash

βœ“ Manual Sync Commit (Safety)

Scenario: Financial transactions, payment processing
Config: enable.auto.commit=false, manual commitSync() after each message
Trade-off: Slower throughput, guaranteed at-least-once

βœ“ Manual Async Commit (Performance)

Scenario: High-throughput log aggregation
Config: enable.auto.commit=false, commitAsync() for batches
Trade-off: Risk of offset commit failure, but non-blocking

βœ“ Transactional Commit (Exactly-Once)

Scenario: Database updates from Kafka (must be atomic)
Config: Transactional producer + consumer, commit offsets in transaction
Trade-off: Complex setup, highest reliability

Interview Application

Common Interview Question 1

Q: β€œYour consumer processes a message and writes to a database, then crashes before committing the offset. What happens on restart? How do you handle this?”

Strong Answer:

β€œOn restart, the consumer will reprocess the message since the offset wasn’t committed - this is at-least-once semantics. This can cause duplicate database writes. Solutions:

  1. Idempotent processing: Use upsert instead of insert, or add message IDs to detect duplicates
  2. Transactional processing: Use database transactions to commit both the database write and Kafka offset atomically (requires transactional API)
  3. Exactly-once with Kafka transactions: Use isolation.level=read_committed and transactional producer/consumer

For critical systems like payments, I’d use option 2 or 3. For analytics where occasional duplicates are acceptable, option 1 is simpler.”

Why this is good:

  • Identifies the problem (duplicate processing)
  • Provides multiple solutions with tradeoffs
  • Matches solution to use case severity

Common Interview Question 2

Q: β€œHow would you reprocess the last 7 days of data from a Kafka topic?”

Strong Answer:

β€œI’d use consumer.offsetsForTimes() to find the offset from 7 days ago, then seek to that offset:

long sevenDaysAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);
Map<TopicPartition, Long> timestamps = Map.of(
    new TopicPartition("events", 0), sevenDaysAgo
);
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(timestamps);

offsets.forEach((partition, offsetAndTimestamp) -> {
    consumer.seek(partition, offsetAndTimestamp.offset());
});

Important considerations:

  • Ensure retention period is > 7 days, or data may be deleted
  • Create a NEW consumer group to avoid affecting production consumers
  • Consider data volume (7 days might be terabytes, need parallel consumers)
  • Implement idempotent processing to handle potential duplicates”

Why this is good:

  • Provides working code
  • Addresses retention concerns
  • Thinks about production impact
  • Considers scale and parallelism

Red Flags to Avoid

  • βœ• Confusing offset with message ID or timestamp
  • βœ• Not understanding at-least-once vs exactly-once semantics
  • βœ• Assuming offsets reset to 0 after deletion
  • βœ• Forgetting that offsets are per-partition, not per-topic

Quick Self-Check

Before moving on, can you:

  • Explain what an offset is in 30 seconds?
  • Draw the relationship between offsets, partitions, and consumers?
  • Explain at-least-once vs exactly-once semantics?
  • Describe how to replay data from a specific timestamp?
  • Choose appropriate commit strategy for different use cases?
  • Understand where offsets are stored?

Prerequisites

Used In Systems

Explained In Detail


Next Recommended: Log-Based Storage - Learn how Kafka stores messages using append-only logs