Event sourcing with Spring Boot, Kafka and jOOQ

Events around

One way of building resilient, scalable, and maintainable applications is use event sourcing. The event sourcing become a powerful architectural pattern. In classical approach we have only current state of application. On other hands in the event sourcing persist every state change as event. With this method we have complete history of changes. The complete history of changes easing debug, audit, and even reconstruction application state.

In this article we’ll walk through implementation of event sourcing pattern. We’ll develop Spring Boot application for a food delivery domain. During development we’ll use Kafka as our event streaming platform. For robust data serialization we’ll use Apache Avro and Schema Registry. jOOQ for database operations. We’ll explore both theoretical aspects and practical implementation. By the end of article you will have understanding of how to apply event sourcing in you own projects.

Understanding event sourcing

First, let’s understand what is event sourcing and why it’s especially powerful.

In traditional CRUD (Create, Read, Update, and Delete) application, we store the current state straight in database. When we need to update an object, we overwrite it previous state. The direct approach has limitation. Once history of changes lost, it will hard to understand how objects reached its current state.

Event sourcing takes a different approach:

  1. Every change to the application state is captured as an event.
  2. These events are stored in append — only event store.
  3. The current state is reconstructed by replacing these events.

When we add Kafka, Avro, Schema Registry, and jOOQ to this architecture, we gain benefits.

  1. Kafka provides distributed, fault — tolerant event streaming platform.
  2. Avro offers a compact, binary serialization format with robust schema support.
  3. Schema Registry ensures schema compatibility across producers and consumers.
  4. jOOQ provides type-safe SQL generation and execution, giving us more control over database operations than JPA.

For example, in a food delivery application, event like OrderCreated, OrderAccepted, or OrderDeliveredare published to Kafka topics using Avro-defined schemas. These events are also stored in a database.

Key benefits of this approach

  • Complete audit trail: You have record of every change made to your system.
  • Schema evolution: Avro and Schema Registry enables safe evolution of you event schemas.
  • Type-safe SQL: jOOQ provides compile-time checking of SQL queries.
  • Compact serialization: Avro provides efficient binary serialization compared to JSON.
  • Strong typing: Avro schemas and jOOQ provide type safety for your events.
  • Real-time processing: Service can react to events immediately as they occur.
  • Scalability: Kafka’s distributed nature allows for high-throughput event processing.
  • SQL control: jOOQ gives you precise control over your SQL without sacrificing type safety.

Challenges to consider

  • Operational complexity: Managing Kafka cluster and Schema Registry add operational overhead.
  • Learning curve: Understanding Avro schemas, jOOQ, and event sourcing patterns takes time.
  • Schema design: Careful planning is required for schema evolution strategies.
  • Eventual consistency: The current state might not immediately available after events are stored.

Implement event sourcing

Now, let’s build a Spring Boot application that implements event sourcing for a food delivery service using all mentioned tools.

Setting up the project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>dev.vrnsky</groupId>
    <artifactId>event-sourcing</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>event-sourcing</name>
    <description>event-sourcing</description>
    <properties>
        <java.version>21</java.version>
        <avro.version>1.11.1</avro.version>
        <confluent.version>7.4.0</confluent.version>
        <jooq.version>3.20.2</jooq.version>
        <spring.datasource.url>jdbc:postgresql://localhost:51432/food_delivery</spring.datasource.url>
        <spring.datasource.username>postgres</spring.datasource.username>
        <spring.datasource.password>postgres</spring.datasource.password>
        <liquibase.version>4.31.1</liquibase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jooq</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Avro and Schema Registry -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>

        <!-- Database -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>
        <dependency>
            <groupId>org.liquibase</groupId>
            <artifactId>liquibase-core</artifactId>
            <version>${liquibase.version}</version>
        </dependency>

        <!-- Utilities -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>26.0.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>1.18.36</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <!-- Avro Schema Generation -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                            <outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
                            <stringType>String</stringType>
                            <imports>
                                <import>${project.basedir}/src/main/avro/BaseEvent.avsc</import>
                            </imports>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- Liquibase Plugin -->
            <plugin>
                <groupId>org.liquibase</groupId>
                <artifactId>liquibase-maven-plugin</artifactId>
                <version>${liquibase.version}</version>
                <configuration>
                    <changeLogFile>src/main/resources/db/changelog.yml</changeLogFile>
                    <url>${spring.datasource.url}</url>
                    <username>${spring.datasource.username}</username>
                    <password>${spring.datasource.password}</password>
                </configuration>
                <executions>
                    <execution>
                        <phase>initialize</phase>
                        <goals>
                            <goal>update</goal>
                        </goals>
                    </execution>
                </executions>
                <dependencies>
                    <dependency>
                        <groupId>org.postgresql</groupId>
                        <artifactId>postgresql</artifactId>
                        <version>${postgresql.version}</version>
                    </dependency>
                </dependencies>
            </plugin>

            <!-- jOOQ Code Generation -->
            <plugin>
                <groupId>org.jooq</groupId>
                <artifactId>jooq-codegen-maven</artifactId>
                <version>${jooq.version}</version>
                <executions>
                    <execution>
                        <id>jooq-codegen</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>generate</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <jdbc>
                        <driver>org.postgresql.Driver</driver>
                        <url>${spring.datasource.url}</url>
                        <user>${spring.datasource.username}</user>
                        <password>${spring.datasource.password}</password>
                    </jdbc>
                    <generator>
                        <generate>
                            <tables>true</tables>
                        </generate>
                        <database>
                            <name>org.jooq.meta.postgres.PostgresDatabase</name>
                            <includes>.*</includes>
                            <excludes>databasechangelog|databasechangeloglock</excludes>
                            <inputSchema>public</inputSchema>
                        </database>
                        <target>
                            <packageName>dev.vrnsky.eventsourcing.db.generated</packageName>
                            <directory>${project.build.directory}/generated-sources/jooq</directory>
                        </target>
                    </generator>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.postgresql</groupId>
                        <artifactId>postgresql</artifactId>
                        <version>${postgresql.version}</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

