Skip to content

Kafka Series

Consumer Groups and Rebalancing

Deep dive into consumer group coordination, partition assignment strategies, rebalancing protocols, and offset management

Concepts Covered in This Article

Consumer Group Fundamentals

Group Coordination Architecture

Consumer groups enable fault-tolerant parallel processing by coordinating multiple consumer instances to share partition consumption:

CONSUMER GROUP ARCHITECTURE:

Topic: user-events (4 partitions)
┌─────────┬─────────┬─────────┬─────────┐
│Part 0   │Part 1   │Part 2   │Part 3   │
└─────────┴─────────┴─────────┴─────────┘
     │        │        │        │
     │        │        │        │
     ▼        ▼        ▼        ▼
┌─────────┬─────────┬─────────┬─────────┐
│Consumer │Consumer │Consumer │Consumer │
│   A     │   B     │   C     │   D     │
└─────────┴─────────┴─────────┴─────────┘

Group: "analytics-processors"
Coordinator: Broker-2 (based on hash(group.id))

KEY GUARANTEES:
├── Each partition assigned to exactly ONE consumer in group
├── Each consumer can handle multiple partitions
├── Automatic rebalancing on member changes
└── Fault tolerance through coordinator failover

Coordinator Selection and Responsibilities

Group Coordinator Selection:

// Coordinator determination algorithm
int coordinatorPartition = Math.abs(groupId.hashCode()) % numOffsetsTopicPartitions;
Broker groupCoordinator = findBrokerForPartition("__consumer_offsets", coordinatorPartition);

// Example:
// groupId="payment-processors" → hash=1823456
// __consumer_offsets has 50 partitions
// coordinator partition = 1823456 % 50 = 6
// Group coordinator = broker hosting __consumer_offsets partition 6

Coordinator Responsibilities:

GROUP COORDINATOR DUTIES:

Member Management:
├── Track group membership
├── Handle join/leave requests
├── Detect failed members via heartbeat
└── Trigger rebalancing when needed

Offset Management:
├── Store committed offsets in __consumer_offsets
├── Handle offset fetch/commit requests
├── Provide offset reset capabilities
└── Manage offset retention

Rebalancing Coordination:
├── Lead rebalancing protocol
├── Collect member metadata
├── Coordinate assignment distribution
└── Ensure consistent state across members

Consumer Lifecycle and State Management

public class ConsumerLifecycleExample {
    private final KafkaConsumer<String, String> consumer;
    private final String groupId = "order-processors";

    public void startConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);      // 30s
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);   // 10s
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 5min

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("orders", "payments"));

        // Consumer state transitions:
        // 1. UNASSIGNED → Joining group
        // 2. PREPARING_REBALANCE → Rebalancing triggered
        // 3. COMPLETING_REBALANCE → Receiving assignments
        // 4. STABLE → Normal processing
    }
}

Partition Assignment Strategies

Assignor Algorithm Deep Dive

Range Assignor (Default):

// Range assignment distributes consecutive partitions
public class RangeAssignorExample {
    void demonstrateRangeAssignment() {
        // Topic: orders (7 partitions)
        // Consumers: 3 members

        // Result:
        // Consumer-1: [0, 1, 2]     (3 partitions)
        // Consumer-2: [3, 4]        (2 partitions)
        // Consumer-3: [5, 6]        (2 partitions)

        // Pros: Simple, predictable
        // Cons: Uneven distribution with multiple topics
    }
}

Sticky Assignor (Recommended):

public class StickyAssignorBenefits {
    void demonstrateStickiness() {
        // Before rebalancing:
        // Consumer-A: [0, 1, 2]
        // Consumer-B: [3, 4, 5]
        // Consumer-C: [6, 7, 8]

        // Consumer-B fails:
        // Sticky Assignor result:
        // Consumer-A: [0, 1, 2, 4]  ← keeps original + gets 1 from failed
        // Consumer-C: [6, 7, 8, 3, 5] ← keeps original + gets remaining

        // Benefits:
        // ├── Minimizes partition movement
        // ├── Preserves consumer-local state/caches
        // ├── Faster rebalancing completion
        // └── Better performance during member changes
    }
}

Custom Assignor Implementation

public class GeographicAssignor implements ConsumerPartitionAssignor {

