Skip to content

Kafka Series

Event Sourcing and CQRS with Kafka

Deep dive into using Kafka for event sourcing and CQRS patterns: event stores, projections, state rebuilding, and architectural patterns

Event Sourcing Fundamentals and Kafka Integration

Event Sourcing Core Principles

Event Sourcing stores all changes to application state as a sequence of events rather than storing current state directly. Kafka serves as the durable, scalable event store with natural partitioning and replication:

Traditional vs Event Sourcing

Kafka as Event Store Architecture

public class KafkaEventStore {

    // Topic design for event sourcing
    void configureEventStoreTopic() {
        Properties topicConfig = new Properties();

        // Key partitioning strategy - aggregate ID based
        topicConfig.put("partitions", "12");  // Scale based on aggregate count
        topicConfig.put("replication.factor", "3");

        // Event store specific settings
        topicConfig.put("cleanup.policy", "compact,delete");     // Hybrid cleanup
        topicConfig.put("retention.ms", "31536000000");          // 1 year retention
        topicConfig.put("min.compaction.lag.ms", "86400000");    // 24h before compact

        // Ensure ordering and durability
        topicConfig.put("min.insync.replicas", "2");
        topicConfig.put("unclean.leader.election.enable", "false");
    }

    // Event schema design
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
    @JsonSubTypes({
        @JsonSubTypes.Type(value = UserCreated.class, name = "UserCreated"),
        @JsonSubTypes.Type(value = EmailChanged.class, name = "EmailChanged"),
        @JsonSubTypes.Type(value = StatusUpgraded.class, name = "StatusUpgraded")
    })
    public abstract class DomainEvent {
        private final String aggregateId;
        private final long aggregateVersion;
        private final Instant timestamp;
        private final String userId;      // Who made the change

        // Events are immutable value objects
        public abstract String getEventType();
    }

    // Append events to Kafka
    public CompletableFuture<Long> appendEvents(String aggregateId,
                                               List<DomainEvent> events,
                                               long expectedVersion) {
        return CompletableFuture.supplyAsync(() -> {
            producer.beginTransaction();

            try {
                long currentVersion = expectedVersion;

                for (DomainEvent event : events) {
                    // Ensure event versioning for optimistic concurrency
                    event.setAggregateVersion(++currentVersion);

                    ProducerRecord<String, DomainEvent> record =
                        new ProducerRecord<>("events", aggregateId, event);

                    producer.send(record);
                }

                producer.commitTransaction();
                return currentVersion;

            } catch (Exception e) {
                producer.abortTransaction();
                throw new ConcurrencyException("Failed to append events", e);
            }
        });
    }
}

CQRS Architecture Patterns with Kafka

Command and Query Separation

CQRS separates write models (commands) from read models (queries), using Kafka as the event backbone between them:

CQRS with Kafka Architecture

Command Handler Implementation

@Component
public class UserCommandHandler {
    private final KafkaEventStore eventStore;
    private final UserRepository userRepository; // Write-side repository

    @CommandHandler
    public CompletableFuture<Void> handle(CreateUserCommand command) {
        return CompletableFuture.runAsync(() -> {
            // Load current state (if exists)
            User user = userRepository.findById(command.getUserId())
                .orElse(null);

            if (user != null) {
                throw new UserAlreadyExistsException(command.getUserId());
            }

            // Create domain events
            UserCreatedEvent event = UserCreatedEvent.builder()
                .aggregateId(command.getUserId())
                .email(command.getEmail())
                .timestamp(Instant.now())
                .userId(command.getRequesterId())
                .build();

            // Persist events
            eventStore.appendEvents(
                command.getUserId(),
                List.of(event),
                0 // Expected version for new aggregate
            ).join();

            // Update write-side state (optional - can be event-driven)
            User newUser = User.builder()
                .id(command.getUserId())
                .email(command.getEmail())
                .version(1)
                .build();
            userRepository.save(newUser);
        });
    }