</project>

Defining Avro Schemas

Let’s define our Avro schemas for the food delivery events. We’ll place these in src/main/avrodirectory. First, let’s create base event schema ( src/main/avro/BaseEvent.avsc)

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "BaseEvent",
  "fields": [
    {
      "name": "eventId",
      "type": "string"
    },
    {
      "name": "aggregateId",
      "type": "string"
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "eventType",
      "type": "string"
    }
  ]
}

Now let’s define specific event schemas src/main/avro/OrderCreatedEvent.avsc 

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderCreatedEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "restaurantId",
      "type": "string"
    },
    {
      "name": "deliveryAddress",
      "type": "string"
    },
    {
      "name": "deliveryNotes",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

src/main/avro/OrderItemAddedEvent.avscs

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderItemAddedEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    },
    {
      "name": "itemId",
      "type": "string"
    },
    {
      "name": "itemName",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "price",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    }
  ]
}

src/main/avro/OrderAcceptedEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderAcceptedEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    }
  ]
}

src/main/avro/FoodPreparationStartedEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "FoodPreparationStartedEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    }
  ]
}

src/main/avro/FoodReadyEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "FoodReadyEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    }
  ]
}

src/main/avro/OrderPickedUpEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderPickedUpEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    },
    {
      "name": "driverId",
      "type": "string"
    }
  ]
}

