Transactions and Exactly-Once Semantics
Deep dive into Kafka transactions, idempotent producers, exactly-once semantics, and atomic read-process-write patterns
Concepts Covered in This Article
Exactly-Once Semantics Foundations
Understanding Exactly-Once Guarantees
Exactly-once semantics (EOS) in Kafka provides idempotent message delivery with transactional boundaries, eliminating duplicates while ensuring atomicity across multiple operations:
EXACTLY-ONCE SEMANTIC BOUNDARIES:
┌─────────────────────────────────────────────────────────────┐
│ KAFKA EOS GUARANTEE SCOPE │
├─────────────────────────────────────────────────────────────┤
│ Producer ──▶ [Transaction] ──▶ Kafka ──▶ [Transaction] ──▶ Consumer │
│ │ │ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │• Idempotent │ │• Read │ │
│ │ writes │ │ committed │ │
│ │• Atomic │ │• Isolation │ │
│ │ batches │ │ levels │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
EOS GUARANTEES:
├── Each message delivered exactly once within transaction scope
├── No duplicates due to retries or failures
├── Atomic commit/abort across multiple partitions/topics
├── Read isolation prevents consuming uncommitted data
└── Zombie producer fencing via epochs and producer IDs
System Requirements and Limitations
Configuration Requirements:
// Producer configuration for EOS
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1");
// Automatic configurations (cannot override):
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection <= 5
// Consumer configuration for EOS
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Scope Limitations (Critical Understanding):
public class EOSScopeLimitations {
void understandLimitations() {
// ✓ EOS COVERS:
// - Kafka-to-Kafka processing within single transaction
// - Producer idempotency across retries
// - Consumer reading only committed data
// - Atomic writes across multiple partitions
// ✕ EOS DOES NOT COVER:
// - External system interactions (databases, APIs)
// - Network failures outside Kafka
// - Application-level processing failures
// - Cross-cluster operations
// - Consumer-side processing failures
// Example limitation:
processRecord(record); // ← Application failure NOT covered
database.save(result); // ← Database failure NOT covered
producer.send(outputRecord); // ← Only this is covered by EOS
}
}
Transaction Coordinator Architecture
Coordinator Architecture and State Management
The transaction coordinator manages distributed transaction state using a specialized internal topic:
TRANSACTION COORDINATOR ARCHITECTURE:
__transaction_state Topic (50 partitions by default)
┌─────────────────────────────────────────────────────────────┐
│ Partition 0: [TxnId-A:ONGOING] [TxnId-B:COMMITTED] ... │
│ Partition 1: [TxnId-C:ABORTED] [TxnId-D:ONGOING] ... │
│ ... │
│ Partition 49: [TxnId-Y:COMMITTED] [TxnId-Z:ONGOING] ... │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ TRANSACTION COORDINATOR │
├─────────────────────────────────────────────────────────────┤
│ • Maps transactional.id → Coordinator (hash-based) │
│ • Maintains transaction state (ONGOING/COMMITTED/ABORTED) │
│ • Handles 2PC protocol with partition leaders │
│ • Manages producer epochs for zombie fencing │
│ • Coordinates transaction markers across partitions │
└─────────────────────────────────────────────────────────────┘
Two-Phase Commit Protocol Implementation
Phase 1: Prepare (Transaction Log Updates)
public class TransactionCoordinatorFlow {
void demonstrateTwoPhaseCommit() {
// PHASE 1: PREPARE
// 1. Producer sends EndTransaction(COMMIT) to coordinator
// 2. Coordinator validates transaction state
// 3. Coordinator writes PREPARE_COMMIT to __transaction_state
// 4. Coordinator identifies all partitions in transaction
TransactionState txnState = TransactionState.builder()
.transactionalId("payment-processor-1")
.producerId(12345L)
.epoch(5)
.status(TransactionStatus.PREPARE_COMMIT)
.partitions(Set.of(
new TopicPartition("payments", 0),
new TopicPartition("orders", 2),
new TopicPartition("audit", 1)
))
.build();
// PHASE 2: COMMIT (Marker Distribution)
// 5. Coordinator sends COMMIT markers to all partition leaders
// 6. Partition leaders write markers to their logs
// 7. Partition leaders acknowledge marker writes
// 8. Coordinator writes COMPLETE_COMMIT to __transaction_state
// 9. Transaction considered committed
}
}
Transaction Marker Flow:
TRANSACTION MARKER DISTRIBUTION:
Coordinator Partition Leaders
│ │
│──── COMMIT_MARKER ──────────▶ payments-0
│──── COMMIT_MARKER ──────────▶ orders-2
│──── COMMIT_MARKER ──────────▶ audit-1
│ │
│◄─── MARKER_ACK ──────────────│ payments-0
│◄─── MARKER_ACK ──────────────│ orders-2
│◄─── MARKER_ACK ──────────────│ audit-1
│ │
│ [Write COMPLETE_COMMIT] │
Transaction Markers in Log:
[UserRecord][UserRecord][COMMIT_MARKER][UserRecord][UserRecord]
│
└── Invisible to consumers
Marks transaction boundary
Producer Epoch and Zombie Fencing
Zombie Producer Prevention:
public class ZombieFencingMechanism {
void demonstrateEpochFencing() {
// Scenario: Network partition causes "zombie" producer
// Original producer (becomes zombie due to network partition)
ProducerSession originalProducer = ProducerSession.builder()
.transactionalId("payment-processor-1")
.producerId(12345L)
.epoch(5) // ← Original epoch
.build();
// New producer instance (after timeout/recovery)
ProducerSession newProducer = ProducerSession.builder()
.transactionalId("payment-processor-1") // ← Same transactional ID
.producerId(12345L) // ← Same producer ID
.epoch(6) // ← Incremented epoch
.build();
// When zombie tries to write:
// 1. Zombie sends request with epoch=5
// 2. Broker sees current epoch=6 > zombie epoch=5
// 3. Broker rejects request with INVALID_PRODUCER_EPOCH
// 4. Zombie producer is permanently fenced out
// Benefits:
// - Prevents duplicate writes from zombie producers
// - Maintains exactly-once guarantees during failures
// - Automatic without manual intervention
}
}
Producer-Consumer Transaction Patterns
Transactional Producer Implementation
public class TransactionalProducerPattern {
private final KafkaProducer<String, String> producer;
public TransactionalProducerPattern(String transactionalId) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(props);
this.producer.initTransactions(); // Initialize transaction state
}
public void sendTransactionally(List<ProducerRecord<String, String>> records) {
producer.beginTransaction();
try {
// Send all records as part of single transaction
for (ProducerRecord<String, String> record : records) {
Future<RecordMetadata> future = producer.send(record);
// Note: Don't block on individual sends for performance
}
// Flush to ensure all records sent before commit
producer.flush();
// Atomic commit - all records visible together
producer.commitTransaction();
} catch (Exception e) {
// Atomic abort - no records from this transaction visible
producer.abortTransaction();
throw new TransactionException("Transaction failed", e);
}
}
// Advanced: Transaction with offset commits (consume-transform-produce)
public void processTransactionally(
Map<TopicPartition, OffsetAndMetadata> offsets,
List<ProducerRecord<String, String>> outputRecords,
String consumerGroupId) {
producer.beginTransaction();
try {
// Send output records
outputRecords.forEach(producer::send);
// Commit consumer offsets as part of transaction
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
// Atomic commit: both outputs and offsets committed together
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
}
Read-Committed Consumer Pattern
public class ReadCommittedConsumerPattern {
private final KafkaConsumer<String, String> consumer;
public ReadCommittedConsumerPattern(String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.consumer = new KafkaConsumer<>(props);
}
public void processWithReadCommitted() {
consumer.subscribe(Arrays.asList("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// Only committed records returned - no uncommitted data
for (ConsumerRecord<String, String> record : records) {
// Process record (guaranteed to be from committed transaction)
processRecord(record);
// Can see transaction markers in debug mode:
if (record.headers().lastHeader("__transaction_marker") != null) {
// This is a transaction marker (usually filtered out)
handleTransactionMarker(record);
}
}
// Manual offset commits for exactly-once processing
consumer.commitSync();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
// Guaranteed: This record is from a committed transaction
System.out.printf("Processing committed record: %s%n", record.value());
}
}
Consistency vs Performance Tradeoffs
Performance Impact Analysis
Transactional vs Non-Transactional Performance:
public class PerformanceTradeoffAnalysis {
void measureTransactionalOverhead() {
// Benchmark results (typical production workloads):
BenchmarkResults nonTransactional = BenchmarkResults.builder()
.throughput("500k messages/sec")
.latencyP99("2ms")
.cpuOverhead("baseline")
.networkOverhead("baseline")
.build();
BenchmarkResults transactional = BenchmarkResults.builder()
.throughput("300k messages/sec") // ~40% reduction
.latencyP99("8ms") // ~4x increase
.cpuOverhead("+25%") // Additional coordination
.networkOverhead("+15%") // Transaction markers
.build();
// Overhead sources:
// 1. Two-phase commit protocol coordination
// 2. Transaction state management
// 3. Producer ID and epoch tracking
// 4. Additional network round trips
// 5. Transaction marker writes
}
void optimizeTransactionalPerformance() {
Properties optimizedProps = new Properties();
// Batch more aggressively for transactions
optimizedProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
optimizedProps.put(ProducerConfig.LINGER_MS_CONFIG, 50); // 50ms
// Buffer larger transactions
optimizedProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB
// Optimize for transaction latency
optimizedProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// Result: ~60% of non-transactional performance achievable
}
}
Decision Framework for EOS Adoption
public class EOSDecisionFramework {
public boolean shouldUseExactlyOnce(SystemRequirements requirements) {
// STRONG YES - Use EOS:
if (requirements.isMissionCritical() &&
requirements.requiresZeroDuplicates() &&
requirements.canAcceptPerformanceReduction(40)) {
return true;
}
// STRONG NO - Don't use EOS:
if (requirements.isHighThroughputSystem() &&
requirements.canTolerateRareDuplicates() &&
requirements.requiresSubMillisecondLatency()) {
return false;
}
// CONDITIONAL - Consider alternatives:
return evaluateAlternatives(requirements);
}
private boolean evaluateAlternatives(SystemRequirements requirements) {
// Alternative 1: Idempotent processing at application level
if (canImplementIdempotency(requirements)) {
return false; // Use idempotent processors instead
}
// Alternative 2: At-least-once with deduplication
if (canImplementDeduplication(requirements)) {
return false; // Use dedup storage/cache
}
// Alternative 3: Accept duplicates with monitoring
if (requirements.getDuplicateImpact() == Impact.LOW) {
return false; // Monitor and accept rare duplicates
}
return true; // EOS is the right choice
}
}
Failure Modes and Recovery Mechanisms
Transaction Coordinator Failure Scenarios
Coordinator Failover Process:
public class CoordinatorFailureRecovery {
void demonstrateCoordinatorFailover() {
// Failure scenario: Transaction coordinator broker fails
// Before failure:
TransactionState beforeFailure = TransactionState.builder()
.coordinator("broker-2")
.transactionId("payment-processor-1")
.status(TransactionStatus.ONGOING)
.partitions(Set.of(
new TopicPartition("payments", 0),
new TopicPartition("orders", 1)
))
.build();
// After coordinator failure and recovery:
// 1. New coordinator elected for __transaction_state partition
// 2. New coordinator reads transaction state from log
// 3. Incomplete transactions are aborted (conservative approach)
// 4. Producers reconnect and get new epochs
TransactionState afterRecovery = TransactionState.builder()
.coordinator("broker-3") // New coordinator
.transactionId("payment-processor-1")
.status(TransactionStatus.ABORTED) // Incomplete txn aborted
.epoch(6) // Incremented epoch
.build();
}
void handleProducerRecovery() {
// Producer recovery after coordinator failure:
try {
producer.beginTransaction();
// ... transaction operations
producer.commitTransaction();
} catch (InvalidProducerEpochException e) {
// Producer epoch invalidated during coordinator failover
producer.close();
// Reinitialize with same transactional ID
producer = createNewProducer(SAME_TRANSACTIONAL_ID);
producer.initTransactions(); // Gets new epoch
// Retry transaction with new producer instance
}
}
}
Network Partition Handling
public class NetworkPartitionRecovery {
void demonstratePartitionRecovery() {
// Scenario: Producer partitioned from coordinator
// During partition:
// 1. Producer cannot commit transactions
// 2. In-flight transactions timeout
// 3. Coordinator marks transactions as aborted
// 4. Producer accumulates records in buffer
try {
producer.beginTransaction();
// These records buffered locally during partition
producer.send(new ProducerRecord<>("topic", "message1"));
producer.send(new ProducerRecord<>("topic", "message2"));
// This will block until partition heals or timeout
producer.commitTransaction(); // May throw TimeoutException
} catch (TimeoutException e) {
// Partition detected - abort and retry
producer.abortTransaction();
// Implement exponential backoff retry
retryWithBackoff(() -> {
producer.beginTransaction();
// Resend messages
producer.commitTransaction();
});
}
}
}
Stream Processing Integration
Kafka Streams Exactly-Once Integration
public class StreamsExactlyOnceExample {
public void configureExactlyOnceStreams() {
Properties streamsConfig = new Properties();
// Enable exactly-once processing
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2); // Latest version
// Required for exactly-once
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "payment-processor");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Optimize for exactly-once performance
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); // 10s commits
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB cache
StreamsBuilder builder = new StreamsBuilder();
// Exactly-once stream processing topology
KStream<String, String> payments = builder.stream("payments");
payments
.mapValues(this::processPayment) // Transform
.filter((key, value) -> isValid(value)) // Filter
.to("processed-payments"); // Output - exactly-once guaranteed
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
}
private String processPayment(String payment) {
// Business logic guaranteed to execute exactly once per input record
return enhancePaymentData(payment);
}
}
Multi-System Transaction Patterns
Outbox Pattern with Kafka Transactions
public class OutboxPatternWithTransactions {
@Transactional
public void processOrderWithOutbox(OrderRequest request) {
// Phase 1: Database transaction with outbox
// Save business entity
Order order = orderService.createOrder(request);
// Save outbox event in SAME database transaction
OutboxEvent outboxEvent = OutboxEvent.builder()
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(order.toJson())
.status(OutboxStatus.PENDING)
.build();
outboxRepository.save(outboxEvent);
// Database transaction commits - order and outbox event durable
}
@Component
public class OutboxPublisher {
private final KafkaProducer<String, String> transactionalProducer;
@Scheduled(fixedDelay = 1000) // Every second
public void publishOutboxEvents() {
List<OutboxEvent> pendingEvents = outboxRepository.findPendingEvents();
if (!pendingEvents.isEmpty()) {
transactionalProducer.beginTransaction();
try {
// Publish events to Kafka
for (OutboxEvent event : pendingEvents) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-events",
event.getAggregateId(),
event.getPayload()
);
transactionalProducer.send(record);
}
// Mark events as published in database
outboxRepository.markAsPublished(pendingEvents);
// Commit Kafka transaction
transactionalProducer.commitTransaction();
} catch (Exception e) {
transactionalProducer.abortTransaction();
// Events remain pending for retry
}
}
}
}
}
Alternative Approaches Comparison
EOS vs Other Consistency Models
public class ConsistencyModelComparison {
void compareApproaches() {
// Kafka EOS (Two-Phase Commit)
ConsistencyApproach kafkaEOS = ConsistencyApproach.builder()
.guarantees("Exactly-once within Kafka scope")
.performance("300k msgs/sec, 8ms p99 latency")
.complexity("High - requires transaction coordination")
.scope("Kafka producers/consumers only")
.failureHandling("Automatic with coordinator failover")
.build();
// Saga Pattern (Choreography)
ConsistencyApproach sagaChoreography = ConsistencyApproach.builder()
.guarantees("Eventual consistency with compensation")
.performance("500k msgs/sec, 3ms p99 latency")
.complexity("Medium - distributed state machines")
.scope("Cross-service, cross-system")
.failureHandling("Manual compensation logic required")
.build();
// Event Sourcing + CQRS
ConsistencyApproach eventSourcing = ConsistencyApproach.builder()
.guarantees("Strong consistency within aggregate")
.performance("Variable - depends on projection complexity")
.complexity("High - requires event design and projections")
.scope("Application-level across read/write models")
.failureHandling("Replay capability, temporal queries")
.build();
// Application-Level Idempotency
ConsistencyApproach applicationIdempotency = ConsistencyApproach.builder()
.guarantees("Idempotent processing with duplicate detection")
.performance("450k msgs/sec, 4ms p99 latency")
.complexity("Medium - requires deduplication logic")
.scope("Application-specific")
.failureHandling("Custom retry and dedup logic")
.build();
}
}
This interview-focused knowledge chunk on Kafka transactions and exactly-once semantics provides the deep technical understanding needed for L8_PRINCIPAL level discussions. The content emphasizes theoretical foundations, architectural decision-making, and system design tradeoffs while providing practical implementation patterns for complex distributed systems scenarios.
See Also: [[Event_Sourcing_CQRS]], [[Microservices_Saga_Patterns]], [[Stream_Processing_Frameworks]], [[Database_Consistency_Models]]