Event sourcing with Spring Boot, Kafka and jOOQ

One way of building resilient, scalable, and maintainable applications is use event sourcing. The event sourcing become a powerful architectural pattern. In classical approach we have only current state of application. On other hands in the event sourcing persist every state change as event. With this method we have complete history of changes. The complete history of changes easing debug, audit, and even reconstruction application state.
In this article we’ll walk through implementation of event sourcing pattern. We’ll develop Spring Boot application for a food delivery domain. During development we’ll use Kafka as our event streaming platform. For robust data serialization we’ll use Apache Avro and Schema Registry. jOOQ for database operations. We’ll explore both theoretical aspects and practical implementation. By the end of article you will have understanding of how to apply event sourcing in you own projects.
Understanding event sourcing
First, let’s understand what is event sourcing and why it’s especially powerful.
In traditional CRUD (Create, Read, Update, and Delete) application, we store the current state straight in database. When we need to update an object, we overwrite it previous state. The direct approach has limitation. Once history of changes lost, it will hard to understand how objects reached its current state.
Event sourcing takes a different approach:
- Every change to the application state is captured as an event.
- These events are stored in append — only event store.
- The current state is reconstructed by replacing these events.
When we add Kafka, Avro, Schema Registry, and jOOQ to this architecture, we gain benefits.
- Kafka provides distributed, fault — tolerant event streaming platform.
- Avro offers a compact, binary serialization format with robust schema support.
- Schema Registry ensures schema compatibility across producers and consumers.
- jOOQ provides type-safe SQL generation and execution, giving us more control over database operations than JPA.
For example, in a food delivery application, event like OrderCreated
, OrderAccepted
, or OrderDelivered
are published to Kafka topics using Avro-defined schemas. These events are also stored in a database.
Key benefits of this approach
- Complete audit trail: You have record of every change made to your system.
- Schema evolution: Avro and Schema Registry enables safe evolution of you event schemas.
- Type-safe SQL: jOOQ provides compile-time checking of SQL queries.
- Compact serialization: Avro provides efficient binary serialization compared to JSON.
- Strong typing: Avro schemas and jOOQ provide type safety for your events.
- Real-time processing: Service can react to events immediately as they occur.
- Scalability: Kafka’s distributed nature allows for high-throughput event processing.
- SQL control: jOOQ gives you precise control over your SQL without sacrificing type safety.
Challenges to consider
- Operational complexity: Managing Kafka cluster and Schema Registry add operational overhead.
- Learning curve: Understanding Avro schemas, jOOQ, and event sourcing patterns takes time.
- Schema design: Careful planning is required for schema evolution strategies.
- Eventual consistency: The current state might not immediately available after events are stored.
Implement event sourcing
Now, let’s build a Spring Boot application that implements event sourcing for a food delivery service using all mentioned tools.
Setting up the project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>dev.vrnsky</groupId>
<artifactId>event-sourcing</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>event-sourcing</name>
<description>event-sourcing</description>
<properties>
<java.version>21</java.version>
<avro.version>1.11.1</avro.version>
<confluent.version>7.4.0</confluent.version>
<jooq.version>3.20.2</jooq.version>
<spring.datasource.url>jdbc:postgresql://localhost:51432/food_delivery</spring.datasource.url>
<spring.datasource.username>postgres</spring.datasource.username>
<spring.datasource.password>postgres</spring.datasource.password>
<liquibase.version>4.31.1</liquibase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jooq</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Avro and Schema Registry -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- Database -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
<version>${liquibase.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>26.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.36</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- Avro Schema Generation -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
<stringType>String</stringType>
<imports>
<import>${project.basedir}/src/main/avro/BaseEvent.avsc</import>
</imports>
</configuration>
</execution>
</executions>
</plugin>
<!-- Liquibase Plugin -->
<plugin>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-maven-plugin</artifactId>
<version>${liquibase.version}</version>
<configuration>
<changeLogFile>src/main/resources/db/changelog.yml</changeLogFile>
<url>${spring.datasource.url}</url>
<username>${spring.datasource.username}</username>
<password>${spring.datasource.password}</password>
</configuration>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>update</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- jOOQ Code Generation -->
<plugin>
<groupId>org.jooq</groupId>
<artifactId>jooq-codegen-maven</artifactId>
<version>${jooq.version}</version>
<executions>
<execution>
<id>jooq-codegen</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
<configuration>
<jdbc>
<driver>org.postgresql.Driver</driver>
<url>${spring.datasource.url}</url>
<user>${spring.datasource.username}</user>
<password>${spring.datasource.password}</password>
</jdbc>
<generator>
<generate>
<tables>true</tables>
</generate>
<database>
<name>org.jooq.meta.postgres.PostgresDatabase</name>
<includes>.*</includes>
<excludes>databasechangelog|databasechangeloglock</excludes>
<inputSchema>public</inputSchema>
</database>
<target>
<packageName>dev.vrnsky.eventsourcing.db.generated</packageName>
<directory>${project.build.directory}/generated-sources/jooq</directory>
</target>
</generator>
</configuration>
<dependencies>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Defining Avro Schemas
Let’s define our Avro schemas for the food delivery events. We’ll place these in src/main/avro
directory. First, let’s create base event schema ( src/main/avro/BaseEvent.avsc
)
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "BaseEvent",
"fields": [
{
"name": "eventId",
"type": "string"
},
{
"name": "aggregateId",
"type": "string"
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "eventType",
"type": "string"
}
]
}
Now let’s define specific event schemas src/main/avro/OrderCreatedEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderCreatedEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "restaurantId",
"type": "string"
},
{
"name": "deliveryAddress",
"type": "string"
},
{
"name": "deliveryNotes",
"type": [
"null",
"string"
],
"default": null
}
]
}
src/main/avro/OrderItemAddedEvent.avscs
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderItemAddedEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
},
{
"name": "itemId",
"type": "string"
},
{
"name": "itemName",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "price",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
}
]
}
src/main/avro/OrderAcceptedEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderAcceptedEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
}
]
}
src/main/avro/FoodPreparationStartedEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "FoodPreparationStartedEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
}
]
}
src/main/avro/FoodReadyEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "FoodReadyEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
}
]
}
src/main/avro/OrderPickedUpEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderPickedUpEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
},
{
"name": "driverId",
"type": "string"
}
]
}
src/main/avro/OrderDeliveredEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderDeliveredEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
},
{
"name": "deliveryTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
src/main/avro/OrderCanelledEvent.avsc
{
"namespace": "dev.vrnsky.eventsourcing.event",
"type": "record",
"name": "OrderCancelledEvent",
"fields": [
{
"name": "base",
"type": "dev.vrnsky.eventsourcing.event.BaseEvent"
},
{
"name": "reason",
"type": [
"null",
"string"
],
"default": null
}
]
}
Database schema setup with Liquibase
Let’s setup our database using Liquibase migrations. Create file at src/main/resource/db/migrations/create-event-store.yml
databaseChangeLog:
- changeSet:
id: create-event-store-table
author: Egor Voronianskii
preConditions:
- onFail: MARK_RAN
- not:
tableExists:
tableName: event_store
changes:
- createTable:
tableName: event_store
columns:
- column:
name: id
type: UUID
constraints:
primaryKey: true
nullable: false
- column:
name: aggregate_id
type: UUID
constraints:
nullable: false
- column:
name: event_type
type: VARCHAR(255)
constraints:
nullable: false
- column:
name: event_data
type: BYTEA
constraints:
nullable: false
- column:
name: timestamp
type: TIMESTAMP
constraints:
nullable: false
- column:
name: sequence_number
type: BIGINT
autoIncrement: true
constraints:
nullable: false
- createIndex:
indexName: idx_event_store_aggregate_id
tableName: event_store
columns:
- column:
name: aggregate_id
- createIndex:
indexName: idx_event_store_timestamp
tableName: event_store
columns:
- column:
name: timestamp
- createIndex:
indexName: idx_event_store_event_type
tableName: event_store
columns:
- column:
name: event_type
Create changelog file in src/main/resources/db/changelog.yml
databaseChangeLog:
- includeAll:
path: migrations/
relativeToChangelogFile: true
errorIfMissingOrEmpty: false
Defining the domain model
Next, let’s define our domain model for the food delivery service:
package dev.vrnsky.eventsourcing.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@NoArgsConstructor
@AllArgsConstructor
class OrderItem {
private String itemId;
private String itemName;
private int quantity;
private BigDecimal price;
}
package dev.vrnsky.eventsourcing.domain;
enum OrderStatus {
CREATED,
ACCEPTED,
PREPARING,
READY,
IN_DELIVERY,
DELIVERED,
CANCELLED
}
Kafka and Schema Registry configuration
Now, let’s configure Kafka and Schema Registry
package dev.vrnsky.eventsourcing.config;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic orderEventsTopic() {
return new NewTopic("order-events", 3, (short) 1);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
configProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
And the consumer configuration
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "food-delivery-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
jOOQ Configuration
Let’s configure jOOQ for our application
package dev.vrnsky.eventsourcing.config;
import org.jooq.SQLDialect;
import org.jooq.impl.DataSourceConnectionProvider;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import javax.sql.DataSource;
@Configuration
public class JooqConfig {
@Bean
public DataSourceConnectionProvider connectionProvider(DataSource dataSource) {
return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
}
@Bean
public DefaultDSLContext dsl(DataSourceConnectionProvider connectionProvider) {
DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
jooqConfiguration.set(connectionProvider);
jooqConfiguration.set(SQLDialect.POSTGRES);
return new DefaultDSLContext(jooqConfiguration);
}
}
Event repository with jOOQ
package dev.vrnsky.eventsourcing.repository;
import static dev.vrnsky.eventsourcing.db.generated.Tables.EVENT_STORE;
import dev.vrnsky.eventsourcing.db.generated.tables.records.EventStoreRecord;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
@Repository
@RequiredArgsConstructor
public class EventRepository {
private final DSLContext dsl;
public void saveEvent(UUID id, UUID aggregateId, String eventType, byte[] eventData, Instant timestamp) {
dsl.insertInto(EVENT_STORE)
.set(EVENT_STORE.ID, id)
.set(EVENT_STORE.AGGREGATE_ID, aggregateId)
.set(EVENT_STORE.EVENT_TYPE, eventType)
.set(EVENT_STORE.EVENT_DATA, eventData)
.set(EVENT_STORE.TIMESTAMP, timestamp)
.execute();
}
public List<EventStoreRecord> findByAggregateId(UUID aggregateId) {
return dsl.selectFrom(EVENT_STORE)
.where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
.orderBy(EVENT_STORE.SEQUENCE_NUMBER.asc())
.fetch();
}
public List<EventStoreRecord> findByEventType(String eventType) {
return dsl.selectFrom(EVENT_STORE)
.where(EVENT_STORE.EVENT_TYPE.eq(eventType))
.orderBy(EVENT_STORE.TIMESTAMP.asc())
.fetch();
}
public List<EventStoreRecord> findByAggregateIdAndEventType(UUID aggregateId, String eventType) {
return dsl.selectFrom(EVENT_STORE)
.where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
.and(EVENT_STORE.EVENT_TYPE.eq(eventType))
.orderBy(EVENT_STORE.SEQUENCE_NUMBER.asc())
.fetch();
}
public List<EventStoreRecord> findAllOrderByTimestamp() {
return dsl.selectFrom(EVENT_STORE)
.orderBy(EVENT_STORE.TIMESTAMP.asc())
.fetch();
}
}
Event service
Now, let’s create an Event Service that will handle the common event operations:
package dev.vrnsky.eventsourcing.service;
import dev.vrnsky.eventsourcing.db.generated.tables.records.EventStoreRecord;
import dev.vrnsky.eventsourcing.event.BaseEvent;
import dev.vrnsky.eventsourcing.repository.EventRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.ByteArrayOutputStream;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class EventService {
private final EventRepository eventRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public void saveAndPublishEvent(SpecificRecord event) {
try {
// Extract base event information
BaseEvent baseEvent = extractBaseEvent(event);
UUID eventId = UUID.fromString(baseEvent.getEventId());
UUID aggregateId = UUID.fromString(baseEvent.getAggregateId());
String eventType = event.getClass().getSimpleName();
Instant timestamp = Instant.ofEpochMilli(baseEvent.getTimestamp().toEpochMilli());
// Serialize event for storage
byte[] serializedEvent = serializeEvent(event);
// Save to event store
eventRepository.saveEvent(eventId, aggregateId, eventType, serializedEvent, timestamp);
// Publish to Kafka
kafkaTemplate.send("order-events", aggregateId.toString(), event);
log.info("Event {} saved and published for aggregate {}", eventType, aggregateId);
} catch (Exception e) {
log.error("Error saving and publishing event", e);
throw new RuntimeException("Error processing event", e);
}
}
@Transactional(readOnly = true)
public List<SpecificRecord> getEvents(UUID aggregateId) {
return eventRepository.findByAggregateId(aggregateId)
.stream()
.map(this::deserializeEvent)
.collect(Collectors.toList());
}
private BaseEvent extractBaseEvent(SpecificRecord event) {
try {
// This assumes each event has a 'base' field that is a BaseEvent
return (BaseEvent) event.get("base");
} catch (Exception e) {
throw new RuntimeException("Error extracting base event", e);
}
}
private byte[] serializeEvent(SpecificRecord event) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(event.getSchema());
writer.write(event, encoder);
encoder.flush();
return outputStream.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Error serializing event", e);
}
}
private SpecificRecord deserializeEvent(EventStoreRecord record) {
try {
// Determine the event class based on the stored event type
Class<? extends SpecificRecord> eventClass = getEventClass(record.getEventType());
// Create a reader for this specific event type
SpecificDatumReader<? extends SpecificRecord> reader = new SpecificDatumReader<>(eventClass);
// Deserialize the event data
return reader.read(null, DecoderFactory.get().binaryDecoder(record.getEventData(), null));
} catch (Exception e) {
throw new RuntimeException("Error deserializing event", e);
}
}
@SuppressWarnings("unchecked")
private Class<? extends SpecificRecord> getEventClass(String eventType) {
try {
return (Class<? extends SpecificRecord>) Class.forName("dev.vrnsky.eventsourcing.event." + eventType);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Event type not found: " + eventType, e);
}
}
}
Event factory
Let’s create a factory class to help create our Avro events.
package dev.vrnsky.eventsourcing.service;
import dev.vrnsky.eventsourcing.event.*;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.UUID;
@Component
public class EventFactory {
public OrderCreatedEvent createOrderCreatedEvent(UUID orderId, String customerId, String restaurantId,
String deliveryAddress, String deliveryNotes) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderCreatedEvent.class.getSimpleName());
return OrderCreatedEvent.newBuilder()
.setBase(baseEvent)
.setCustomerId(customerId).setRestaurantId(restaurantId)
.setDeliveryAddress(deliveryAddress)
.setDeliveryNotes(deliveryNotes)
.build();
}
public OrderItemAddedEvent createOrderItemAddedEvent(UUID orderId, String itemId, String itemName,
int quantity, BigDecimal price) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderItemAddedEvent.class.getSimpleName());
// Convert BigDecimal to Avro's decimal representation (ByteBuffer)
ByteBuffer priceBytes = ByteBuffer.wrap(price.unscaledValue().toByteArray());
return OrderItemAddedEvent.newBuilder()
.setBase(baseEvent)
.setItemId(itemId)
.setItemName(itemName)
.setQuantity(quantity)
.setPrice(priceBytes)
.build();
}
public OrderAcceptedEvent createOrderAcceptedEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderAcceptedEvent.class.getSimpleName());
return OrderAcceptedEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public FoodPreparationStartedEvent createFoodPreparationStartedEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, FoodPreparationStartedEvent.class.getSimpleName());
return FoodPreparationStartedEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public FoodReadyEvent createFoodReadyEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, FoodReadyEvent.class.getSimpleName());
return FoodReadyEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public OrderPickedUpEvent createOrderPickedUpEvent(UUID orderId, String driverId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderPickedUpEvent.class.getSimpleName());
return OrderPickedUpEvent.newBuilder()
.setBase(baseEvent)
.setDriverId(driverId)
.build();
}
public OrderDeliveredEvent createOrderDeliveredEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderDeliveredEvent.class.getSimpleName());
Instant now = Instant.now();
return OrderDeliveredEvent.newBuilder()
.setBase(baseEvent)
.setDeliveryTime(now)
.build();
}
public OrderCancelledEvent createOrderCancelledEvent(UUID orderId, String reason) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderCancelledEvent.class.getSimpleName());
return OrderCancelledEvent.newBuilder()
.setBase(baseEvent)
.setReason(reason)
.build();
}
private BaseEvent createBaseEvent(UUID aggregateId, String eventType) {
return BaseEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setAggregateId(aggregateId.toString())
.setTimestamp(Instant.now())
.setEventType(eventType)
.build();
}
}
Food Order Service
Now, let’s create a food order service that uses our event sourcing architecture.
package dev.vrnsky.eventsourcing.service;
import dev.vrnsky.eventsourcing.event.*;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.UUID;
@Component
public class EventFactory {
public OrderCreatedEvent createOrderCreatedEvent(UUID orderId, String customerId, String restaurantId,
String deliveryAddress, String deliveryNotes) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderCreatedEvent.class.getSimpleName());
return OrderCreatedEvent.newBuilder()
.setBase(baseEvent)
.setCustomerId(customerId).setRestaurantId(restaurantId)
.setDeliveryAddress(deliveryAddress)
.setDeliveryNotes(deliveryNotes)
.build();
}
public OrderItemAddedEvent createOrderItemAddedEvent(UUID orderId, String itemId, String itemName,
int quantity, BigDecimal price) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderItemAddedEvent.class.getSimpleName());
// Convert BigDecimal to Avro's decimal representation (ByteBuffer)
ByteBuffer priceBytes = ByteBuffer.wrap(price.unscaledValue().toByteArray());
return OrderItemAddedEvent.newBuilder()
.setBase(baseEvent)
.setItemId(itemId)
.setItemName(itemName)
.setQuantity(quantity)
.setPrice(priceBytes)
.build();
}
public OrderAcceptedEvent createOrderAcceptedEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderAcceptedEvent.class.getSimpleName());
return OrderAcceptedEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public FoodPreparationStartedEvent createFoodPreparationStartedEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, FoodPreparationStartedEvent.class.getSimpleName());
return FoodPreparationStartedEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public FoodReadyEvent createFoodReadyEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, FoodReadyEvent.class.getSimpleName());
return FoodReadyEvent.newBuilder()
.setBase(baseEvent)
.build();
}
public OrderPickedUpEvent createOrderPickedUpEvent(UUID orderId, String driverId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderPickedUpEvent.class.getSimpleName());
return OrderPickedUpEvent.newBuilder()
.setBase(baseEvent)
.setDriverId(driverId)
.build();
}
public OrderDeliveredEvent createOrderDeliveredEvent(UUID orderId) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderDeliveredEvent.class.getSimpleName());
Instant now = Instant.now();
return OrderDeliveredEvent.newBuilder()
.setBase(baseEvent)
.setDeliveryTime(now)
.build();
}
public OrderCancelledEvent createOrderCancelledEvent(UUID orderId, String reason) {
BaseEvent baseEvent = createBaseEvent(orderId, OrderCancelledEvent.class.getSimpleName());
return OrderCancelledEvent.newBuilder()
.setBase(baseEvent)
.setReason(reason)
.build();
}
private BaseEvent createBaseEvent(UUID aggregateId, String eventType) {
return BaseEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setAggregateId(aggregateId.toString())
.setTimestamp(Instant.now())
.setEventType(eventType)
.build();
}
}
Creating event consumers
Now, let’s create some Kafka consumers to reach to our events. These consumers should be in sepearate microservices, but for simplicity, we’ll keep them in the same application.
package dev.vrnsky.eventsourcing.listener;
import dev.vrnsky.eventsourcing.event.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class AnalyticsConsumer {
private final Map<String, AtomicInteger> restaurantOrderCounts = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> menuItemPopularity = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> orderCancellationReasons = new ConcurrentHashMap<>();
@KafkaListener(topics = "order-events", groupId = "analytics-service")
public void consume(Object eventObject) {
if (eventObject instanceof OrderCreatedEvent) {
OrderCreatedEvent event = (OrderCreatedEvent) eventObject;
String restaurantId = event.getRestaurantId();
// Update restaurant order count
restaurantOrderCounts.computeIfAbsent(restaurantId, k -> new AtomicInteger(0))
.incrementAndGet();
log.info("Analytics: New order for restaurant {}. Total orders: {}",
restaurantId,
restaurantOrderCounts.get(restaurantId).get());
} else if (eventObject instanceof OrderItemAddedEvent) {
OrderItemAddedEvent event = (OrderItemAddedEvent) eventObject;
String itemId = event.getItemId();
// Update menu item popularity
menuItemPopularity.computeIfAbsent(itemId, k -> new AtomicInteger(0))
.addAndGet(event.getQuantity());
log.info("Analytics: Menu item {} ordered. Total ordered: {}",
itemId,
menuItemPopularity.get(itemId).get());
} else if (eventObject instanceof OrderCancelledEvent) {
OrderCancelledEvent event = (OrderCancelledEvent) eventObject;
String reason = event.getReason() != null ? event.getReason() : "Unknown";
// Track cancellation reasons
orderCancellationReasons.computeIfAbsent(reason, k -> new AtomicInteger(0))
.incrementAndGet();
log.info("Analytics: Order cancelled. Reason: {}. Frequency: {}",
reason,
orderCancellationReasons.get(reason).get());
}
}
// Methods to expose analytics data
public Map<String, Integer> getRestaurantOrderCounts() {
Map<String, Integer> result = new ConcurrentHashMap<>();
restaurantOrderCounts.forEach((key, value) -> result.put(key, value.get()));
return result;
}
public Map<String, Integer> getMenuItemPopularity() {
Map<String, Integer> result = new ConcurrentHashMap<>();
menuItemPopularity.forEach((key, value) -> result.put(key, value.get()));
return result;
}
public Map<String, Integer> getOrderCancellationReasons() {
Map<String, Integer> result = new ConcurrentHashMap<>();
orderCancellationReasons.forEach((key, value) -> result.put(key, value.get()));
return result;
}
}
Create custom jOOQ queries for advanced analytics
One advantage of using jOOQ over JPA is the ability to write custom, type-safe SQL queries. Let’s add some advanced analytics capabilities to our application:
package dev.vrnsky.eventsourcing.repository;
import dev.vrnsky.eventsourcing.db.generated.tables.EventStore;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.Record2;
import org.jooq.Record4;
import org.jooq.Result;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Repository
@RequiredArgsConstructor
public class AnalyticsRepository {
private final DSLContext dsl;
public Map<String, Long> getEventCountByType() {
EventStore e = EventStore.EVENT_STORE;
Result<Record2<String, Integer>> result = dsl
.select(e.EVENT_TYPE, dsl.count())
.from(e)
.groupBy(e.EVENT_TYPE)
.fetch();
return result.stream()
.collect(Collectors.toMap(
r -> r.get(e.EVENT_TYPE),
r -> r.get(dsl.count()).longValue()
));
}
public List<DailyOrderMetrics> getDailyOrderMetrics(LocalDate startDate, LocalDate endDate) {
EventStore e = EventStore.EVENT_STORE;
// Convert LocalDate to Instant
Instant startInstant = startDate.atStartOfDay(ZoneId.systemDefault()).toInstant();
Instant endInstant = endDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant();
Result<Record4<LocalDate, Integer, Integer, Integer>> result = dsl
.select(
dsl.cast(dsl.localDate(e.TIMESTAMP), LocalDate.class).as("day"),
dsl.countDistinct(e.AGGREGATE_ID).as("total_orders"),
dsl.countDistinct(
dsl.when(e.EVENT_TYPE.eq("OrderDeliveredEvent"), e.AGGREGATE_ID)
).as("delivered_orders"),
dsl.countDistinct(
dsl.when(e.EVENT_TYPE.eq("OrderCancelledEvent"), e.AGGREGATE_ID)
).as("cancelled_orders")
)
.from(e)
.where(e.TIMESTAMP.between(startInstant, endInstant))
.and(dsl.or(
e.EVENT_TYPE.eq("OrderCreatedEvent"),
e.EVENT_TYPE.eq("OrderDeliveredEvent"),
e.EVENT_TYPE.eq("OrderCancelledEvent")
))
.groupBy(dsl.localDate(e.TIMESTAMP))
.orderBy(dsl.localDate(e.TIMESTAMP))
.fetch();
return result.stream()
.map(r -> new DailyOrderMetrics(
r.get("day", LocalDate.class),
r.get("total_orders", Integer.class),
r.get("delivered_orders", Integer.class),
r.get("cancelled_orders", Integer.class)
))
.collect(Collectors.toList());
}
public List<RestaurantPerformance> getRestaurantPerformance() {
// This would require more complex query joining with a restaurants table
// and parsing event data to extract restaurant IDs and delivery times
// For now, we'll return a placeholder
return List.of();
}
} @Data
public static class RestaurantPerformance {
private final String restaurantId;
private final String restaurantName;
private final int totalOrders;
private final BigDecimal averageDeliveryTimeMinutes;
private final BigDecimal cancellationRatePercent;
} @Data
public static class DailyOrderMetrics {
private final LocalDate date;
private final int totalOrders;
private final int deliveredOrders;
private final int cancelledOrders;
}
REST Controller
Finally, let’s create a REST controller to expose our functionality
package dev.vrnsky.eventsourcing.controller;
import dev.vrnsky.eventsourcing.domain.FoodOrder;
import dev.vrnsky.eventsourcing.listener.AnalyticsConsumer;
import dev.vrnsky.eventsourcing.repository.AnalyticsRepository;
import dev.vrnsky.eventsourcing.service.FoodOrderService;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
public class FoodOrderController {
private final FoodOrderService foodOrderService;
private final AnalyticsConsumer analyticsConsumer;
private final AnalyticsRepository analyticsRepository;
@PostMapping
public ResponseEntity<UUID> createOrder(@RequestBody CreateOrderRequest request) {
UUID orderId = foodOrderService.createOrder(
request.getCustomerId(),
request.getRestaurantId(),
request.getDeliveryAddress(),
request.getDeliveryNotes()
);
return ResponseEntity.ok(orderId);
}
@GetMapping("/{orderId}")
public ResponseEntity<FoodOrder> getOrder(@PathVariable UUID orderId) {
FoodOrder order = foodOrderService.getOrder(orderId);
return ResponseEntity.ok(order);
}
@PostMapping("/{orderId}/items")
public ResponseEntity<Void> addOrderItem(@PathVariable UUID orderId, @RequestBody AddItemRequest request) {
foodOrderService.addOrderItem(
orderId,
request.getItemId(),
request.getItemName(),
request.getQuantity(),
request.getPrice()
);
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/accept")
public ResponseEntity<Void> acceptOrder(@PathVariable UUID orderId) {
foodOrderService.acceptOrder(orderId);
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/prepare")
public ResponseEntity<Void> startFoodPreparation(@PathVariable UUID orderId) {
foodOrderService.startFoodPreparation(orderId);
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/ready")
public ResponseEntity<Void> markFoodReady(@PathVariable UUID orderId) {
foodOrderService.markFoodReady(orderId);
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/pickup")
public ResponseEntity<Void> pickUpOrder(@PathVariable UUID orderId, @RequestBody PickupRequest request) {
foodOrderService.pickUpOrder(orderId, request.getDriverId());
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/deliver")
public ResponseEntity<Void> deliverOrder(@PathVariable UUID orderId) {
foodOrderService.deliverOrder(orderId);
return ResponseEntity.ok().build();
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable UUID orderId, @RequestBody CancelRequest request) {
foodOrderService.cancelOrder(orderId, request.getReason());
return ResponseEntity.ok().build();
}
@GetMapping("/analytics/restaurant-orders")
public ResponseEntity<Map<String, Integer>> getRestaurantOrderCounts() {
return ResponseEntity.ok(analyticsConsumer.getRestaurantOrderCounts());
}
@GetMapping("/analytics/popular-items")
public ResponseEntity<Map<String, Integer>> getPopularItems() {
return ResponseEntity.ok(analyticsConsumer.getMenuItemPopularity());
}
@GetMapping("/analytics/cancellation-reasons")
public ResponseEntity<Map<String, Integer>> getCancellationReasons() {
return ResponseEntity.ok(analyticsConsumer.getOrderCancellationReasons());
}
@GetMapping("/analytics/event-counts")
public ResponseEntity<Map<String, Long>> getEventCounts() {
return ResponseEntity.ok(analyticsRepository.getEventCountByType());
}
@GetMapping("/analytics/daily-metrics")
public ResponseEntity<List<AnalyticsRepository.DailyOrderMetrics>> getDailyMetrics(
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate startDate,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate endDate) {
return ResponseEntity.ok(analyticsRepository.getDailyOrderMetrics(startDate, endDate));
}
@Data
public static class CreateOrderRequest {
private String customerId;
private String restaurantId;
private String deliveryAddress;
private String deliveryNotes;
}
@Data
public static class AddItemRequest {
private String itemId;
private String itemName;
private int quantity;
private BigDecimal price;
}
@Data
public static class PickupRequest {
private String driverId;
}
@Data
public static class CancelRequest {
private String reason;
}
}
Application properties
Let’s set up our application properties
server:
port: 8080
spring:
datasource:
url: jdbc:postgresql://localhost:51432/food_delivery
username: postgres
password: postgres
driver-class-name: org.postgresql.Driver
kafka:
bootstrap-servers: localhost:9092
schema-registry-url: http://localhost:8081
liquibase:
change-log: classpath:db/changelog.yml
Running the application
Before running the aplication, you’ll need to have:
- PostgreSQL database running.
- A Kafka cluster and Schema Registry.
- Kafka UI.
You can use Docker compose to set this. Create a docker-compose.yml
file:
version: '3'
services:
postgres:
image: postgres:17
container_name: postgres
ports:
- "51432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: food_delivery
volumes:
- postgres-container-data:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
image: kafbat/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
- schema-registry
ports:
- "8082:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
volumes:
postgres-container-data:
Start the Docker Compose environment
docker compose up -d
Testing the application
Let’s test our application using cURL commands
Create a food order
curl -X POST http://localhost:8080/orders -H "Content-Type: application/json" -d '{
"customerId": "cust123",
"restaurantId": "rest456",
"deliveryAddress": "123 Main St, Anytown",
"deliveryNotes": "Ring doorbell twice"
}'
This should return a UUID like 123e4567-e89b-12d3-a456-426614174000
Add item to the order
curl -X POST http://localhost:8080/orders/123e4567-e89b-12d3-a456-426614174000/items -H "Content-Type: application/json" -d '{
"itemId": "pizza01",
"itemName": "Large Pepperoni Pizza",
"quantity": 1,
"price": 12.99
}'
View the order
curl http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46
Accept the order
curl -X POST
http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46
Start food preparation
curl -X POST
http://localhost:8080/orders/8cc79b78-2809-458f-171-0cccba16cf46
Make food as ready
curl -X POST
http://localhost:8080/orders/8cc89b78-2809-458f-171-0cccba16cf46
Mark order as picked up by drive
curl -X POST http://localhost:8080/orders/8cc89b78-2809-458f-171-0cccba16cf46/items -H "Content-Type: application/json" -d '{
"itemId": "pizza01",
"itemName": "Large Pepperoni Pizza",
"quantity": 1,
"price": 12.99
}'
Mark order as delivered
curl -X POST http://localhost:8080/orders/
8cc89b78-2809-458f-171-0cccba16cf46
/devlier
Check analytics
curl http://localhost:8080/orders/analytics/event-counts
curl http://localhost:8080/orders/analytics/daily-metrics?startDate=2023-01-01&endDate=2023-12-31
Benefits of using jOOQ over JPA
In our implementation, we’ve chose jOOQ over JPA, which offer several benefits:
- Type-safe SQL: jOOQ generates Java classes based in your database schema, enabling type-safe queries that are checked at compile time. This reduces the risk of runtime SQL errors.
- Control over SQL: Unlike JPA, which abstracts away SQL, jOOQ, gives you prices control over your SQL queries while still providing a fluent Java API.
- Performance optimization: jOOQ allows you to write highly optimized SQL queries, leveraging database-specific features.
- SQL transparency: With jOOQ, you always know exactly what SQL is being executed, which can be crucial for debugging and performance tuning.
- Complex queries: jOOQ excels at handling complex queries, joins, and subqueries that might be awkward to express in JPA.
Advanced event sourcing concepts
Event versioning
As your application evolves, your event schemas might change. When using Avro and Schema Registry, you can handle event versioning in a controlled way. Here’s an example of evolving event schema.
Snapshotting
For performance optimization, we can set up snapshotting to avoid replaying all events when reconstructing an aggregate.
Event sourcing with CQRS
Event sourcing is often used with the Command Query Responsibility Segregation (CQRS) pattern. CQRS separates the command (write) operations form the query (read) operations, allowing each to be optimized independently.
Conclusion
In this article, we’ve explored the implementation of event sourcing in a Spring Boot application using Kafka for event streaming. Avro with Schema Registry for robust data serialization, and jOOQ for type-safe database operations. We’ve built a food delivery service that tracks order through a series of events, allowing us to reconstruct the state of an order at any point.
The combination of event sourcing, Kafka, Avro, and jOOQ provides a powerful architecture for building scalable, resilient, and maintainable applications. This approach offers following benefits:
- A complete audit trail of all state changes.
- Type-safe SQL queries with jOOQ.
- Schema evolution capabilities with Avro and Schema Registry
- Real-time event processing with Kafka.
- The ability to create specialized read models for different use cases.
However, it also introduces complexity, including operational overhead for managing Kafka and Schema Registry, the need for careful schema evolution strategies, and learning curve for the various technologies involved.
As with any architectural pattern, consider your specific requirements carefully before adopting it. For applications that need strong audit capabilities, historical state reconstruction, or complex event processing, event sourcing with Kafka, Avro, and jOOQ can be powerful approach.