src/main/avro/OrderDeliveredEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderDeliveredEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    },
    {
      "name": "deliveryTime",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

src/main/avro/OrderCanelledEvent.avsc

{
  "namespace": "dev.vrnsky.eventsourcing.event",
  "type": "record",
  "name": "OrderCancelledEvent",
  "fields": [
    {
      "name": "base",
      "type": "dev.vrnsky.eventsourcing.event.BaseEvent"
    },
    {
      "name": "reason",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Database schema setup with Liquibase

Let’s setup our database using Liquibase migrations. Create file at src/main/resource/db/migrations/create-event-store.yml

databaseChangeLog:
  - changeSet:
      id: create-event-store-table
      author: Egor Voronianskii
      preConditions:
        - onFail: MARK_RAN
        - not:
            tableExists:
              tableName: event_store
      changes:
        - createTable:
            tableName: event_store
            columns:
              - column:
                  name: id
                  type: UUID
                  constraints:
                    primaryKey: true
                    nullable: false
              - column:
                  name: aggregate_id
                  type: UUID
                  constraints:
                    nullable: false
              - column:
                  name: event_type
                  type: VARCHAR(255)
                  constraints:
                    nullable: false
              - column:
                  name: event_data
                  type: BYTEA
                  constraints:
                    nullable: false
              - column:
                  name: timestamp
                  type: TIMESTAMP
                  constraints:
                    nullable: false
              - column:
                  name: sequence_number
                  type: BIGINT
                  autoIncrement: true
                  constraints:
                    nullable: false
        - createIndex:
            indexName: idx_event_store_aggregate_id
            tableName: event_store
            columns:
              - column:
                  name: aggregate_id
        - createIndex:
            indexName: idx_event_store_timestamp
            tableName: event_store
            columns:
              - column:
                  name: timestamp
        - createIndex:
            indexName: idx_event_store_event_type
            tableName: event_store
            columns:
              - column:
                  name: event_type

Create changelog file in src/main/resources/db/changelog.yml

databaseChangeLog:
  - includeAll:
      path: migrations/
      relativeToChangelogFile: true
      errorIfMissingOrEmpty: false

Defining the domain model

Next, let’s define our domain model for the food delivery service:

package dev.vrnsky.eventsourcing.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderItem {
    private String itemId;
    private String itemName;
    private int quantity;
    private BigDecimal price;
}
package dev.vrnsky.eventsourcing.domain;

enum OrderStatus {
    CREATED,
    ACCEPTED,
    PREPARING,
    READY,
    IN_DELIVERY,
    DELIVERED,
    CANCELLED
}

Kafka and Schema Registry configuration

Now, let’s configure Kafka and Schema Registry

package dev.vrnsky.eventsourcing.config;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.schema-registry-url}")
    private String schemaRegistryUrl;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic orderEventsTopic() {
        return new NewTopic("order-events", 3, (short) 1);
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        configProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

And the consumer configuration

import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.schema-registry-url}")
    private String schemaRegistryUrl;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "food-delivery-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

jOOQ Configuration

Let’s configure jOOQ for our application

package dev.vrnsky.eventsourcing.config;

import org.jooq.SQLDialect;
import org.jooq.impl.DataSourceConnectionProvider;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;

import javax.sql.DataSource;

@Configuration
public class JooqConfig {

    @Bean
    public DataSourceConnectionProvider connectionProvider(DataSource dataSource) {
        return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
    }

    @Bean
    public DefaultDSLContext dsl(DataSourceConnectionProvider connectionProvider) {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
        jooqConfiguration.set(connectionProvider);
        jooqConfiguration.set(SQLDialect.POSTGRES);
        return new DefaultDSLContext(jooqConfiguration);
    }
}

Event repository with jOOQ

package dev.vrnsky.eventsourcing.repository;

import static dev.vrnsky.eventsourcing.db.generated.Tables.EVENT_STORE;

import dev.vrnsky.eventsourcing.db.generated.tables.records.EventStoreRecord;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Repository;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

@Repository
@RequiredArgsConstructor
public class EventRepository {
    private final DSLContext dsl;
    
    public void saveEvent(UUID id, UUID aggregateId, String eventType, byte[] eventData, Instant timestamp) {
        dsl.insertInto(EVENT_STORE)
            .set(EVENT_STORE.ID, id)
            .set(EVENT_STORE.AGGREGATE_ID, aggregateId)
            .set(EVENT_STORE.EVENT_TYPE, eventType)
            .set(EVENT_STORE.EVENT_DATA, eventData)
            .set(EVENT_STORE.TIMESTAMP, timestamp)
            .execute();
    }
    
    public List<EventStoreRecord> findByAggregateId(UUID aggregateId) {
        return dsl.selectFrom(EVENT_STORE)
                 .where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
                 .orderBy(EVENT_STORE.SEQUENCE_NUMBER.asc())
                 .fetch();
    }
    
    public List<EventStoreRecord> findByEventType(String eventType) {
        return dsl.selectFrom(EVENT_STORE)
                 .where(EVENT_STORE.EVENT_TYPE.eq(eventType))
                 .orderBy(EVENT_STORE.TIMESTAMP.asc())
                 .fetch();
    }
    
    public List<EventStoreRecord> findByAggregateIdAndEventType(UUID aggregateId, String eventType) {
        return dsl.selectFrom(EVENT_STORE)
                 .where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
                 .and(EVENT_STORE.EVENT_TYPE.eq(eventType))
                 .orderBy(EVENT_STORE.SEQUENCE_NUMBER.asc())
                 .fetch();
    }
    
    public List<EventStoreRecord> findAllOrderByTimestamp() {
        return dsl.selectFrom(EVENT_STORE)
                 .orderBy(EVENT_STORE.TIMESTAMP.asc())
                 .fetch();
    }
}

Event service

Now, let’s create an Event Service that will handle the common event operations:

package dev.vrnsky.eventsourcing.service;

import dev.vrnsky.eventsourcing.db.generated.tables.records.EventStoreRecord;
import dev.vrnsky.eventsourcing.event.BaseEvent;
import dev.vrnsky.eventsourcing.repository.EventRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayOutputStream;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class EventService {
    private final EventRepository eventRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public void saveAndPublishEvent(SpecificRecord event) {
        try {
            // Extract base event information
            BaseEvent baseEvent = extractBaseEvent(event);
            UUID eventId = UUID.fromString(baseEvent.getEventId());
            UUID aggregateId = UUID.fromString(baseEvent.getAggregateId());
            String eventType = event.getClass().getSimpleName();
            Instant timestamp = Instant.ofEpochMilli(baseEvent.getTimestamp().toEpochMilli());

            // Serialize event for storage
            byte[] serializedEvent = serializeEvent(event);
            
            // Save to event store
            eventRepository.saveEvent(eventId, aggregateId, eventType, serializedEvent, timestamp);
            
            // Publish to Kafka
            kafkaTemplate.send("order-events", aggregateId.toString(), event);
            
            log.info("Event {} saved and published for aggregate {}", eventType, aggregateId);
        } catch (Exception e) {
            log.error("Error saving and publishing event", e);
            throw new RuntimeException("Error processing event", e);
        }
    }

    @Transactional(readOnly = true)
    public List<SpecificRecord> getEvents(UUID aggregateId) {
        return eventRepository.findByAggregateId(aggregateId)
                .stream()
                .map(this::deserializeEvent)
                .collect(Collectors.toList());
    }
    
    private BaseEvent extractBaseEvent(SpecificRecord event) {
        try {
            // This assumes each event has a 'base' field that is a BaseEvent
            return (BaseEvent) event.get("base");
        } catch (Exception e) {
            throw new RuntimeException("Error extracting base event", e);
        }
    }
    
    private byte[] serializeEvent(SpecificRecord event) {
        try {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
            DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(event.getSchema());
            writer.write(event, encoder);
            encoder.flush();
            return outputStream.toByteArray();
        } catch (Exception e) {
            throw new RuntimeException("Error serializing event", e);
        }
    }
    
    private SpecificRecord deserializeEvent(EventStoreRecord record) {
        try {
            // Determine the event class based on the stored event type
            Class<? extends SpecificRecord> eventClass = getEventClass(record.getEventType());
            
            // Create a reader for this specific event type
            SpecificDatumReader<? extends SpecificRecord> reader = new SpecificDatumReader<>(eventClass);
            
            // Deserialize the event data
            return reader.read(null, DecoderFactory.get().binaryDecoder(record.getEventData(), null));
        } catch (Exception e) {
            throw new RuntimeException("Error deserializing event", e);
        }
    }
    
    @SuppressWarnings("unchecked")
    private Class<? extends SpecificRecord> getEventClass(String eventType) {
        try {
            return (Class<? extends SpecificRecord>) Class.forName("dev.vrnsky.eventsourcing.event." + eventType);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Event type not found: " + eventType, e);
        }
    }
}

Event factory

Let’s create a factory class to help create our Avro events.

package dev.vrnsky.eventsourcing.service;

import dev.vrnsky.eventsourcing.event.*;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.UUID;

@Component
public class EventFactory {

    public OrderCreatedEvent createOrderCreatedEvent(UUID orderId, String customerId, String restaurantId,
                                                     String deliveryAddress, String deliveryNotes) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderCreatedEvent.class.getSimpleName());

        return OrderCreatedEvent.newBuilder()
                .setBase(baseEvent)
                .setCustomerId(customerId).setRestaurantId(restaurantId)
                .setDeliveryAddress(deliveryAddress)
                .setDeliveryNotes(deliveryNotes)
                .build();
    }

    public OrderItemAddedEvent createOrderItemAddedEvent(UUID orderId, String itemId, String itemName,
                                                         int quantity, BigDecimal price) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderItemAddedEvent.class.getSimpleName());

        // Convert BigDecimal to Avro's decimal representation (ByteBuffer)
        ByteBuffer priceBytes = ByteBuffer.wrap(price.unscaledValue().toByteArray());

        return OrderItemAddedEvent.newBuilder()
                .setBase(baseEvent)
                .setItemId(itemId)
                .setItemName(itemName)
                .setQuantity(quantity)
                .setPrice(priceBytes)
                .build();
    }

    public OrderAcceptedEvent createOrderAcceptedEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderAcceptedEvent.class.getSimpleName());

        return OrderAcceptedEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public FoodPreparationStartedEvent createFoodPreparationStartedEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, FoodPreparationStartedEvent.class.getSimpleName());

        return FoodPreparationStartedEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public FoodReadyEvent createFoodReadyEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, FoodReadyEvent.class.getSimpleName());

        return FoodReadyEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public OrderPickedUpEvent createOrderPickedUpEvent(UUID orderId, String driverId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderPickedUpEvent.class.getSimpleName());

        return OrderPickedUpEvent.newBuilder()
                .setBase(baseEvent)
                .setDriverId(driverId)
                .build();
    }

    public OrderDeliveredEvent createOrderDeliveredEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderDeliveredEvent.class.getSimpleName());
        Instant now = Instant.now();

        return OrderDeliveredEvent.newBuilder()
                .setBase(baseEvent)
                .setDeliveryTime(now)
                .build();
    }

    public OrderCancelledEvent createOrderCancelledEvent(UUID orderId, String reason) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderCancelledEvent.class.getSimpleName());

        return OrderCancelledEvent.newBuilder()
                .setBase(baseEvent)
                .setReason(reason)
                .build();
    }

    private BaseEvent createBaseEvent(UUID aggregateId, String eventType) {
        return BaseEvent.newBuilder()
                .setEventId(UUID.randomUUID().toString())
                .setAggregateId(aggregateId.toString())
                .setTimestamp(Instant.now())
                .setEventType(eventType)
                .build();
    }
}