    @CommandHandler
    public CompletableFuture<Void> handle(UpdateEmailCommand command) {
        return CompletableFuture.runAsync(() -> {
            // Load current state
            User user = userRepository.findById(command.getUserId())
                .orElseThrow(() -> new UserNotFoundException(command.getUserId()));

            // Business logic validation
            if (user.getEmail().equals(command.getNewEmail())) {
                return; // No change needed
            }

            // Generate domain event
            EmailChangedEvent event = EmailChangedEvent.builder()
                .aggregateId(command.getUserId())
                .oldEmail(user.getEmail())
                .newEmail(command.getNewEmail())
                .timestamp(Instant.now())
                .userId(command.getRequesterId())
                .build();

            // Optimistic concurrency control
            try {
                eventStore.appendEvents(
                    command.getUserId(),
                    List.of(event),
                    user.getVersion() // Expected version
                ).join();

                // Update write-side state
                user.setEmail(command.getNewEmail());
                user.setVersion(user.getVersion() + 1);
                userRepository.save(user);

            } catch (ConcurrencyException e) {
                throw new OptimisticLockingException("User was modified concurrently");
            }
        });
    }
}

Event Store Design and Optimization

Partitioning Strategies for Event Stores

public class EventStorePartitioning {

    // Strategy 1: Aggregate-based partitioning (recommended)
    void configureAggregatePartitioning() {
        // Key = aggregateId (e.g., user_123, order_456)
        // Ensures all events for an aggregate in same partition
        // Guarantees ordering within aggregate
        // Enables efficient aggregate reconstruction

        String partitionKey = event.getAggregateId();

        ProducerRecord<String, DomainEvent> record =
            new ProducerRecord<>("events", partitionKey, event);
    }

    // Strategy 2: Tenant-based partitioning (multi-tenant systems)
    void configureTenantPartitioning() {
        // Key = tenantId:aggregateId
        // Isolates tenant data
        // Enables tenant-specific scaling
        // Supports data locality requirements

        String partitionKey = event.getTenantId() + ":" + event.getAggregateId();

        ProducerRecord<String, DomainEvent> record =
            new ProducerRecord<>("events", partitionKey, event);
    }

    // Strategy 3: Time-based partitioning (for massive scale)
    void configureTimeBasedPartitioning() {
        // Key = aggregateId, but route by time
        // Enables time-based compaction and archival
        // Better for systems with billions of events

        CustomPartitioner timePartitioner = new TimeBasedPartitioner();

        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                          timePartitioner.getClass());
    }
}

Event Store Performance Optimization

public class EventStoreOptimization {

    // Batch event appending for performance
    public CompletableFuture<Void> appendEventsBatch(
            Map<String, List<DomainEvent>> eventsByAggregate) {

        return CompletableFuture.runAsync(() -> {
            producer.beginTransaction();

            try {
                // Send all events in single transaction
                for (Map.Entry<String, List<DomainEvent>> entry :
                     eventsByAggregate.entrySet()) {

                    String aggregateId = entry.getKey();
                    List<DomainEvent> events = entry.getValue();

                    for (DomainEvent event : events) {
                        ProducerRecord<String, DomainEvent> record =
                            new ProducerRecord<>("events", aggregateId, event);
                        producer.send(record);
                    }
                }

                producer.commitTransaction();

            } catch (Exception e) {
                producer.abortTransaction();
                throw e;
            }
        });
    }

    // Snapshot optimization for large aggregates
    @Component
    public class SnapshotManager {

        public void createSnapshot(String aggregateId, Object aggregateState, long version) {
            SnapshotEvent snapshot = SnapshotEvent.builder()
                .aggregateId(aggregateId)
                .aggregateType(aggregateState.getClass().getSimpleName())
                .aggregateState(serialize(aggregateState))
                .snapshotVersion(version)
                .timestamp(Instant.now())
                .build();

            // Store snapshot in separate topic with compaction
            ProducerRecord<String, SnapshotEvent> record =
                new ProducerRecord<>("snapshots", aggregateId, snapshot);
            snapshotProducer.send(record);
        }

