Skip to content

Kafka Series

Kafka Architecture - Core Concepts

Deep dive into Kafka's distributed architecture, topics, partitions, brokers, and core design decisions that enable high-throughput event streaming

Core Architecture Overview

What is Kafka?

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, real-time data pipelines. It acts as a commit log service that stores and forwards messages between producers and consumers.

Key Value Proposition:

  • High Throughput: Millions of messages per second
  • Low Latency: Sub-millisecond for 99th percentile
  • Durability: Messages persisted to disk with replication
  • Scalability: Horizontal scaling across multiple machines
  • Fault Tolerance: Continues operating despite node failures

Core Components

KAFKA CORE COMPONENTS

Kafka Cluster Architecture

KAFKA CLUSTER ARCHITECTURE

Basic Data Flow

  1. Producers send messages to specific topics
  2. Topics are divided into partitions for scalability
  3. Brokers store partition data across the cluster
  4. Consumers read messages from topics/partitions
  5. Consumer Groups coordinate parallel consumption

Interview Key Points

“Why Kafka over traditional message queues?”

  • Persistence: Messages stored on disk, not just memory
  • Replay: Consumers can re-read historical messages
  • Multiple Consumers: Many consumers can read same data simultaneously
  • Ordering: Guarantees order within partitions
  • Scaling: Add partitions and brokers independently

“When would you use Kafka?”

  • Event-driven microservices communication
  • Real-time analytics and stream processing
  • Log aggregation and centralized logging
  • Change data capture (CDC) from databases
  • Activity tracking and user behavior analytics

See Also: Topics & Partitions, Brokers


Topics, Partitions & Segments

Topic Organization

A topic is a logical grouping of related messages (e.g., “user-events”, “payment-transactions”). Topics provide the primary abstraction for organizing data streams.

TOPIC ORGANIZATION

Partition Mechanics

Partitions enable:

  • Parallelism: Multiple consumers process different partitions simultaneously
  • Scalability: Add partitions to increase throughput
  • Ordering: Messages within a partition maintain strict order
  • Distribution: Partitions spread across different brokers

Partition Assignment:

// Producer determines partition via:
// 1. Explicit partition specification
producer.send(new ProducerRecord<>("topic", partition, key, value));

// 2. Key-based hashing (most common)
producer.send(new ProducerRecord<>("topic", key, value));
// partition = hash(key) % num_partitions

// 3. Round-robin (no key)
producer.send(new ProducerRecord<>("topic", value));

Segment Storage Structure

Each partition is divided into segments for efficient storage and retrieval:

PARTITION SEGMENT STRUCTURE

Segment Properties:

  • Size-based rollover: New segment when current reaches size limit
  • Time-based rollover: New segment after time period
  • Immutable: Old segments are read-only, enabling efficient caching
  • Cleanup: Old segments deleted based on retention policy

Storage Configuration Examples

Topic Creation with Partitioning Strategy:

# Create topic with strategic partition count
kafka-topics --create \
  --topic user-events \
  --partitions 12 \
  --replication-factor 3 \
  --config segment.ms=86400000 \
  --config retention.ms=604800000

# Partition count considerations:
# - Start with (target_throughput_MB/s ÷ partition_throughput_MB/s)
# - Consider consumer parallelism (max consumers = partitions)
# - Account for hot partitions with uneven key distribution

Retention Policies:

# Time-based retention (delete after 7 days)
--config retention.ms=604800000

# Size-based retention (delete when topic exceeds 10GB)
--config retention.bytes=10737418240

# Log compaction (keep latest value per key)
--config cleanup.policy=compact

# Combined: compaction + time retention
--config cleanup.policy=compact,delete
--config retention.ms=604800000

Partition Strategy Decision Framework

PARTITIONING STRATEGIES

Custom Partitioning:

public class CustomPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        // Custom logic: geographic routing, load balancing, etc.
        if (key.toString().startsWith("VIP_")) {
            return 0; // VIP users get dedicated partition
        }
        return Utils.murmur2(keyBytes) % cluster.partitionCountForTopic(topic);
    }
}

Sizing and Capacity Planning

PARTITION SIZING GUIDELINES

Performance Implications:

  • Too few partitions: Limits consumer parallelism and throughput
  • Too many partitions: Increases metadata overhead, election time
  • Sweet spot: Start conservative, monitor, and add partitions as needed

See Also: Producer Architecture, Consumer Groups


Brokers and Clustering

Broker Roles and Responsibilities

A broker is a Kafka server that stores data and serves client requests. In a cluster, brokers coordinate to provide distributed storage and fault tolerance.

BROKER RESPONSIBILITIES

Cluster Coordination: ZooKeeper vs KRaft

Traditional ZooKeeper Mode:

ZOOKEEPER MODE

Modern KRaft Mode (KIP-500):

KRAFT MODE (SELF-MANAGING)

Leader Election and Partition Leadership

PARTITION LEADERSHIP MODEL

Broker Configuration Examples

Essential Broker Settings:

# server.properties

# Unique broker identifier
broker.id=1

# Network and connectivity
listeners=PLAINTEXT://kafka1.company.com:9092
advertised.listeners=PLAINTEXT://kafka1.company.com:9092

# Storage configuration
log.dirs=/var/kafka-logs-1,/var/kafka-logs-2,/var/kafka-logs-3
num.network.threads=8
num.io.threads=16

# Replication settings
default.replication.factor=3
min.insync.replicas=2

# Log management
log.retention.hours=168
log.segment.bytes=1073741824
log.cleanup.policy=delete

# ZooKeeper connection (if not using KRaft)
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka

Production Hardening:

# JVM heap sizing (typically 6-10GB max)
export KAFKA_HEAP_OPTS="-Xmx8g -Xms8g"

# OS-level optimizations
# File descriptor limits: ulimit -n 100000
# Disable swap: vm.swappiness=1

# Network optimization
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.replica.fetchers=4

Failure Scenarios and Recovery

BROKER FAILURE HANDLING
CONTROLLER ELECTION

Cluster Expansion and Maintenance

Adding New Brokers:

# 1. Start new broker with unique broker.id
# 2. Create partition reassignment plan
kafka-reassign-partitions --generate \
  --topics-to-move-json-file topics-to-move.json \
  --broker-list "1,2,3,4" # including new broker

# 3. Execute reassignment
kafka-reassign-partitions --execute \
  --reassignment-json-file reassignment-plan.json

# 4. Verify completion
kafka-reassign-partitions --verify \
  --reassignment-json-file reassignment-plan.json

Rolling Restarts (Zero Downtime):

# For each broker:
# 1. Drain leadership (optional)
kafka-leader-election --election-type preferred \
  --all-topic-partitions

# 2. Stop broker gracefully
kafka-server-stop

# 3. Apply configuration changes
# 4. Start broker
kafka-server-start server.properties

# 5. Verify broker rejoined and caught up
# 6. Proceed to next broker

Interview Scenarios

“How does Kafka handle a broker failure?”

  1. Controller detects failure via missed heartbeats
  2. Failed broker removed from ISR for its partitions
  3. New leaders elected from remaining ISR replicas
  4. Clients automatically discover new leaders
  5. Failed broker rejoins when recovered

“What happens if the controller fails?”

  1. New controller election among remaining brokers
  2. New controller reads full cluster state
  3. Continues managing leader elections and metadata
  4. Brief pause in metadata operations during election

See Also: Replication & Consistency, Operational Considerations


Producer Architecture

Producer Request Flow

The Kafka producer is responsible for publishing messages to topics with configurable reliability, performance, and ordering guarantees.

PRODUCER INTERNAL FLOW

Batching and Performance Optimization

Batch Configuration Strategy:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");

// Batching controls
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB batches
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);            // Wait 10ms for batching
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB total buffer

// Compression (reduces network I/O)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // or snappy, gzip

// Parallel requests per connection
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
PRODUCER BATCHING TIMELINE

Acknowledgment Strategies and Reliability

ACK Configuration Levels:

// acks=0: Fire and forget (highest throughput, no durability)
props.put(ProducerConfig.ACKS_CONFIG, "0");

// acks=1: Leader acknowledgment (balanced)
props.put(ProducerConfig.ACKS_CONFIG, "1");

// acks=all: Full ISR acknowledgment (highest durability)
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MIN_IN_SYNC_REPLICAS, 2); // At broker level
RELIABILITY vs PERFORMANCE TRADEOFFS

Idempotency and Exactly-Once Semantics

Idempotent Producer Configuration:

// Enables automatic deduplication
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Idempotency requires:
// acks = all (automatically set)
// max.in.flight.requests.per.connection <= 5 (automatically set)
// retries > 0 (automatically set)

// Transactional producer (for exactly-once)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

Exactly-Once Producer Pattern:

public class ExactlyOnceProducer {
    private KafkaProducer<String, String> producer;

    public void initTransactions() {
        producer.initTransactions();
    }

    public void sendTransactionally(List<ProducerRecord<String, String>> records) {
        producer.beginTransaction();
        try {
            for (ProducerRecord<String, String> record : records) {
                producer.send(record);
            }
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

Error Handling and Retry Logic

Comprehensive Error Handling:

public class RobustProducer {
    private KafkaProducer<String, String> producer;
    private final int maxRetries = 3;

    public void sendWithRetry(ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                handleSendError(record, exception, 0);
            } else {
                log.info("Message sent to partition {} at offset {}",
                    metadata.partition(), metadata.offset());
            }
        });
    }

    private void handleSendError(ProducerRecord<String, String> record,
                                Exception exception, int attempt) {
        if (exception instanceof RetriableException && attempt < maxRetries) {
            // Automatic retry for retriable exceptions
            scheduleRetry(record, attempt + 1);
        } else if (exception instanceof SerializationException) {
            // Non-retriable: log and possibly send to DLQ
            sendToDeadLetterQueue(record, exception);
        } else {
            // Network timeout, authentication, etc.
            handleNonRetriableError(record, exception);
        }
    }
}

Producer Configuration for Reliability:

// Retry configuration
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes total

// Request timeout (per request)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds

// Connection and metadata
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
props.put(ProducerConfig.METADATA_MAX_AGE_MS_CONFIG, 300000); // Refresh topology

Message Ordering Guarantees

Ordering Configuration:

// For strict ordering within partition:
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Key-based routing ensures related messages go to same partition
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", customerId, orderData);
ORDER PRESERVATION SCENARIOS

Interview Questions and Answers

“How do you ensure exactly-once semantics?”

  1. Enable idempotent producer (enable.idempotence=true)
  2. Use transactions with transactional.id
  3. Coordinate with exactly-once consumers
  4. Handle duplicates at application level if needed

“What’s the tradeoff between throughput and durability?”

  • acks=0: Highest throughput, risk of data loss
  • acks=1: Balanced, risk if leader fails before replication
  • acks=all: Highest durability, lower throughput

“How do you handle producer failures?”

  1. Configure appropriate retries and timeouts
  2. Implement error callbacks for handling failures
  3. Use circuit breakers for downstream failures
  4. Monitor producer metrics for health

See Also: Topics & Partitions, Replication & Consistency


Consumer Groups & Offsets

Consumer Group Coordination

Consumer groups enable parallel processing and fault tolerance by coordinating multiple consumer instances to share the work of consuming from a topic’s partitions.

CONSUMER GROUP COORDINATION

Partition Assignment Strategies

RANGE ASSIGNOR (DEFAULT)
ROUND-ROBIN ASSIGNOR

Sticky Assignor (Recommended):

ROUND-ROBIN ASSIGNOR

Offset Management Strategies

Automatic Offset Management:

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds

// Offset commit timing options:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // Process message
        // Offset automatically committed every 5 seconds
    }
}

Manual Offset Management (Recommended for Production):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto-commit

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record); // Process message

            // Option 1: Commit after each message (lower throughput)
            consumer.commitSync();

        } catch (Exception e) {
            handleProcessingError(record, e);
            // Don't commit offset on error - message will be retried
        }
    }

    // Option 2: Commit batch of messages (higher throughput)
    try {
        consumer.commitSync(); // Commit all processed messages
    } catch (CommitFailedException e) {
        handleCommitError(e);
    }
}

Rebalancing Protocol and Handling

REBALANCING TRIGGERS
REBALANCING PROCESS

Rebalancing Listener Implementation:

public class RebalanceListener implements ConsumerRebalanceListener {

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.info("Partitions revoked: {}", partitions);

        // 1. Finish processing current messages
        finishCurrentWork();

        // 2. Commit current offsets
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            log.error("Failed to commit offsets during rebalance", e);
        }

        // 3. Clean up resources for revoked partitions
        partitions.forEach(this::cleanupPartition);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.info("Partitions assigned: {}", partitions);

        // 1. Initialize resources for new partitions
        partitions.forEach(this::initializePartition);

        // 2. Seek to specific offsets if needed
        for (TopicPartition partition : partitions) {
            long savedOffset = getOffsetFromDatabase(partition);
            if (savedOffset >= 0) {
                consumer.seek(partition, savedOffset);
            }
        }
    }
}

// Usage:
consumer.subscribe(Arrays.asList("user-events"), new RebalanceListener());

Consumer Configuration Best Practices

Essential Consumer Settings:

Properties props = new Properties();

// Group and client identification
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-processors");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-processor-" + instanceId);

// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Session and heartbeat configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);      // 30 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);   // 10 seconds
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 5 minutes

// Fetch configuration for performance
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);          // 1KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);         // 500ms max wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max per partition

Consumer Performance Tuning:

// Large batch processing
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // Process 1000 records per poll

// Network optimization
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 131072);   // 128KB
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);      // 128KB

// For high-throughput scenarios
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);   // Wait for 50KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);   // But no more than 100ms

Error Handling and Recovery Patterns

Retry with Dead Letter Queue:

public class RobustConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> dlqProducer;
    private final String dlqTopic;
    private final int maxRetries = 3;

    public void processRecords() {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

        for (ConsumerRecord<String, String> record : records) {
            boolean processed = false;
            int attempts = 0;

            while (!processed && attempts < maxRetries) {
                try {
                    processMessage(record);
                    processed = true;
                } catch (RetriableException e) {
                    attempts++;
                    if (attempts >= maxRetries) {
                        sendToDeadLetterQueue(record, e);
                        processed = true; // Don't retry further
                    } else {
                        waitBeforeRetry(attempts);
                    }
                } catch (NonRetriableException e) {
                    sendToDeadLetterQueue(record, e);
                    processed = true;
                }
            }
        }

        // Commit offsets only after all messages processed
        consumer.commitSync();
    }

    private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
        Headers headers = record.headers();
        headers.add("error.message", e.getMessage().getBytes());
        headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<String, String> dlqRecord =
            new ProducerRecord<>(dlqTopic, record.key(), record.value(), headers);

        dlqProducer.send(dlqRecord);
    }
}

Consumer Lag Monitoring

Key Metrics to Track:

# Consumer lag per partition
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group payment-processors

TOPIC     PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG    CONSUMER-ID
orders    0         1500           1750           250    consumer-1-abc123
orders    1         2300           2310           10     consumer-2-def456
orders    2         3100           3100           0      consumer-3-ghi789

Programmatic Lag Monitoring:

public class ConsumerLagMonitor {
    private final AdminClient adminClient;
    private final String groupId;

    public Map<TopicPartition, Long> getConsumerLag() {
        Map<TopicPartition, Long> lagMap = new HashMap<>();

        // Get consumer group offsets
        Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
            adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();

        // Get log end offsets
        Map<TopicPartition, Long> logEndOffsets =
            adminClient.getConsumer().endOffsets(consumerOffsets.keySet());

        // Calculate lag
        for (TopicPartition tp : consumerOffsets.keySet()) {
            long consumerOffset = consumerOffsets.get(tp).offset();
            long logEndOffset = logEndOffsets.get(tp);
            lagMap.put(tp, logEndOffset - consumerOffset);
        }

        return lagMap;
    }
}

Interview Scenarios

“What happens when a consumer in a group fails?”

  1. Consumer stops sending heartbeats
  2. Group coordinator triggers rebalancing after session timeout
  3. Failed consumer’s partitions redistributed to remaining consumers
  4. Remaining consumers may experience brief pause during rebalancing

“How do you handle slow consumers?”

  1. Increase max.poll.interval.ms for longer processing times
  2. Reduce max.poll.records to process smaller batches
  3. Scale horizontally by adding more consumer instances
  4. Optimize message processing logic

“Explain exactly-once consumption.”

  1. Disable auto-commit (enable.auto.commit=false)
  2. Process message and commit offset in single transaction
  3. Handle idempotency at application level for duplicates
  4. Use external storage for offset management if needed

See Also: Brokers & Clustering, Operational Considerations


Replication and Consistency

Replication Architecture

Kafka provides fault tolerance through partition replication across multiple brokers. Each partition has one leader and multiple followers, ensuring data survives broker failures.

REPLICATION ARCHITECTURE

In-Sync Replicas (ISR) Management

IN-SYNC REPLICAS (ISR)

ISR Configuration:

# Broker configuration
replica.lag.time.max.ms=30000           # 30 seconds to stay in ISR
min.insync.replicas=2                   # Minimum ISR size for writes
unclean.leader.election.enable=false    # Only ISR can become leader

# Topic-level override
kafka-configs --alter --entity-type topics --entity-name critical-topic \
  --add-config min.insync.replicas=3

Consistency Guarantees and Trade-offs

KAFKA CONSISTENCY LEVELS
CAP THEOREM IN KAFKA

Replication Configuration Examples

High Durability Configuration:

# Topic creation for financial transactions
kafka-topics --create \
  --topic financial-transactions \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=3 \
  --config unclean.leader.election.enable=false

# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

High Performance Configuration:

# Topic for metrics/logging
kafka-topics --create \
  --topic application-metrics \
  --partitions 24 \
  --replication-factor 2 \
  --config min.insync.replicas=1

# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);

Failure Scenarios and Recovery

LEADER FAILURE RECOVERY
SPLIT-BRAIN PREVENTION

Cross-Datacenter Replication

MIRRORMAKER 2.0 ARCHITECTURE

MirrorMaker Configuration:

# Source and target clusters
clusters=primary,secondary
primary.bootstrap.servers=kafka1-east:9092,kafka2-east:9092
secondary.bootstrap.servers=kafka1-west:9092,kafka2-west:9092

# Replication flows
primary->secondary.enabled=true
primary->secondary.topics=user-events,transactions,.*-logs

# Topic naming in target cluster
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Results in: primary.user-events, primary.transactions

# Sync settings
sync.topic.acls.enabled=true
sync.topic.configs.enabled=true
emit.checkpoints.enabled=true  # For exactly-once

Monitoring Replication Health

Key Replication Metrics:

# ISR size monitoring
kafka-topics --describe --topic critical-topic

Topic: critical-topic
Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3 Healthy
Partition: 1    Leader: 2    Replicas: 2,3,1    Isr: 2,3 Degraded

# Under-replicated partitions
kafka-topics --describe --under-replicated-partitions

# Preferred replica election (rebalancing)
kafka-leader-election --election-type preferred --all-topic-partitions

JMX Metrics for Replication:

// Key metrics to monitor
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions     // Should be 0
kafka.server:type=ReplicaManager,name=PartitionCount               // Partitions per broker
kafka.controller:type=KafkaController,name=OfflinePartitionsCount  // Should be 0
kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs  // Election frequency

Interview Questions

“How does Kafka ensure no data loss?”

  1. Configure acks=all for producers
  2. Set min.insync.replicas > 1 at topic/broker level
  3. Disable unclean.leader.election
  4. Monitor ISR health and handle shrinking ISRs

“What happens if all replicas fail?”

  1. Partition becomes unavailable for reads/writes
  2. If unclean.leader.election=false: Wait for ISR replica to recover
  3. If unclean.leader.election=true: Allow non-ISR replica to become leader (data loss possible)
  4. Monitor and alert on ISR size to prevent this scenario

“Explain Kafka’s consistency model.”

  • Read-after-write consistency: Consumers see writes immediately after producer acknowledgment
  • Monotonic read consistency: Consumers see messages in order within partitions
  • No cross-partition consistency: No guarantees across different partitions
  • Configurable durability: Trade-off between consistency and performance via acks

See Also: Brokers & Clustering, Producer Architecture


Performance Characteristics

Throughput and Latency Metrics

KAFKA PERFORMANCE CHARACTERISTICS
PERFORMANCE FACTORS

Scaling Patterns and Bottlenecks

SCALING DIMENSIONS

Common Bottlenecks and Solutions:

BottleneckSymptomsSolutions
Producer BatchingLow throughput, high CPUIncrease batch.size, tune linger.ms
Network I/OHigh latency, bandwidth limitsEnable compression, increase buffers
Disk I/OSlow writes, high latencyUse SSD, tune OS page cache
Consumer LagProcessing slower than ingestionScale consumers, optimize processing
ReplicationHigh latency with acks=allOptimize network, tune ISR settings
GC PausesPeriodic latency spikesTune JVM heap, use G1 collector

Configuration Tuning Guide

Producer Performance Tuning:

Properties producerProps = new Properties();

// Throughput optimization
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);        // 64KB batches
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);            // 20ms batching window
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");   // Fast compression
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728);  // 128MB buffer

// Network optimization
producerProps.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);       // 128KB
producerProps.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);     // 32KB
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// Reliability vs performance balance
producerProps.put(ProducerConfig.ACKS_CONFIG, "1");                 // Leader ack only
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);

Consumer Performance Tuning:

Properties consumerProps = new Properties();

// Fetch optimization
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);     // 50KB minimum
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);     // 100ms max wait
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max

// Processing optimization
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);     // Larger batches
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Manual commits

// Network buffers
consumerProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144);     // 256KB
consumerProps.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);        // 128KB

Broker Performance Tuning:

# server.properties

# Network threads (typically 8-16)
num.network.threads=16
num.io.threads=16

# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log segment configuration
log.segment.bytes=1073741824          # 1GB segments
log.index.size.max.bytes=10485760     # 10MB indexes

# Replication performance
num.replica.fetchers=8                # Parallel fetcher threads
replica.fetch.max.bytes=1048576       # 1MB per fetch

# Compression and cleanup
compression.type=lz4                  # Broker-side compression
log.cleanup.policy=delete
log.retention.check.interval.ms=300000

Hardware Recommendations

HARDWARE RECOMMENDATIONS

Performance Monitoring and Alerting

Key Performance Metrics:

# Throughput monitoring
kafka-consumer-perf-test --topic test-topic --bootstrap-server localhost:9092

# Producer performance test
kafka-producer-perf-test --topic test-topic --num-records 1000000 \
  --record-size 1024 --throughput 100000

# JMX metrics to monitor
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.server:type=ReplicaManager,name=LeaderCount

Production Monitoring Dashboard:

Key Metrics to Dashboard:
  - Throughput: Messages/sec, Bytes/sec per topic
  - Latency: 99th percentile produce/consume latency
  - Consumer Lag: Per partition and total
  - Broker Health: CPU, memory, disk usage
  - Replication: ISR shrinking, under-replicated partitions
  - Network: Bandwidth utilization, request rate

Interview Performance Questions

“How do you optimize Kafka for high throughput?”

  1. Increase batch sizes and enable compression
  2. Tune linger.ms for better batching
  3. Scale partitions and consumers horizontally
  4. Use appropriate hardware (SSD, high bandwidth)
  5. Monitor and eliminate bottlenecks systematically

“What causes latency in Kafka?”

  1. Producer batching (linger.ms setting)
  2. Network RTT between components
  3. Disk sync behavior and storage performance
  4. Replication overhead with acks=all
  5. JVM garbage collection pauses

“How do you handle traffic spikes?”

  1. Over-provision partitions for scaling headroom
  2. Implement producer-side backpressure
  3. Use consumer auto-scaling based on lag metrics
  4. Pre-warm page caches during off-peak hours
  5. Monitor leading indicators (queue depth, latency)

See Also: Producer Architecture, Consumer Groups, Operational Considerations


Operational Considerations

Essential Monitoring Metrics

MONITORING ALERTS

Monitoring Implementation:

// JMX metric collection example
public class KafkaMetricsCollector {
    private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();

    public Map<String, Object> getClusterHealth() {
        Map<String, Object> metrics = new HashMap<>();

        // Under-replicated partitions (critical)
        ObjectName underReplicatedName = new ObjectName(
            "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
        metrics.put("underReplicatedPartitions",
            server.getAttribute(underReplicatedName, "Value"));

        // Offline partitions (critical)
        ObjectName offlinePartitionsName = new ObjectName(
            "kafka.controller:type=KafkaController,name=OfflinePartitionsCount");
        metrics.put("offlinePartitions",
            server.getAttribute(offlinePartitionsName, "Value"));

        // Producer request rate
        ObjectName produceRequestName = new ObjectName(
            "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
        metrics.put("produceRequestRate",
            server.getAttribute(produceRequestName, "OneMinuteRate"));

        return metrics;
    }
}

Troubleshooting Common Issues

CONSUMER LAG DIAGNOSIS
DISK SPACE EXHAUSTION
FREQUENT LEADER ELECTIONS

Deployment and Configuration Management

Zero-Downtime Deployment Process:

#!/bin/bash
# Rolling update script

BROKERS=("broker1" "broker2" "broker3")
NEW_VERSION="2.8.0"

for broker in "${BROKERS[@]}"; do
    echo "Updating $broker..."

    # 1. Gracefully shut down broker
    ssh $broker "kafka-server-stop.sh"

    # 2. Wait for partition leadership migration
    sleep 30

    # 3. Update Kafka binaries
    ssh $broker "tar -xzf kafka-${NEW_VERSION}.tgz -C /opt/"

    # 4. Start broker with new version
    ssh $broker "kafka-server-start.sh -daemon server.properties"

    # 5. Verify broker rejoined cluster
    kafka-broker-api-versions.sh --bootstrap-server $broker:9092

    # 6. Wait before proceeding to next broker
    sleep 60
done

Configuration Management Best Practices:

# server.properties template with environment variables

broker.id=${BROKER_ID}
listeners=${LISTENERS}
log.dirs=${LOG_DIRS}

# Environment-specific settings
default.replication.factor=${REPLICATION_FACTOR:-3}
min.insync.replicas=${MIN_ISR:-2}

# Security configuration
security.protocol=${SECURITY_PROTOCOL:-PLAINTEXT}
ssl.keystore.location=${SSL_KEYSTORE_PATH}
ssl.truststore.location=${SSL_TRUSTSTORE_PATH}

# Monitoring integration
jmx.port=${JMX_PORT:-9999}

Backup and Disaster Recovery

Backup Strategies:

# 1. Topic configuration backup
kafka-topics --list --bootstrap-server localhost:9092 > topics.txt
while read topic; do
    kafka-configs --describe --entity-type topics --entity-name $topic \
      --bootstrap-server localhost:9092 >> topic-configs.txt
done < topics.txt

# 2. Consumer group offset backup
kafka-consumer-groups --list --bootstrap-server localhost:9092 > groups.txt
while read group; do
    kafka-consumer-groups --describe --group $group \
      --bootstrap-server localhost:9092 >> group-offsets.txt
done < groups.txt

# 3. Metadata backup (ZooKeeper)
zkCli.sh -server zk1:2181 <<EOF
ls /brokers
ls /config
ls /controller
EOF

Cross-Region Disaster Recovery:

DR Setup with MirrorMaker 2.0:

PRIMARY_REGION (us-east):
├── Kafka Cluster (3 brokers)
├── Application producers/consumers
└── MirrorMaker 2.0 → DR_REGION

DR_REGION (us-west):
├── Kafka Cluster (3 brokers)
├── Standby applications (read-only)
├── Mirrored topics: primary.topic-name
└── Offset sync for consumer failover

FAILOVER_PROCESS:
1. Stop applications in primary region
2. Switch DNS/load balancers to DR region
3. Start applications in DR region
4. Begin reverse replication (DR → PRIMARY)

Security Operations

Authentication and Authorization Setup:

# Enable SASL/SCRAM authentication
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# SSL configuration
ssl.client.auth=required
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks

# ACL authorization
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka;User:admin

Common Security Operations:

# Create SCRAM user
kafka-configs --alter --add-config 'SCRAM-SHA-256=[password=secret]' \
  --entity-type users --entity-name alice

# Grant ACLs
kafka-acls --add --allow-principal User:alice \
  --operation Read --operation Write --topic payments

# List current ACLs
kafka-acls --list --principal User:alice

# Rotate SSL certificates
# 1. Generate new certificates
# 2. Add to truststore
# 3. Rolling restart with new keystore
# 4. Remove old certificates

Capacity Planning and Growth Management

Capacity Planning Framework:

# Capacity calculation example
def calculate_kafka_capacity(requirements):
    daily_messages = requirements['daily_messages']
    message_size = requirements['avg_message_size_kb']
    retention_days = requirements['retention_days']
    replication_factor = requirements['replication_factor']

    # Storage calculation
    daily_storage_gb = (daily_messages * message_size) / (1024 * 1024)
    total_storage_gb = daily_storage_gb * retention_days * replication_factor

    # Add safety margin
    recommended_storage = total_storage_gb * 1.5

    # Partition calculation
    target_throughput_mb = daily_storage_gb / (24 * 3600) # MB/s
    partition_throughput_mb = 25  # Conservative estimate
    min_partitions = max(target_throughput_mb / partition_throughput_mb,
                        requirements.get('min_consumers', 1))

    return {
        'storage_gb': recommended_storage,
        'partitions': min_partitions,
        'brokers': max(3, min_partitions // 100)  # 100 partitions per broker
    }

Growth Management:

# Adding partitions (irreversible!)
kafka-topics --alter --topic user-events --partitions 20

# Adding brokers and rebalancing
# 1. Start new brokers
# 2. Generate reassignment plan
kafka-reassign-partitions --generate \
  --topics-to-move-json-file all-topics.json \
  --broker-list "1,2,3,4,5" # Include new brokers

# 3. Execute rebalancing (gradual)
kafka-reassign-partitions --execute \
  --reassignment-json-file expand-cluster-reassignment.json \
  --throttle 50000000  # 50MB/s throttle

Interview Operational Questions

“How do you monitor Kafka in production?”

  1. JMX metrics collection (broker, producer, consumer metrics)
  2. Consumer lag monitoring and alerting
  3. Cluster health checks (ISR, offline partitions)
  4. Resource monitoring (CPU, memory, disk, network)
  5. End-to-end latency and throughput monitoring

“Describe your disaster recovery plan for Kafka.”

  1. Cross-region replication with MirrorMaker 2.0
  2. Regular metadata and configuration backups
  3. Automated failover procedures with DNS switching
  4. Consumer offset synchronization for seamless failover
  5. Regular DR testing and runbook maintenance

“How do you handle a failed broker?”

  1. Immediate: Check if partitions are still available via ISR
  2. Short-term: Allow automatic leader election and recovery
  3. Investigation: Identify root cause (hardware, network, configuration)
  4. Recovery: Fix underlying issue and restart broker
  5. Verification: Confirm broker rejoined cluster and replicated data

See Also: Performance Characteristics, Replication & Consistency


Summary and Integration

Key Takeaways

Kafka’s Core Strengths:

  1. High Throughput: Millions of messages per second through batching and sequential I/O
  2. Fault Tolerance: Replication and ISR management ensure data durability
  3. Scalability: Horizontal scaling via partitions and brokers
  4. Durability: Configurable persistence with acks and min.insync.replicas
  5. Ordering: Guarantees within partition boundaries

Critical Design Decisions:

  • Partitioning Strategy: Impacts parallelism, ordering, and scaling
  • Replication Factor: Balances durability with performance and storage costs
  • Producer ACKs: Trades durability guarantees for latency and throughput
  • Consumer Group Design: Affects processing parallelism and failure handling

System Design Integration Points

When to Choose Kafka:

  • High-throughput event streaming (>100k messages/sec)
  • Event-driven microservices architecture
  • Real-time analytics and stream processing
  • Log aggregation and centralized logging
  • Change data capture from databases

Kafka vs Alternatives:

  • RabbitMQ: Better for complex routing, lower throughput requirements
  • AWS SQS: Managed service, simpler operations, vendor lock-in
  • Apache Pulsar: Multi-tenancy, geo-replication, newer ecosystem
  • Redis Streams: Lower latency, simpler deployment, less durability

Production Readiness Checklist

✓ Infrastructure:

  • Multi-broker cluster (minimum 3 brokers)
  • Separate ZooKeeper/KRaft cluster
  • SSD storage with adequate IOPS
  • Network bandwidth planning (10Gbps+)
  • Security setup (SSL, SASL, ACLs)

✓ Configuration:

  • Appropriate replication factors (3+ for critical data)
  • min.insync.replicas configured
  • Retention policies aligned with business needs
  • JVM tuning (heap size, GC collector)
  • OS-level optimizations (file descriptors, swappiness)

✓ Monitoring:

  • JMX metrics collection and dashboards
  • Consumer lag monitoring and alerting
  • Cluster health checks automation
  • End-to-end latency monitoring
  • Capacity and growth trend analysis

✓ Operations:

  • Deployment automation and rolling updates
  • Backup and disaster recovery procedures
  • Incident response runbooks
  • Performance baseline and SLAs defined
  • Team training and knowledge transfer

This knowledge chunk provides the foundation for implementing, operating, and scaling Kafka in production environments while being prepared for senior-level technical discussions and system design interviews.

Total Study Time: 45 minutes Next Steps: Practice system design scenarios, implement hands-on labs, explore advanced topics like exactly-once semantics and stream processing frameworks.

Concepts covered in this article

Table of Contents