    @Override
    public String name() {
        return "geographic";
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {

        Map<String, List<TopicPartition>> assignments = new HashMap<>();

        for (String memberId : subscriptions.keySet()) {
            // Extract geographic region from consumer metadata
            ConsumerPartitionAssignor.Subscription subscription = subscriptions.get(memberId);
            ByteBuffer userData = subscription.userData();
            String region = extractRegion(userData); // us-east, us-west, eu-west

            // Assign partitions based on geographic affinity
            List<TopicPartition> assignedPartitions = new ArrayList<>();

            for (Map.Entry<String, Integer> topicEntry : partitionsPerTopic.entrySet()) {
                String topic = topicEntry.getKey();
                int numPartitions = topicEntry.getValue();

                // Custom logic: assign partitions based on region
                List<Integer> regionPartitions = getPartitionsForRegion(topic, numPartitions, region);

                for (Integer partition : regionPartitions) {
                    assignedPartitions.add(new TopicPartition(topic, partition));
                }
            }

            assignments.put(memberId, assignedPartitions);
        }

        return assignments;
    }

    private List<Integer> getPartitionsForRegion(String topic, int numPartitions, String region) {
        // Example: us-east gets partitions 0-2, us-west gets 3-5, eu-west gets 6-8
        switch (region) {
            case "us-east": return IntStream.range(0, numPartitions/3).boxed().collect(toList());
            case "us-west": return IntStream.range(numPartitions/3, 2*numPartitions/3).boxed().collect(toList());
            case "eu-west": return IntStream.range(2*numPartitions/3, numPartitions).boxed().collect(toList());
            default: return Collections.emptyList();
        }
    }
}

Rebalancing Protocol Mechanics

Rebalancing Trigger Scenarios

Common Rebalancing Triggers:

public class RebalancingTriggers {

    // 1. Consumer joins group (scale up)
    void consumerJoins() {
        // New KafkaConsumer calls subscribe() and starts polling
        // Coordinator detects new member via JoinGroup request
        // Triggers rebalancing for entire group
    }

    // 2. Consumer leaves gracefully
    void consumerLeaves() {
        consumer.close(); // Sends LeaveGroup request
        // Immediate rebalancing without waiting for session timeout
    }

    // 3. Consumer failure (heartbeat timeout)
    void consumerFails() {
        // Consumer stops sending heartbeats
        // Coordinator waits for session.timeout.ms (default: 30s)
        // Removes failed member and triggers rebalancing
    }

    // 4. Partition count change
    void partitionAdded() {
        // Admin adds partitions: kafka-topics --alter --partitions 12
        // All consumers detect metadata change
        // Rebalancing redistributes new partitions
    }

    // 5. Subscription change
    void subscriptionChange() {
        // Consumer calls subscribe() with different topic list
        // Forces rebalancing to handle new subscription
    }
}

Rebalancing Protocol Flow

EAGER REBALANCING PROTOCOL:

Phase 1: PREPARING_REBALANCE
┌─────────────────────────────────────────────────────────────┐
│ Coordinator broadcasts rebalance to all members              │
│ ├── All consumers stop processing                           │
│ ├── Consumers commit current offsets                        │
│ ├── Consumers revoke assigned partitions                    │
│ └── Consumers send JoinGroup requests                       │
└─────────────────────────────────────────────────────────────┘


Phase 2: COMPLETING_REBALANCE
┌─────────────────────────────────────────────────────────────┐
│ Coordinator selects group leader from members               │
│ ├── Leader receives member list and metadata               │
│ ├── Leader computes new partition assignments              │
│ ├── Leader sends assignments back to coordinator          │
│ ├── Coordinator distributes assignments to all members    │
│ └── All members start consuming assigned partitions       │
└─────────────────────────────────────────────────────────────┘

DOWNTIME: Complete stop during rebalancing (seconds to minutes)

Cooperative Rebalancing (Incremental)

public class CooperativeRebalancingExample {

    void configureCooperativeRebalancing() {
        Properties props = new Properties();

        // Enable cooperative (incremental) rebalancing
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
            "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

        // Cooperative rebalancing benefits:
        // ├── Only affected partitions pause processing
        // ├── Unaffected partitions continue processing
        // ├── Significantly reduced downtime
        // └── Better for real-time processing systems
    }

    void cooperativeFlow() {
        // Example scenario: Consumer-B fails in 3-consumer group

        // Traditional eager rebalancing:
        // ├── ALL consumers stop processing
        // ├── ALL partitions reassigned
        // ├── Downtime: 5-30 seconds

        // Cooperative rebalancing:
        // ├── Consumer-A continues processing partitions [0,1,2]
        // ├── Consumer-C continues processing partitions [6,7,8]
        // ├── Only partitions [3,4,5] from failed Consumer-B reassigned
        // ├── Minimal downtime: < 1 second
    }
}

Rebalancing Optimization Strategies

Static Membership Configuration

public class StaticMembershipOptimization {

    void configureStaticMembership() {
        Properties props = new Properties();

        // Assign unique static member ID
        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "processor-pod-1");

        // Increase session timeout for static members
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 180000); // 3 minutes