        public <T> Optional<AggregateSnapshot<T>> getLatestSnapshot(
                String aggregateId, Class<T> aggregateType) {

            // Read from compacted snapshots topic
            // Latest snapshot automatically available due to log compaction
            return snapshotRepository.findLatestSnapshot(aggregateId, aggregateType);
        }
    }

    // Optimized aggregate reconstruction
    public <T> T rebuildAggregate(String aggregateId, Class<T> aggregateType) {
        // Step 1: Try to load latest snapshot
        Optional<AggregateSnapshot<T>> snapshot =
            snapshotManager.getLatestSnapshot(aggregateId, aggregateType);

        T aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            aggregate = snapshot.get().getAggregateState();
            fromVersion = snapshot.get().getVersion();
        } else {
            aggregate = createEmptyAggregate(aggregateType);
            fromVersion = 0;
        }

        // Step 2: Load and apply events since snapshot
        List<DomainEvent> eventsSinceSnapshot =
            eventStore.getEventsSince(aggregateId, fromVersion);

        for (DomainEvent event : eventsSinceSnapshot) {
            aggregate = applyEvent(aggregate, event);
        }

        return aggregate;
    }
}

Projection Patterns and View Materialization

Projection Builder Implementation

@Component
public class UserProjectionBuilder {

    @KafkaListener(topics = "events", groupId = "user-view-builder")
    public void handleUserEvent(DomainEvent event) {
        switch (event.getEventType()) {
            case "UserCreated" -> handleUserCreated((UserCreatedEvent) event);
            case "EmailChanged" -> handleEmailChanged((EmailChangedEvent) event);
            case "StatusUpgraded" -> handleStatusUpgraded((StatusUpgradedEvent) event);
            default -> log.warn("Unknown event type: {}", event.getEventType());
        }
    }

    private void handleUserCreated(UserCreatedEvent event) {
        UserView userView = UserView.builder()
            .id(event.getAggregateId())
            .email(event.getEmail())
            .status(UserStatus.ACTIVE)
            .createdAt(event.getTimestamp())
            .lastModified(event.getTimestamp())
            .version(event.getAggregateVersion())
            .build();

        userViewRepository.save(userView);

        // Update search index
        searchIndexer.indexUser(userView);

        // Update statistics
        statisticsService.incrementUserCount();
    }

    private void handleEmailChanged(EmailChangedEvent event) {
        UserView userView = userViewRepository.findById(event.getAggregateId())
            .orElseThrow(() -> new ProjectionInconsistencyException(
                "User view not found for email change: " + event.getAggregateId()));

        // Idempotency check
        if (userView.getVersion() >= event.getAggregateVersion()) {
            log.info("Event already processed: {}", event);
            return;
        }

        userView.setEmail(event.getNewEmail());
        userView.setLastModified(event.getTimestamp());
        userView.setVersion(event.getAggregateVersion());

        userViewRepository.save(userView);

        // Update search index
        searchIndexer.updateUser(userView);
    }
}

Multiple Projection Strategies

public class MultipleProjectionManager {

    // Strategy 1: Dedicated projection builders per view
    @Component
    public class UserSearchProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "user-search-builder")
        public void buildSearchProjection(DomainEvent event) {
            // Build search-optimized view
            // - Denormalized for fast queries
            // - Indexed for full-text search
            // - Includes computed fields
        }
    }

    @Component
    public class UserAnalyticsProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "user-analytics-builder")
        public void buildAnalyticsProjection(DomainEvent event) {
            // Build analytics-optimized view
            // - Aggregated statistics
            // - Time-series data
            // - Pre-computed metrics
        }
    }

    // Strategy 2: Single builder with multiple outputs
    @Component
    public class MultiViewProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "multi-view-builder")
        @Transactional
        public void buildMultipleViews(DomainEvent event) {
            // Update all projections in single transaction
            updateUserDetailsView(event);
            updateUserSearchView(event);
            updateUserAnalyticsView(event);
            updateUserAuditView(event);
        }
    }

    // Strategy 3: Event-driven projection chaining
    void configureProjectionChain() {
        // Primary projection publishes to derived topic
        // Secondary projections consume from derived topic
        // Enables complex projection dependencies

        // events → user-details-view → user-search-events → search-index
        //       └─→ user-analytics-view → analytics-events → dashboards
    }
}

Consistency and Transaction Patterns

Saga Pattern with Event Sourcing

public class OrderProcessingSaga {

    @SagaOrchestrationStart
    @EventHandler
    public void handle(OrderPlacedEvent event) {
        SagaState sagaState = SagaState.builder()
            .sagaId(UUID.randomUUID().toString())
            .orderId(event.getOrderId())
            .userId(event.getUserId())
            .amount(event.getAmount())
            .status(SagaStatus.STARTED)
            .build();

        // Step 1: Reserve inventory
        ReserveInventoryCommand reserveCommand = ReserveInventoryCommand.builder()
            .sagaId(sagaState.getSagaId())
            .orderId(event.getOrderId())
            .items(event.getItems())
            .build();

        commandGateway.send(reserveCommand);

        // Store saga state as event
        SagaStartedEvent sagaEvent = SagaStartedEvent.builder()
            .sagaId(sagaState.getSagaId())
            .orderId(event.getOrderId())
            .step("INVENTORY_RESERVATION")
            .build();

        eventStore.appendEvent(sagaState.getSagaId(), sagaEvent);
    }

    @EventHandler
    public void handle(InventoryReservedEvent event) {
        // Step 2: Process payment
        ProcessPaymentCommand paymentCommand = ProcessPaymentCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .amount(event.getAmount())
            .build();

        commandGateway.send(paymentCommand);
    }

    @EventHandler
    public void handle(PaymentProcessedEvent event) {
        // Step 3: Confirm order
        ConfirmOrderCommand confirmCommand = ConfirmOrderCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .build();

        commandGateway.send(confirmCommand);
    }

    @EventHandler
    public void handle(PaymentFailedEvent event) {
        // Compensating transaction: Release inventory
        ReleaseInventoryCommand compensateCommand = ReleaseInventoryCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .reason("Payment failed")
            .build();

        commandGateway.send(compensateCommand);

        // Mark saga as failed
        SagaFailedEvent sagaFailedEvent = SagaFailedEvent.builder()
            .sagaId(event.getSagaId())
            .reason("Payment processing failed")
            .build();

        eventStore.appendEvent(event.getSagaId(), sagaFailedEvent);
    }
}

Eventual Consistency Management

@Component
public class ConsistencyManager {

    // Pattern 1: Version-based consistency
    public void handleEventualConsistency(DomainEvent event) {
        try {
            updateProjection(event);
        } catch (ProjectionUpdateException e) {
            // Retry with exponential backoff
            scheduleRetry(event, calculateBackoff(e.getAttemptCount()));
        }
    }

    // Pattern 2: Consistency boundary enforcement
    public void enforceConsistencyBoundary(String aggregateId, DomainEvent event) {
        // Strong consistency within aggregate
        // Eventual consistency across aggregates

        if (isWithinAggregateBoundary(aggregateId, event)) {
            // Synchronous update - strong consistency
            updateAggregateState(aggregateId, event);
        } else {
            // Asynchronous update - eventual consistency
            scheduleAsyncUpdate(event);
        }
    }

    // Pattern 3: Compensation tracking
    public void trackCompensation(String sagaId, CompensatingAction action) {
        CompensationEvent compensationEvent = CompensationEvent.builder()
            .sagaId(sagaId)
            .action(action)
            .status(CompensationStatus.PENDING)
            .timestamp(Instant.now())
            .build();

        eventStore.appendEvent(sagaId, compensationEvent);
    }
}

Temporal Queries and Event Replay

Time-Travel Query Implementation

@Service
public class TemporalQueryService {

    // Query aggregate state at specific point in time
    public <T> T getAggregateStateAt(String aggregateId,
                                    Class<T> aggregateType,
                                    Instant pointInTime) {

        // Get all events up to the specified time
        List<DomainEvent> historicalEvents = eventStore.getEventsUntil(
            aggregateId, pointInTime);

        T aggregate = createEmptyAggregate(aggregateType);

        // Replay events in chronological order
        for (DomainEvent event : historicalEvents) {
            aggregate = applyEvent(aggregate, event);
        }

        return aggregate;
    }

    // Query state changes over time period
    public <T> List<StateSnapshot<T>> getStateEvolution(
            String aggregateId,
            Class<T> aggregateType,
            Instant fromTime,
            Instant toTime,
            Duration snapshotInterval) {

        List<StateSnapshot<T>> evolution = new ArrayList<>();

        Instant currentTime = fromTime;
        while (!currentTime.isAfter(toTime)) {
            T state = getAggregateStateAt(aggregateId, aggregateType, currentTime);

            evolution.add(StateSnapshot.<T>builder()
                .timestamp(currentTime)
                .state(state)
                .build());

            currentTime = currentTime.plus(snapshotInterval);
        }

        return evolution;
    }

    // Audit query - who changed what when
    public List<AuditRecord> getAuditTrail(String aggregateId,
                                          Instant fromTime,
                                          Instant toTime) {

        List<DomainEvent> events = eventStore.getEventsBetween(
            aggregateId, fromTime, toTime);

        return events.stream()
            .map(event -> AuditRecord.builder()
                .aggregateId(aggregateId)
                .eventType(event.getEventType())
                .timestamp(event.getTimestamp())
                .userId(event.getUserId())
                .changes(extractChanges(event))
                .build())
            .collect(toList());
    }
}

Event Replay and System Recovery

@Component
public class EventReplayService {

    // Rebuild all projections from scratch
    public CompletableFuture<Void> rebuildAllProjections() {
        return CompletableFuture.runAsync(() -> {
            log.info("Starting full projection rebuild");

            // Clear all existing projections
            projectionRepository.deleteAll();

            // Create new consumer group for replay
            String replayGroupId = "projection-rebuild-" + System.currentTimeMillis();

            try (KafkaConsumer<String, DomainEvent> replayConsumer =
                 createReplayConsumer(replayGroupId)) {

                // Seek to beginning of all partitions
                List<TopicPartition> partitions = replayConsumer.partitionsFor("events")
                    .stream()
                    .map(info -> new TopicPartition("events", info.partition()))
                    .collect(toList());

                replayConsumer.assign(partitions);
                replayConsumer.seekToBeginning(partitions);

                // Process all events
                long processedEvents = 0;
                boolean reachedEnd = false;

                while (!reachedEnd) {
                    ConsumerRecords<String, DomainEvent> records =
                        replayConsumer.poll(Duration.ofSeconds(1));

                    if (records.isEmpty()) {
                        reachedEnd = true;
                    } else {
                        for (ConsumerRecord<String, DomainEvent> record : records) {
                            projectionBuilder.processEvent(record.value());
                            processedEvents++;

                            if (processedEvents % 10000 == 0) {
                                log.info("Processed {} events", processedEvents);
                            }
                        }
                    }
                }

                log.info("Projection rebuild completed. Processed {} events",
                        processedEvents);
            }
        });
    }

    // Selective replay for specific aggregates
    public CompletableFuture<Void> replayAggregateProjections(
            List<String> aggregateIds) {

        return CompletableFuture.runAsync(() -> {
            for (String aggregateId : aggregateIds) {
                List<DomainEvent> events = eventStore.getAllEvents(aggregateId);

                for (DomainEvent event : events) {
                    projectionBuilder.processEvent(event);
                }
            }
        });
    }
}

Schema Evolution and Versioning

Event Schema Evolution Strategies

public class EventSchemaEvolution {

    // Strategy 1: Additive changes (backward compatible)
    @JsonIgnoreProperties(ignoreUnknown = true)  // Ignore new fields in old consumers
    public class UserCreatedEventV2 extends UserCreatedEvent {
        private String phoneNumber;     // New optional field
        private Instant lastLogin;      // New optional field

        // Old consumers can still process V2 events
        // New consumers can process V1 and V2 events
    }

    // Strategy 2: Event upcasting for breaking changes
    @Component
    public class EventUpcastingService {

        public DomainEvent upcastEvent(DomainEvent event) {
            return switch (event.getEventType()) {
                case "UserCreatedV1" -> upcastUserCreatedV1ToV2((UserCreatedEventV1) event);
                case "EmailChangedV1" -> upcastEmailChangedV1ToV2((EmailChangedEventV1) event);
                default -> event; // No upcasting needed
            };
        }

        private UserCreatedEventV2 upcastUserCreatedV1ToV2(UserCreatedEventV1 oldEvent) {
            return UserCreatedEventV2.builder()
                .aggregateId(oldEvent.getAggregateId())
                .email(oldEvent.getEmail())
                .phoneNumber(null)          // Default value for new field
                .lastLogin(null)            // Default value for new field
                .timestamp(oldEvent.getTimestamp())
                .userId(oldEvent.getUserId())
                .aggregateVersion(oldEvent.getAggregateVersion())
                .build();
        }
    }

    // Strategy 3: Event versioning metadata
    public abstract class VersionedDomainEvent {
        @JsonProperty("eventVersion")
        private final int eventVersion = getEventVersion();

        public abstract int getEventVersion();

        public boolean isCompatibleWith(int consumerVersion) {
            return this.eventVersion <= consumerVersion;
        }
    }
}

Performance Optimization and Scaling

High-Performance Event Processing

@Component
public class PerformanceOptimizedProjectionBuilder {

    private final ExecutorService projectionExecutor =
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    @KafkaListener(topics = "events",
                  concurrency = "8")  // Parallel processing
    public void processEventsConcurrently(List<DomainEvent> events) {

        // Group events by aggregate to maintain ordering
        Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
            .collect(groupingBy(DomainEvent::getAggregateId));

        // Process each aggregate's events sequentially, aggregates in parallel
        List<CompletableFuture<Void>> futures = eventsByAggregate.entrySet()
            .stream()
            .map(entry -> CompletableFuture.runAsync(
                () -> processAggregateEvents(entry.getKey(), entry.getValue()),
                projectionExecutor))
            .collect(toList());

        // Wait for all aggregates to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();
    }

    // Batch projection updates for performance
    @Transactional
    public void processAggregateEvents(String aggregateId, List<DomainEvent> events) {
        // Load current projection state
        UserView currentView = userViewRepository.findById(aggregateId)
            .orElse(null);

        // Apply all events in batch
        UserView updatedView = currentView;
        for (DomainEvent event : events) {
            updatedView = applyEventToProjection(updatedView, event);
        }

        // Single database update for all events
        if (updatedView != currentView) {
            userViewRepository.save(updatedView);
        }
    }
}

This comprehensive Event Sourcing/CQRS knowledge chunk provides both strategic architectural guidance and detailed implementation patterns needed for L8_PRINCIPAL level discussions and complex system implementations. The content balances theoretical understanding with practical production patterns essential for enterprise-grade event-driven systems.

See Also: [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Microservices_Saga_Patterns]], [[Advanced_Database_Patterns]]

Concepts covered in this article