Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/gradle-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: Set up gradle
uses: gradle/actions/setup-gradle@v4
- name: Build
run: ./gradlew build
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g build

dependency-review:
needs: build
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
- name: Set up gradle
uses: gradle/actions/setup-gradle@v4
- name: Staging artifacts
run: ./gradlew publish
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g publish
- name: Publish
env:
JRELEASER_GPG_SECRET_KEY: ${{ secrets.MAVEN_CENTRAL_GPG_SECRET_KEY }}
Expand All @@ -77,4 +77,4 @@ jobs:
JRELEASER_NEXUS2_USERNAME: ${{ secrets.MAVEN_CENTRAL_USERNAME }}
JRELEASER_NEXUS2_PASSWORD: ${{ secrets.MAVEN_CENTRAL_PASSWORD }}
JRELEASER_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew :commons:jreleaserDeploy :outbox-kafka-spring:jreleaserDeploy :outbox-kafka-spring-reactive:jreleaserDeploy
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g :commons:jreleaserDeploy :outbox-kafka-spring:jreleaserDeploy :outbox-kafka-spring-reactive:jreleaserDeploy
50 changes: 46 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[![ci](https://github.com/tomorrow-one/transactional-outbox/actions/workflows/gradle-build.yml/badge.svg)](https://github.com/tomorrow-one/transactional-outbox/actions/workflows/gradle-build.yml)
[![maven: outbox-kafka-spring](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-spring.svg?label=maven:%20outbox-kafka-spring)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-spring)
[![maven: outbox-kafka-spring-reactive](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-spring-reactive.svg?label=maven:%20outbox-kafka-spring-reactive)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-spring-reactive)
[![maven: outbox-kafka-quarkus](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-quarkus.svg?label=maven:%20outbox-kafka-quarkus)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-quarkus)

# Transactional Outbox

Expand Down Expand Up @@ -44,9 +45,7 @@ the message / payload), a solution would have to be found or developed. At the t
experience in the team with Debezium or Kafka Connect.

### Current Limitations
* This library assumes and uses Spring (for transaction handling)
* It comes with a module for usage in classic spring and spring boot projects using sync/blocking operations (this
module uses spring-jdbc), and another module for reactive operations (uses [spring R2DBC](https://spring.io/projects/spring-data-r2dbc) for database access)
* It comes with modules for usage in Spring/Spring Boot projects (classic sync/blocking and reactive operations), and Quarkus applications
* It's tested with postgresql only (verified support for other databases could be contributed)

## Installation & Configuration
Expand All @@ -57,6 +56,7 @@ Depending on your application add one of the following libraries as dependency t

* classic (sync/blocking): `one.tomorrow.transactional-outbox:outbox-kafka-spring:$version`
* reactive: `one.tomorrow.transactional-outbox:outbox-kafka-spring-reactive:$version`
* quarkus: `one.tomorrow.transactional-outbox:outbox-kafka-quarkus:$version`

#### Compatibility Matrix

Expand Down Expand Up @@ -202,6 +202,28 @@ public class TransactionalOutboxConfig {
}
```

#### Setup the `OutboxProcessor` from `outbox-kafka-quarkus`

For Quarkus applications, the extension automatically configures the `OutboxProcessor` - no manual setup is required! Simply add the dependency and configure via `application.properties`:

```properties
# Kafka configuration (standard Quarkus Kafka config)
kafka.bootstrap.servers=localhost:9092

# Transactional Outbox configuration
one.tomorrow.transactional-outbox.enabled=true
one.tomorrow.transactional-outbox.processing-interval=PT0.2S
one.tomorrow.transactional-outbox.lock-timeout=PT5S
one.tomorrow.transactional-outbox.lock-owner-id=my-service-instance-1
one.tomorrow.transactional-outbox.event-source=my-service

# Optional: Automatic cleanup configuration
one.tomorrow.transactional-outbox.cleanup.interval=PT1H
one.tomorrow.transactional-outbox.cleanup.retention=P30D
```

For detailed Quarkus-specific configuration and usage, see the [Quarkus module README](outbox-kafka-quarkus/README.md).

## Usage

In a service that changes the database (inside a transaction), create and serialize the message/event that should
Expand Down Expand Up @@ -254,6 +276,26 @@ public Mono<OutboxRecord> doSomething(String name) {
}
```

In a **quarkus application** it would look like this:

```java
@Inject
OutboxService outboxService;

@Transactional
public void doSomething(String id, String name) {

// Here s.th. else would be done within the transaction, e.g. some entity created.

SomeEvent event = SomeEvent.newBuilder()
.setId(id)
.setName(name)
.build();
Map<String, String> headers = Map.of(KafkaHeaders.HEADERS_VALUE_TYPE_NAME, event.getDescriptorForType().getFullName());
outboxService.saveForPublishing("some-topic", id, event.toByteArray(), headers);
}
```

### Tracing

If you have tracing in place you're probably interested in getting the trace context propagated with Kafka messages as well.
Expand Down Expand Up @@ -324,7 +366,7 @@ public class Cleaner {
## How-To Release

To release a new version follow this step
1. In your PR with the functional change, bump the version of `commons`, `outbox-kafka-spring` or `outbox-kafka-spring-reactive` in the root `build.gradle.kts` to a non-`SNAPSHOT` version.
1. In your PR with the functional change, bump the version of `commons`, `outbox-kafka-spring`, `outbox-kafka-spring-reactive` or `outbox-kafka-quarkus` in the root `build.gradle.kts` to a non-`SNAPSHOT` version.
* Try to follow semantic versioning, i.e. bump the major version for binary incompatible changes, the minor version for compatible changes with improvements/new features, and the patch version for bugfixes or non-functional changes like refactorings.
2. Merge your PR - the related pipeline will publish the new version(s) to Sonatype's staging repo (SNAPSHOTs are published to Maven Central Snapshots repository (and are kept for 90 days)).
3. To publish a release, follow https://central.sonatype.com/publishing/deployments
Expand Down
34 changes: 27 additions & 7 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import java.util.*
project(":commons").version = "3.0.1-SNAPSHOT"
project(":outbox-kafka-spring").version = "4.0.1-SNAPSHOT"
project(":outbox-kafka-spring-reactive").version = "4.0.1-SNAPSHOT"
project(":outbox-kafka-quarkus").version = "1.0.0-SNAPSHOT"
project(":outbox-kafka-quarkus-deployment").version = "1.0.0-SNAPSHOT"

plugins {
id("java-library")
Expand All @@ -16,9 +18,14 @@ plugins {
id("org.jreleaser") version "1.20.0"
id("jacoco")
id("com.github.hierynomus.license") version "0.16.1"
id("io.quarkus.extension") version "3.26.0" apply false
id("io.quarkus") version "3.26.0" apply false
}

group = "one.tomorrow.transactional-outbox"

val protobufVersion by extra("3.25.5")
val quarkusVersion by extra("3.26.0")

// disable JReleaser on root level
jreleaser {
Expand All @@ -29,18 +36,23 @@ subprojects {
apply(plugin = "java-library")
apply(plugin = "java-test-fixtures")
apply(plugin = "io.freefair.lombok")
apply(plugin = "com.google.protobuf")
// protobuf plugin does not play nicely with quarkus, see
// https://github.com/google/protobuf-gradle-plugin/issues/659
if (!name.contains("quarkus"))
apply(plugin = "com.google.protobuf")
apply(plugin = "maven-publish")
apply(plugin = "org.jreleaser")
apply(plugin = "jacoco")
apply(plugin = "com.github.hierynomus.license")

group = "one.tomorrow.transactional-outbox"
group = rootProject.group

java {
sourceCompatibility = JavaVersion.VERSION_17

withJavadocJar()
if (name != "outbox-kafka-quarkus-deployment") {
withJavadocJar()
}
withSourcesJar()

registerFeature("protobufSupport") {
Expand All @@ -56,14 +68,16 @@ subprojects {
(options as StandardJavadocDocletOptions).addBooleanOption("Xdoclint:none", true)
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protobufVersion"
if (!name.contains("quarkus")) {
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protobufVersion"
}
}
}

license {
header = file("../LICENSE-header.txt")
header = rootDir.resolve("LICENSE-header.txt")
excludes(
setOf(
"one/tomorrow/kafka/messages/DeserializerMessages.java",
Expand Down Expand Up @@ -116,6 +130,11 @@ subprojects {
}
}

// ignore information that is not contained in the pom file and suppress the warnings:
suppressPomMetadataWarningsFor("protobufSupportApiElements")
suppressPomMetadataWarningsFor("protobufSupportRuntimeElements")
suppressPomMetadataWarningsFor("testFixturesApiElements")
suppressPomMetadataWarningsFor("testFixturesRuntimeElements")
}
}
repositories {
Expand Down Expand Up @@ -160,6 +179,7 @@ subprojects {
allprojects {
repositories {
mavenCentral()
gradlePluginPortal()
}

tasks.withType<Test> {
Expand Down
26 changes: 26 additions & 0 deletions outbox-kafka-quarkus-deployment/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id("java")
}

val quarkusVersion = rootProject.extra["quarkusVersion"]

dependencies {
implementation(platform("io.quarkus:quarkus-bom:$quarkusVersion"))
implementation("io.quarkus:quarkus-core-deployment")
implementation(project(":outbox-kafka-quarkus"))
implementation("io.quarkus:quarkus-arc-deployment")
implementation("io.quarkus:quarkus-hibernate-orm-deployment")
implementation("io.quarkus:quarkus-kafka-client-deployment")
implementation("io.quarkus:quarkus-jackson-deployment")
implementation("io.quarkus:quarkus-datasource-deployment")

// testing
testImplementation("io.quarkus:quarkus-junit5-internal")
testImplementation("io.quarkus:quarkus-hibernate-orm")
testImplementation("io.quarkus:quarkus-jdbc-postgresql")
testImplementation("io.quarkus:quarkus-flyway")
testImplementation("io.quarkus:quarkus-kafka-client")
testImplementation("io.quarkus:quarkus-messaging-kafka")
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
testImplementation("org.awaitility:awaitility")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright 2025 Tomorrow GmbH @ https://tomorrow.one
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package one.tomorrow.transactionaloutbox.quarkus.deployment;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.hibernate.orm.deployment.spi.AdditionalJpaModelBuildItem;
import one.tomorrow.transactionaloutbox.config.TransactionalOutboxConfig;
import one.tomorrow.transactionaloutbox.health.OutboxProcessorHealthCheck;
import one.tomorrow.transactionaloutbox.model.OutboxLock;
import one.tomorrow.transactionaloutbox.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.publisher.*;
import one.tomorrow.transactionaloutbox.repository.OutboxLockRepository;
import one.tomorrow.transactionaloutbox.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.service.OutboxLockService;
import one.tomorrow.transactionaloutbox.service.OutboxProcessor;
import one.tomorrow.transactionaloutbox.service.OutboxService;
import one.tomorrow.transactionaloutbox.tracing.NoopTracingServiceProducer;
import one.tomorrow.transactionaloutbox.tracing.OpenTelemetryTracingServiceProducer;

import java.util.List;

class TransactionalOutboxExtensionProcessor {

private static final String FEATURE = "transactional-outbox";

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
AdditionalBeanBuildItem outboxBeans() {
return AdditionalBeanBuildItem.builder()
.addBeanClasses(
OutboxLockRepository.class,
OutboxRepository.class,
OutboxLockService.class,
TransactionalOutboxConfig.class,
TransactionalOutboxConfig.CleanupConfig.class,
OutboxService.class,
OutboxProcessor.class,
PublisherConfig.class,
KafkaProducerMessagePublisherFactory.class,
DefaultKafkaProducerFactory.class
)
.setUnremovable()
.build();
}

@BuildStep
void registerTracingBeans(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBeans
) {
// Always register the default/no-op TracingService (it is marked @DefaultBean)
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClasses(NoopTracingServiceProducer.class)
.setUnremovable()
.build());

// Only register the OTel-based TracingService if the OTel capability is present
if (capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(OpenTelemetryTracingServiceProducer.class)
.setUnremovable()
.build());
}
}

@BuildStep
void registerHealthCheckBean(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBeans
) {
// Only register the OutboxProcessorHealthCheck if the smallrye healt capability is present
if (capabilities.isPresent(Capability.SMALLRYE_HEALTH)) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(OutboxProcessorHealthCheck.class)
.setUnremovable()
.build());
}
}

@BuildStep
List<AdditionalJpaModelBuildItem> jpaModels() {
return List.of(
new AdditionalJpaModelBuildItem(OutboxLock.class.getName()),
new AdditionalJpaModelBuildItem(OutboxRecord.class.getName())
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2025 Tomorrow GmbH @ https://tomorrow.one
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package one.tomorrow.transactionaloutbox.quarkus.deployment;

import jakarta.enterprise.context.ApplicationScoped;

/**
* Test application for the Transactional Outbox extension.
* This serves as a marker class for Quarkus tests.
*/
@ApplicationScoped
public class TestApp {
// This class is intentionally empty - it just serves as a marker for the Quarkus test
}
Loading