        // Static membership benefits:
        // ├── Rolling restarts don't trigger rebalancing
        // ├── Temporary network issues tolerated better
        // ├── Preserves partition assignments across restarts
        // └── Reduces rebalancing frequency significantly
    }

    void handleRollingDeployment() {
        // Traditional membership (dynamic):
        // Pod restart → consumer leave → rebalance → consumer rejoin → rebalance
        // Total rebalances: 2 per pod restart

        // Static membership:
        // Pod restart → session timeout wait → same consumer rejoins → no rebalance
        // Total rebalances: 0 for quick restarts (< session timeout)

        // Best practice: Set session timeout > deployment time
    }
}

Graceful Shutdown Implementation

public class GracefulShutdownPattern {
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;

    public void startGracefulConsumer() {
        // Register shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Starting graceful shutdown...");
            shutdown.set(true);
            consumer.wakeup(); // Interrupt poll() if blocking
        }));

        try {
            while (!shutdown.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    if (shutdown.get()) {
                        break; // Stop processing on shutdown signal
                    }
                    processRecord(record);
                }

                // Commit offsets periodically
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (WakeupException e) {
            // Expected during shutdown
            if (!shutdown.get()) {
                throw e;
            }
        } finally {
            try {
                // Final offset commit
                consumer.commitSync();
            } finally {
                // Clean shutdown - sends LeaveGroup request
                consumer.close();
                System.out.println("Consumer closed gracefully");
            }
        }
    }
}

Offset Management Patterns

Advanced Offset Commit Strategies

Manual Offset Management with Error Handling:

public class RobustOffsetManagement {
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    public void processWithManualOffsets() {
        Properties props = new Properties();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("orders"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Process message
                        processMessage(record);

                        // Track successful processing
                        currentOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)
                        );

                    } catch (ProcessingException e) {
                        // Handle processing failure - don't advance offset
                        handleProcessingFailure(record, e);

                        // Option 1: Skip message and continue
                        // Option 2: Send to dead letter queue
                        // Option 3: Retry with backoff
                    }
                }

                // Batch commit successful offsets
                if (!currentOffsets.isEmpty()) {
                    try {
                        consumer.commitSync(currentOffsets);
                        currentOffsets.clear();
                    } catch (CommitFailedException e) {
                        // Commit failed - likely due to rebalancing
                        handleCommitFailure(e);
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

External Offset Storage Pattern

public class ExternalOffsetStorage {
    private final KafkaConsumer<String, String> consumer;
    private final OffsetRepository offsetRepository; // Database/Redis

    public void processWithExternalOffsets() {
        // Disable Kafka's offset management
        Properties props = new Properties();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        consumer = new KafkaConsumer<>(props);

        // Manual partition assignment (no group coordination)
        List<TopicPartition> partitions = Arrays.asList(
            new TopicPartition("orders", 0),
            new TopicPartition("orders", 1)
        );
        consumer.assign(partitions);

        // Restore offsets from external storage
        for (TopicPartition partition : partitions) {
            Long storedOffset = offsetRepository.getOffset(partition);
            if (storedOffset != null) {
                consumer.seek(partition, storedOffset);
            } else {
                consumer.seekToBeginning(Collections.singletonList(partition));
            }
        }

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {
                // Process in transaction with offset storage
                processInTransaction(record);
            }
        }
    }

    @Transactional
    private void processInTransaction(ConsumerRecord<String, String> record) {
        // Process business logic
        BusinessResult result = processBusinessLogic(record.value());

        // Store result in database
        businessRepository.save(result);

        // Store offset in same transaction
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        offsetRepository.saveOffset(tp, record.offset() + 1);

        // Both operations succeed or fail together (exactly-once processing)
    }
}

Consumer Group Scaling Patterns

Auto-Scaling Based on Lag

public class ConsumerAutoScaler {
    private final AdminClient adminClient;
    private final String groupId;

    public void monitorAndScale() {
        while (true) {
            try {
                ConsumerGroupLagMetrics lagMetrics = measureConsumerLag();

                if (shouldScaleUp(lagMetrics)) {
                    scaleUp();
                } else if (shouldScaleDown(lagMetrics)) {
                    scaleDown();
                }

                Thread.sleep(30000); // Check every 30 seconds

            } catch (Exception e) {
                logger.error("Error in auto-scaling loop", e);
            }
        }
    }

    private ConsumerGroupLagMetrics measureConsumerLag() {
        // Get consumer group offset information
        Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
            adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();

        // Get latest offsets for all partitions
        Set<TopicPartition> partitions = consumerOffsets.keySet();
        Map<TopicPartition, Long> latestOffsets = getLatestOffsets(partitions);

        // Calculate lag per partition
        long totalLag = 0;
        long maxLag = 0;

        for (TopicPartition tp : partitions) {
            long consumerOffset = consumerOffsets.get(tp).offset();
            long latestOffset = latestOffsets.get(tp);
            long lag = latestOffset - consumerOffset;

            totalLag += lag;
            maxLag = Math.max(maxLag, lag);
        }

        return new ConsumerGroupLagMetrics(totalLag, maxLag, partitions.size());
    }

    private boolean shouldScaleUp(ConsumerGroupLagMetrics metrics) {
        // Scale up conditions:
        return metrics.getTotalLag() > 100000 ||           // Total lag > 100k messages
               metrics.getMaxLag() > 50000 ||              // Any partition > 50k lag
               metrics.getAvgLag() > 10000;                // Average lag > 10k
    }

    private boolean shouldScaleDown(ConsumerGroupLagMetrics metrics) {
        // Scale down conditions (conservative):
        return metrics.getTotalLag() < 1000 &&             // Very low lag
               metrics.getMaxLag() < 500 &&                // All partitions caught up
               getCurrentConsumerCount() > getPartitionCount(); // More consumers than partitions
    }
}

Partition-to-Consumer Ratio Planning

public class CapacityPlanning {

    public ConsumerGroupConfig planConsumerGroup(
            long messagesPerSecond,
            int processingTimeMs,
            int targetLagSeconds) {

        // Calculate processing capacity per consumer
        double messagesPerConsumerPerSecond = 1000.0 / processingTimeMs;

        // Calculate required consumers for throughput
        int consumersForThroughput = (int) Math.ceil(messagesPerSecond / messagesPerConsumerPerSecond);

        // Calculate required consumers for lag target
        long maxAcceptableLag = messagesPerSecond * targetLagSeconds;
        int consumersForLag = (int) Math.ceil(messagesPerSecond * processingTimeMs / 1000.0);

        // Take maximum of throughput and lag requirements
        int recommendedConsumers = Math.max(consumersForThroughput, consumersForLag);

        // Partitions should be multiple of consumer count for even distribution
        int recommendedPartitions = Math.max(recommendedConsumers,
            roundUpToNearestMultiple(recommendedConsumers, 12)); // 12 for future scaling

        return new ConsumerGroupConfig(recommendedConsumers, recommendedPartitions);
    }

    // Example calculations:
    void examplePlanning() {
        // Scenario: Order processing system
        // 1000 messages/sec, 50ms processing time, 30s lag target

        ConsumerGroupConfig config = planConsumerGroup(1000, 50, 30);

        // Results:
        // messagesPerConsumerPerSecond = 20 (1000ms / 50ms)
        // consumersForThroughput = 50 (1000 / 20)
        // consumersForLag = 50 (1000 * 50 / 1000)
        // recommendedConsumers = 50
        // recommendedPartitions = 60 (for future scaling headroom)
    }
}

Monitoring and Observability

Consumer Group Health Metrics

public class ConsumerGroupMonitoring {

    public ConsumerGroupHealth assessGroupHealth() {
        return ConsumerGroupHealth.builder()
            .lagMetrics(measureLag())
            .rebalanceFrequency(measureRebalanceFrequency())
            .memberStability(measureMemberStability())
            .processingRate(measureProcessingRate())
            .build();
    }

    private ConsumerLagMetrics measureLag() {
        // Key lag metrics to monitor:
        return ConsumerLagMetrics.builder()
            .totalLag(getTotalLag())                    // Sum across all partitions
            .maxLag(getMaxLag())                        // Highest lag partition
            .avgLag(getAverageLag())                    // Average across partitions
            .lagTrend(getLagTrend())                    // Increasing/decreasing
            .lagBeyondThreshold(getLagBeyondThreshold()) // Partitions > SLA
            .build();
    }

    private RebalanceMetrics measureRebalanceFrequency() {
        // Rebalancing health indicators:
        return RebalanceMetrics.builder()
            .rebalancesPerHour(getRebalancesPerHour())
            .avgRebalanceDuration(getAvgRebalanceDuration())
            .rebalanceSuccessRate(getRebalanceSuccessRate())
            .timeSpentRebalancing(getTimeSpentRebalancing()) // % of time
            .build();
    }

    void setupAlerting() {
        // Critical alerts:
        alertIf("Consumer lag > SLA", () -> getMaxLag() > 50000);
        alertIf("Frequent rebalancing", () -> getRebalancesPerHour() > 4);
        alertIf("Consumer group empty", () -> getActiveConsumers() == 0);
        alertIf("Processing stopped", () -> getProcessingRate() == 0);

        // Warning alerts:
        alertIf("High rebalance duration", () -> getAvgRebalanceDuration() > 30000);
        alertIf("Member instability", () -> getMemberChurnRate() > 0.1);
        alertIf("Uneven partition distribution", () -> getPartitionDistributionSkew() > 0.3);
    }
}

This comprehensive knowledge chunk provides both interview-ready concepts and production-ready implementation patterns for Kafka consumer groups and rebalancing. The content balances theoretical understanding with practical operational knowledge needed for senior-level discussions and implementations.

See Also: [[Producer_Mechanics]], [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Multi_Region_Patterns]]