Skip to content

Kafka Series

Retention and Log Compaction

Deep dive into time-based and size-based retention policies, log compaction mechanics, and production configuration strategies

Concepts Covered in This Article

Retention Policies and Mechanisms

Time-Based Retention Deep Dive

Kafka’s time-based retention operates at the segment level using file modification timestamps:

RETENTION TIMELINE:

Segment Creation ──▶ Active Writing ──▶ Sealed Segment ──▶ Retention Check ──▶ Deletion
     │                    │                  │                    │              │
     t=0             t=segment.ms        t=sealed          t=retention.ms    t=cleanup
     │                    │                  │                    │              │
  New segment         Segment full      Stop writing       Eligible for      Delete
  created            or time limit      Start new         cleanup based      segment
                                       segment           on file mtime       files

Advanced Time-Based Configuration:

# Production retention configuration
log.retention.ms=604800000              # 7 days (overrides hours/minutes)
log.retention.check.interval.ms=300000  # Check every 5 minutes
log.segment.ms=86400000                 # New segment daily (24 hours)

# Retention vs segment interaction
# Retention = 7 days, Segment = 1 day
# Result: 7 segments retained, each representing 1 day of data

Size-Based Retention Implementation

public class SizeBasedRetentionCalculator {

    public long calculateRetentionBytes(
            long dailyIngressGB,
            int retentionDays,
            int replicationFactor,
            double compressionRatio) {

        // Base calculation
        long dailyBytes = dailyIngressGB * 1024L * 1024L * 1024L;
        long totalUncompressed = dailyBytes * retentionDays;
        long totalCompressed = (long)(totalUncompressed / compressionRatio);
        long totalWithReplication = totalCompressed * replicationFactor;

        // Add safety margin
        long retentionBytes = (long)(totalWithReplication * 1.2); // 20% buffer

        return retentionBytes;
    }

    void exampleCalculation() {
        // Example: Financial transaction logging
        long retentionBytes = calculateRetentionBytes(
            100,    // 100 GB/day ingress
            30,     // 30 days retention
            3,      // 3x replication
            2.5     // 2.5:1 compression ratio
        );

        // Result:
        // 100GB * 30 days = 3TB raw
        // 3TB / 2.5 compression = 1.2TB compressed
        // 1.2TB * 3 replication = 3.6TB total
        // 3.6TB * 1.2 safety = 4.32TB retention.bytes
    }
}

Policy Combination Strategies

Multi-Policy Retention Configuration:

# Topic with both time and size limits
log.retention.ms=2592000000    # 30 days
log.retention.bytes=1073741824000  # 1TB per partition

# Cleanup triggers when EITHER condition met:
# - Data older than 30 days, OR
# - Partition size exceeds 1TB

# Use cases:
# - Burst traffic protection (size limit)
# - Compliance requirements (time limit)
# - Storage cost control (size limit)

Production Retention Patterns:

public class RetentionPatternCatalog {

    // Pattern 1: High-value transactional data
    void configureFinancialRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "31536000000");      // 365 days
        config.put("log.retention.bytes", "-1");            // No size limit
        config.put("log.segment.ms", "86400000");           // Daily segments
        config.put("min.insync.replicas", "3");             // High durability
    }

    // Pattern 2: High-volume logs with cost control
    void configureLogRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "604800000");        // 7 days
        config.put("log.retention.bytes", "107374182400");  // 100GB per partition
        config.put("log.segment.ms", "3600000");            // Hourly segments
        config.put("compression.type", "lz4");              // Reduce storage
    }

    // Pattern 3: Real-time metrics (short retention)
    void configureMetricsRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "86400000");         // 1 day
        config.put("log.retention.bytes", "10737418240");   // 10GB per partition
        config.put("log.segment.ms", "300000");             // 5-minute segments
    }
}

Log Compaction Deep Dive

Compaction Algorithm and Process Flow

Log compaction maintains the latest value for each key while removing outdated records:

COMPACTION PROCESS:

Original Log:
[key1:v1][key2:v1][key1:v2][key3:v1][key2:v2][key1:v3]
    │        │        │        │        │        │
    t1       t2       t3       t4       t5       t6

After Compaction:
[key2:v2][key3:v1][key1:v3]
    │        │        │
   t5       t4       t6

Only latest value per key retained

Log Cleaner Architecture:

LOG CLEANER INTERNAL FLOW:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Dirty Logs    │───▶│  Log Cleaner    │───▶│   Clean Logs    │
│   (uncomp)      │    │                 │    │   (compacted)   │
│                 │    │ ┌─────────────┐ │    │                 │
│ - Head segment  │    │ │ Key Index   │ │    │ - Tail segments │
│ - Tail segments │    │ │ (OffsetMap) │ │    │ - Latest values │
└─────────────────┘    │ └─────────────┘ │    └─────────────────┘
                       │                 │
                       │ ┌─────────────┐ │
                       │ │Thread Pool  │ │
                       │ │(configurable│ │
                       │ │ parallelism)│ │
                       │ └─────────────┘ │
                       └─────────────────┘

Advanced Compaction Configuration

Production Compaction Tuning:

# Log cleaner configuration
log.cleanup.policy=compact
log.cleaner.threads=8                           # Parallel cleaner threads
log.cleaner.io.max.bytes.per.second=1048576000  # 1GB/s I/O throttling
log.cleaner.dedupe.buffer.size=134217728        # 128MB dedup buffer

# Compaction triggers
log.cleaner.min.compaction.lag.ms=60000         # Wait 1 min before compact
log.cleaner.max.compaction.lag.ms=86400000     # Force compact within 24h
log.segment.ms=86400000                         # 24h segments
log.cleaner.min.cleanable.ratio=0.5             # Compact when 50% dirty

# Advanced settings
log.cleaner.io.buffer.size=524288               # 512KB I/O buffer
log.cleaner.io.buffer.load.factor=0.9           # Buffer utilization
log.cleaner.backoff.ms=15000                    # Backoff between runs

Compaction Performance Analysis:

public class CompactionPerformanceMonitor {

    public CompactionMetrics analyzeCompactionEffectiveness() {
        return CompactionMetrics.builder()
            .dirtyRatio(calculateDirtyRatio())
            .compactionRate(getCompactionRate())
            .keySpaceReduction(getKeySpaceReduction())
            .storageReclaimed(getStorageReclaimed())
            .ioImpact(getIOImpact())
            .build();
    }

    private double calculateDirtyRatio() {
        // dirty.ratio = dirty_bytes / (clean_bytes + dirty_bytes)
        // Optimal range: 0.3 - 0.7 for balanced performance

        long dirtyBytes = getDirtyLogSize();
        long cleanBytes = getCleanLogSize();
        return (double) dirtyBytes / (dirtyBytes + cleanBytes);
    }

    void optimizeCompactionScheduling() {
        double dirtyRatio = calculateDirtyRatio();

        if (dirtyRatio > 0.8) {
            // High dirty ratio - increase compaction frequency
            updateConfig("log.cleaner.min.compaction.lag.ms", "30000");  // 30s
            updateConfig("log.cleaner.threads", "12");                   // More threads

        } else if (dirtyRatio < 0.2) {
            // Low dirty ratio - reduce compaction frequency
            updateConfig("log.cleaner.min.compaction.lag.ms", "300000"); // 5min
            updateConfig("log.cleaner.threads", "4");                    // Fewer threads
        }
    }
}

Key-Based Compaction Strategies

Effective Key Design Patterns:

public class CompactionKeyStrategies {

    // Pattern 1: Entity state management
    void entityStateKeys() {
        // Key: user_id
        // Value: Complete user state (JSON/Avro)
        // Result: Latest state per user maintained

        String key = "user_123";
        UserState value = UserState.builder()
            .userId("user_123")
            .email("user@example.com")
            .lastLogin(Instant.now())
            .build();
    }