Food Order Service

Now, let’s create a food order service that uses our event sourcing architecture.

package dev.vrnsky.eventsourcing.service;

import dev.vrnsky.eventsourcing.event.*;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.UUID;

@Component
public class EventFactory {

    public OrderCreatedEvent createOrderCreatedEvent(UUID orderId, String customerId, String restaurantId,
                                                     String deliveryAddress, String deliveryNotes) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderCreatedEvent.class.getSimpleName());

        return OrderCreatedEvent.newBuilder()
                .setBase(baseEvent)
                .setCustomerId(customerId).setRestaurantId(restaurantId)
                .setDeliveryAddress(deliveryAddress)
                .setDeliveryNotes(deliveryNotes)
                .build();
    }

    public OrderItemAddedEvent createOrderItemAddedEvent(UUID orderId, String itemId, String itemName,
                                                         int quantity, BigDecimal price) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderItemAddedEvent.class.getSimpleName());

        // Convert BigDecimal to Avro's decimal representation (ByteBuffer)
        ByteBuffer priceBytes = ByteBuffer.wrap(price.unscaledValue().toByteArray());

        return OrderItemAddedEvent.newBuilder()
                .setBase(baseEvent)
                .setItemId(itemId)
                .setItemName(itemName)
                .setQuantity(quantity)
                .setPrice(priceBytes)
                .build();
    }

    public OrderAcceptedEvent createOrderAcceptedEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderAcceptedEvent.class.getSimpleName());

        return OrderAcceptedEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public FoodPreparationStartedEvent createFoodPreparationStartedEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, FoodPreparationStartedEvent.class.getSimpleName());

        return FoodPreparationStartedEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public FoodReadyEvent createFoodReadyEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, FoodReadyEvent.class.getSimpleName());

        return FoodReadyEvent.newBuilder()
                .setBase(baseEvent)
                .build();
    }

    public OrderPickedUpEvent createOrderPickedUpEvent(UUID orderId, String driverId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderPickedUpEvent.class.getSimpleName());

        return OrderPickedUpEvent.newBuilder()
                .setBase(baseEvent)
                .setDriverId(driverId)
                .build();
    }

    public OrderDeliveredEvent createOrderDeliveredEvent(UUID orderId) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderDeliveredEvent.class.getSimpleName());
        Instant now = Instant.now();

        return OrderDeliveredEvent.newBuilder()
                .setBase(baseEvent)
                .setDeliveryTime(now)
                .build();
    }

    public OrderCancelledEvent createOrderCancelledEvent(UUID orderId, String reason) {
        BaseEvent baseEvent = createBaseEvent(orderId, OrderCancelledEvent.class.getSimpleName());

        return OrderCancelledEvent.newBuilder()
                .setBase(baseEvent)
                .setReason(reason)
                .build();
    }

    private BaseEvent createBaseEvent(UUID aggregateId, String eventType) {
        return BaseEvent.newBuilder()
                .setEventId(UUID.randomUUID().toString())
                .setAggregateId(aggregateId.toString())
                .setTimestamp(Instant.now())
                .setEventType(eventType)
                .build();
    }
}

