Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/gradle-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: Set up gradle
uses: gradle/actions/setup-gradle@v4
- name: Build
run: ./gradlew build
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g build

dependency-review:
needs: build
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
- name: Set up gradle
uses: gradle/actions/setup-gradle@v4
- name: Staging artifacts
run: ./gradlew publish
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g publish
- name: Publish
env:
JRELEASER_GPG_SECRET_KEY: ${{ secrets.MAVEN_CENTRAL_GPG_SECRET_KEY }}
Expand All @@ -77,4 +77,4 @@ jobs:
JRELEASER_NEXUS2_USERNAME: ${{ secrets.MAVEN_CENTRAL_USERNAME }}
JRELEASER_NEXUS2_PASSWORD: ${{ secrets.MAVEN_CENTRAL_PASSWORD }}
JRELEASER_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ./gradlew :commons:jreleaserDeploy :outbox-kafka-spring:jreleaserDeploy :outbox-kafka-spring-reactive:jreleaserDeploy
run: ./gradlew -Dorg.gradle.jvmargs=-Xmx1g :commons:jreleaserDeploy :outbox-kafka-spring:jreleaserDeploy :outbox-kafka-spring-reactive:jreleaserDeploy
50 changes: 46 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[![ci](https://github.com/tomorrow-one/transactional-outbox/actions/workflows/gradle-build.yml/badge.svg)](https://github.com/tomorrow-one/transactional-outbox/actions/workflows/gradle-build.yml)
[![maven: outbox-kafka-spring](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-spring.svg?label=maven:%20outbox-kafka-spring)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-spring)
[![maven: outbox-kafka-spring-reactive](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-spring-reactive.svg?label=maven:%20outbox-kafka-spring-reactive)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-spring-reactive)
[![maven: outbox-kafka-quarkus](https://img.shields.io/maven-central/v/one.tomorrow.transactional-outbox/outbox-kafka-quarkus.svg?label=maven:%20outbox-kafka-quarkus)](https://central.sonatype.com/search?q=one.tomorrow.transactional-outbox:outbox-kafka-quarkus)

# Transactional Outbox

Expand Down Expand Up @@ -44,9 +45,7 @@ the message / payload), a solution would have to be found or developed. At the t
experience in the team with Debezium or Kafka Connect.

### Current Limitations
* This library assumes and uses Spring (for transaction handling)
* It comes with a module for usage in classic spring and spring boot projects using sync/blocking operations (this
module uses spring-jdbc), and another module for reactive operations (uses [spring R2DBC](https://spring.io/projects/spring-data-r2dbc) for database access)
* It comes with modules for usage in Spring/Spring Boot projects (classic sync/blocking and reactive operations), and Quarkus applications
* It's tested with postgresql only (verified support for other databases could be contributed)

## Installation & Configuration
Expand All @@ -57,6 +56,7 @@ Depending on your application add one of the following libraries as dependency t

* classic (sync/blocking): `one.tomorrow.transactional-outbox:outbox-kafka-spring:$version`
* reactive: `one.tomorrow.transactional-outbox:outbox-kafka-spring-reactive:$version`
* quarkus: `one.tomorrow.transactional-outbox:outbox-kafka-quarkus:$version`

#### Compatibility Matrix

Expand Down Expand Up @@ -201,6 +201,28 @@ public class TransactionalOutboxConfig {
}
```

#### Setup the `OutboxProcessor` from `outbox-kafka-quarkus`

For Quarkus applications, the extension automatically configures the `OutboxProcessor` - no manual setup is required! Simply add the dependency and configure via `application.properties`:

```properties
# Kafka configuration (standard Quarkus Kafka config)
kafka.bootstrap.servers=localhost:9092

# Transactional Outbox configuration
one.tomorrow.transactional-outbox.enabled=true
one.tomorrow.transactional-outbox.processing-interval=PT0.2S
one.tomorrow.transactional-outbox.lock-timeout=PT5S
one.tomorrow.transactional-outbox.lock-owner-id=my-service-instance-1
one.tomorrow.transactional-outbox.event-source=my-service

# Optional: Automatic cleanup configuration
one.tomorrow.transactional-outbox.cleanup.interval=PT1H
one.tomorrow.transactional-outbox.cleanup.retention=P30D
```

For detailed Quarkus-specific configuration and usage, see the [Quarkus module README](outbox-kafka-quarkus/README.md).

## Usage

In a service that changes the database (inside a transaction), create and serialize the message/event that should
Expand Down Expand Up @@ -253,6 +275,26 @@ public Mono<OutboxRecord> doSomething(String name) {
}
```

In a **quarkus application** it would look like this:

```java
@Inject
OutboxService outboxService;

@Transactional
public void doSomething(String id, String name) {

// Here s.th. else would be done within the transaction, e.g. some entity created.

SomeEvent event = SomeEvent.newBuilder()
.setId(id)
.setName(name)
.build();
Map<String, String> headers = Map.of(KafkaHeaders.HEADERS_VALUE_TYPE_NAME, event.getDescriptorForType().getFullName());
outboxService.saveForPublishing("some-topic", id, event.toByteArray(), headers);
}
```

### Tracing

If you have tracing in place you're probably interested in getting the trace context propagated with Kafka messages as well.
Expand Down Expand Up @@ -323,7 +365,7 @@ public class Cleaner {
## How-To Release

To release a new version follow this step
1. In your PR with the functional change, bump the version of `commons`, `outbox-kafka-spring` or `outbox-kafka-spring-reactive` in the root `build.gradle.kts` to a non-`SNAPSHOT` version.
1. In your PR with the functional change, bump the version of `commons`, `outbox-kafka-spring`, `outbox-kafka-spring-reactive` or `outbox-kafka-quarkus` in the root `build.gradle.kts` to a non-`SNAPSHOT` version.
* Try to follow semantic versioning, i.e. bump the major version for binary incompatible changes, the minor version for compatible changes with improvements/new features, and the patch version for bugfixes or non-functional changes like refactorings.
2. Merge your PR - the related pipeline will publish the new version(s) to Sonatype's staging repo (SNAPSHOTs are published to Maven Central Snapshots repository (and are kept for 90 days)).
3. To publish a release, follow https://central.sonatype.com/publishing/deployments
Expand Down
34 changes: 27 additions & 7 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import java.util.*
project(":commons").version = "2.4.6-SNAPSHOT"
project(":outbox-kafka-spring").version = "3.5.6-SNAPSHOT"
project(":outbox-kafka-spring-reactive").version = "3.4.6-SNAPSHOT"
project(":outbox-kafka-quarkus").version = "1.0.0-SNAPSHOT"
project(":outbox-kafka-quarkus-deployment").version = "1.0.0-SNAPSHOT"

plugins {
id("java-library")
Expand All @@ -16,9 +18,14 @@ plugins {
id("org.jreleaser") version "1.19.0"
id("jacoco")
id("com.github.hierynomus.license") version "0.16.1"
id("io.quarkus.extension") version "3.26.0" apply false
id("io.quarkus") version "3.26.0" apply false
}

group = "one.tomorrow.transactional-outbox"

val protobufVersion by extra("3.25.5")
val quarkusVersion by extra("3.26.0")

// disable JReleaser on root level
jreleaser {
Expand All @@ -29,18 +36,23 @@ subprojects {
apply(plugin = "java-library")
apply(plugin = "java-test-fixtures")
apply(plugin = "io.freefair.lombok")
apply(plugin = "com.google.protobuf")
// protobuf plugin does not play nicely with quarkus, see
// https://github.com/google/protobuf-gradle-plugin/issues/659
if (!name.contains("quarkus"))
apply(plugin = "com.google.protobuf")
apply(plugin = "maven-publish")
apply(plugin = "org.jreleaser")
apply(plugin = "jacoco")
apply(plugin = "com.github.hierynomus.license")

group = "one.tomorrow.transactional-outbox"
group = rootProject.group

java {
sourceCompatibility = JavaVersion.VERSION_17

withJavadocJar()
if (name != "outbox-kafka-quarkus-deployment") {
withJavadocJar()
}
withSourcesJar()

registerFeature("protobufSupport") {
Expand All @@ -56,14 +68,16 @@ subprojects {
(options as StandardJavadocDocletOptions).addBooleanOption("Xdoclint:none", true)
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protobufVersion"
if (!name.contains("quarkus")) {
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protobufVersion"
}
}
}

license {
header = file("../LICENSE-header.txt")
header = rootDir.resolve("LICENSE-header.txt")
excludes(
setOf(
"one/tomorrow/kafka/messages/DeserializerMessages.java",
Expand Down Expand Up @@ -116,6 +130,11 @@ subprojects {
}
}

// ignore information that is not contained in the pom file and suppress the warnings:
suppressPomMetadataWarningsFor("protobufSupportApiElements")
suppressPomMetadataWarningsFor("protobufSupportRuntimeElements")
suppressPomMetadataWarningsFor("testFixturesApiElements")
suppressPomMetadataWarningsFor("testFixturesRuntimeElements")
}
}
repositories {
Expand Down Expand Up @@ -160,6 +179,7 @@ subprojects {
allprojects {
repositories {
mavenCentral()
gradlePluginPortal()
}

tasks.withType<Test> {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
26 changes: 26 additions & 0 deletions outbox-kafka-quarkus-deployment/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id("java")
}

val quarkusVersion = rootProject.extra["quarkusVersion"]

dependencies {
implementation(platform("io.quarkus:quarkus-bom:$quarkusVersion"))
implementation("io.quarkus:quarkus-core-deployment")
implementation(project(":outbox-kafka-quarkus"))
implementation("io.quarkus:quarkus-arc-deployment")
implementation("io.quarkus:quarkus-hibernate-orm-deployment")
implementation("io.quarkus:quarkus-kafka-client-deployment")
implementation("io.quarkus:quarkus-jackson-deployment")
implementation("io.quarkus:quarkus-datasource-deployment")

// testing
testImplementation("io.quarkus:quarkus-junit5-internal")
testImplementation("io.quarkus:quarkus-hibernate-orm")
testImplementation("io.quarkus:quarkus-jdbc-postgresql")
testImplementation("io.quarkus:quarkus-flyway")
testImplementation("io.quarkus:quarkus-kafka-client")
testImplementation("io.quarkus:quarkus-messaging-kafka")
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
testImplementation("org.awaitility:awaitility")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright 2025 Tomorrow GmbH @ https://tomorrow.one
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package one.tomorrow.transactionaloutbox.quarkus.deployment;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.hibernate.orm.deployment.spi.AdditionalJpaModelBuildItem;
import one.tomorrow.transactionaloutbox.config.TransactionalOutboxConfig;
import one.tomorrow.transactionaloutbox.health.OutboxProcessorHealthCheck;
import one.tomorrow.transactionaloutbox.model.OutboxLock;
import one.tomorrow.transactionaloutbox.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.publisher.*;
import one.tomorrow.transactionaloutbox.repository.OutboxLockRepository;
import one.tomorrow.transactionaloutbox.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.service.OutboxLockService;
import one.tomorrow.transactionaloutbox.service.OutboxProcessor;
import one.tomorrow.transactionaloutbox.service.OutboxService;
import one.tomorrow.transactionaloutbox.tracing.NoopTracingServiceProducer;
import one.tomorrow.transactionaloutbox.tracing.OpenTelemetryTracingServiceProducer;

import java.util.List;

class TransactionalOutboxExtensionProcessor {

private static final String FEATURE = "transactional-outbox";

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
AdditionalBeanBuildItem outboxBeans() {
return AdditionalBeanBuildItem.builder()
.addBeanClasses(
OutboxLockRepository.class,
OutboxRepository.class,
OutboxLockService.class,
TransactionalOutboxConfig.class,
TransactionalOutboxConfig.CleanupConfig.class,
OutboxService.class,
OutboxProcessor.class,
PublisherConfig.class,
KafkaProducerMessagePublisherFactory.class,
DefaultKafkaProducerFactory.class
)
.setUnremovable()
.build();
}

@BuildStep
void registerTracingBeans(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBeans
) {
// Always register the default/no-op TracingService (it is marked @DefaultBean)
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClasses(NoopTracingServiceProducer.class)
.setUnremovable()
.build());

// Only register the OTel-based TracingService if the OTel capability is present
if (capabilities.isPresent(Capability.OPENTELEMETRY_TRACER)) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(OpenTelemetryTracingServiceProducer.class)
.setUnremovable()
.build());
}
}

@BuildStep
void registerHealthCheckBean(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBeans
) {
// Only register the OutboxProcessorHealthCheck if the smallrye healt capability is present
if (capabilities.isPresent(Capability.SMALLRYE_HEALTH)) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(OutboxProcessorHealthCheck.class)
.setUnremovable()
.build());
}
}

@BuildStep
List<AdditionalJpaModelBuildItem> jpaModels() {
return List.of(
new AdditionalJpaModelBuildItem(OutboxLock.class.getName()),
new AdditionalJpaModelBuildItem(OutboxRecord.class.getName())
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2025 Tomorrow GmbH @ https://tomorrow.one
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package one.tomorrow.transactionaloutbox.quarkus.deployment;

import jakarta.enterprise.context.ApplicationScoped;

/**
* Test application for the Transactional Outbox extension.
* This serves as a marker class for Quarkus tests.
*/
@ApplicationScoped
public class TestApp {
// This class is intentionally empty - it just serves as a marker for the Quarkus test
}
Loading