    // Pattern 2: Configuration management
    void configurationKeys() {
        // Key: config_category:setting_name
        // Value: Setting value
        // Result: Latest config per setting

        String key = "database:connection_timeout";
        String value = "30000";
    }

    // Pattern 3: State machine events (anti-pattern)
    void stateMachineAntiPattern() {
        // WRONG: State transition events with timestamp keys
        // Key: user_123:timestamp  ← Each event has unique key
        // Result: No compaction benefit, all events retained

        // CORRECT: Current state with entity key
        // Key: user_123  ← Same key for all state updates
        // Result: Only latest state retained
    }
}

Segment Management and Cleanup

Segment Lifecycle Management

Segment Creation and Rollover:

SEGMENT LIFECYCLE:

Active Segment                 Sealed Segment               Eligible for Cleanup
     │                              │                              │
     ▼                              ▼                              ▼
┌──────────┐  Rollover      ┌──────────────┐  Time/Size     ┌──────────────┐
│.log      │  Triggers:     │.log (sealed) │  Triggers:     │.log (old)    │
│.index    │  - Size limit  │.index        │  - retention   │.index        │
│.timeindex│  - Time limit  │.timeindex    │  - compaction  │.timeindex    │
└──────────┘  - Force roll  └──────────────┘                └──────────────┘

Production Segment Configuration:

# Segment size management
log.segment.bytes=1073741824        # 1GB segments (balance between size & granularity)
log.index.size.max.bytes=10485760   # 10MB index files
log.index.interval.bytes=4096       # Index entry every 4KB

# Segment time management
log.segment.ms=86400000             # 24h segments (daily rollover)
log.roll.jitter.ms=3600000          # 1h jitter (spread rollover load)

# Segment cleanup optimization
log.retention.check.interval.ms=300000  # Check every 5 minutes
log.segment.delete.delay.ms=60000        # 1 minute delay before deletion

File System Layout and Optimization

Kafka Directory Structure:

# Optimal file system layout
/var/kafka-logs/
├── topic-partition-0/
   ├── 00000000000000000000.log        # Segment 0 (oldest)
   ├── 00000000000000000000.index      # Offset index
   ├── 00000000000000000000.timeindex  # Time index
   ├── 00000000000000100000.log        # Segment 1
   ├── 00000000000000100000.index
   ├── 00000000000000100000.timeindex
   ├── 00000000000000200000.log        # Segment 2 (active)
   ├── 00000000000000200000.index
   ├── 00000000000000200000.timeindex
   └── leader-epoch-checkpoint         # Leadership changes

File System Optimization:

# Production file system tuning
# Mount options for Kafka log directories
mount -o noatime,nodiratime /dev/sdb1 /var/kafka-logs

# File system selection
# XFS: Recommended for large files, better performance
# EXT4: Acceptable alternative, wider compatibility

# I/O scheduler optimization
echo deadline > /sys/block/sdb/queue/scheduler  # Better for sequential I/O

# File descriptor limits
echo "kafka soft nofile 100000" >> /etc/security/limits.conf
echo "kafka hard nofile 100000" >> /etc/security/limits.conf

Cleanup Process Optimization

public class SegmentCleanupOptimizer {

    public void optimizeCleanupScheduling() {
        // Cleanup process phases:
        // 1. Identify segments eligible for deletion
        // 2. Mark segments for deletion (rename to .deleted)
        // 3. Asynchronous deletion after delay

        Properties config = new Properties();

        // Balance cleanup frequency vs I/O impact
        config.put("log.retention.check.interval.ms", "300000");    // 5 minutes
        config.put("log.segment.delete.delay.ms", "60000");         // 1 minute delay

        // Spread cleanup load across time
        config.put("log.roll.jitter.ms", "3600000");                // 1 hour jitter
    }

    public void monitorCleanupHealth() {
        // Key metrics to track:
        long pendingDeletes = countPendingDeletes();
        double deletionRate = calculateDeletionRate();
        long diskSpaceReclaimed = getDiskSpaceReclaimed();

        // Alert conditions:
        if (pendingDeletes > 1000) {
            alert("High number of pending segment deletes");
        }

        if (deletionRate < expectedDeletionRate * 0.5) {
            alert("Segment deletion falling behind retention schedule");
        }
    }
}