Creating event consumers

Now, let’s create some Kafka consumers to reach to our events. These consumers should be in sepearate microservices, but for simplicity, we’ll keep them in the same application.

package dev.vrnsky.eventsourcing.listener;

import dev.vrnsky.eventsourcing.event.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class AnalyticsConsumer {
    
    private final Map<String, AtomicInteger> restaurantOrderCounts = new ConcurrentHashMap<>();
    private final Map<String, AtomicInteger> menuItemPopularity = new ConcurrentHashMap<>();
    private final Map<String, AtomicInteger> orderCancellationReasons = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "order-events", groupId = "analytics-service")
    public void consume(Object eventObject) {
        if (eventObject instanceof OrderCreatedEvent) {
            OrderCreatedEvent event = (OrderCreatedEvent) eventObject;
            String restaurantId = event.getRestaurantId();
            
            // Update restaurant order count
            restaurantOrderCounts.computeIfAbsent(restaurantId, k -> new AtomicInteger(0))
                    .incrementAndGet();
            
            log.info("Analytics: New order for restaurant {}. Total orders: {}", 
                    restaurantId, 
                    restaurantOrderCounts.get(restaurantId).get());
        } else if (eventObject instanceof OrderItemAddedEvent) {
            OrderItemAddedEvent event = (OrderItemAddedEvent) eventObject;
            String itemId = event.getItemId();
            
            // Update menu item popularity
            menuItemPopularity.computeIfAbsent(itemId, k -> new AtomicInteger(0))
                    .addAndGet(event.getQuantity());
            
            log.info("Analytics: Menu item {} ordered. Total ordered: {}", 
                    itemId, 
                    menuItemPopularity.get(itemId).get());
        } else if (eventObject instanceof OrderCancelledEvent) {
            OrderCancelledEvent event = (OrderCancelledEvent) eventObject;
            String reason = event.getReason() != null ? event.getReason() : "Unknown";
            
            // Track cancellation reasons
            orderCancellationReasons.computeIfAbsent(reason, k -> new AtomicInteger(0))
                    .incrementAndGet();
            
            log.info("Analytics: Order cancelled. Reason: {}. Frequency: {}", 
                    reason,
                    orderCancellationReasons.get(reason).get());
        }
    }
    
    // Methods to expose analytics data
    public Map<String, Integer> getRestaurantOrderCounts() {
        Map<String, Integer> result = new ConcurrentHashMap<>();
        restaurantOrderCounts.forEach((key, value) -> result.put(key, value.get()));
        return result;
    }
    
    public Map<String, Integer> getMenuItemPopularity() {
        Map<String, Integer> result = new ConcurrentHashMap<>();
        menuItemPopularity.forEach((key, value) -> result.put(key, value.get()));
        return result;
    }
    
    public Map<String, Integer> getOrderCancellationReasons() {
        Map<String, Integer> result = new ConcurrentHashMap<>();
        orderCancellationReasons.forEach((key, value) -> result.put(key, value.get()));
        return result;
    }
}

