Skip to content

Kafka Series

Transactions and Exactly-Once Semantics

Deep dive into Kafka transactions, idempotent producers, exactly-once semantics, and atomic read-process-write patterns

Concepts Covered in This Article

Exactly-Once Semantics Foundations

Understanding Exactly-Once Guarantees

Exactly-once semantics (EOS) in Kafka provides idempotent message delivery with transactional boundaries, eliminating duplicates while ensuring atomicity across multiple operations:

EXACTLY-ONCE SEMANTIC BOUNDARIES:

┌─────────────────────────────────────────────────────────────┐
│                  KAFKA EOS GUARANTEE SCOPE                 │
├─────────────────────────────────────────────────────────────┤
│ Producer ──▶ [Transaction] ──▶ Kafka ──▶ [Transaction] ──▶ Consumer │
│                    │                         │                    │
│              ┌─────────────┐          ┌─────────────┐             │
│              │• Idempotent │          │• Read       │             │
│              │  writes     │          │  committed  │             │
│              │• Atomic     │          │• Isolation  │             │
│              │  batches    │          │  levels     │             │
│              └─────────────┘          └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘

EOS GUARANTEES:
├── Each message delivered exactly once within transaction scope
├── No duplicates due to retries or failures
├── Atomic commit/abort across multiple partitions/topics
├── Read isolation prevents consuming uncommitted data
└── Zombie producer fencing via epochs and producer IDs

System Requirements and Limitations

Configuration Requirements:

// Producer configuration for EOS
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1");

// Automatic configurations (cannot override):
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection <= 5

// Consumer configuration for EOS
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Scope Limitations (Critical Understanding):

public class EOSScopeLimitations {

    void understandLimitations() {
        // ✓ EOS COVERS:
        // - Kafka-to-Kafka processing within single transaction
        // - Producer idempotency across retries
        // - Consumer reading only committed data
        // - Atomic writes across multiple partitions

        // ✕ EOS DOES NOT COVER:
        // - External system interactions (databases, APIs)
        // - Network failures outside Kafka
        // - Application-level processing failures
        // - Cross-cluster operations
        // - Consumer-side processing failures

        // Example limitation:
        processRecord(record);          // ← Application failure NOT covered
        database.save(result);          // ← Database failure NOT covered
        producer.send(outputRecord);    // ← Only this is covered by EOS
    }
}

Transaction Coordinator Architecture

Coordinator Architecture and State Management

The transaction coordinator manages distributed transaction state using a specialized internal topic:

TRANSACTION COORDINATOR ARCHITECTURE:

__transaction_state Topic (50 partitions by default)
┌─────────────────────────────────────────────────────────────┐
│ Partition 0: [TxnId-A:ONGOING] [TxnId-B:COMMITTED] ...     │
│ Partition 1: [TxnId-C:ABORTED] [TxnId-D:ONGOING] ...       │
│ ...                                                         │
│ Partition 49: [TxnId-Y:COMMITTED] [TxnId-Z:ONGOING] ...    │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│              TRANSACTION COORDINATOR                        │
├─────────────────────────────────────────────────────────────┤
│ • Maps transactional.id → Coordinator (hash-based)         │
│ • Maintains transaction state (ONGOING/COMMITTED/ABORTED)   │
│ • Handles 2PC protocol with partition leaders              │
│ • Manages producer epochs for zombie fencing               │
│ • Coordinates transaction markers across partitions        │
└─────────────────────────────────────────────────────────────┘

Two-Phase Commit Protocol Implementation

Phase 1: Prepare (Transaction Log Updates)

public class TransactionCoordinatorFlow {

    void demonstrateTwoPhaseCommit() {
        // PHASE 1: PREPARE
        // 1. Producer sends EndTransaction(COMMIT) to coordinator
        // 2. Coordinator validates transaction state
        // 3. Coordinator writes PREPARE_COMMIT to __transaction_state
        // 4. Coordinator identifies all partitions in transaction

        TransactionState txnState = TransactionState.builder()
            .transactionalId("payment-processor-1")
            .producerId(12345L)
            .epoch(5)
            .status(TransactionStatus.PREPARE_COMMIT)
            .partitions(Set.of(
                new TopicPartition("payments", 0),
                new TopicPartition("orders", 2),
                new TopicPartition("audit", 1)
            ))
            .build();

        // PHASE 2: COMMIT (Marker Distribution)
        // 5. Coordinator sends COMMIT markers to all partition leaders
        // 6. Partition leaders write markers to their logs
        // 7. Partition leaders acknowledge marker writes
        // 8. Coordinator writes COMPLETE_COMMIT to __transaction_state
        // 9. Transaction considered committed
    }
}

Transaction Marker Flow:

TRANSACTION MARKER DISTRIBUTION:

Coordinator                    Partition Leaders
     │                              │
     │──── COMMIT_MARKER ──────────▶ payments-0
     │──── COMMIT_MARKER ──────────▶ orders-2
     │──── COMMIT_MARKER ──────────▶ audit-1
     │                              │
     │◄─── MARKER_ACK ──────────────│ payments-0
     │◄─── MARKER_ACK ──────────────│ orders-2
     │◄─── MARKER_ACK ──────────────│ audit-1
     │                              │
     │ [Write COMPLETE_COMMIT]      │

Transaction Markers in Log:
[UserRecord][UserRecord][COMMIT_MARKER][UserRecord][UserRecord]

                           └── Invisible to consumers
                               Marks transaction boundary

Producer Epoch and Zombie Fencing

Zombie Producer Prevention:

public class ZombieFencingMechanism {

    void demonstrateEpochFencing() {
        // Scenario: Network partition causes "zombie" producer

        // Original producer (becomes zombie due to network partition)
        ProducerSession originalProducer = ProducerSession.builder()
            .transactionalId("payment-processor-1")
            .producerId(12345L)
            .epoch(5)              // ← Original epoch
            .build();

        // New producer instance (after timeout/recovery)
        ProducerSession newProducer = ProducerSession.builder()
            .transactionalId("payment-processor-1") // ← Same transactional ID
            .producerId(12345L)     // ← Same producer ID
            .epoch(6)               // ← Incremented epoch
            .build();

        // When zombie tries to write:
        // 1. Zombie sends request with epoch=5
        // 2. Broker sees current epoch=6 > zombie epoch=5
        // 3. Broker rejects request with INVALID_PRODUCER_EPOCH
        // 4. Zombie producer is permanently fenced out

        // Benefits:
        // - Prevents duplicate writes from zombie producers
        // - Maintains exactly-once guarantees during failures
        // - Automatic without manual intervention
    }
}

Producer-Consumer Transaction Patterns

Transactional Producer Implementation

public class TransactionalProducerPattern {
    private final KafkaProducer<String, String> producer;

    public TransactionalProducerPattern(String transactionalId) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        this.producer = new KafkaProducer<>(props);
        this.producer.initTransactions(); // Initialize transaction state
    }

    public void sendTransactionally(List<ProducerRecord<String, String>> records) {
        producer.beginTransaction();

        try {
            // Send all records as part of single transaction
            for (ProducerRecord<String, String> record : records) {
                Future<RecordMetadata> future = producer.send(record);
                // Note: Don't block on individual sends for performance
            }

            // Flush to ensure all records sent before commit
            producer.flush();

            // Atomic commit - all records visible together
            producer.commitTransaction();

        } catch (Exception e) {
            // Atomic abort - no records from this transaction visible
            producer.abortTransaction();
            throw new TransactionException("Transaction failed", e);
        }
    }

    // Advanced: Transaction with offset commits (consume-transform-produce)
    public void processTransactionally(
            Map<TopicPartition, OffsetAndMetadata> offsets,
            List<ProducerRecord<String, String>> outputRecords,
            String consumerGroupId) {

        producer.beginTransaction();

        try {
            // Send output records
            outputRecords.forEach(producer::send);

            // Commit consumer offsets as part of transaction
            producer.sendOffsetsToTransaction(offsets, consumerGroupId);

            // Atomic commit: both outputs and offsets committed together
            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

Read-Committed Consumer Pattern

public class ReadCommittedConsumerPattern {
    private final KafkaConsumer<String, String> consumer;

    public ReadCommittedConsumerPattern(String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        this.consumer = new KafkaConsumer<>(props);
    }

    public void processWithReadCommitted() {
        consumer.subscribe(Arrays.asList("input-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            // Only committed records returned - no uncommitted data
            for (ConsumerRecord<String, String> record : records) {

                // Process record (guaranteed to be from committed transaction)
                processRecord(record);

                // Can see transaction markers in debug mode:
                if (record.headers().lastHeader("__transaction_marker") != null) {
                    // This is a transaction marker (usually filtered out)
                    handleTransactionMarker(record);
                }
            }

            // Manual offset commits for exactly-once processing
            consumer.commitSync();
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
        // Guaranteed: This record is from a committed transaction
        System.out.printf("Processing committed record: %s%n", record.value());
    }
}

Consistency vs Performance Tradeoffs

Performance Impact Analysis

Transactional vs Non-Transactional Performance:

public class PerformanceTradeoffAnalysis {

    void measureTransactionalOverhead() {
        // Benchmark results (typical production workloads):

        BenchmarkResults nonTransactional = BenchmarkResults.builder()
            .throughput("500k messages/sec")
            .latencyP99("2ms")
            .cpuOverhead("baseline")
            .networkOverhead("baseline")
            .build();

        BenchmarkResults transactional = BenchmarkResults.builder()
            .throughput("300k messages/sec")      // ~40% reduction
            .latencyP99("8ms")                    // ~4x increase
            .cpuOverhead("+25%")                  // Additional coordination
            .networkOverhead("+15%")              // Transaction markers
            .build();

        // Overhead sources:
        // 1. Two-phase commit protocol coordination
        // 2. Transaction state management
        // 3. Producer ID and epoch tracking
        // 4. Additional network round trips
        // 5. Transaction marker writes
    }

    void optimizeTransactionalPerformance() {
        Properties optimizedProps = new Properties();

        // Batch more aggressively for transactions
        optimizedProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);      // 64KB
        optimizedProps.put(ProducerConfig.LINGER_MS_CONFIG, 50);          // 50ms

        // Buffer larger transactions
        optimizedProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB

        // Optimize for transaction latency
        optimizedProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

        // Result: ~60% of non-transactional performance achievable
    }
}

Decision Framework for EOS Adoption

public class EOSDecisionFramework {

    public boolean shouldUseExactlyOnce(SystemRequirements requirements) {

        // STRONG YES - Use EOS:
        if (requirements.isMissionCritical() &&
            requirements.requiresZeroDuplicates() &&
            requirements.canAcceptPerformanceReduction(40)) {
            return true;
        }

        // STRONG NO - Don't use EOS:
        if (requirements.isHighThroughputSystem() &&
            requirements.canTolerateRareDuplicates() &&
            requirements.requiresSubMillisecondLatency()) {
            return false;
        }

        // CONDITIONAL - Consider alternatives:
        return evaluateAlternatives(requirements);
    }

    private boolean evaluateAlternatives(SystemRequirements requirements) {
        // Alternative 1: Idempotent processing at application level
        if (canImplementIdempotency(requirements)) {
            return false; // Use idempotent processors instead
        }

        // Alternative 2: At-least-once with deduplication
        if (canImplementDeduplication(requirements)) {
            return false; // Use dedup storage/cache
        }

        // Alternative 3: Accept duplicates with monitoring
        if (requirements.getDuplicateImpact() == Impact.LOW) {
            return false; // Monitor and accept rare duplicates
        }

        return true; // EOS is the right choice
    }
}

Failure Modes and Recovery Mechanisms

Transaction Coordinator Failure Scenarios

Coordinator Failover Process:

public class CoordinatorFailureRecovery {

    void demonstrateCoordinatorFailover() {
        // Failure scenario: Transaction coordinator broker fails

        // Before failure:
        TransactionState beforeFailure = TransactionState.builder()
            .coordinator("broker-2")
            .transactionId("payment-processor-1")
            .status(TransactionStatus.ONGOING)
            .partitions(Set.of(
                new TopicPartition("payments", 0),
                new TopicPartition("orders", 1)
            ))
            .build();

        // After coordinator failure and recovery:
        // 1. New coordinator elected for __transaction_state partition
        // 2. New coordinator reads transaction state from log
        // 3. Incomplete transactions are aborted (conservative approach)
        // 4. Producers reconnect and get new epochs

        TransactionState afterRecovery = TransactionState.builder()
            .coordinator("broker-3")           // New coordinator
            .transactionId("payment-processor-1")
            .status(TransactionStatus.ABORTED) // Incomplete txn aborted
            .epoch(6)                          // Incremented epoch
            .build();
    }

    void handleProducerRecovery() {
        // Producer recovery after coordinator failure:
        try {
            producer.beginTransaction();
            // ... transaction operations
            producer.commitTransaction();

        } catch (InvalidProducerEpochException e) {
            // Producer epoch invalidated during coordinator failover
            producer.close();

            // Reinitialize with same transactional ID
            producer = createNewProducer(SAME_TRANSACTIONAL_ID);
            producer.initTransactions(); // Gets new epoch

            // Retry transaction with new producer instance
        }
    }
}

Network Partition Handling

public class NetworkPartitionRecovery {

    void demonstratePartitionRecovery() {
        // Scenario: Producer partitioned from coordinator

        // During partition:
        // 1. Producer cannot commit transactions
        // 2. In-flight transactions timeout
        // 3. Coordinator marks transactions as aborted
        // 4. Producer accumulates records in buffer

        try {
            producer.beginTransaction();

            // These records buffered locally during partition
            producer.send(new ProducerRecord<>("topic", "message1"));
            producer.send(new ProducerRecord<>("topic", "message2"));

            // This will block until partition heals or timeout
            producer.commitTransaction(); // May throw TimeoutException

        } catch (TimeoutException e) {
            // Partition detected - abort and retry
            producer.abortTransaction();

            // Implement exponential backoff retry
            retryWithBackoff(() -> {
                producer.beginTransaction();
                // Resend messages
                producer.commitTransaction();
            });
        }
    }
}

Stream Processing Integration

Kafka Streams Exactly-Once Integration

public class StreamsExactlyOnceExample {

    public void configureExactlyOnceStreams() {
        Properties streamsConfig = new Properties();

        // Enable exactly-once processing
        streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
                         StreamsConfig.EXACTLY_ONCE_V2); // Latest version

        // Required for exactly-once
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "payment-processor");
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Optimize for exactly-once performance
        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); // 10s commits
        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB cache

        StreamsBuilder builder = new StreamsBuilder();

        // Exactly-once stream processing topology
        KStream<String, String> payments = builder.stream("payments");

        payments
            .mapValues(this::processPayment)          // Transform
            .filter((key, value) -> isValid(value))  // Filter
            .to("processed-payments");               // Output - exactly-once guaranteed

        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
        streams.start();
    }

    private String processPayment(String payment) {
        // Business logic guaranteed to execute exactly once per input record
        return enhancePaymentData(payment);
    }
}