Capacity Planning and Cost Optimization

Storage Capacity Modeling

public class StorageCapacityPlanner {

    public StorageRequirements calculateStorageNeeds(
            TopicProfile profile,
            RetentionPolicy retention,
            ClusterConfig cluster) {

        // Base storage calculation
        long dailyBytes = profile.messagesPerDay * profile.avgMessageSizeBytes;
        long retentionBytes = dailyBytes * retention.retentionDays;

        // Apply compression
        long compressedBytes = (long)(retentionBytes / profile.compressionRatio);

        // Apply replication
        long replicatedBytes = compressedBytes * cluster.replicationFactor;

        // Add overhead (indexes, metadata)
        double overheadRatio = 0.15; // 15% overhead typical
        long totalBytes = (long)(replicatedBytes * (1 + overheadRatio));

        // Add growth and safety margins
        long safetyMargin = (long)(totalBytes * 0.25); // 25% safety
        long growthProjection = calculateGrowthProjection(totalBytes, 12); // 12 months

        return StorageRequirements.builder()
            .baseRequirement(totalBytes)
            .withSafetyMargin(totalBytes + safetyMargin)
            .withGrowthProjection(totalBytes + safetyMargin + growthProjection)
            .build();
    }

    public void optimizeStorageCosts() {
        // Cost optimization strategies:

        // 1. Right-size retention policies
        optimizeRetentionPolicies();

        // 2. Implement tiered storage
        configureTieredStorage();

        // 3. Optimize compression
        tuneCompressionSettings();

        // 4. Monitor and alert on storage growth
        setupStorageAlerting();
    }

    private void optimizeRetentionPolicies() {
        // Analyze actual data access patterns
        Map<String, AccessPattern> accessPatterns = analyzeAccessPatterns();

        for (String topic : accessPatterns.keySet()) {
            AccessPattern pattern = accessPatterns.get(topic);

            if (pattern.getLastAccessDays() < 7 && pattern.getCurrentRetentionDays() > 30) {
                // Reduce retention for rarely accessed topics
                recommendRetentionChange(topic, 7);
            }
        }
    }
}

Cost Model and Analysis

Storage Cost Breakdown:

public class StorageCostAnalyzer {

    public CostBreakdown analyzeMonthlyCosts(
            long totalStorageGB,
            StorageTier tier,
            Region region) {

        // Base storage costs (example AWS pricing)
        double gpSSDCostPerGB = 0.10;    // gp3 SSD
        double ioPSSDCostPerGB = 0.125;  // io2 SSD
        double s3CostPerGB = 0.023;      // S3 Standard

        double monthlyCost = switch(tier) {
            case HOT_SSD -> totalStorageGB * gpSSDCostPerGB;
            case PERFORMANCE_SSD -> totalStorageGB * ioPSSDCostPerGB;
            case COLD_S3 -> totalStorageGB * s3CostPerGB;
        };

        // Add I/O costs
        double ioCosts = calculateIOCosts(totalStorageGB, tier);

        // Add network costs for replication
        double networkCosts = calculateNetworkCosts(totalStorageGB);

        return CostBreakdown.builder()
            .storageCosts(monthlyCost)
            .ioCosts(ioCosts)
            .networkCosts(networkCosts)
            .totalCosts(monthlyCost + ioCosts + networkCosts)
            .build();
    }

    public void generateCostOptimizationReport() {
        // Identify cost optimization opportunities:

        // 1. Topics with high storage cost but low access
        List<String> overRetainedTopics = findOverRetainedTopics();

        // 2. Topics suitable for compression optimization
        List<String> compressionCandidates = findCompressionCandidates();

        // 3. Topics suitable for tiered storage
        List<String> tieringCandidates = findTieringCandidates();

        // 4. Unused or low-value topics
        List<String> unusedTopics = findUnusedTopics();

        generateReport(overRetainedTopics, compressionCandidates,
                      tieringCandidates, unusedTopics);
    }
}