Create custom jOOQ queries for advanced analytics

One advantage of using jOOQ over JPA is the ability to write custom, type-safe SQL queries. Let’s add some advanced analytics capabilities to our application:

package dev.vrnsky.eventsourcing.repository;

import dev.vrnsky.eventsourcing.db.generated.tables.EventStore;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.Record2;
import org.jooq.Record4;
import org.jooq.Result;
import org.springframework.stereotype.Repository;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Repository
@RequiredArgsConstructor
public class AnalyticsRepository {
    private final DSLContext dsl;
    
    public Map<String, Long> getEventCountByType() {
        EventStore e = EventStore.EVENT_STORE;
        
        Result<Record2<String, Integer>> result = dsl
            .select(e.EVENT_TYPE, dsl.count())
            .from(e)
            .groupBy(e.EVENT_TYPE)
            .fetch();
        
        return result.stream()
            .collect(Collectors.toMap(
                r -> r.get(e.EVENT_TYPE),
                r -> r.get(dsl.count()).longValue()
            ));
    }
    
    public List<DailyOrderMetrics> getDailyOrderMetrics(LocalDate startDate, LocalDate endDate) {
        EventStore e = EventStore.EVENT_STORE;
        
        // Convert LocalDate to Instant
        Instant startInstant = startDate.atStartOfDay(ZoneId.systemDefault()).toInstant();
        Instant endInstant = endDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant();
        
        Result<Record4<LocalDate, Integer, Integer, Integer>> result = dsl
            .select(
                dsl.cast(dsl.localDate(e.TIMESTAMP), LocalDate.class).as("day"),
                dsl.countDistinct(e.AGGREGATE_ID).as("total_orders"),
                dsl.countDistinct(
                    dsl.when(e.EVENT_TYPE.eq("OrderDeliveredEvent"), e.AGGREGATE_ID)
                ).as("delivered_orders"),
                dsl.countDistinct(
                    dsl.when(e.EVENT_TYPE.eq("OrderCancelledEvent"), e.AGGREGATE_ID)
                ).as("cancelled_orders")
            )
            .from(e)
            .where(e.TIMESTAMP.between(startInstant, endInstant))
            .and(dsl.or(
                e.EVENT_TYPE.eq("OrderCreatedEvent"),
                e.EVENT_TYPE.eq("OrderDeliveredEvent"),
                e.EVENT_TYPE.eq("OrderCancelledEvent")
            ))
            .groupBy(dsl.localDate(e.TIMESTAMP))
            .orderBy(dsl.localDate(e.TIMESTAMP))
            .fetch();
        
        return result.stream()
            .map(r -> new DailyOrderMetrics(
                r.get("day", LocalDate.class),
                r.get("total_orders", Integer.class),
                r.get("delivered_orders", Integer.class),
                r.get("cancelled_orders", Integer.class)
            ))
            .collect(Collectors.toList());
    }
    
    public List<RestaurantPerformance> getRestaurantPerformance() {
        // This would require more complex query joining with a restaurants table
        // and parsing event data to extract restaurant IDs and delivery times
        // For now, we'll return a placeholder
        return List.of();
    }
    
}    @Data
    public static class RestaurantPerformance {
        private final String restaurantId;
        private final String restaurantName;
        private final int totalOrders;
        private final BigDecimal averageDeliveryTimeMinutes;
        private final BigDecimal cancellationRatePercent;
    }    @Data
    public static class DailyOrderMetrics {
        private final LocalDate date;
        private final int totalOrders;
        private final int deliveredOrders;
        private final int cancelledOrders;
    }

