Skip to content

Commit

Permalink
Testcontainers: make Schema Registry configurable (#1489)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuvalshi0 authored Apr 4, 2022
1 parent 3d5cafb commit 9419b5d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/paradox/testing-testcontainers.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Testcontainers also allow you to create a complete Kafka cluster (using Docker c

## Settings

You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker and ZooKeeper configuration, by updating @apidoc[KafkaTestkitTestcontainersSettings] in code or configuration.
You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker, ZooKeeper and Schema Registry configuration, by updating @apidoc[KafkaTestkitTestcontainersSettings] in code or configuration.
The @apidoc[KafkaTestkitTestcontainersSettings] type can be used to perform actions such as:

* Set the docker image and tag of Kafka, ZooKeeper, and Schema Registry version to use (a recent Confluent Platform version is used by default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ public KafkaContainerCluster(
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""))
.collect(Collectors.toList());

if (useSchemaRegistry) {
this.schemaRegistry =
Optional.of(
new SchemaRegistryContainer(this.schemaRegistryImage)
.withNetworkAliases("schema-registry")
.withCluster(this));
} else {
this.schemaRegistry = Optional.empty();
}
}

public Network getNetwork() {
Expand Down Expand Up @@ -186,16 +196,6 @@ public void start() {

waitForClusterFormation();

if (useSchemaRegistry) {
this.schemaRegistry =
Optional.of(
new SchemaRegistryContainer(this.schemaRegistryImage)
.withNetworkAliases("schema-registry")
.withCluster(this));
} else {
this.schemaRegistry = Optional.empty();
}

// start schema registry if the container is initialized
Startables.deepStart(optionalStream(this.schemaRegistry))
.get(clusterStartTimeout.getSeconds(), SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.testkit.KafkaTestkitTestcontainersSettings.this")
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ final class KafkaTestkitTestcontainersSettings private (
val configureZooKeeperConsumer: java.util.function.Consumer[GenericContainer[_]] =
new Consumer[GenericContainer[_]]() {
override def accept(arg: GenericContainer[_]): Unit = ()
}
},
val configureSchemaRegistry: GenericContainer[_] => Unit = _ => ()
) {

/**
Expand Down Expand Up @@ -180,6 +181,14 @@ final class KafkaTestkitTestcontainersSettings private (
): KafkaTestkitTestcontainersSettings =
copy(configureZooKeeperConsumer = configureZooKeeperConsumer)

/**
* Java Api
* Replaces the default schema registry testcontainers configuration logic
*/
def withConfigureSchemaRegistry(
configureSchemaRegistry: GenericContainer[_] => Unit
): KafkaTestkitTestcontainersSettings = copy(configureSchemaRegistry = configureSchemaRegistry)

/**
* Use Schema Registry container.
*/
Expand Down Expand Up @@ -237,7 +246,8 @@ final class KafkaTestkitTestcontainersSettings private (
configureKafkaConsumer: java.util.function.Consumer[java.util.Collection[AlpakkaKafkaContainer]] =
configureKafkaConsumer,
configureZooKeeper: GenericContainer[_] => Unit = configureZooKeeper,
configureZooKeeperConsumer: java.util.function.Consumer[GenericContainer[_]] = configureZooKeeperConsumer
configureZooKeeperConsumer: java.util.function.Consumer[GenericContainer[_]] = configureZooKeeperConsumer,
configureSchemaRegistry: GenericContainer[_] => Unit = configureSchemaRegistry
): KafkaTestkitTestcontainersSettings =
new KafkaTestkitTestcontainersSettings(zooKeeperImage,
zooKeeperImageTag,
Expand All @@ -254,7 +264,8 @@ final class KafkaTestkitTestcontainersSettings private (
configureKafka,
configureKafkaConsumer,
configureZooKeeper,
configureZooKeeperConsumer)
configureZooKeeperConsumer,
configureSchemaRegistry)

override def toString: String =
"KafkaTestkitTestcontainersSettings(" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ object TestcontainersKafka {
configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
configureZooKeeper(zookeeperContainer)
configureZooKeeperConsumer.accept(zookeeperContainer)
schemaRegistryContainer match {
case Some(container) => configureSchemaRegistry(container)
case _ =>
}
log.info("Starting Kafka cluster with settings: {}", settings)
cluster.start()
kafkaBootstrapServersInternal = cluster.getBootstrapServers
Expand Down

0 comments on commit 9419b5d

Please sign in to comment.