Monitoring and Troubleshooting

Essential Retention Metrics

public class RetentionMonitoring {

    public void setupRetentionAlerting() {
        // Critical storage alerts
        alertOnMetric("kafka.log:type=LogSize,name=Size",
                     threshold -> threshold > getMaxDiskCapacity() * 0.85,
                     "Disk usage critical - approaching capacity limit");

        alertOnMetric("kafka.log:type=LogManager,name=OfflineLogDirectoryCount",
                     count -> count > 0,
                     "Log directory offline - data loss risk");

        // Retention process health
        alertOnMetric("kafka.log:type=LogCleanerManager,name=uncleanable-partitions-count",
                     count -> count > 0,
                     "Partitions unable to be cleaned");

        alertOnMetric("kafka.log:type=LogCleaner,name=max-dirty-percent",
                     percent -> percent > 0.8,
                     "Log cleaner falling behind - high dirty ratio");

        // Segment management
        alertOnMetric("kafka.log:type=LogManager,name=LogFlushRateAndTimeMs",
                     rate -> rate < expectedFlushRate * 0.5,
                     "Log flush rate degraded");
    }

    public RetentionHealthReport generateHealthReport() {
        return RetentionHealthReport.builder()
            .storageUtilization(calculateStorageUtilization())
            .retentionCompliance(checkRetentionCompliance())
            .compactionEffectiveness(measureCompactionEffectiveness())
            .segmentHealth(assessSegmentHealth())
            .cleanupPerformance(measureCleanupPerformance())
            .build();
    }
}

Troubleshooting Playbook

Common Issues and Solutions:

public class RetentionTroubleshooting {

    public void troubleshootHighDiskUsage() {
        System.out.println("DISK USAGE TROUBLESHOOTING:");
        System.out.println("1. Check retention policies:");
        System.out.println("   kafka-topics --describe --topic <topic>");
        System.out.println("2. Verify cleanup is running:");
        System.out.println("   Check JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
        System.out.println("3. Look for stuck segments:");
        System.out.println("   ls -la /kafka-logs/*/");
        System.out.println("4. Check for errors:");
        System.out.println("   grep ERROR /kafka-logs/log-cleaner.log");
    }

    public void troubleshootCompactionIssues() {
        System.out.println("COMPACTION TROUBLESHOOTING:");
        System.out.println("1. Check compaction lag:");
        System.out.println("   JMX: kafka.log:type=LogCleaner,name=max-compaction-delay-secs");
        System.out.println("2. Verify key distribution:");
        System.out.println("   kafka-dump-log --files /kafka-logs/topic-0/00000.log");
        System.out.println("3. Check cleaner thread status:");
        System.out.println("   JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
        System.out.println("4. Review memory allocation:");
        System.out.println("   Check log.cleaner.dedupe.buffer.size setting");
    }

    public void troubleshootSlowCleanup() {
        // Step 1: Check I/O throttling
        double ioThrottle = getIOThrottleLimit();
        if (ioThrottle < 100_000_000) { // 100MB/s
            System.out.println("Consider increasing log.cleaner.io.max.bytes.per.second");
        }

        // Step 2: Check thread allocation
        int cleanerThreads = getCleanerThreadCount();
        int availableCores = Runtime.getRuntime().availableProcessors();
        if (cleanerThreads < availableCores / 4) {
            System.out.println("Consider increasing log.cleaner.threads");
        }

        // Step 3: Check segment sizing
        analyzeSegmentSizing();
    }
}

This production-focused deep dive into Kafka retention and log compaction provides the technical expertise needed for implementing cost-effective, scalable storage strategies in enterprise environments. The content emphasizes operational excellence, capacity planning, and troubleshooting skills essential for managing large-scale Kafka deployments.

See Also: [[Producer_Mechanics]], [[Consumer_Groups]], [[Event_Sourcing_Patterns]], [[Tiered_Storage_Strategies]]