How distributed messaging systems guarantee each message is processed exactly once, eliminating duplicates while ensuring atomicity across multiple operations
Exactly-once semantics (EOS) in distributed messaging ensures each message is processed precisely one time, even in the presence of failures, retries, and network issues. It combines idempotent producers (deduplicating retries) with transactions (atomic multi-message operations) to eliminate duplicates while maintaining strong consistency guarantees.
Visual Overview
Three Delivery Guarantee Levels
Three Delivery Guarantee Levels
THREE DELIVERY GUARANTEE LEVELS:
AT-MOST-ONCE (Fire and Forget):
Producer ──▶ [Message] ──▶ Broker [FAIL] message lost
Result: 0 or 1 deliveries (message may be lost)
AT-LEAST-ONCE (Retry until success):
Producer ──▶ [Message] ──▶ Broker [ACK] received
──▶ [Message] ──▶ Broker [ACK] Retry on timeout (DUPLICATE!)
Result: 1+ deliveries (duplicates possible)
EXACTLY-ONCE (Idempotent + Transactional):
Producer ──▶ [Message seq=1] ──▶ Broker [STORED]
──▶ [Message seq=1] ──▶ Broker [IGNORED] Duplicate detected
Result: Exactly 1 delivery (no duplicates, no loss)
EXACTLY-ONCE COMPONENTS:
┌────────────────────────────────────────────────┐│ 1. Idempotent Producer││├──Producer ID (PID) ││├──Sequence numbers per partition ││└── Broker-side deduplication││││ 2. Transactions││├──Transaction coordinator││├──Two-phase commit protocol ││├── Atomic multi-partition writes ││└── Read isolation (read_committed) ││││ 3. Zombie Fencing││├── Producer epochs ││├──Fencing old producer instances ││└── Preventing split-brain scenarios │└────────────────────────────────────────────────┘
Core Explanation
What is Exactly-Once Semantics?
Exactly-once semantics guarantees that:
Every message sent is delivered to the consumer
No message is delivered more than once
Messages are processed atomically across multiple operations
This is achieved through two mechanisms:
Exactly-Once Mechanisms
Exactly-Once Mechanisms
MECHANISM 1: IDEMPOTENT PRODUCER (Eliminates duplicate writes)
Producer Instance
├──Producer ID (PID): 12345
├──Sequence Numbers:
│├── Partition 0: seq=[0, 1, 2, 3, ...]
│├── Partition 1: seq=[0, 1, 2, 3, ...]
│└── Partition 2: seq=[0, 1, 2, 3, ...]
Retry Scenario:
T=0: Send msg (seq=5) ──▶ Broker [STORED]
T=1: Network timeout, no ACK received
T=2: Retry msg (seq=5) ──▶ Broker sees PID=12345, seq=5 already stored
▶Ignores duplicate, returns success
Result: Message stored exactly onceMECHANISM 2: TRANSACTIONS (Atomic multi-message operations)
Transaction {
Write to topic A, partition 0
Write to topic B, partition 2
Write to topic C, partition 1
} ──▶All succeed OR all fail atomically
Consumer with isolation.level=read_committed:
├──Sees only committed transactions
├── Never sees partial/aborted transactions
└──Guaranteed consistent view
How Idempotent Producers Work
Producer ID and Sequence Numbers:
Producer ID and Sequence Numbers
Producer ID and Sequence Numbers
PRODUCER INITIALIZATION:
1. Producer starts up
2. RequestsProducer ID (PID) from broker
3. Broker assigns unique PID: 12345
4. Producer maintains sequence counters per partition:
Partition 0: next_seq = 0
Partition 1: next_seq = 0
Partition 2: next_seq = 0
SENDING MESSAGES:
Producer.send(topic="orders", partition=0, msg="order-123")
├──AttachPID=12345
├──Attachseq=0 (for partition 0)
├──Increment partition 0 seq to 1
└──Send to broker
Broker receives (PID=12345, partition=0, seq=0):
├──Check: Is this a duplicate?
├── Last seq for (PID=12345, partition=0) = -1 (no previous)
├──Accept: 0 > -1, this is new├──Store message└──Update last seq to 0
Producer.send(topic="orders", partition=0, msg="order-456")
├──AttachPID=12345
├──Attachseq=1 (incremented)
└──Send to broker
RETRY SCENARIO (Network failure):
Producer.send(topic="orders", partition=0, msg="order-789")
├──Send with seq=2
├── Broker stores it
├──ACK packet lost in network [FAIL]
└── Producer doesn't receive ACK
Producer retries:
├──Resend with same seq=2 (didn't increment)
└──Send to broker
Broker receives (PID=12345, partition=0, seq=2):
├──Check: Last seq = 2 (already stored)
├──Reject as duplicate: seq=2 is not > 2
├── Return success ACK (idempotent)
└──No duplicate stored!
TRANSACTION COORDINATOR SYSTEM:
__transaction_state Topic (Internal, 50 partitions)
┌──────────────────────────────────────────────────┐│ Stores transaction metadata: ││ - transactional.id→Producer ID mapping ││ - Transaction status (ONGOING/COMMITTED/ABORTED) ││ - Partitions involved in transaction ││ - Producer epochs (for zombie fencing) │└──────────────────────────────────────────────────┘Transaction Flow:
┌─────────────────────────────────────────────────┐│ 1. Producer: initTransactions() ││├──RequestProducer ID││├──Increment epoch (fence old producers) ││└── Get transaction coordinator assignment ││││ 2. Producer: beginTransaction() ││└── Mark transaction as ONGOING locally ││││ 3. Producer: send() messages ││├── Send to partition leaders ││└── Coordinator tracks partitions involved ││││ 4. Producer: commitTransaction() ││├──WritePREPARE_COMMIT to __transaction_state│├──Send COMMIT markers to all partitions ││├── Wait for partition ACKs ││├──WriteCOMPLETE_COMMIT││└── Transaction complete [SUCCESS] │└─────────────────────────────────────────────────┘
Two-Phase Commit Protocol:
Two-Phase Commit Protocol
Two-Phase Commit Protocol
TWO-PHASE COMMIT FLOW:
PHASE 1: PREPARE────────────────────────────────────────────────
Producer ──▶Transaction Coordinator
"I want to commit transaction X"
Coordinator:
1. Validate transaction state (must be ONGOING)
2. WritePREPARE_COMMIT to __transaction_state
3. Identify all partitions in transaction:
- topic-A, partition-0
- topic-B, partition-2
- topic-C, partition-1
PHASE 2: COMMIT────────────────────────────────────────────────
Coordinator ──▶Partition Leaders
"WriteCOMMIT markers"
Partition Leaders:
topic-A, partition-0: [msg1][msg2][COMMIT_MARKER]
topic-B, partition-2: [msg3][COMMIT_MARKER]
topic-C, partition-1: [msg4][msg5][COMMIT_MARKER]
↑
Transaction boundary marker
Coordinator receives all ACKs:
1. WriteCOMPLETE_COMMIT to __transaction_state
2. Transaction is now durable and visible
3. Consumers with read_committed see all messagesABORT SCENARIO:
────────────────────────────────────────────────
If ANY step fails:
1. Coordinator writes PREPARE_ABORT
2. Send ABORT markers to all partitions
3. WriteCOMPLETE_ABORT
4. Consumers never see aborted messages
Code Example:
public class ExactlyOnceProcessor { private final KafkaProducer<String, String> producer; private final KafkaConsumer<String, String> consumer; public ExactlyOnceProcessor() { // Producer setup Properties producerProps = new Properties(); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1"); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); producer = new KafkaProducer<>(producerProps); producer.initTransactions(); // Initialize transaction state // Consumer setup Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-group"); consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumer = new KafkaConsumer<>(consumerProps); } public void processExactlyOnce() { consumer.subscribe(Arrays.asList("input-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); if (!records.isEmpty()) { // Start transaction producer.beginTransaction(); try { // Process records and produce outputs for (ConsumerRecord<String, String> record : records) { String result = processRecord(record); producer.send(new ProducerRecord<>( "output-topic", record.key(), result )); } // Commit offsets as part of transaction Map<TopicPartition, OffsetAndMetadata> offsets = getOffsetsToCommit(records); producer.sendOffsetsToTransaction( offsets, consumer.groupMetadata() ); // Atomic commit: outputs + offsets together producer.commitTransaction(); // Exactly-once guarantee: // - Input consumed exactly once (offset committed) // - Output produced exactly once (transaction committed) // - Both atomic (all or nothing) } catch (Exception e) { // Abort transaction on any failure producer.abortTransaction(); // Offsets NOT committed, will reprocess from last commit // Output messages NOT visible to consumers } } } }}
Zombie Producer Fencing
The Zombie Problem:
The Zombie Problem
The Zombie Problem
SCENARIO: Producer appears to fail, new instance starts
Timeline:
T=0: Producer-A (PID=123, epoch=5) is running
T=10: Network partition, Producer-A isolated
T=20: Application restarts Producer-B (same transactional.id)
T=21: Producer-B gets (PID=123, epoch=6) ← Epoch incremented
T=30: Network heals, Producer-A reconnects (zombie!)
Without Fencing:
Producer-A: Writes with epoch=5
Producer-B: Writes with epoch=6
Result: Both write, duplicates! [PROBLEM]
With Fencing:
Producer-A: Sends request with epoch=5
Broker: Current epoch=6, reject with INVALID_PRODUCER_EPOCH
Producer-A: Permanently fenced, stops writes [FENCED]
Producer-B: Only valid producer, no duplicates
Scenario: E-commerce order fulfillmentWhy EOS: Duplicate orders = angry customers + lossConfig: Full transactions with offset commitsExample: Amazon order pipeline, Shopify checkouts
Database Change Data Capture (CDC)
CDC Use Case
CDC Use Case
Scenario: Replicating database changes to data warehouse
Why EOS: Duplicate records corrupt analyticsConfig: Transactional producer with idempotent writesExample: Debezium → Kafka → Snowflake pipelines
Audit and Compliance Logs
Audit and Compliance Use Case
Audit and Compliance Use Case
Scenario: Financial audit trails, healthcare recordsWhy EOS: Legal requirement for accurate recordsConfig: acks=all, transactions, long retention
Example: Banking transaction logs, HIPAA-compliant systems
When NOT to Use
High-Volume Metrics/Logs
When NOT to Use - High-Volume Logs
When NOT to Use - High-Volume Logs
Problem: 60% throughput hit unacceptable for logs
Alternative: At-least-once + application-level dedupExample: Observability data, clickstream analytics
Performance-Critical Real-Time Systems
When NOT to Use - Performance-Critical
When NOT to Use - Performance-Critical
Problem: 4x latency increase breaks SLAAlternative: At-least-once with dedup cacheExample: Ad bidding, real-time recommendations
Idempotent Consumers
When NOT to Use - Idempotent Consumers
When NOT to Use - Idempotent Consumers
Problem: Consumer already handles duplicatesAlternative: No need for EOS overheadExample: Incrementing counters (idempotent operation)
Interview Application
Common Interview Question 1
Q: “Explain how Kafka achieves exactly-once semantics and why it’s difficult in distributed systems.”
Strong Answer:
“Exactly-once is challenging because distributed systems face network failures, crashes, and retries that naturally cause duplicates. Kafka solves this with two mechanisms: