diff --git a/pom.xml b/pom.xml index 3fbbcdfa..08d75d2e 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 1.0.5 1.7.2 5.0.2 - 0.8.0 + 0.10.2.0 3.4.5 4.11 3.2 @@ -812,7 +812,26 @@ org.apache.kafka - kafka_2.10 + kafka_2.11 + ${kafka.version} + + + org.slf4j + slf4j-simple + + + com.sun.jdmk + jmxtools + + + com.sun.jmx + jmxri + + + + + org.apache.kafka + kafka-clients ${kafka.version} @@ -1027,6 +1046,13 @@ ${commons-compress.version} test + + + org.slf4j + slf4j-log4j12 + 1.7.25 + test + diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java index 9a214892..909ac627 100644 --- a/twill-api/src/main/java/org/apache/twill/api/Configs.java +++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java @@ -83,6 +83,11 @@ public static final class Keys { */ public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled"; + /** + * Setting for kafka 'bootstrap.servers' for log collection + */ + public static final String LOG_COLLECTION_KAFKA_BOOTSTRAP = "twill.log.collection.kafka.bootstrap"; + /** * The maximum number of FileContext object cached by the FileContextLocationFactory. */ @@ -132,6 +137,11 @@ public static final class Defaults { */ public static final boolean LOG_COLLECTION_ENABLED = true; + /** + * Default is empty + */ + public static final String LOG_COLLECTION_KAFKA_BOOTSTRAP_EMPTY = ""; + /** * Default size of the file context cache. */ diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index 1f509724..dad242c1 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -45,6 +45,13 @@ public interface TwillPreparer { */ TwillPreparer addLogHandler(LogHandler handler); + /** + * Configures bootstrap servers for kafka log aggregation client + * @param kafkaBootstrapServers kafka bootstrap.servers config for log aggregation client + * @return This {@link TwillPreparer} + */ + TwillPreparer withKafkaBootstrapServers(String kafkaBootstrapServers); + /** * Sets the user name that runs the application. Default value is get from {@code "user.name"} by calling * {@link System#getProperty(String)}. diff --git a/twill-core/pom.xml b/twill-core/pom.xml index 7e24154f..b4d8dd20 100644 --- a/twill-core/pom.xml +++ b/twill-core/pom.xml @@ -79,7 +79,11 @@ org.apache.kafka - kafka_2.10 + kafka_2.11 + + + org.apache.kafka + kafka-clients @@ -98,5 +102,9 @@ org.apache.commons commons-compress + + org.slf4j + slf4j-log4j12 + diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java index e49a2ad0..c0a2c70c 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java @@ -43,14 +43,13 @@ import org.apache.twill.internal.json.LogEntryDecoder; import org.apache.twill.internal.json.LogThrowableCodec; import org.apache.twill.internal.json.StackTraceElementCodec; -import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.kafka.client.BootstrapedKafkaClientService; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.state.SystemMessages; import org.apache.twill.kafka.client.FetchedMessage; import org.apache.twill.kafka.client.KafkaClientService; import org.apache.twill.kafka.client.KafkaConsumer; import org.apache.twill.zookeeper.ZKClient; -import org.apache.twill.zookeeper.ZKClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +78,7 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle private Cancellable logCancellable; public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled, - Iterable logHandlers) { + String kafkaBootstrap, Iterable logHandlers) { super(runId, zkClient); this.appName = appName; this.runId = runId; @@ -88,7 +87,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b // When addressing TWILL-147, need to check if the given ZKClient is // actually used by the Kafka used for log collection if (logCollectionEnabled) { - this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka")); + this.kafkaClient = new BootstrapedKafkaClientService(kafkaBootstrap); Iterables.addAll(this.logHandlers, logHandlers); } else { this.kafkaClient = null; diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java index 831c8318..9e27c317 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java @@ -43,12 +43,15 @@ public class TwillRuntimeSpecification { private final Map maxRetries; private final double minHeapRatio; private final boolean logCollectionEnabled; + private final boolean embeddedKafkaEnabled; + private String kafkaBootstrap; public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir, String zkConnectStr, RunId twillRunId, String twillAppName, int reservedMemory, @Nullable String rmSchedulerAddr, Map> logLevels, Map maxRetries, - double minHeapRatio, boolean logCollectionEnabled) { + double minHeapRatio, boolean logCollectionEnabled, String kafkaBootstrap, + boolean embeddedKafkaEnabled) { this.twillSpecification = twillSpecification; this.fsUser = fsUser; this.twillAppDir = twillAppDir; @@ -61,6 +64,8 @@ public TwillRuntimeSpecification(TwillSpecification twillSpecification, String f this.maxRetries = maxRetries; this.minHeapRatio = minHeapRatio; this.logCollectionEnabled = logCollectionEnabled; + this.kafkaBootstrap = kafkaBootstrap; + this.embeddedKafkaEnabled = embeddedKafkaEnabled; } public TwillSpecification getTwillSpecification() { @@ -116,15 +121,18 @@ public Map getMaxRetries() { } /** - * Returns the ZK connection string for the Kafka used for log collections, + * Returns the bootstrap connection string for the Kafka used for log collections, * or {@code null} if log collection is disabled. */ - @Nullable - public String getKafkaZKConnect() { - if (!isLogCollectionEnabled()) { - return null; - } - // When addressing TWILL-147, a field can be introduced to carry this value. - return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId()); + public String getKafkaBootstrap() { + return kafkaBootstrap; + } + + public void setKafkaBootstrap(String kafkaBootstrap) { + this.kafkaBootstrap = kafkaBootstrap; + } + + public boolean isEmbeddedKafkaEnabled() { + return embeddedKafkaEnabled; } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java index 5ff05e8a..93866a2e 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java @@ -51,6 +51,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer() { }.getType()); Map> logLevels = context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken>>() { }.getType()); - Map maxRetries = + Map maxRetries = context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken>() { }.getType()); - + return new TwillRuntimeSpecification(twillSpecification, jsonObj.get(FS_USER).getAsString(), URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()), @@ -100,6 +104,8 @@ public TwillRuntimeSpecification deserialize(JsonElement json, Type typeOfT, logLevels, maxRetries, jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(), - jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean()); + jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean(), + jsonObj.get(KAFKA_BOOTSTRAP).getAsString(), + jsonObj.get(IS_EMBEDDED_KAFKA_ENABLED).getAsBoolean()); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java index be6121d0..170f8a05 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java @@ -21,8 +21,7 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractIdleService; import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.Time; +import kafka.server.KafkaServerStartable; import org.I0Itec.zkclient.exception.ZkTimeoutException; import org.apache.twill.internal.utils.Networks; import org.slf4j.Logger; @@ -45,22 +44,24 @@ public final class EmbeddedKafkaServer extends AbstractIdleService { private static final String DEFAULT_START_RETRIES = "5"; private final int startTimeoutRetries; - private final Properties properties; - private KafkaServer server; + private final KafkaConfig kafkaConfig; + private KafkaServerStartable server; public EmbeddedKafkaServer(Properties properties) { this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES, DEFAULT_START_RETRIES)); - this.properties = new Properties(); - this.properties.putAll(properties); + this.kafkaConfig = createKafkaConfig(properties); + } + + public String getKafkaBootstrap() { + return String.format("%s:%d", kafkaConfig.hostName(), kafkaConfig.port()); } @Override protected void startUp() throws Exception { int tries = 0; do { - KafkaConfig kafkaConfig = createKafkaConfig(properties); - KafkaServer kafkaServer = createKafkaServer(kafkaConfig); + KafkaServerStartable kafkaServer = createKafkaServer(kafkaConfig); try { kafkaServer.startup(); server = kafkaServer; @@ -96,28 +97,10 @@ protected void shutDown() throws Exception { } } - private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) { - return new KafkaServer(kafkaConfig, new Time() { - - @Override - public long milliseconds() { - return System.currentTimeMillis(); - } - - @Override - public long nanoseconds() { - return System.nanoTime(); - } - - @Override - public void sleep(long ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - Thread.interrupted(); - } - } - }); + private KafkaServerStartable createKafkaServer(KafkaConfig kafkaConfig) { + Properties properties = new Properties(); + properties.putAll(kafkaConfig.props()); + return KafkaServerStartable.fromProps(properties); } /** @@ -134,6 +117,15 @@ private KafkaConfig createKafkaConfig(Properties properties) { Preconditions.checkState(randomPort > 0, "Failed to get random port."); prop.setProperty("port", Integer.toString(randomPort)); } + String brokerId = prop.getProperty("broker.id"); + if (brokerId == null) { + Random random = new Random(System.currentTimeMillis()); + int i; + do { + i = random.nextInt(1000); + } while (i < 0); + prop.setProperty("broker.id", Integer.toString(i)); + } return new KafkaConfig(prop); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaConsumer.java new file mode 100644 index 00000000..c982c4fd --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaConsumer.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.kafka.client; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.twill.common.Cancellable; +import org.apache.twill.kafka.client.FetchedMessage; +import org.apache.twill.kafka.client.KafkaConsumer; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * {@link KafkaConsumer} implementation with new kafka api and bootstrap servers + * Created by SFilippov on 18.07.2017. + */ +public class BetterKafkaConsumer implements KafkaConsumer { + + private static final int POOL_DELAY = 100; + private static final int POOL_TIMEOUT = 100; + + private final ScheduledExecutorService executorService; + private final org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer; + + public BetterKafkaConsumer(String bootstrapServers) { + this.executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("kafka-consumer-%d") + .build()); + Properties properties = new Properties(); + properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "twill-log-consumer-group"); + properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, "twill-log-consumer-client-" + UUID.randomUUID()); + kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties, + new IntegerDeserializer(), + new ByteBufferDeserializer()); + } + + public void stop() throws InterruptedException { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } + + @Override + public Preparer prepare() { + return new BetterPreparer(); + } + + private final class BetterPreparer implements Preparer { + + private TopicPartition topicPartition; + + /** + * Should be invoked only on {@link BetterKafkaConsumer#executorService}. + * + * @param topic topic to check if consumer subscribed + */ + private void checkSubscription(String topic) { + if (!kafkaConsumer.subscription().contains(topic)) { + kafkaConsumer.subscribe(Collections.singleton(topic)); + kafkaConsumer.poll(0); + } + } + + @Override + public Preparer add(final String topic, final int partition, final long offset) { + executorService.submit(new Runnable() { + @Override + public void run() { + checkSubscription(topic); + topicPartition = new TopicPartition(topic, partition); + kafkaConsumer.seek(topicPartition, offset); + } + }); + return this; + } + + @Override + public Preparer addFromBeginning(final String topic, final int partition) { + executorService.submit(new Runnable() { + @Override + public void run() { + checkSubscription(topic); + topicPartition = new TopicPartition(topic, partition); + kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition)); + } + }); + return this; + } + + @Override + public Preparer addLatest(final String topic, final int partition) { + executorService.submit(new Runnable() { + @Override + public void run() { + checkSubscription(topic); + topicPartition = new TopicPartition(topic, partition); + kafkaConsumer.seekToEnd(Collections.singleton(topicPartition)); + } + }); + return this; + } + + @Override + public Cancellable consume(final MessageCallback callback) { + final ScheduledFuture future = executorService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + ConsumerRecords records = kafkaConsumer.poll(POOL_TIMEOUT); + Iterable fetchedMessages = + Iterables.transform(records, new Function, FetchedMessage>() { + @Nullable + @Override + public FetchedMessage apply(@Nullable ConsumerRecord record) { + BasicFetchedMessage fetchedMessage = + new BasicFetchedMessage(new org.apache.twill.kafka.client.TopicPartition(record.topic(), + record.partition())); + fetchedMessage.setPayload(record.value()); + fetchedMessage.setOffset(record.offset()); + fetchedMessage.setNextOffset(record.offset() + 1); + return fetchedMessage; + } + }); + long offset = callback.onReceived(fetchedMessages.iterator()); + if (offset > 0) { + kafkaConsumer.seek(topicPartition, offset); + } + } + }, 0, POOL_DELAY, TimeUnit.MILLISECONDS); + return new Cancellable() { + @Override + public void cancel() { + future.cancel(true); + callback.finished(); + } + }; + } + } +} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaPublisher.java new file mode 100644 index 00000000..90849e87 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BetterKafkaPublisher.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.kafka.client; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.twill.kafka.client.Compression; +import org.apache.twill.kafka.client.KafkaPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; + +/** + * {@link KafkaPublisher} implementation with new kafka api and bootstrap servers + * Created by SFilippov on 18.07.2017. + */ +public class BetterKafkaPublisher implements KafkaPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(BetterKafkaPublisher.class); + + private final KafkaProducer kafkaProducer; + + public BetterKafkaPublisher(String bootstrapServers, + KafkaPublisher.Ack ack, + Compression compression) { + final Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.ACKS_CONFIG, Integer.toString(ack.getAck())); + props.put(ProducerConfig.RETRIES_CONFIG, 3); + props.put(ProducerConfig.LINGER_MS_CONFIG, 1); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.getCodec()); + + kafkaProducer = new KafkaProducer<>(props, + new IntegerSerializer(), + new ByteBufferSerializer()); + LOG.debug("Starting kafkaProducer {}", props); + } + + @Override + public Preparer prepare(String topic) { + return new BetterPreparer(topic); + } + + public void stop() { + kafkaProducer.close(); + } + + + private final class BetterPreparer implements Preparer { + private final String topic; + private final List> messages; + + private BetterPreparer(String topic) { + this.topic = topic; + this.messages = Lists.newLinkedList(); + } + + @Override + public Preparer add(ByteBuffer message, Object partitionKey) { + messages.add(new ProducerRecord(topic, Math.abs(partitionKey.hashCode()), message)); + return BetterPreparer.this; + } + + @Override + public ListenableFuture send() { + try { + if (kafkaProducer == null) { + return Futures.immediateFailedFuture(new IllegalStateException("No kafka producer available.")); + } + + List> futures = Lists.newArrayList(); + for (ProducerRecord pr : messages) { + futures.add(JdkFutureAdapters.listenInPoolThread(kafkaProducer.send(pr), + Executors.newSingleThreadExecutor())); + } + ListenableFuture> listListenableFuture = Futures.allAsList(futures); + return Futures.transform(listListenableFuture, new AsyncFunction, Integer>() { + @Override + public ListenableFuture apply(List input) throws Exception { + return Futures.immediateFuture(input.size()); + } + }); + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } finally { + messages.clear(); + } + } + } +} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BootstrapedKafkaClientService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BootstrapedKafkaClientService.java new file mode 100644 index 00000000..0cb1cf42 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BootstrapedKafkaClientService.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.kafka.client; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.twill.kafka.client.Compression; +import org.apache.twill.kafka.client.KafkaClientService; +import org.apache.twill.kafka.client.KafkaPublisher; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Implementation of {@link KafkaClientService} with bootstrap servers string + * Created by SFilippov on 11.07.2017. + */ +public class BootstrapedKafkaClientService extends AbstractIdleService implements KafkaClientService { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapedKafkaClientService.class); + + private final String bootstrapServers; + private final List publishers; + private BetterKafkaConsumer consumer; + + public BootstrapedKafkaClientService(String bootstrapServers) { + Preconditions.checkNotNull(bootstrapServers, "Bootstrap server's list cannot be null"); + Preconditions.checkState(!bootstrapServers.isEmpty(), "Bootstrap server's list cannot be empty"); + this.bootstrapServers = bootstrapServers; + this.publishers = Lists.newArrayList(); + } + + @Override + protected void startUp() throws Exception { + + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Stopping Kafka consumer"); + consumer.stop(); + for (BetterKafkaPublisher publisher : publishers) { + publisher.stop(); + } + LOG.info("Kafka Consumer stopped"); + } + + @Override + public KafkaPublisher getPublisher(KafkaPublisher.Ack ack, Compression compression) { + BetterKafkaPublisher publisher = new BetterKafkaPublisher(bootstrapServers, ack, compression); + publishers.add(publisher); + return publisher; + } + + @Override + public org.apache.twill.kafka.client.KafkaConsumer getConsumer() { + if (consumer == null) { + consumer = new BetterKafkaConsumer(bootstrapServers); + } + return consumer; + } +} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java deleted file mode 100644 index 9211d92e..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -import java.nio.ByteBuffer; - -/** - * A kafka {@link kafka.serializer.Encoder} for encoding byte buffer into byte array. - */ -public final class ByteBufferEncoder implements Encoder { - - public ByteBufferEncoder(VerifiableProperties properties) { - } - - public byte[] toBytes(ByteBuffer buffer) { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - return bytes; - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java deleted file mode 100644 index cbe7eaa4..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import com.google.common.primitives.Ints; -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -/** - * A kafka {@link kafka.serializer.Encoder} for encoding integer into bytes. - */ -public final class IntegerEncoder implements Encoder { - - public IntegerEncoder(VerifiableProperties properties) { - } - - public byte[] toBytes(Integer buffer) { - return Ints.toByteArray(buffer.intValue()); - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java deleted file mode 100644 index 4aa79403..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -/** - * A kafka {@link kafka.producer.Partitioner} using integer key to compute partition id. - */ -public final class IntegerPartitioner implements Partitioner { - - public IntegerPartitioner(VerifiableProperties properties) { - } - - public int partition(Integer key, int numPartitions) { - return key % numPartitions; - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java deleted file mode 100644 index 73235b7a..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java +++ /dev/null @@ -1,521 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Futures; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.ErrorMapping; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; -import org.apache.twill.common.Cancellable; -import org.apache.twill.common.Threads; -import org.apache.twill.kafka.client.BrokerInfo; -import org.apache.twill.kafka.client.BrokerService; -import org.apache.twill.kafka.client.FetchedMessage; -import org.apache.twill.kafka.client.KafkaConsumer; -import org.apache.twill.kafka.client.TopicPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ConnectException; -import java.nio.channels.ClosedByInterruptException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A {@link KafkaConsumer} implementation using the scala kafka api. - */ -final class SimpleKafkaConsumer implements KafkaConsumer { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaConsumer.class); - private static final int FETCH_SIZE = 1024 * 1024; // Use a default fetch size. - private static final int SO_TIMEOUT = 5 * 1000; // 5 seconds. - private static final int MAX_WAIT = 1000; // 1 second. - private static final long CONSUMER_EXPIRE_MINUTES = 1L; // close consumer if not used for 1 minute. - private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer. - private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer. - private static final long EMPTY_FETCH_WAIT = 500L; // Sleep for 500 ms if no message is fetched. - - private final BrokerService brokerService; - private final LoadingCache consumers; - private final BlockingQueue consumerCancels; - - SimpleKafkaConsumer(BrokerService brokerService) { - this.brokerService = brokerService; - this.consumers = CacheBuilder.newBuilder() - .expireAfterAccess(CONSUMER_EXPIRE_MINUTES, TimeUnit.MINUTES) - .removalListener(createRemovalListener()) - .build(createConsumerLoader()); - this.consumerCancels = new LinkedBlockingQueue(); - } - - @Override - public Preparer prepare() { - return new SimplePreparer(); - } - - /** - * Called to stop all consumers created. This method should only be - * called by KafkaClientService who own this consumer. - */ - void stop() { - LOG.info("Stopping Kafka consumer"); - List cancels = Lists.newLinkedList(); - consumerCancels.drainTo(cancels); - for (Cancellable cancel : cancels) { - cancel.cancel(); - } - consumers.invalidateAll(); - LOG.info("Kafka Consumer stopped"); - } - - /** - * Creates a CacheLoader for creating SimpleConsumer. - */ - private CacheLoader createConsumerLoader() { - return new CacheLoader() { - @Override - public SimpleConsumer load(BrokerInfo key) throws Exception { - return new SimpleConsumer(key.getHost(), key.getPort(), SO_TIMEOUT, FETCH_SIZE, "simple-kafka-client"); - } - }; - } - - /** - * Creates a RemovalListener that will close SimpleConsumer on cache removal. - */ - private RemovalListener createRemovalListener() { - return new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - SimpleConsumer consumer = notification.getValue(); - if (consumer != null) { - consumer.close(); - } - } - }; - } - - /** - * Retrieves the last offset before the given timestamp for a given topic partition. - * - * @return The last offset before the given timestamp or {@code 0} if failed to do so. - */ - private long getLastOffset(TopicPartition topicPart, long timestamp) { - BrokerInfo brokerInfo = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition()); - SimpleConsumer consumer = brokerInfo == null ? null : consumers.getUnchecked(brokerInfo); - - // If no broker, treat it as failure attempt. - if (consumer == null) { - LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", topicPart); - return 0L; - } - - // Fire offset request - OffsetRequest request = new OffsetRequest(ImmutableMap.of( - new TopicAndPartition(topicPart.getTopic(), topicPart.getPartition()), - new PartitionOffsetRequestInfo(timestamp, 1) - ), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - - OffsetResponse response = consumer.getOffsetsBefore(request); - - // Retrieve offsets from response - long[] offsets = response.hasError() ? null : response.offsets(topicPart.getTopic(), topicPart.getPartition()); - if (offsets == null || offsets.length <= 0) { - short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition()); - - // If the topic partition doesn't exists, use offset 0 without logging error. - if (errorCode != ErrorMapping.UnknownTopicOrPartitionCode()) { - consumers.refresh(brokerInfo); - LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.", - topicPart, timestamp, errorCode); - } - return 0L; - } - - LOG.debug("Offset {} fetched for {} with timestamp {}.", offsets[0], topicPart, timestamp); - return offsets[0]; - } - - /** - * A preparer that uses kafak scala api for consuming messages. - */ - private final class SimplePreparer implements Preparer { - - // Map from TopicPartition to offset - private final Map requests; - private final ThreadFactory threadFactory; - - private SimplePreparer() { - this.requests = Maps.newHashMap(); - this.threadFactory = Threads.createDaemonThreadFactory("message-callback-%d"); - } - - @Override - public Preparer add(String topic, int partition, long offset) { - requests.put(new TopicPartition(topic, partition), offset); - return this; - } - - @Override - public Preparer addFromBeginning(String topic, int partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - requests.put(topicPartition, kafka.api.OffsetRequest.EarliestTime()); - return this; - } - - @Override - public Preparer addLatest(String topic, int partition) { - TopicPartition topicPartition = new TopicPartition(topic, partition); - requests.put(topicPartition, kafka.api.OffsetRequest.LatestTime()); - return this; - } - - @Override - public Cancellable consume(MessageCallback callback) { - final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory); - final List pollers = Lists.newArrayList(); - - // When cancelling the consumption, first terminates all polling threads and then stop the executor service. - final AtomicBoolean cancelled = new AtomicBoolean(); - Cancellable cancellable = new Cancellable() { - @Override - public void cancel() { - if (!cancelled.compareAndSet(false, true)) { - return; - } - consumerCancels.remove(this); - - LOG.info("Requesting stop of all consumer threads."); - for (ConsumerThread consumerThread : pollers) { - consumerThread.terminate(); - } - LOG.info("Wait for all consumer threads to stop."); - for (ConsumerThread consumerThread : pollers) { - try { - consumerThread.join(); - } catch (InterruptedException e) { - LOG.warn("Interrupted exception while waiting for thread to complete.", e); - } - } - LOG.info("All consumer threads stopped."); - // Use shutdown so that submitted task still has chance to execute, which is important for finished to get - // called. - executor.shutdown(); - } - }; - - // Wrap the callback with a single thread executor. - MessageCallback messageCallback = wrapCallback(callback, executor, cancellable); - - // Starts threads for polling new messages. - for (Map.Entry entry : requests.entrySet()) { - ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue(), messageCallback); - consumerThread.setDaemon(true); - consumerThread.start(); - pollers.add(consumerThread); - } - - consumerCancels.add(cancellable); - return cancellable; - } - - /** - * Wrap a given MessageCallback by a executor so that calls are executed in the given executor. - * By running the calls through the executor, it also block and wait for the task being completed so that - * it can block the poller thread depending on the rate of processing that the callback can handle. - */ - private MessageCallback wrapCallback(final MessageCallback callback, - final ExecutorService executor, final Cancellable cancellable) { - final AtomicBoolean stopped = new AtomicBoolean(); - return new MessageCallback() { - @Override - public long onReceived(final Iterator messages) { - if (stopped.get()) { - return -1L; - } - return Futures.getUnchecked(executor.submit(new Callable() { - @Override - public Long call() { - if (stopped.get()) { - return -1L; - } - return callback.onReceived(messages); - } - })); - } - - @Override - public void finished() { - // Make sure finished only get called once. - if (!stopped.compareAndSet(false, true)) { - return; - } - Futures.getUnchecked(executor.submit(new Runnable() { - @Override - public void run() { - // When finished is called, also cancel the consumption from all polling thread. - callback.finished(); - cancellable.cancel(); - } - })); - } - }; - } - } - - /** - * The thread for polling kafka. - */ - private final class ConsumerThread extends Thread { - - private final TopicPartition topicPart; - private final long startOffset; - private final MessageCallback callback; - private final BasicFetchedMessage fetchedMessage; - private volatile boolean running; - - private ConsumerThread(TopicPartition topicPart, long startOffset, MessageCallback callback) { - super(String.format("Kafka-Consumer-%s-%d", topicPart.getTopic(), topicPart.getPartition())); - this.topicPart = topicPart; - this.startOffset = startOffset; - this.callback = callback; - this.running = true; - this.fetchedMessage = new BasicFetchedMessage(topicPart); - } - - @Override - public void run() { - final AtomicLong offset = new AtomicLong(startOffset); - - Map.Entry consumerEntry = null; - ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF, - MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS); - while (running) { - if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) { - LOG.debug("No leader for topic partition {}.", topicPart); - backoff.backoff(); - continue; - } - - // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset - // from kafak server. - long off = offset.get(); - if (off < 0) { - offset.set(getLastOffset(topicPart, off)); - } - - SimpleConsumer consumer = consumerEntry.getValue(); - - // Fire a fetch message request - try { - FetchResponse response = fetchMessages(consumer, offset.get()); - - // Failure response, set consumer entry to null and let next round of loop to handle it. - if (response.hasError()) { - short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition()); - LOG.info("Failed to fetch message on {}. Error: {}", topicPart, errorCode); - // If it is out of range error, reset to earliest offset - if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { - offset.set(kafka.api.OffsetRequest.EarliestTime()); - } - - consumers.refresh(consumerEntry.getKey()); - consumerEntry = null; - continue; - } - - ByteBufferMessageSet messages = response.messageSet(topicPart.getTopic(), topicPart.getPartition()); - if (sleepIfEmpty(messages)) { - continue; - } - - // Call the callback - invokeCallback(messages, offset); - backoff.reset(); - } catch (Throwable t) { - // Only log if it is still running, otherwise, it just the interrupt caused by the stop. - if (!running) { - LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart); - } else { - if (t instanceof ClosedByInterruptException || t instanceof ConnectException) { - LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart); - } else { - LOG.info("Exception when fetching message on {}.", topicPart, t); - } - backoff.backoff(); - } - consumers.refresh(consumerEntry.getKey()); - consumerEntry = null; - } - } - - // When the thread is done, call the callback finished method. - try { - callback.finished(); - } catch (Throwable t) { - LOG.error("Exception thrown from MessageCallback.finished({})", running, t); - } - } - - public void terminate() { - LOG.info("Terminate requested {}", getName()); - running = false; - interrupt(); - } - - /** - * Gets the leader broker and the associated SimpleConsumer for the current topic and partition. - */ - private Map.Entry getConsumerEntry() { - BrokerInfo leader = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition()); - return leader == null ? null : Maps.immutableEntry(leader, consumers.getUnchecked(leader)); - } - - /** - * Makes a call to kafka to fetch messages. - */ - private FetchResponse fetchMessages(SimpleConsumer consumer, long offset) { - FetchRequest request = new FetchRequestBuilder() - .clientId(consumer.clientId()) - .addFetch(topicPart.getTopic(), topicPart.getPartition(), offset, FETCH_SIZE) - .maxWait(MAX_WAIT) - .build(); - return consumer.fetch(request); - } - - /** - * Sleeps if the message set is empty. - * @return {@code true} if it is empty, {@code false} otherwise. - */ - private boolean sleepIfEmpty(ByteBufferMessageSet messages) { - if (Iterables.isEmpty(messages)) { - LOG.trace("No message fetched. Sleep for {} ms before next fetch.", EMPTY_FETCH_WAIT); - try { - TimeUnit.MILLISECONDS.sleep(EMPTY_FETCH_WAIT); - } catch (InterruptedException e) { - // It's interrupted from stop, ok to ignore. - } - return true; - } - return false; - } - - /** - * Calls the message callback with the given message set. - */ - private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) { - long savedOffset = offset.get(); - try { - offset.set(callback.onReceived(createFetchedMessages(messages, offset))); - } catch (Throwable t) { - LOG.error("Callback throws exception. Retry from offset {} for {}", startOffset, topicPart, t); - offset.set(savedOffset); - } - } - - /** - * Creates an Iterator of FetchedMessage based on the given message set. The iterator would also updates - * the offset while iterating. - */ - private Iterator createFetchedMessages(ByteBufferMessageSet messageSet, final AtomicLong offset) { - final Iterator messages = messageSet.iterator(); - return new AbstractIterator() { - @Override - protected FetchedMessage computeNext() { - while (messages.hasNext()) { - MessageAndOffset message = messages.next(); - long msgOffset = message.offset(); - if (msgOffset < offset.get()) { - LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.", - msgOffset, offset.get(), topicPart); - continue; - } - - fetchedMessage.setPayload(message.message().payload()); - fetchedMessage.setOffset(message.offset()); - fetchedMessage.setNextOffset(message.nextOffset()); - - return fetchedMessage; - } - return endOfData(); - } - }; - } - - /** - * Helper class for performance exponential backoff on message fetching failure. - */ - private final class ExponentialBackoff { - private final long initialBackoff; - private final long maxBackoff; - private final TimeUnit backoffUnit; - private int failureCount = 0; - - private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) { - this.initialBackoff = initialBackoff; - this.maxBackoff = maxBackoff; - this.backoffUnit = backoffUnit; - } - - void backoff() { - failureCount++; - long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1)); - long backoff = Math.min(initialBackoff * multiplier, maxBackoff); - backoff = backoff < 0 ? maxBackoff : backoff; - try { - backoffUnit.sleep(backoff); - } catch (InterruptedException e) { - // OK to ignore since this method is called from the consumer thread only, which on thread shutdown, - // the thread will be interrupted - } - } - - void reset() { - failureCount = 0; - } - } - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java deleted file mode 100644 index f147d24d..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import org.apache.twill.common.Cancellable; -import org.apache.twill.common.Threads; -import org.apache.twill.kafka.client.BrokerService; -import org.apache.twill.kafka.client.Compression; -import org.apache.twill.kafka.client.KafkaPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Implementation of {@link KafkaPublisher} using the kafka scala-java api. - */ -final class SimpleKafkaPublisher implements KafkaPublisher { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaPublisher.class); - - private final BrokerService brokerService; - private final Ack ack; - private final Compression compression; - private final AtomicReference> producer; - private final AtomicBoolean listenerCancelled; - - public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) { - this.brokerService = brokerService; - this.ack = ack; - this.compression = compression; - this.producer = new AtomicReference>(); - this.listenerCancelled = new AtomicBoolean(false); - } - - /** - * Start the publisher. This method must be called before other methods. This method is only to be called - * by KafkaClientService who own this object. - * @return A Cancellable for closing this publish. - */ - Cancellable start() { - ExecutorService listenerExecutor - = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("kafka-publisher")); - - // Listen to changes in broker list - final BrokerListChangeListener listener = new BrokerListChangeListener(listenerCancelled, producer, - ack, compression); - Cancellable cancelChangeListener = brokerService.addChangeListener(listener, listenerExecutor); - - // Invoke the change listener at least once. Since every call to the listener is through the single thread - // executor, there is no race and for sure the listener always see the latest change, either through this call - // or from the BrokerService callback. - Future completion = listenerExecutor.submit(new Runnable() { - @Override - public void run() { - listener.changed(brokerService); - } - }); - - Futures.getUnchecked(completion); - return new ProducerCancellable(listenerExecutor, listenerCancelled, cancelChangeListener, producer); - } - - @Override - public Preparer prepare(String topic) { - return new SimplePreparer(topic); - } - - private final class SimplePreparer implements Preparer { - - private final String topic; - private final List> messages; - - private SimplePreparer(String topic) { - this.topic = topic; - this.messages = Lists.newLinkedList(); - } - - @Override - public Preparer add(ByteBuffer message, Object partitionKey) { - messages.add(new KeyedMessage(topic, Math.abs(partitionKey.hashCode()), message)); - return this; - } - - @Override - public ListenableFuture send() { - try { - int size = messages.size(); - Producer kafkaProducer = producer.get(); - if (kafkaProducer == null) { - return Futures.immediateFailedFuture(new IllegalStateException("No kafka producer available.")); - } - kafkaProducer.send(messages); - return Futures.immediateFuture(size); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } finally { - messages.clear(); - } - } - } - - /** - * Listener for watching for changes in broker list. - * This needs to be a static class so that no reference to the publisher instance is held in order for - * weak reference inside ZKBrokerService to publish works and be able to GC the Publisher instance and hence - * closing the underlying kafka producer. - */ - private static final class BrokerListChangeListener extends BrokerService.BrokerChangeListener { - - private final AtomicBoolean listenerCancelled; - private final AtomicReference> producer; - private final Ack ack; - private final Compression compression; - private String brokerList; - - private BrokerListChangeListener(AtomicBoolean listenerCancelled, - AtomicReference> producer, - Ack ack, Compression compression) { - this.listenerCancelled = listenerCancelled; - this.producer = producer; - this.ack = ack; - this.compression = compression; - } - - @Override - public void changed(BrokerService brokerService) { - if (listenerCancelled.get()) { - return; - } - - String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { - LOG.warn("Broker list is empty. No Kafka producer is created."); - return; - } - - if (Objects.equal(brokerList, newBrokerList)) { - return; - } - - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); - - ProducerConfig config = new ProducerConfig(props); - Producer oldProducer = producer.getAndSet(new Producer(config)); - if (oldProducer != null) { - oldProducer.close(); - } - - LOG.info("Update Kafka producer broker list: {}", newBrokerList); - brokerList = newBrokerList; - } - } - - /** - * For stopping and releasing resource for the publisher. This class shouldn't hold any references to the - * Publisher class. - */ - private static final class ProducerCancellable implements Cancellable, Runnable { - private final ExecutorService executor; - private final AtomicBoolean listenerCancelled; - private final Cancellable cancelChangeListener; - private final AtomicReference> producer; - - private ProducerCancellable(ExecutorService executor, AtomicBoolean listenerCancelled, - Cancellable cancelChangeListener, - AtomicReference> producer) { - this.executor = executor; - this.listenerCancelled = listenerCancelled; - this.cancelChangeListener = cancelChangeListener; - this.producer = producer; - } - - @Override - public void cancel() { - if (listenerCancelled.compareAndSet(false, true)) { - executor.execute(this); - } - } - - @Override - public void run() { - // Call from cancel() through executor only. - cancelChangeListener.cancel(); - Producer kafkaProducer = producer.get(); - kafkaProducer.close(); - executor.shutdownNow(); - } - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java deleted file mode 100644 index 2ffc604f..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; -import com.google.gson.Gson; -import org.apache.twill.common.Cancellable; -import org.apache.twill.common.Threads; -import org.apache.twill.kafka.client.BrokerInfo; -import org.apache.twill.kafka.client.BrokerService; -import org.apache.twill.kafka.client.TopicPartition; -import org.apache.twill.zookeeper.NodeChildren; -import org.apache.twill.zookeeper.NodeData; -import org.apache.twill.zookeeper.ZKClient; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for - * each topic partition. - */ -public final class ZKBrokerService extends AbstractIdleService implements BrokerService { - - private static final Logger LOG = LoggerFactory.getLogger(ZKBrokerService.class); - private static final String BROKER_IDS_PATH = "/brokers/ids"; - private static final String BROKER_TOPICS_PATH = "/brokers/topics"; - private static final long FAILURE_RETRY_SECONDS = 5; - private static final Gson GSON = new Gson(); - private static final Function BROKER_ID_TRANSFORMER = new Function() { - @Override - public BrokerId apply(String input) { - return new BrokerId(Integer.parseInt(input)); - } - }; - private static final Function BROKER_INFO_TO_ADDRESS = new Function() { - @Override - public String apply(BrokerInfo input) { - return String.format("%s:%d", input.getHost(), input.getPort()); - } - }; - - private final ZKClient zkClient; - private final LoadingCache> brokerInfos; - private final LoadingCache> partitionInfos; - private final Set listeners; - - private ExecutorService executorService; - private Supplier> brokerList; - - public ZKBrokerService(ZKClient zkClient) { - this.zkClient = zkClient; - this.brokerInfos = CacheBuilder.newBuilder().build(createCacheLoader(new CacheInvalidater() { - @Override - public void invalidate(BrokerId key) { - brokerInfos.invalidate(key); - } - }, BrokerInfo.class)); - this.partitionInfos = CacheBuilder.newBuilder().build(createCacheLoader( - new CacheInvalidater() { - @Override - public void invalidate(KeyPathTopicPartition key) { - partitionInfos.invalidate(key); - } - }, PartitionInfo.class)); - - // Use CopyOnWriteArraySet so that it's thread safe and order of listener is maintain as the insertion order. - this.listeners = Sets.newCopyOnWriteArraySet(); - } - - @Override - protected void startUp() throws Exception { - executorService = Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("zk-kafka-broker")); - } - - @Override - protected void shutDown() throws Exception { - executorService.shutdownNow(); - } - - @Override - public BrokerInfo getLeader(String topic, int partition) { - Preconditions.checkState(isRunning(), "BrokerService is not running."); - PartitionInfo partitionInfo = partitionInfos.getUnchecked(new KeyPathTopicPartition(topic, partition)).get(); - return partitionInfo == null ? null : brokerInfos.getUnchecked(new BrokerId(partitionInfo.getLeader())).get(); - } - - @Override - public synchronized Iterable getBrokers() { - Preconditions.checkState(isRunning(), "BrokerService is not running."); - - if (brokerList != null) { - return brokerList.get(); - } - - final SettableFuture readerFuture = SettableFuture.create(); - final AtomicReference> brokers = - new AtomicReference>(ImmutableList.of()); - - actOnExists(BROKER_IDS_PATH, new Runnable() { - @Override - public void run() { - // Callback for fetching children list. This callback should be executed in the executorService. - final FutureCallback childrenCallback = new FutureCallback() { - @Override - public void onSuccess(NodeChildren result) { - try { - // For each children node, get the BrokerInfo from the brokerInfo cache. - brokers.set( - ImmutableList.copyOf( - Iterables.transform( - brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(), - Suppliers.supplierFunction()))); - readerFuture.set(null); - - for (ListenerExecutor listener : listeners) { - listener.changed(ZKBrokerService.this); - } - } catch (ExecutionException e) { - readerFuture.setException(e.getCause()); - } - } - - @Override - public void onFailure(Throwable t) { - readerFuture.setException(t); - } - }; - - // Fetch list of broker ids - Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, new Watcher() { - @Override - public void process(WatchedEvent event) { - if (!isRunning()) { - return; - } - if (event.getType() == Event.EventType.NodeChildrenChanged) { - Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService); - } - } - }), childrenCallback, executorService); - } - }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); - - brokerList = createSupplier(brokers); - try { - readerFuture.get(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - return brokerList.get(); - } - - @Override - public String getBrokerList() { - return Joiner.on(',').join(Iterables.transform(getBrokers(), BROKER_INFO_TO_ADDRESS)); - } - - @Override - public Cancellable addChangeListener(BrokerChangeListener listener, Executor executor) { - final ListenerExecutor listenerExecutor = new ListenerExecutor(listener, executor); - listeners.add(listenerExecutor); - - return new Cancellable() { - @Override - public void cancel() { - listeners.remove(listenerExecutor); - } - }; - } - - /** - * Creates a cache loader for the given path to supply data with the data node. - */ - private CacheLoader> createCacheLoader(final CacheInvalidater invalidater, - final Class resultType) { - return new CacheLoader>() { - - @Override - public Supplier load(final K key) throws Exception { - // A future to tell if the result is ready, even it is failure. - final SettableFuture readyFuture = SettableFuture.create(); - final AtomicReference resultValue = new AtomicReference(); - - // Fetch for node data when it exists. - final String path = key.getPath(); - actOnExists(path, new Runnable() { - @Override - public void run() { - // Callback for getData call - final FutureCallback dataCallback = new FutureCallback() { - @Override - public void onSuccess(NodeData result) { - // Update with latest data - T value = decodeNodeData(result, resultType); - resultValue.set(value); - readyFuture.set(value); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Failed to fetch node data on {}", path, t); - if (t instanceof KeeperException.NoNodeException) { - resultValue.set(null); - readyFuture.set(null); - return; - } - - // On error, simply invalidate the key so that it'll be fetched next time. - invalidater.invalidate(key); - readyFuture.setException(t); - } - }; - - // Fetch node data - Futures.addCallback(zkClient.getData(path, new Watcher() { - @Override - public void process(WatchedEvent event) { - if (!isRunning()) { - return; - } - if (event.getType() == Event.EventType.NodeDataChanged) { - // If node data changed, fetch it again. - Futures.addCallback(zkClient.getData(path, this), dataCallback, executorService); - } else if (event.getType() == Event.EventType.NodeDeleted) { - // If node removed, invalidate the cached value. - brokerInfos.invalidate(key); - } - } - }), dataCallback, executorService); - } - }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); - - readyFuture.get(); - return createSupplier(resultValue); - } - }; - } - - /** - * Gson decode the NodeData into object. - * @param nodeData The data to decode - * @param type Object class to decode into. - * @param Type of the object. - * @return The decoded object or {@code null} if node data is null. - */ - private T decodeNodeData(NodeData nodeData, Class type) { - byte[] data = nodeData == null ? null : nodeData.getData(); - if (data == null) { - return null; - } - return GSON.fromJson(new String(data, Charsets.UTF_8), type); - } - - /** - * Checks exists of a given ZK path and execute the action when it exists. - */ - private void actOnExists(final String path, final Runnable action, - final SettableFuture readyFuture, final long retryTime, final TimeUnit retryUnit) { - Futures.addCallback(zkClient.exists(path, new Watcher() { - @Override - public void process(WatchedEvent event) { - if (!isRunning()) { - return; - } - if (event.getType() == Event.EventType.NodeCreated) { - action.run(); - } - } - }), new FutureCallback() { - @Override - public void onSuccess(Stat result) { - if (result != null) { - action.run(); - } else { - // If the node doesn't exists, treat it as ready. When the node becomes available later, data will be - // fetched by the watcher. - readyFuture.set(null); - } - } - - @Override - public void onFailure(Throwable t) { - // Retry the operation based on the retry time. - Thread retryThread = new Thread("zk-broker-service-retry") { - @Override - public void run() { - try { - retryUnit.sleep(retryTime); - actOnExists(path, action, readyFuture, retryTime, retryUnit); - } catch (InterruptedException e) { - LOG.warn("ZK retry thread interrupted. Action not retried."); - } - } - }; - retryThread.setDaemon(true); - retryThread.start(); - } - }, executorService); - } - - /** - * Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}. - */ - private Supplier createSupplier(final AtomicReference ref) { - return new Supplier() { - @Override - public T get() { - return ref.get(); - } - }; - } - - - /** - * Interface for invalidating an entry in a cache. - * @param Key type. - */ - private interface CacheInvalidater { - void invalidate(T key); - } - - /** - * Represents a path in zookeeper for cache key. - */ - private interface KeyPath { - String getPath(); - } - - private static final class BrokerId implements KeyPath { - private final int id; - - private BrokerId(int id) { - this.id = id; - } - - @Override - public boolean equals(Object o) { - return this == o || !(o == null || getClass() != o.getClass()) && id == ((BrokerId) o).id; - } - - @Override - public int hashCode() { - return Ints.hashCode(id); - } - - @Override - public String getPath() { - return BROKER_IDS_PATH + "/" + id; - } - } - - /** - * Represents a topic + partition combination. Used for loading cache key. - */ - private static final class KeyPathTopicPartition extends TopicPartition implements KeyPath { - - private KeyPathTopicPartition(String topic, int partition) { - super(topic, partition); - } - - @Override - public String getPath() { - return String.format("%s/%s/partitions/%d/state", BROKER_TOPICS_PATH, getTopic(), getPartition()); - } - } - - /** - * Class for holding information about a partition. Only used by gson to decode partition state node in zookeeper. - */ - private static final class PartitionInfo { - private int[] isr; - private int leader; - - private int[] getIsr() { - return isr; - } - - private int getLeader() { - return leader; - } - } - - - /** - * Helper class to invoke {@link BrokerChangeListener} from an {@link Executor}. - */ - private static final class ListenerExecutor extends BrokerChangeListener { - - private final BrokerChangeListener listener; - private final Executor executor; - - private ListenerExecutor(BrokerChangeListener listener, Executor executor) { - this.listener = listener; - this.executor = executor; - } - - @Override - public void changed(final BrokerService brokerService) { - try { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - listener.changed(brokerService); - } catch (Throwable t) { - LOG.error("Failure when calling BrokerChangeListener.", t); - } - } - }); - } catch (Throwable t) { - LOG.error("Failure when calling BrokerChangeListener.", t); - } - } - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java deleted file mode 100644 index a0d93ec0..00000000 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.kafka.client; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractIdleService; -import org.apache.twill.common.Cancellable; -import org.apache.twill.common.Threads; -import org.apache.twill.kafka.client.BrokerService; -import org.apache.twill.kafka.client.Compression; -import org.apache.twill.kafka.client.KafkaClientService; -import org.apache.twill.kafka.client.KafkaConsumer; -import org.apache.twill.kafka.client.KafkaPublisher; -import org.apache.twill.zookeeper.ZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.WeakReference; -import java.util.Collections; -import java.util.IdentityHashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A KafkaClientService that uses ZooKeeper for broker discovery. - */ -public class ZKKafkaClientService extends AbstractIdleService implements KafkaClientService, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(ZKKafkaClientService.class); - private static final long PUBLISHER_CLEANUP_SECONDS = 1; - - private final BrokerService brokerService; - - // Maintains a weak reference key map for calling publisher.shutdown when garbage collected. - private final Map, Cancellable> publishers; - private final ReferenceQueue referenceQueue; - - private final SimpleKafkaConsumer consumer; - - // For running cleanup job - private ScheduledExecutorService scheduler; - - public ZKKafkaClientService(ZKClient zkClient) { - this.brokerService = new ZKBrokerService(zkClient); - this.publishers = Collections.synchronizedMap(new IdentityHashMap, Cancellable>()); - this.referenceQueue = new ReferenceQueue(); - this.consumer = new SimpleKafkaConsumer(brokerService); - } - - @Override - public KafkaPublisher getPublisher(KafkaPublisher.Ack ack, Compression compression) { - Preconditions.checkState(isRunning(), "Service is not running."); - - // Wrap the publisher with a weak reference and save the cancellable for closing the publisher. - SimpleKafkaPublisher publisher = new SimpleKafkaPublisher(brokerService, ack, compression); - publishers.put(new WeakReference(publisher, referenceQueue), publisher.start()); - return publisher; - } - - @Override - public KafkaConsumer getConsumer() { - Preconditions.checkState(isRunning(), "Service is not running."); - return consumer; - } - - @Override - public void run() { - // For calling publisher.producer.close() on garbage collected - Reference ref = referenceQueue.poll(); - while (ref != null && isRunning()) { - publishers.remove(ref).cancel(); - ref = referenceQueue.poll(); - } - } - - @Override - protected void startUp() throws Exception { - scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-client-cleanup")); - scheduler.scheduleAtFixedRate(this, PUBLISHER_CLEANUP_SECONDS, PUBLISHER_CLEANUP_SECONDS, TimeUnit.SECONDS); - - // Start broker service to get auto-updated brokers information. - brokerService.startAndWait(); - } - - @Override - protected void shutDown() throws Exception { - LOG.info("Stopping KafkaClientService"); - scheduler.shutdownNow(); - for (Cancellable cancellable : publishers.values()) { - cancellable.cancel(); - } - consumer.stop(); - - brokerService.stopAndWait(); - LOG.info("KafkaClientService stopped"); - } -} diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java index 493c4cae..257a799c 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java +++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java @@ -33,18 +33,13 @@ import com.google.gson.GsonBuilder; import org.apache.twill.api.logging.LogThrowable; import org.apache.twill.common.Threads; -import org.apache.twill.internal.Services; import org.apache.twill.internal.json.ILoggingEventSerializer; import org.apache.twill.internal.json.LogThrowableCodec; import org.apache.twill.internal.json.StackTraceElementCodec; -import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.kafka.client.BootstrapedKafkaClientService; import org.apache.twill.kafka.client.Compression; import org.apache.twill.kafka.client.KafkaClientService; import org.apache.twill.kafka.client.KafkaPublisher; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; import java.nio.ByteBuffer; import java.util.Collection; @@ -74,9 +69,8 @@ public final class KafkaAppender extends UnsynchronizedAppenderBase>>() { + kafkaClient = new BootstrapedKafkaClientService(kafkaBootstrap); + Futures.addCallback(kafkaClient.start(), + new FutureCallback() { @Override - public void onSuccess(List> result) { - for (ListenableFuture future : result) { - Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING, + public void onSuccess(Service.State result) { + Preconditions.checkState(result == Service.State.RUNNING, "Service is not running."); - } - addInfo("Kafka client started: " + zkConnectStr); + addInfo("Kafka client started: " + kafkaBootstrap); scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS); } @@ -179,7 +166,7 @@ public void onFailure(Throwable t) { public void stop() { super.stop(); scheduler.shutdownNow(); - Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService)); + Futures.getUnchecked(kafkaClient.stop()); } public void forceFlush() { diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java index c37cd004..9d9643ab 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java @@ -187,7 +187,9 @@ protected void triggerShutdown() { private TwillController getController(ZKClient zkClient, String appName, RunId runId) { AbstractTwillController controller = new AbstractTwillController(appName, runId, - zkClient, false, ImmutableList.of()) { + zkClient, false, + "", + ImmutableList.of()) { @Override public void kill() { diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java index 958925c5..ddbb7704 100644 --- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java +++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java @@ -24,7 +24,7 @@ import org.apache.twill.common.Cancellable; import org.apache.twill.internal.Services; import org.apache.twill.internal.kafka.EmbeddedKafkaServer; -import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.kafka.client.BootstrapedKafkaClientService; import org.apache.twill.internal.utils.Networks; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.zookeeper.ZKClientService; @@ -67,13 +67,12 @@ public static void init() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); zkServer.startAndWait(); - // Extract the kafka.tgz and start the kafka server kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr())); kafkaServer.startAndWait(); zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - kafkaClient = new ZKKafkaClientService(zkClientService); + kafkaClient = new BootstrapedKafkaClientService(kafkaServer.getKafkaBootstrap()); Services.chainStart(zkClientService, kafkaClient).get(); } @@ -95,13 +94,13 @@ public void testKafkaClientReconnect() throws Exception { try { zkClient.create("/", null, CreateMode.PERSISTENT).get(); - ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); + BootstrapedKafkaClientService kafkaClient = new BootstrapedKafkaClientService(server.getKafkaBootstrap()); kafkaClient.startAndWait(); try { server.startAndWait(); try { - // Publish a messages + // Publish a message createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start(); // Create a consumer @@ -194,10 +193,53 @@ public void finished() { Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); } + @Test + public void testKafkaNewClient() throws Exception { + String topic = "testClient"; + kafkaClient = new BootstrapedKafkaClientService(kafkaServer.getKafkaBootstrap()); + kafkaClient.startAndWait(); + + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10); + + t1.start(); + t2.start(); + + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10); + t2.join(); + t3.start(); + + final CountDownLatch latch = new CountDownLatch(30); + final CountDownLatch stopLatch = new CountDownLatch(1); + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + @Override + public long onReceived(Iterator messages) { + long nextOffset = -1; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + nextOffset = message.getNextOffset(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + @Test public void testKafkaClientSkipNext() throws Exception { String topic = "testClientSkipNext"; - // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + // Publish 30 messages with indexes the same as offsets within the range 0 - 29 Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); t1.start(); t1.join(); @@ -248,14 +290,22 @@ public void testBrokerChange() throws Exception { zkClient.create("/", null, CreateMode.PERSISTENT).get(); // Start a new kafka server - File logDir = TMP_FOLDER.newFolder(); - EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + File logDir1 = TMP_FOLDER.newFolder(); + File logDir2 = TMP_FOLDER.newFolder(); + EmbeddedKafkaServer server1 = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir1)); + server1.startAndWait(); + Properties properties = generateKafkaConfig(connectionStr, logDir2); + properties.setProperty("broker.id", "2"); + EmbeddedKafkaServer server2 = new EmbeddedKafkaServer(properties); + server2.startAndWait(); + // Start a Kafka client - KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); + KafkaClientService kafkaClient = new BootstrapedKafkaClientService(server1.getKafkaBootstrap()); kafkaClient.startAndWait(); + // wait a little to mark first broker dead + TimeUnit.SECONDS.sleep(10); // Attach a consumer final BlockingQueue consumedMessages = Queues.newLinkedBlockingQueue(); kafkaClient.getConsumer() @@ -278,18 +328,15 @@ public void finished() { }); // Get a publisher and publish a message - KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.FIRE_AND_FORGET, Compression.NONE); + KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE); publisher.prepare("test").add(Charsets.UTF_8.encode("Message 0"), 0).send().get(); // Should receive one message Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS)); // Now shutdown and restart the server on different port - server.stopAndWait(); - server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + server1.stopAndWait(); - // Wait a little while to make sure changes is reflected in broker service TimeUnit.SECONDS.sleep(3); // Now publish again with the same publisher. It should succeed and the consumer should receive the message. @@ -298,7 +345,7 @@ public void finished() { kafkaClient.stopAndWait(); zkClient.stopAndWait(); - server.stopAndWait(); + server2.stopAndWait(); } private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, @@ -329,10 +376,12 @@ private static Properties generateKafkaConfig(String zkConnectStr, File logDir) int port = Networks.getRandomPort(); Preconditions.checkState(port > 0, "Failed to get random port."); + String portString = Integer.toString(port); Properties prop = new Properties(); prop.setProperty("log.dir", logDir.getAbsolutePath()); - prop.setProperty("port", Integer.toString(port)); + prop.setProperty("port", portString); prop.setProperty("broker.id", "1"); + prop.setProperty("auto.create.topics.enable", "true"); prop.setProperty("socket.send.buffer.bytes", "1048576"); prop.setProperty("socket.receive.buffer.bytes", "1048576"); prop.setProperty("socket.request.max.bytes", "104857600"); diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index b27dcd81..bcd60f43 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -68,7 +68,7 @@ public abstract class ServiceMain { } protected final void doMain(final Service mainService, - Service...prerequisites) throws Exception { + Service... prerequisites) throws Exception { // Only configure the log collection if it is enabled. if (getTwillRuntimeSpecification().isLogCollectionEnabled()) { configureLogger(); @@ -216,8 +216,8 @@ private void configureLogger() throws MalformedURLException, JoranException { kafkaAppender.setName("KAFKA"); kafkaAppender.setTopic(Constants.LOG_TOPIC); kafkaAppender.setHostname(getHostname()); - // The Kafka ZK Connection shouldn't be null as this method only get called if log collection is enabled - kafkaAppender.setZookeeper(getTwillRuntimeSpecification().getKafkaZKConnect()); + // Bootstrap servers string should not be null + kafkaAppender.setKafkaBootstrap(getTwillRuntimeSpecification().getKafkaBootstrap()); String runnableName = getRunnableName(); if (runnableName != null) { kafkaAppender.setRunnableName(runnableName); diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java index a38a163a..df680f75 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java @@ -31,16 +31,16 @@ public final class ApplicationMasterLiveNodeData { private final long appIdClusterTime; private final String containerId; private final List localFiles; - private final String kafkaZKConnect; + private final String kafkaBootstrap; public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId, List localFiles, - @Nullable String kafkaZKConnect) { + @Nullable String kafkaBootstrap) { this.appId = appId; this.appIdClusterTime = appIdClusterTime; this.containerId = containerId; this.localFiles = localFiles; - this.kafkaZKConnect = kafkaZKConnect; + this.kafkaBootstrap = kafkaBootstrap; } public int getAppId() { @@ -60,12 +60,12 @@ public List getLocalFiles() { } /** - * @return the Kafka ZK connection string for the Kafka used for log collection; + * @return the Kafka bootstrap connection string for the Kafka used for log collection; * if log collection is turned off, a {@code null} value will be returned. */ @Nullable - public String getKafkaZKConnect() { - return kafkaZKConnect; + public String getKafkaBootstrap() { + return kafkaBootstrap; } @Override diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 445656d8..b38108cb 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -96,7 +96,15 @@ private void doMain() throws Exception { ); if (twillRuntimeSpec.isLogCollectionEnabled()) { - prerequisites.add(new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getKafkaZKConnect())); + if (twillRuntimeSpec.isEmbeddedKafkaEnabled()) { + ApplicationKafkaService kafkaService = + new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getZkConnectStr()); + + prerequisites.add(kafkaService); + twillRuntimeSpec.setKafkaBootstrap(twillRuntimeSpec.getKafkaBootstrap() + .concat(";") + .concat(kafkaService.getKafkaBootstrap())); + } } else { LOG.info("Log collection through kafka disabled"); } @@ -163,6 +171,10 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { this.kafkaZKPath = kafkaZKConnect.substring(zkClient.getConnectString().length()); } + public String getKafkaBootstrap() { + return kafkaServer.getKafkaBootstrap(); + } + @Override protected void startUp() throws Exception { ZKOperations.ignoreError( diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 523ffce1..a8745371 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -160,7 +160,7 @@ public ApplicationMasterService(RunId runId, ZKClient zkClient, this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)), Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)), amClient.getContainerId().toString(), getLocalizeFiles(), - twillRuntimeSpec.getKafkaZKConnect()); + twillRuntimeSpec.getKafkaBootstrap()); this.expectedContainers = new ExpectedContainers(twillSpec); this.runningContainers = createRunningContainers(amClient.getContainerId(), amClient.getHost()); diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 335d7ec4..2f0908d1 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -78,7 +78,9 @@ final class YarnTwillController extends AbstractTwillController implements Twill */ YarnTwillController(String appName, RunId runId, ZKClient zkClient, final ApplicationMasterLiveNodeData amLiveNodeData, final YarnAppClient yarnAppClient) { - super(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() != null, Collections.emptyList()); + super(appName, runId, zkClient, !amLiveNodeData.getKafkaBootstrap().isEmpty(), + amLiveNodeData.getKafkaBootstrap(), + Collections.emptyList()); this.appName = appName; this.amLiveNodeData = amLiveNodeData; this.startUp = new Callable>() { @@ -93,9 +95,10 @@ public ProcessController call() throws Exception { } YarnTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled, - Iterable logHandlers, Callable> startUp, + String kafkaBootstrap, Iterable logHandlers, + Callable> startUp, long startTimeout, TimeUnit startTimeoutUnit) { - super(appName, runId, zkClient, logCollectionEnabled, logHandlers); + super(appName, runId, zkClient, logCollectionEnabled, kafkaBootstrap, logHandlers); this.appName = appName; this.startUp = startUp; this.startTimeout = startTimeout; @@ -176,7 +179,7 @@ protected synchronized void doShutDown() { finalStatus = report.getFinalApplicationStatus(); ApplicationId appId = report.getApplicationId(); while (finalStatus == FinalApplicationStatus.UNDEFINED && - stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < maxTime) { + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < maxTime) { LOG.debug("Yarn application final status for {} {}: {}", appName, appId, finalStatus); TimeUnit.SECONDS.sleep(1); finalStatus = processController.getReport().getFinalApplicationStatus(); diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java index 40de6a68..8545ecb7 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java @@ -30,7 +30,8 @@ */ interface YarnTwillControllerFactory { - YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable logHandlers, + YarnTwillController create(RunId runId, boolean logCollectionEnabled, + String kafkaBootstrap, Iterable logHandlers, Callable> startUp, long startTimeout, TimeUnit startTimeoutUnit); } diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index 52e18ebf..97328b32 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -188,6 +188,12 @@ public TwillPreparer addLogHandler(LogHandler handler) { return this; } + @Override + public TwillPreparer withKafkaBootstrapServers(String kafkaBootstrapServers) { + config.set(Configs.Keys.LOG_COLLECTION_KAFKA_BOOTSTRAP, kafkaBootstrapServers); + return this; + } + @Override public TwillPreparer setUser(String user) { return this; @@ -428,7 +434,9 @@ public ProcessController call() throws Exception { boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED, Configs.Defaults.LOG_COLLECTION_ENABLED); - YarnTwillController controller = controllerFactory.create(runId, logCollectionEnabled, + String kafkaBootstrap = config.get(Configs.Keys.LOG_COLLECTION_KAFKA_BOOTSTRAP, + Configs.Defaults.LOG_COLLECTION_KAFKA_BOOTSTRAP_EMPTY); + YarnTwillController controller = controllerFactory.create(runId, logCollectionEnabled, kafkaBootstrap, logHandlers, submitTask, timeout, timeoutUnit); controller.start(); return controller; @@ -644,7 +652,7 @@ private Multimap populateRunnableLocalFiles(TwillSpecificatio Multimap localFiles = HashMultimap.create(); LOG.debug("Populating Runnable LocalFiles"); - for (Map.Entry entry: twillSpec.getRunnables().entrySet()) { + for (Map.Entry entry : twillSpec.getRunnables().entrySet()) { String runnableName = entry.getKey(); for (LocalFile localFile : entry.getValue().getLocalFiles()) { Location location; @@ -694,11 +702,17 @@ public RuntimeSpecification transformEntry(String key, RuntimeSpecification valu spec.getPlacementPolicies(), eventHandler); boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED, Configs.Defaults.LOG_COLLECTION_ENABLED); + String kafkaBootstrap = config.get(Configs.Keys.LOG_COLLECTION_KAFKA_BOOTSTRAP, + Configs.Defaults.LOG_COLLECTION_KAFKA_BOOTSTRAP_EMPTY); + // We need embedded kafka server only if log collection enabled and no bootstrap servers provided + boolean embeddedKafkaEnabled = logCollectionEnabled && kafkaBootstrap.isEmpty(); TwillRuntimeSpecificationAdapter.create().toJson( new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(), appLocation.toURI(), zkConnectString, runId, twillSpec.getName(), getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), - logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer); + logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled, kafkaBootstrap, + embeddedKafkaEnabled) + , writer); } LOG.debug("Done {}", targetFile); } diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 20627e27..739f28f3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -305,12 +305,14 @@ public TwillPreparer prepare(TwillApplication application) { return new YarnTwillPreparer(config, twillSpec, runId, zkClientService.getConnectString(), appLocation, jvmOptions, locationCache, new YarnTwillControllerFactory() { @Override - public YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable logHandlers, + public YarnTwillController create(RunId runId, boolean logCollectionEnabled, String kafkaBootstrap, + Iterable logHandlers, Callable> startUp, long startTimeout, TimeUnit startTimeoutUnit) { ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName); YarnTwillController controller = listenController(new YarnTwillController(appName, runId, zkClient, logCollectionEnabled, + kafkaBootstrap, logHandlers, startUp, startTimeout, startTimeoutUnit)); synchronized (YarnTwillRunnerService.this) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java index 66bcd420..62a5b3ab 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java @@ -41,7 +41,8 @@ public final class CustomClassLoaderRunnable extends AbstractTwillRunnable { public void run() { try { Class cls = Class.forName(GENERATED_CLASS_NAME); - java.lang.reflect.Method announce = cls.getMethod("announce", ServiceAnnouncer.class, String.class, int.class); + java.lang.reflect.Method announce = cls.getMethod("announce", ServiceAnnouncer.class, + String.class, int.class); announce.invoke(cls.newInstance(), getContext(), SERVICE_NAME, 54321); Uninterruptibles.awaitUninterruptibly(stopLatch); } catch (Exception e) {