Kafka Architecture - Core Concepts
Deep dive into Kafka's distributed architecture, topics, partitions, brokers, and core design decisions that enable high-throughput event streaming
Core Architecture Overview
What is Kafka?
Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, real-time data pipelines. It acts as a commit log service that stores and forwards messages between producers and consumers.
Key Value Proposition:
- High Throughput: Millions of messages per second
- Low Latency: Sub-millisecond for 99th percentile
- Durability: Messages persisted to disk with replication
- Scalability: Horizontal scaling across multiple machines
- Fault Tolerance: Continues operating despite node failures
Core Components
Kafka Cluster Architecture
Basic Data Flow
- Producers send messages to specific topics
- Topics are divided into partitions for scalability
- Brokers store partition data across the cluster
- Consumers read messages from topics/partitions
- Consumer Groups coordinate parallel consumption
Interview Key Points
“Why Kafka over traditional message queues?”
- Persistence: Messages stored on disk, not just memory
- Replay: Consumers can re-read historical messages
- Multiple Consumers: Many consumers can read same data simultaneously
- Ordering: Guarantees order within partitions
- Scaling: Add partitions and brokers independently
“When would you use Kafka?”
- Event-driven microservices communication
- Real-time analytics and stream processing
- Log aggregation and centralized logging
- Change data capture (CDC) from databases
- Activity tracking and user behavior analytics
See Also: Topics & Partitions, Brokers
Topics, Partitions & Segments
Topic Organization
A topic is a logical grouping of related messages (e.g., “user-events”, “payment-transactions”). Topics provide the primary abstraction for organizing data streams.
Partition Mechanics
Partitions enable:
- Parallelism: Multiple consumers process different partitions simultaneously
- Scalability: Add partitions to increase throughput
- Ordering: Messages within a partition maintain strict order
- Distribution: Partitions spread across different brokers
Partition Assignment:
// Producer determines partition via:
// 1. Explicit partition specification
producer.send(new ProducerRecord<>("topic", partition, key, value));
// 2. Key-based hashing (most common)
producer.send(new ProducerRecord<>("topic", key, value));
// partition = hash(key) % num_partitions
// 3. Round-robin (no key)
producer.send(new ProducerRecord<>("topic", value));
Segment Storage Structure
Each partition is divided into segments for efficient storage and retrieval:
Segment Properties:
- Size-based rollover: New segment when current reaches size limit
- Time-based rollover: New segment after time period
- Immutable: Old segments are read-only, enabling efficient caching
- Cleanup: Old segments deleted based on retention policy
Storage Configuration Examples
Topic Creation with Partitioning Strategy:
# Create topic with strategic partition count
kafka-topics --create \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config segment.ms=86400000 \
--config retention.ms=604800000
# Partition count considerations:
# - Start with (target_throughput_MB/s ÷ partition_throughput_MB/s)
# - Consider consumer parallelism (max consumers = partitions)
# - Account for hot partitions with uneven key distribution
Retention Policies:
# Time-based retention (delete after 7 days)
--config retention.ms=604800000
# Size-based retention (delete when topic exceeds 10GB)
--config retention.bytes=10737418240
# Log compaction (keep latest value per key)
--config cleanup.policy=compact
# Combined: compaction + time retention
--config cleanup.policy=compact,delete
--config retention.ms=604800000
Partition Strategy Decision Framework
Custom Partitioning:
public class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// Custom logic: geographic routing, load balancing, etc.
if (key.toString().startsWith("VIP_")) {
return 0; // VIP users get dedicated partition
}
return Utils.murmur2(keyBytes) % cluster.partitionCountForTopic(topic);
}
}
Sizing and Capacity Planning
Performance Implications:
- Too few partitions: Limits consumer parallelism and throughput
- Too many partitions: Increases metadata overhead, election time
- Sweet spot: Start conservative, monitor, and add partitions as needed
See Also: Producer Architecture, Consumer Groups
Brokers and Clustering
Broker Roles and Responsibilities
A broker is a Kafka server that stores data and serves client requests. In a cluster, brokers coordinate to provide distributed storage and fault tolerance.
Cluster Coordination: ZooKeeper vs KRaft
Traditional ZooKeeper Mode:
Modern KRaft Mode (KIP-500):
Leader Election and Partition Leadership
Broker Configuration Examples
Essential Broker Settings:
# server.properties
# Unique broker identifier
broker.id=1
# Network and connectivity
listeners=PLAINTEXT://kafka1.company.com:9092
advertised.listeners=PLAINTEXT://kafka1.company.com:9092
# Storage configuration
log.dirs=/var/kafka-logs-1,/var/kafka-logs-2,/var/kafka-logs-3
num.network.threads=8
num.io.threads=16
# Replication settings
default.replication.factor=3
min.insync.replicas=2
# Log management
log.retention.hours=168
log.segment.bytes=1073741824
log.cleanup.policy=delete
# ZooKeeper connection (if not using KRaft)
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka
Production Hardening:
# JVM heap sizing (typically 6-10GB max)
export KAFKA_HEAP_OPTS="-Xmx8g -Xms8g"
# OS-level optimizations
# File descriptor limits: ulimit -n 100000
# Disable swap: vm.swappiness=1
# Network optimization
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.replica.fetchers=4
Failure Scenarios and Recovery
Cluster Expansion and Maintenance
Adding New Brokers:
# 1. Start new broker with unique broker.id
# 2. Create partition reassignment plan
kafka-reassign-partitions --generate \
--topics-to-move-json-file topics-to-move.json \
--broker-list "1,2,3,4" # including new broker
# 3. Execute reassignment
kafka-reassign-partitions --execute \
--reassignment-json-file reassignment-plan.json
# 4. Verify completion
kafka-reassign-partitions --verify \
--reassignment-json-file reassignment-plan.json
Rolling Restarts (Zero Downtime):
# For each broker:
# 1. Drain leadership (optional)
kafka-leader-election --election-type preferred \
--all-topic-partitions
# 2. Stop broker gracefully
kafka-server-stop
# 3. Apply configuration changes
# 4. Start broker
kafka-server-start server.properties
# 5. Verify broker rejoined and caught up
# 6. Proceed to next broker
Interview Scenarios
“How does Kafka handle a broker failure?”
- Controller detects failure via missed heartbeats
- Failed broker removed from ISR for its partitions
- New leaders elected from remaining ISR replicas
- Clients automatically discover new leaders
- Failed broker rejoins when recovered
“What happens if the controller fails?”
- New controller election among remaining brokers
- New controller reads full cluster state
- Continues managing leader elections and metadata
- Brief pause in metadata operations during election
See Also: Replication & Consistency, Operational Considerations
Producer Architecture
Producer Request Flow
The Kafka producer is responsible for publishing messages to topics with configurable reliability, performance, and ordering guarantees.
Batching and Performance Optimization
Batch Configuration Strategy:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// Batching controls
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB batches
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Wait 10ms for batching
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB total buffer
// Compression (reduces network I/O)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // or snappy, gzip
// Parallel requests per connection
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
Acknowledgment Strategies and Reliability
ACK Configuration Levels:
// acks=0: Fire and forget (highest throughput, no durability)
props.put(ProducerConfig.ACKS_CONFIG, "0");
// acks=1: Leader acknowledgment (balanced)
props.put(ProducerConfig.ACKS_CONFIG, "1");
// acks=all: Full ISR acknowledgment (highest durability)
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MIN_IN_SYNC_REPLICAS, 2); // At broker level
Idempotency and Exactly-Once Semantics
Idempotent Producer Configuration:
// Enables automatic deduplication
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Idempotency requires:
// acks = all (automatically set)
// max.in.flight.requests.per.connection <= 5 (automatically set)
// retries > 0 (automatically set)
// Transactional producer (for exactly-once)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
Exactly-Once Producer Pattern:
public class ExactlyOnceProducer {
private KafkaProducer<String, String> producer;
public void initTransactions() {
producer.initTransactions();
}
public void sendTransactionally(List<ProducerRecord<String, String>> records) {
producer.beginTransaction();
try {
for (ProducerRecord<String, String> record : records) {
producer.send(record);
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
}
Error Handling and Retry Logic
Comprehensive Error Handling:
public class RobustProducer {
private KafkaProducer<String, String> producer;
private final int maxRetries = 3;
public void sendWithRetry(ProducerRecord<String, String> record) {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleSendError(record, exception, 0);
} else {
log.info("Message sent to partition {} at offset {}",
metadata.partition(), metadata.offset());
}
});
}
private void handleSendError(ProducerRecord<String, String> record,
Exception exception, int attempt) {
if (exception instanceof RetriableException && attempt < maxRetries) {
// Automatic retry for retriable exceptions
scheduleRetry(record, attempt + 1);
} else if (exception instanceof SerializationException) {
// Non-retriable: log and possibly send to DLQ
sendToDeadLetterQueue(record, exception);
} else {
// Network timeout, authentication, etc.
handleNonRetriableError(record, exception);
}
}
}
Producer Configuration for Reliability:
// Retry configuration
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes total
// Request timeout (per request)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
// Connection and metadata
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
props.put(ProducerConfig.METADATA_MAX_AGE_MS_CONFIG, 300000); // Refresh topology
Message Ordering Guarantees
Ordering Configuration:
// For strict ordering within partition:
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Key-based routing ensures related messages go to same partition
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", customerId, orderData);
Interview Questions and Answers
“How do you ensure exactly-once semantics?”
- Enable idempotent producer (
enable.idempotence=true) - Use transactions with
transactional.id - Coordinate with exactly-once consumers
- Handle duplicates at application level if needed
“What’s the tradeoff between throughput and durability?”
acks=0: Highest throughput, risk of data lossacks=1: Balanced, risk if leader fails before replicationacks=all: Highest durability, lower throughput
“How do you handle producer failures?”
- Configure appropriate retries and timeouts
- Implement error callbacks for handling failures
- Use circuit breakers for downstream failures
- Monitor producer metrics for health
See Also: Topics & Partitions, Replication & Consistency
Consumer Groups & Offsets
Consumer Group Coordination
Consumer groups enable parallel processing and fault tolerance by coordinating multiple consumer instances to share the work of consuming from a topic’s partitions.
Partition Assignment Strategies
Sticky Assignor (Recommended):
Offset Management Strategies
Automatic Offset Management:
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds
// Offset commit timing options:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // Process message
// Offset automatically committed every 5 seconds
}
}
Manual Offset Management (Recommended for Production):
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto-commit
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // Process message
// Option 1: Commit after each message (lower throughput)
consumer.commitSync();
} catch (Exception e) {
handleProcessingError(record, e);
// Don't commit offset on error - message will be retried
}
}
// Option 2: Commit batch of messages (higher throughput)
try {
consumer.commitSync(); // Commit all processed messages
} catch (CommitFailedException e) {
handleCommitError(e);
}
}
Rebalancing Protocol and Handling
Rebalancing Listener Implementation:
public class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("Partitions revoked: {}", partitions);
// 1. Finish processing current messages
finishCurrentWork();
// 2. Commit current offsets
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("Failed to commit offsets during rebalance", e);
}
// 3. Clean up resources for revoked partitions
partitions.forEach(this::cleanupPartition);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: {}", partitions);
// 1. Initialize resources for new partitions
partitions.forEach(this::initializePartition);
// 2. Seek to specific offsets if needed
for (TopicPartition partition : partitions) {
long savedOffset = getOffsetFromDatabase(partition);
if (savedOffset >= 0) {
consumer.seek(partition, savedOffset);
}
}
}
}
// Usage:
consumer.subscribe(Arrays.asList("user-events"), new RebalanceListener());
Consumer Configuration Best Practices
Essential Consumer Settings:
Properties props = new Properties();
// Group and client identification
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-processors");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-processor-" + instanceId);
// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session and heartbeat configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10 seconds
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes
// Fetch configuration for performance
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms max wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max per partition
Consumer Performance Tuning:
// Large batch processing
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // Process 1000 records per poll
// Network optimization
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 131072); // 128KB
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
// For high-throughput scenarios
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000); // Wait for 50KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // But no more than 100ms
Error Handling and Recovery Patterns
Retry with Dead Letter Queue:
public class RobustConsumer {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic;
private final int maxRetries = 3;
public void processRecords() {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
boolean processed = false;
int attempts = 0;
while (!processed && attempts < maxRetries) {
try {
processMessage(record);
processed = true;
} catch (RetriableException e) {
attempts++;
if (attempts >= maxRetries) {
sendToDeadLetterQueue(record, e);
processed = true; // Don't retry further
} else {
waitBeforeRetry(attempts);
}
} catch (NonRetriableException e) {
sendToDeadLetterQueue(record, e);
processed = true;
}
}
}
// Commit offsets only after all messages processed
consumer.commitSync();
}
private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
Headers headers = record.headers();
headers.add("error.message", e.getMessage().getBytes());
headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>(dlqTopic, record.key(), record.value(), headers);
dlqProducer.send(dlqRecord);
}
}
Consumer Lag Monitoring
Key Metrics to Track:
# Consumer lag per partition
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group payment-processors
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
orders 0 1500 1750 250 consumer-1-abc123
orders 1 2300 2310 10 consumer-2-def456
orders 2 3100 3100 0 consumer-3-ghi789
Programmatic Lag Monitoring:
public class ConsumerLagMonitor {
private final AdminClient adminClient;
private final String groupId;
public Map<TopicPartition, Long> getConsumerLag() {
Map<TopicPartition, Long> lagMap = new HashMap<>();
// Get consumer group offsets
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// Get log end offsets
Map<TopicPartition, Long> logEndOffsets =
adminClient.getConsumer().endOffsets(consumerOffsets.keySet());
// Calculate lag
for (TopicPartition tp : consumerOffsets.keySet()) {
long consumerOffset = consumerOffsets.get(tp).offset();
long logEndOffset = logEndOffsets.get(tp);
lagMap.put(tp, logEndOffset - consumerOffset);
}
return lagMap;
}
}
Interview Scenarios
“What happens when a consumer in a group fails?”
- Consumer stops sending heartbeats
- Group coordinator triggers rebalancing after session timeout
- Failed consumer’s partitions redistributed to remaining consumers
- Remaining consumers may experience brief pause during rebalancing
“How do you handle slow consumers?”
- Increase
max.poll.interval.msfor longer processing times - Reduce
max.poll.recordsto process smaller batches - Scale horizontally by adding more consumer instances
- Optimize message processing logic
“Explain exactly-once consumption.”
- Disable auto-commit (
enable.auto.commit=false) - Process message and commit offset in single transaction
- Handle idempotency at application level for duplicates
- Use external storage for offset management if needed
See Also: Brokers & Clustering, Operational Considerations
Replication and Consistency
Replication Architecture
Kafka provides fault tolerance through partition replication across multiple brokers. Each partition has one leader and multiple followers, ensuring data survives broker failures.
In-Sync Replicas (ISR) Management
ISR Configuration:
# Broker configuration
replica.lag.time.max.ms=30000 # 30 seconds to stay in ISR
min.insync.replicas=2 # Minimum ISR size for writes
unclean.leader.election.enable=false # Only ISR can become leader
# Topic-level override
kafka-configs --alter --entity-type topics --entity-name critical-topic \
--add-config min.insync.replicas=3
Consistency Guarantees and Trade-offs
Replication Configuration Examples
High Durability Configuration:
# Topic creation for financial transactions
kafka-topics --create \
--topic financial-transactions \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=3 \
--config unclean.leader.election.enable=false
# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
High Performance Configuration:
# Topic for metrics/logging
kafka-topics --create \
--topic application-metrics \
--partitions 24 \
--replication-factor 2 \
--config min.insync.replicas=1
# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
Failure Scenarios and Recovery
Cross-Datacenter Replication
MirrorMaker Configuration:
# Source and target clusters
clusters=primary,secondary
primary.bootstrap.servers=kafka1-east:9092,kafka2-east:9092
secondary.bootstrap.servers=kafka1-west:9092,kafka2-west:9092
# Replication flows
primary->secondary.enabled=true
primary->secondary.topics=user-events,transactions,.*-logs
# Topic naming in target cluster
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Results in: primary.user-events, primary.transactions
# Sync settings
sync.topic.acls.enabled=true
sync.topic.configs.enabled=true
emit.checkpoints.enabled=true # For exactly-once
Monitoring Replication Health
Key Replication Metrics:
# ISR size monitoring
kafka-topics --describe --topic critical-topic
Topic: critical-topic
Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 ← Healthy
Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3 ← Degraded
# Under-replicated partitions
kafka-topics --describe --under-replicated-partitions
# Preferred replica election (rebalancing)
kafka-leader-election --election-type preferred --all-topic-partitions
JMX Metrics for Replication:
// Key metrics to monitor
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions // Should be 0
kafka.server:type=ReplicaManager,name=PartitionCount // Partitions per broker
kafka.controller:type=KafkaController,name=OfflinePartitionsCount // Should be 0
kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs // Election frequency
Interview Questions
“How does Kafka ensure no data loss?”
- Configure
acks=allfor producers - Set
min.insync.replicas > 1at topic/broker level - Disable
unclean.leader.election - Monitor ISR health and handle shrinking ISRs
“What happens if all replicas fail?”
- Partition becomes unavailable for reads/writes
- If
unclean.leader.election=false: Wait for ISR replica to recover - If
unclean.leader.election=true: Allow non-ISR replica to become leader (data loss possible) - Monitor and alert on ISR size to prevent this scenario
“Explain Kafka’s consistency model.”
- Read-after-write consistency: Consumers see writes immediately after producer acknowledgment
- Monotonic read consistency: Consumers see messages in order within partitions
- No cross-partition consistency: No guarantees across different partitions
- Configurable durability: Trade-off between consistency and performance via
acks
See Also: Brokers & Clustering, Producer Architecture
Performance Characteristics
Throughput and Latency Metrics
Scaling Patterns and Bottlenecks
Common Bottlenecks and Solutions:
| Bottleneck | Symptoms | Solutions |
|---|---|---|
| Producer Batching | Low throughput, high CPU | Increase batch.size, tune linger.ms |
| Network I/O | High latency, bandwidth limits | Enable compression, increase buffers |
| Disk I/O | Slow writes, high latency | Use SSD, tune OS page cache |
| Consumer Lag | Processing slower than ingestion | Scale consumers, optimize processing |
| Replication | High latency with acks=all | Optimize network, tune ISR settings |
| GC Pauses | Periodic latency spikes | Tune JVM heap, use G1 collector |
Configuration Tuning Guide
Producer Performance Tuning:
Properties producerProps = new Properties();
// Throughput optimization
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB batches
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 20ms batching window
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB buffer
// Network optimization
producerProps.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
producerProps.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768); // 32KB
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Reliability vs performance balance
producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader ack only
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
Consumer Performance Tuning:
Properties consumerProps = new Properties();
// Fetch optimization
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000); // 50KB minimum
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 100ms max wait
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max
// Processing optimization
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000); // Larger batches
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commits
// Network buffers
consumerProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144); // 256KB
consumerProps.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
Broker Performance Tuning:
# server.properties
# Network threads (typically 8-16)
num.network.threads=16
num.io.threads=16
# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log segment configuration
log.segment.bytes=1073741824 # 1GB segments
log.index.size.max.bytes=10485760 # 10MB indexes
# Replication performance
num.replica.fetchers=8 # Parallel fetcher threads
replica.fetch.max.bytes=1048576 # 1MB per fetch
# Compression and cleanup
compression.type=lz4 # Broker-side compression
log.cleanup.policy=delete
log.retention.check.interval.ms=300000
Hardware Recommendations
Performance Monitoring and Alerting
Key Performance Metrics:
# Throughput monitoring
kafka-consumer-perf-test --topic test-topic --bootstrap-server localhost:9092
# Producer performance test
kafka-producer-perf-test --topic test-topic --num-records 1000000 \
--record-size 1024 --throughput 100000
# JMX metrics to monitor
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.server:type=ReplicaManager,name=LeaderCount
Production Monitoring Dashboard:
Key Metrics to Dashboard:
- Throughput: Messages/sec, Bytes/sec per topic
- Latency: 99th percentile produce/consume latency
- Consumer Lag: Per partition and total
- Broker Health: CPU, memory, disk usage
- Replication: ISR shrinking, under-replicated partitions
- Network: Bandwidth utilization, request rate
Interview Performance Questions
“How do you optimize Kafka for high throughput?”
- Increase batch sizes and enable compression
- Tune linger.ms for better batching
- Scale partitions and consumers horizontally
- Use appropriate hardware (SSD, high bandwidth)
- Monitor and eliminate bottlenecks systematically
“What causes latency in Kafka?”
- Producer batching (
linger.mssetting) - Network RTT between components
- Disk sync behavior and storage performance
- Replication overhead with
acks=all - JVM garbage collection pauses
“How do you handle traffic spikes?”
- Over-provision partitions for scaling headroom
- Implement producer-side backpressure
- Use consumer auto-scaling based on lag metrics
- Pre-warm page caches during off-peak hours
- Monitor leading indicators (queue depth, latency)
See Also: Producer Architecture, Consumer Groups, Operational Considerations
Operational Considerations
Essential Monitoring Metrics
Monitoring Implementation:
// JMX metric collection example
public class KafkaMetricsCollector {
private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
public Map<String, Object> getClusterHealth() {
Map<String, Object> metrics = new HashMap<>();
// Under-replicated partitions (critical)
ObjectName underReplicatedName = new ObjectName(
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
metrics.put("underReplicatedPartitions",
server.getAttribute(underReplicatedName, "Value"));
// Offline partitions (critical)
ObjectName offlinePartitionsName = new ObjectName(
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount");
metrics.put("offlinePartitions",
server.getAttribute(offlinePartitionsName, "Value"));
// Producer request rate
ObjectName produceRequestName = new ObjectName(
"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
metrics.put("produceRequestRate",
server.getAttribute(produceRequestName, "OneMinuteRate"));
return metrics;
}
}
Troubleshooting Common Issues
Deployment and Configuration Management
Zero-Downtime Deployment Process:
#!/bin/bash
# Rolling update script
BROKERS=("broker1" "broker2" "broker3")
NEW_VERSION="2.8.0"
for broker in "${BROKERS[@]}"; do
echo "Updating $broker..."
# 1. Gracefully shut down broker
ssh $broker "kafka-server-stop.sh"
# 2. Wait for partition leadership migration
sleep 30
# 3. Update Kafka binaries
ssh $broker "tar -xzf kafka-${NEW_VERSION}.tgz -C /opt/"
# 4. Start broker with new version
ssh $broker "kafka-server-start.sh -daemon server.properties"
# 5. Verify broker rejoined cluster
kafka-broker-api-versions.sh --bootstrap-server $broker:9092
# 6. Wait before proceeding to next broker
sleep 60
done
Configuration Management Best Practices:
# server.properties template with environment variables
broker.id=${BROKER_ID}
listeners=${LISTENERS}
log.dirs=${LOG_DIRS}
# Environment-specific settings
default.replication.factor=${REPLICATION_FACTOR:-3}
min.insync.replicas=${MIN_ISR:-2}
# Security configuration
security.protocol=${SECURITY_PROTOCOL:-PLAINTEXT}
ssl.keystore.location=${SSL_KEYSTORE_PATH}
ssl.truststore.location=${SSL_TRUSTSTORE_PATH}
# Monitoring integration
jmx.port=${JMX_PORT:-9999}
Backup and Disaster Recovery
Backup Strategies:
# 1. Topic configuration backup
kafka-topics --list --bootstrap-server localhost:9092 > topics.txt
while read topic; do
kafka-configs --describe --entity-type topics --entity-name $topic \
--bootstrap-server localhost:9092 >> topic-configs.txt
done < topics.txt
# 2. Consumer group offset backup
kafka-consumer-groups --list --bootstrap-server localhost:9092 > groups.txt
while read group; do
kafka-consumer-groups --describe --group $group \
--bootstrap-server localhost:9092 >> group-offsets.txt
done < groups.txt
# 3. Metadata backup (ZooKeeper)
zkCli.sh -server zk1:2181 <<EOF
ls /brokers
ls /config
ls /controller
EOF
Cross-Region Disaster Recovery:
DR Setup with MirrorMaker 2.0:
PRIMARY_REGION (us-east):
├── Kafka Cluster (3 brokers)
├── Application producers/consumers
└── MirrorMaker 2.0 → DR_REGION
DR_REGION (us-west):
├── Kafka Cluster (3 brokers)
├── Standby applications (read-only)
├── Mirrored topics: primary.topic-name
└── Offset sync for consumer failover
FAILOVER_PROCESS:
1. Stop applications in primary region
2. Switch DNS/load balancers to DR region
3. Start applications in DR region
4. Begin reverse replication (DR → PRIMARY)
Security Operations
Authentication and Authorization Setup:
# Enable SASL/SCRAM authentication
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# SSL configuration
ssl.client.auth=required
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
# ACL authorization
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka;User:admin
Common Security Operations:
# Create SCRAM user
kafka-configs --alter --add-config 'SCRAM-SHA-256=[password=secret]' \
--entity-type users --entity-name alice
# Grant ACLs
kafka-acls --add --allow-principal User:alice \
--operation Read --operation Write --topic payments
# List current ACLs
kafka-acls --list --principal User:alice
# Rotate SSL certificates
# 1. Generate new certificates
# 2. Add to truststore
# 3. Rolling restart with new keystore
# 4. Remove old certificates
Capacity Planning and Growth Management
Capacity Planning Framework:
# Capacity calculation example
def calculate_kafka_capacity(requirements):
daily_messages = requirements['daily_messages']
message_size = requirements['avg_message_size_kb']
retention_days = requirements['retention_days']
replication_factor = requirements['replication_factor']
# Storage calculation
daily_storage_gb = (daily_messages * message_size) / (1024 * 1024)
total_storage_gb = daily_storage_gb * retention_days * replication_factor
# Add safety margin
recommended_storage = total_storage_gb * 1.5
# Partition calculation
target_throughput_mb = daily_storage_gb / (24 * 3600) # MB/s
partition_throughput_mb = 25 # Conservative estimate
min_partitions = max(target_throughput_mb / partition_throughput_mb,
requirements.get('min_consumers', 1))
return {
'storage_gb': recommended_storage,
'partitions': min_partitions,
'brokers': max(3, min_partitions // 100) # 100 partitions per broker
}
Growth Management:
# Adding partitions (irreversible!)
kafka-topics --alter --topic user-events --partitions 20
# Adding brokers and rebalancing
# 1. Start new brokers
# 2. Generate reassignment plan
kafka-reassign-partitions --generate \
--topics-to-move-json-file all-topics.json \
--broker-list "1,2,3,4,5" # Include new brokers
# 3. Execute rebalancing (gradual)
kafka-reassign-partitions --execute \
--reassignment-json-file expand-cluster-reassignment.json \
--throttle 50000000 # 50MB/s throttle
Interview Operational Questions
“How do you monitor Kafka in production?”
- JMX metrics collection (broker, producer, consumer metrics)
- Consumer lag monitoring and alerting
- Cluster health checks (ISR, offline partitions)
- Resource monitoring (CPU, memory, disk, network)
- End-to-end latency and throughput monitoring
“Describe your disaster recovery plan for Kafka.”
- Cross-region replication with MirrorMaker 2.0
- Regular metadata and configuration backups
- Automated failover procedures with DNS switching
- Consumer offset synchronization for seamless failover
- Regular DR testing and runbook maintenance
“How do you handle a failed broker?”
- Immediate: Check if partitions are still available via ISR
- Short-term: Allow automatic leader election and recovery
- Investigation: Identify root cause (hardware, network, configuration)
- Recovery: Fix underlying issue and restart broker
- Verification: Confirm broker rejoined cluster and replicated data
See Also: Performance Characteristics, Replication & Consistency
Summary and Integration
Key Takeaways
Kafka’s Core Strengths:
- High Throughput: Millions of messages per second through batching and sequential I/O
- Fault Tolerance: Replication and ISR management ensure data durability
- Scalability: Horizontal scaling via partitions and brokers
- Durability: Configurable persistence with acks and min.insync.replicas
- Ordering: Guarantees within partition boundaries
Critical Design Decisions:
- Partitioning Strategy: Impacts parallelism, ordering, and scaling
- Replication Factor: Balances durability with performance and storage costs
- Producer ACKs: Trades durability guarantees for latency and throughput
- Consumer Group Design: Affects processing parallelism and failure handling
System Design Integration Points
When to Choose Kafka:
- High-throughput event streaming (>100k messages/sec)
- Event-driven microservices architecture
- Real-time analytics and stream processing
- Log aggregation and centralized logging
- Change data capture from databases
Kafka vs Alternatives:
- RabbitMQ: Better for complex routing, lower throughput requirements
- AWS SQS: Managed service, simpler operations, vendor lock-in
- Apache Pulsar: Multi-tenancy, geo-replication, newer ecosystem
- Redis Streams: Lower latency, simpler deployment, less durability
Production Readiness Checklist
✓ Infrastructure:
- Multi-broker cluster (minimum 3 brokers)
- Separate ZooKeeper/KRaft cluster
- SSD storage with adequate IOPS
- Network bandwidth planning (10Gbps+)
- Security setup (SSL, SASL, ACLs)
✓ Configuration:
- Appropriate replication factors (3+ for critical data)
- min.insync.replicas configured
- Retention policies aligned with business needs
- JVM tuning (heap size, GC collector)
- OS-level optimizations (file descriptors, swappiness)
✓ Monitoring:
- JMX metrics collection and dashboards
- Consumer lag monitoring and alerting
- Cluster health checks automation
- End-to-end latency monitoring
- Capacity and growth trend analysis
✓ Operations:
- Deployment automation and rolling updates
- Backup and disaster recovery procedures
- Incident response runbooks
- Performance baseline and SLAs defined
- Team training and knowledge transfer
This knowledge chunk provides the foundation for implementing, operating, and scaling Kafka in production environments while being prepared for senior-level technical discussions and system design interviews.
Total Study Time: 45 minutes Next Steps: Practice system design scenarios, implement hands-on labs, explore advanced topics like exactly-once semantics and stream processing frameworks.