REST Controller

Finally, let’s create a REST controller to expose our functionality

package dev.vrnsky.eventsourcing.controller;

import dev.vrnsky.eventsourcing.domain.FoodOrder;
import dev.vrnsky.eventsourcing.listener.AnalyticsConsumer;
import dev.vrnsky.eventsourcing.repository.AnalyticsRepository;
import dev.vrnsky.eventsourcing.service.FoodOrderService;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
public class FoodOrderController {
    private final FoodOrderService foodOrderService;
    private final AnalyticsConsumer analyticsConsumer;
    private final AnalyticsRepository analyticsRepository;

    @PostMapping
    public ResponseEntity<UUID> createOrder(@RequestBody CreateOrderRequest request) {
        UUID orderId = foodOrderService.createOrder(
            request.getCustomerId(), 
            request.getRestaurantId(), 
            request.getDeliveryAddress(), 
            request.getDeliveryNotes()
        );
        return ResponseEntity.ok(orderId);
    }

    @GetMapping("/{orderId}")
    public ResponseEntity<FoodOrder> getOrder(@PathVariable UUID orderId) {
        FoodOrder order = foodOrderService.getOrder(orderId);
        return ResponseEntity.ok(order);
    }

    @PostMapping("/{orderId}/items")
    public ResponseEntity<Void> addOrderItem(@PathVariable UUID orderId, @RequestBody AddItemRequest request) {
        foodOrderService.addOrderItem(
            orderId, 
            request.getItemId(), 
            request.getItemName(), 
            request.getQuantity(), 
            request.getPrice()
        );
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/accept")
    public ResponseEntity<Void> acceptOrder(@PathVariable UUID orderId) {
        foodOrderService.acceptOrder(orderId);
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/prepare")
    public ResponseEntity<Void> startFoodPreparation(@PathVariable UUID orderId) {
        foodOrderService.startFoodPreparation(orderId);
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/ready")
    public ResponseEntity<Void> markFoodReady(@PathVariable UUID orderId) {
        foodOrderService.markFoodReady(orderId);
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/pickup")
    public ResponseEntity<Void> pickUpOrder(@PathVariable UUID orderId, @RequestBody PickupRequest request) {
        foodOrderService.pickUpOrder(orderId, request.getDriverId());
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/deliver")
    public ResponseEntity<Void> deliverOrder(@PathVariable UUID orderId) {
        foodOrderService.deliverOrder(orderId);
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{orderId}/cancel")
    public ResponseEntity<Void> cancelOrder(@PathVariable UUID orderId, @RequestBody CancelRequest request) {
        foodOrderService.cancelOrder(orderId, request.getReason());
        return ResponseEntity.ok().build();
    }
    
    @GetMapping("/analytics/restaurant-orders")
    public ResponseEntity<Map<String, Integer>> getRestaurantOrderCounts() {
        return ResponseEntity.ok(analyticsConsumer.getRestaurantOrderCounts());
    }
    
    @GetMapping("/analytics/popular-items")
    public ResponseEntity<Map<String, Integer>> getPopularItems() {
        return ResponseEntity.ok(analyticsConsumer.getMenuItemPopularity());
    }
    
    @GetMapping("/analytics/cancellation-reasons")
    public ResponseEntity<Map<String, Integer>> getCancellationReasons() {
        return ResponseEntity.ok(analyticsConsumer.getOrderCancellationReasons());
    }
    
    @GetMapping("/analytics/event-counts")
    public ResponseEntity<Map<String, Long>> getEventCounts() {
        return ResponseEntity.ok(analyticsRepository.getEventCountByType());
    }
    
    @GetMapping("/analytics/daily-metrics")
    public ResponseEntity<List<AnalyticsRepository.DailyOrderMetrics>> getDailyMetrics(
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate startDate,
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate endDate) {
        return ResponseEntity.ok(analyticsRepository.getDailyOrderMetrics(startDate, endDate));
    }

    @Data
    public static class CreateOrderRequest {
        private String customerId;
        private String restaurantId;
        private String deliveryAddress;
        private String deliveryNotes;
    }

    @Data
    public static class AddItemRequest {
        private String itemId;
        private String itemName;
        private int quantity;
        private BigDecimal price;
    }

    @Data
    public static class PickupRequest {
        private String driverId;
    }

    @Data
    public static class CancelRequest {
        private String reason;
    }
}

Application properties

Let’s set up our application properties

server:
  port: 8080
spring:
  datasource:
    url: jdbc:postgresql://localhost:51432/food_delivery
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
  kafka:
    bootstrap-servers: localhost:9092
    schema-registry-url: http://localhost:8081
  liquibase:
    change-log: classpath:db/changelog.yml

Running the application

Before running the aplication, you’ll need to have:

  1. PostgreSQL database running.
  2. A Kafka cluster and Schema Registry.
  3. Kafka UI.

You can use Docker compose to set this. Create a docker-compose.yml file:

version: '3'
services:
  postgres:
    image: postgres:17
    container_name: postgres
    ports:
      - "51432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: food_delivery
    volumes:
      - postgres-container-data:/var/lib/postgresql/data

  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-ui:
    image: kafbat/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
      - schema-registry
    ports:
      - "8082:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081

volumes:
  postgres-container-data:

Start the Docker Compose environment

docker compose up -d 

Testing the application

Let’s test our application using cURL commands

Create a food order

This should return a UUID like 123e4567-e89b-12d3-a456-426614174000

Add item to the order

curl -X POST http://localhost:8080/orders/123e4567-e89b-12d3-a456-426614174000/items -H "Content-Type: application/json" -d '{
  "itemId": "pizza01",
  "itemName": "Large Pepperoni Pizza",
  "quantity": 1,
  "price": 12.99
}'

View the order

curl http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46 

Accept the order

curl -X POST http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46

Start food preparation

curl -X POST http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46

Make food as ready

curl -X POST http://localhost:8080/orders/8cc89b78-2809-458f-171-0cccba16cf46

Mark order as picked up by drive

curl -X POST http://localhost:8080/orders/8cc89b78-2809-458f-171-0cccba16cf46/items -H "Content-Type: application/json" -d '{
  "itemId": "pizza01",
  "itemName": "Large Pepperoni Pizza",
  "quantity": 1,
  "price": 12.99
}'

Mark order as delivered

curl -X POST http://localhost:8080/orders/8cc89b78-2809-458f-171-0cccba16cf46/devlier

Check analytics

curl http://localhost:8080/orders/analytics/event-counts
curl http://localhost:8080/orders/analytics/daily-metrics?startDate=2023-01-01&endDate=2023-12-31

Benefits of using jOOQ over JPA

In our implementation, we’ve chose jOOQ over JPA, which offer several benefits:

  1. Type-safe SQL: jOOQ generates Java classes based in your database schema, enabling type-safe queries that are checked at compile time. This reduces the risk of runtime SQL errors.
  2. Control over SQL: Unlike JPA, which abstracts away SQL, jOOQ, gives you prices control over your SQL queries while still providing a fluent Java API.
  3. Performance optimization: jOOQ allows you to write highly optimized SQL queries, leveraging database-specific features.
  4. SQL transparency: With jOOQ, you always know exactly what SQL is being executed, which can be crucial for debugging and performance tuning.
  5. Complex queries: jOOQ excels at handling complex queries, joins, and subqueries that might be awkward to express in JPA.

Advanced event sourcing concepts

Event versioning

As your application evolves, your event schemas might change. When using Avro and Schema Registry, you can handle event versioning in a controlled way. Here’s an example of evolving event schema.

Snapshotting

For performance optimization, we can set up snapshotting to avoid replaying all events when reconstructing an aggregate.

Event sourcing with CQRS

Event sourcing is often used with the Command Query Responsibility Segregation (CQRS) pattern. CQRS separates the command (write) operations form the query (read) operations, allowing each to be optimized independently.

Conclusion

In this article, we’ve explored the implementation of event sourcing in a Spring Boot application using Kafka for event streaming. Avro with Schema Registry for robust data serialization, and jOOQ for type-safe database operations. We’ve built a food delivery service that tracks order through a series of events, allowing us to reconstruct the state of an order at any point.

The combination of event sourcing, Kafka, Avro, and jOOQ provides a powerful architecture for building scalable, resilient, and maintainable applications. This approach offers following benefits:

  • A complete audit trail of all state changes.
  • Type-safe SQL queries with jOOQ.
  • Schema evolution capabilities with Avro and Schema Registry
  • Real-time event processing with Kafka.
  • The ability to create specialized read models for different use cases.

However, it also introduces complexity, including operational overhead for managing Kafka and Schema Registry, the need for careful schema evolution strategies, and learning curve for the various technologies involved.

As with any architectural pattern, consider your specific requirements carefully before adopting it. For applications that need strong audit capabilities, historical state reconstruction, or complex event processing, event sourcing with Kafka, Avro, and jOOQ can be powerful approach.

References

  1. Apache Kafka
  2. Apache Avro
  3. Confluent Schema Registry
  4. Kafbat Kafka UI
  5. jOOQ
  6. Spring Boot
  7. Spring Boot for Apache Kafka
  8. Event Sourcing Pattern
  9. CQRS Pattern

Subscribe to Egor Voronianskii | Java Development and whatsoever

Sign up now to get access to the library of members-only issues.
Jamie Larson
Subscribe