From 411cd7a6540cdc8df1e400943a9b307f52a753ee Mon Sep 17 00:00:00 2001 From: ocadaruma Date: Thu, 10 Sep 2020 17:20:25 +0900 Subject: [PATCH 1/2] Publish testing module --- testing/build.gradle | 3 --- 1 file changed, 3 deletions(-) diff --git a/testing/build.gradle b/testing/build.gradle index 7a23908a..b52a223f 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -14,9 +14,6 @@ * under the License. */ -ext { - noPublish = true -} dependencies { implementation project(':processor') implementation project(':protobuf') From 359466ffe742aa8df5b0148db9aad9ddf154b31d Mon Sep 17 00:00:00 2001 From: ocadaruma Date: Thu, 10 Sep 2020 19:29:27 +0900 Subject: [PATCH 2/2] Change configuration to include libs in compile-scope deps --- testing/build.gradle | 16 +++--- .../decaton/testing/EmbeddedKafkaCluster.java | 54 ++++++++++++------- 2 files changed, 42 insertions(+), 28 deletions(-) diff --git a/testing/build.gradle b/testing/build.gradle index b52a223f..fe0c4d71 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -15,20 +15,18 @@ */ dependencies { - implementation project(':processor') - implementation project(':protobuf') + compile project(':processor') + compile project(':protobuf') - implementation "org.apache.kafka:kafka-clients:$kafkaVersion" - implementation "org.apache.kafka:kafka_2.12:$kafkaVersion" - implementation "org.apache.kafka:kafka-clients:$kafkaVersion:test" - implementation "org.apache.kafka:kafka_2.12:$kafkaVersion:test" + compile "org.apache.kafka:kafka-clients:$kafkaVersion" + compile "org.apache.kafka:kafka_2.12:$kafkaVersion" - implementation('org.apache.zookeeper:zookeeper:3.5.6') { + compile('org.apache.zookeeper:zookeeper:3.5.6') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'log4j', module: 'log4j' } - implementation "junit:junit:$junitVersion" - implementation "org.hamcrest:hamcrest-all:$hamcrestVersion" + compile "junit:junit:$junitVersion" + compile "org.hamcrest:hamcrest-all:$hamcrestVersion" runtimeOnly "ch.qos.logback:logback-classic:1.2.3" } diff --git a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java index a68625ed..5f2d9f71 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java @@ -16,17 +16,20 @@ package com.linecorp.decaton.testing; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.CoreUtils; -import kafka.utils.TestUtils; import lombok.Getter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -48,8 +51,13 @@ public EmbeddedKafkaCluster(int numBrokers, String zkConnect) { for (int i = 0; i < numBrokers; i++) { Properties prop = createBrokerConfig(i, zkConnect); - KafkaServer server = TestUtils.createServer(KafkaConfig.fromProps(prop), Time.SYSTEM); - int port = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT); + + KafkaServer server = new KafkaServer(KafkaConfig.fromProps(prop), + Time.SYSTEM, + Option.empty(), + scala.collection.immutable.List.empty()); + server.startup(); + int port = server.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); String listener = "127.0.0.1:" + port; listeners.add(listener); servers.add(server); @@ -61,24 +69,32 @@ public EmbeddedKafkaCluster(int numBrokers, String zkConnect) { } private static Properties createBrokerConfig(int brokerId, String zkConnect) { - return TestUtils.createBrokerConfig(brokerId, zkConnect, - false, // disable controlled shutdown - true, // enable delete topic - 0, // use random port + Properties prop = new Properties(); - // << enable only PLAINTEXT - Option.empty(), - Option.empty(), Option.empty(), - true, false, 0, - false, 0, false, 0, - // enable only PLAINTEXT >> + prop.setProperty("broker.id", String.valueOf(brokerId)); + prop.setProperty("zookeeper.connect", zkConnect); + prop.setProperty("controlled.shutdown.enable", "false"); + prop.setProperty("delete.topic.enable", "true"); + prop.setProperty("listeners", "PLAINTEXT://localhost:0"); + try { + prop.setProperty("log.dir", + Files.createTempDirectory("zookeeper-logs").toFile().getAbsolutePath()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + prop.setProperty("num.partitions", "1"); + prop.setProperty("default.replication.factor", "3"); + prop.setProperty("zookeeper.connection.timeout.ms", "10000"); + prop.setProperty("replica.socket.timeout.ms", "1500"); + prop.setProperty("controller.socket.timeout.ms", "1500"); + prop.setProperty("log.segment.delete.delay.ms", "1000"); + prop.setProperty("log.cleaner.dedupe.buffer.size", "2097152"); + prop.setProperty("message.timestamp.difference.max.ms", String.valueOf(Long.MAX_VALUE)); + prop.setProperty("offsets.topic.replication.factor", "1"); + prop.setProperty("offsets.topic.num.partitions", "5"); + prop.setProperty("group.initial.rebalance.delay.ms", "0"); - Option.empty(), // omit rack information - 1, // logDir count - false, // disable delegation token - 1, // num partitions - (short) 1 // default replication factor - ); + return prop; } @Override