Multi-System Transaction Patterns

Outbox Pattern with Kafka Transactions

public class OutboxPatternWithTransactions {

    @Transactional
    public void processOrderWithOutbox(OrderRequest request) {
        // Phase 1: Database transaction with outbox

        // Save business entity
        Order order = orderService.createOrder(request);

        // Save outbox event in SAME database transaction
        OutboxEvent outboxEvent = OutboxEvent.builder()
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(order.toJson())
            .status(OutboxStatus.PENDING)
            .build();

        outboxRepository.save(outboxEvent);

        // Database transaction commits - order and outbox event durable
    }

    @Component
    public class OutboxPublisher {
        private final KafkaProducer<String, String> transactionalProducer;

        @Scheduled(fixedDelay = 1000) // Every second
        public void publishOutboxEvents() {
            List<OutboxEvent> pendingEvents = outboxRepository.findPendingEvents();

            if (!pendingEvents.isEmpty()) {
                transactionalProducer.beginTransaction();

                try {
                    // Publish events to Kafka
                    for (OutboxEvent event : pendingEvents) {
                        ProducerRecord<String, String> record = new ProducerRecord<>(
                            "order-events",
                            event.getAggregateId(),
                            event.getPayload()
                        );
                        transactionalProducer.send(record);
                    }

                    // Mark events as published in database
                    outboxRepository.markAsPublished(pendingEvents);

                    // Commit Kafka transaction
                    transactionalProducer.commitTransaction();

                } catch (Exception e) {
                    transactionalProducer.abortTransaction();
                    // Events remain pending for retry
                }
            }
        }
    }
}

Alternative Approaches Comparison

EOS vs Other Consistency Models

public class ConsistencyModelComparison {

    void compareApproaches() {
        // Kafka EOS (Two-Phase Commit)
        ConsistencyApproach kafkaEOS = ConsistencyApproach.builder()
            .guarantees("Exactly-once within Kafka scope")
            .performance("300k msgs/sec, 8ms p99 latency")
            .complexity("High - requires transaction coordination")
            .scope("Kafka producers/consumers only")
            .failureHandling("Automatic with coordinator failover")
            .build();

        // Saga Pattern (Choreography)
        ConsistencyApproach sagaChoreography = ConsistencyApproach.builder()
            .guarantees("Eventual consistency with compensation")
            .performance("500k msgs/sec, 3ms p99 latency")
            .complexity("Medium - distributed state machines")
            .scope("Cross-service, cross-system")
            .failureHandling("Manual compensation logic required")
            .build();

        // Event Sourcing + CQRS
        ConsistencyApproach eventSourcing = ConsistencyApproach.builder()
            .guarantees("Strong consistency within aggregate")
            .performance("Variable - depends on projection complexity")
            .complexity("High - requires event design and projections")
            .scope("Application-level across read/write models")
            .failureHandling("Replay capability, temporal queries")
            .build();

        // Application-Level Idempotency
        ConsistencyApproach applicationIdempotency = ConsistencyApproach.builder()
            .guarantees("Idempotent processing with duplicate detection")
            .performance("450k msgs/sec, 4ms p99 latency")
            .complexity("Medium - requires deduplication logic")
            .scope("Application-specific")
            .failureHandling("Custom retry and dedup logic")
            .build();
    }
}

This interview-focused knowledge chunk on Kafka transactions and exactly-once semantics provides the deep technical understanding needed for L8_PRINCIPAL level discussions. The content emphasizes theoretical foundations, architectural decision-making, and system design tradeoffs while providing practical implementation patterns for complex distributed systems scenarios.

See Also: [[Event_Sourcing_CQRS]], [[Microservices_Saga_Patterns]], [[Stream_Processing_Frameworks]], [[Database_Consistency_Models]]