Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

(TWILL-147) Allow using external Kafka server for log collection #57

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<snappy-java.version>1.0.5</snappy-java.version>
<jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
<asm.version>5.0.2</asm.version>
<kafka.version>0.8.0</kafka.version>
<kafka.version>0.10.2.0</kafka.version>
<zookeeper.version>3.4.5</zookeeper.version>
<junit.version>4.11</junit.version>
<jopt-simple.version>3.2</jopt-simple.version>
Expand Down Expand Up @@ -812,7 +812,26 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -1027,6 +1046,13 @@
<version>${commons-compress.version}</version>
<scope>test</scope>
</dependency>
<!-- Kafka needs it -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
10 changes: 10 additions & 0 deletions twill-api/src/main/java/org/apache/twill/api/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public static final class Keys {
*/
public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";

/**
* Setting for kafka 'bootstrap.servers' for log collection
*/
public static final String LOG_COLLECTION_KAFKA_BOOTSTRAP = "twill.log.collection.kafka.bootstrap";

/**
* The maximum number of FileContext object cached by the FileContextLocationFactory.
*/
Expand Down Expand Up @@ -132,6 +137,11 @@ public static final class Defaults {
*/
public static final boolean LOG_COLLECTION_ENABLED = true;

/**
* Default is empty
*/
public static final String LOG_COLLECTION_KAFKA_BOOTSTRAP_EMPTY = "";

/**
* Default size of the file context cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public interface TwillPreparer {
*/
TwillPreparer addLogHandler(LogHandler handler);

/**
* Configures bootstrap servers for kafka log aggregation client
* @param kafkaBootstrapServers kafka bootstrap.servers config for log aggregation client
* @return This {@link TwillPreparer}
*/
TwillPreparer withKafkaBootstrapServers(String kafkaBootstrapServers);

/**
* Sets the user name that runs the application. Default value is get from {@code "user.name"} by calling
* {@link System#getProperty(String)}.
Expand Down
10 changes: 9 additions & 1 deletion twill-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sf.jopt-simple/jopt-simple -->
<dependency>
Expand All @@ -98,5 +102,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@
import org.apache.twill.internal.json.LogEntryDecoder;
import org.apache.twill.internal.json.LogThrowableCodec;
import org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.kafka.client.BootstrapedKafkaClientService;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,7 +78,7 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
private Cancellable logCancellable;

public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled,
Iterable<LogHandler> logHandlers) {
String kafkaBootstrap, Iterable<LogHandler> logHandlers) {
super(runId, zkClient);
this.appName = appName;
this.runId = runId;
Expand All @@ -88,7 +87,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b
// When addressing TWILL-147, need to check if the given ZKClient is
// actually used by the Kafka used for log collection
if (logCollectionEnabled) {
this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
this.kafkaClient = new BootstrapedKafkaClientService(kafkaBootstrap);
Iterables.addAll(this.logHandlers, logHandlers);
} else {
this.kafkaClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public class TwillRuntimeSpecification {
private final Map<String, Integer> maxRetries;
private final double minHeapRatio;
private final boolean logCollectionEnabled;
private final boolean embeddedKafkaEnabled;
private String kafkaBootstrap;

public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
String zkConnectStr, RunId twillRunId, String twillAppName,
int reservedMemory, @Nullable String rmSchedulerAddr,
Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries,
double minHeapRatio, boolean logCollectionEnabled) {
double minHeapRatio, boolean logCollectionEnabled, String kafkaBootstrap,
boolean embeddedKafkaEnabled) {
this.twillSpecification = twillSpecification;
this.fsUser = fsUser;
this.twillAppDir = twillAppDir;
Expand All @@ -61,6 +64,8 @@ public TwillRuntimeSpecification(TwillSpecification twillSpecification, String f
this.maxRetries = maxRetries;
this.minHeapRatio = minHeapRatio;
this.logCollectionEnabled = logCollectionEnabled;
this.kafkaBootstrap = kafkaBootstrap;
this.embeddedKafkaEnabled = embeddedKafkaEnabled;
}

public TwillSpecification getTwillSpecification() {
Expand Down Expand Up @@ -116,15 +121,18 @@ public Map<String, Integer> getMaxRetries() {
}

/**
* Returns the ZK connection string for the Kafka used for log collections,
* Returns the bootstrap connection string for the Kafka used for log collections,
* or {@code null} if log collection is disabled.
*/
@Nullable
public String getKafkaZKConnect() {
if (!isLogCollectionEnabled()) {
return null;
}
// When addressing TWILL-147, a field can be introduced to carry this value.
return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
public String getKafkaBootstrap() {
return kafkaBootstrap;
}

public void setKafkaBootstrap(String kafkaBootstrap) {
this.kafkaBootstrap = kafkaBootstrap;
}

public boolean isEmbeddedKafkaEnabled() {
return embeddedKafkaEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
private static final String LOG_LEVELS = "logLevels";
private static final String MAX_RETRIES = "maxRetries";
private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
private static final String KAFKA_BOOTSTRAP = "kafkaBootstrap";
private static final String IS_EMBEDDED_KAFKA_ENABLED = "embeddedKafkaEnabled";

@Override
public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
Expand All @@ -62,6 +64,8 @@ public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, Json
json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
json.addProperty(HEAP_RESERVED_MIN_RATIO, src.getMinHeapRatio());
json.addProperty(KAFKA_BOOTSTRAP, src.getKafkaBootstrap());
json.addProperty(IS_EMBEDDED_KAFKA_ENABLED, src.isEmbeddedKafkaEnabled());
if (src.getRmSchedulerAddr() != null) {
json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
}
Expand All @@ -85,9 +89,9 @@ public TwillRuntimeSpecification deserialize(JsonElement json, Type typeOfT,
jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType());
Map<String, Map<String, String>> logLevels =
context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType());
Map<String, Integer> maxRetries =
Map<String, Integer> maxRetries =
context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken<Map<String, Integer>>() { }.getType());

return new TwillRuntimeSpecification(twillSpecification,
jsonObj.get(FS_USER).getAsString(),
URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()),
Expand All @@ -100,6 +104,8 @@ public TwillRuntimeSpecification deserialize(JsonElement json, Type typeOfT,
logLevels,
maxRetries,
jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean(),
jsonObj.get(KAFKA_BOOTSTRAP).getAsString(),
jsonObj.get(IS_EMBEDDED_KAFKA_ENABLED).getAsBoolean());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.server.KafkaServerStartable;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.twill.internal.utils.Networks;
import org.slf4j.Logger;
Expand All @@ -45,22 +44,24 @@ public final class EmbeddedKafkaServer extends AbstractIdleService {
private static final String DEFAULT_START_RETRIES = "5";

private final int startTimeoutRetries;
private final Properties properties;
private KafkaServer server;
private final KafkaConfig kafkaConfig;
private KafkaServerStartable server;

public EmbeddedKafkaServer(Properties properties) {
this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES,
DEFAULT_START_RETRIES));
this.properties = new Properties();
this.properties.putAll(properties);
this.kafkaConfig = createKafkaConfig(properties);
}

public String getKafkaBootstrap() {
return String.format("%s:%d", kafkaConfig.hostName(), kafkaConfig.port());
}

@Override
protected void startUp() throws Exception {
int tries = 0;
do {
KafkaConfig kafkaConfig = createKafkaConfig(properties);
KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
KafkaServerStartable kafkaServer = createKafkaServer(kafkaConfig);
try {
kafkaServer.startup();
server = kafkaServer;
Expand Down Expand Up @@ -96,28 +97,10 @@ protected void shutDown() throws Exception {
}
}

private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
return new KafkaServer(kafkaConfig, new Time() {

@Override
public long milliseconds() {
return System.currentTimeMillis();
}

@Override
public long nanoseconds() {
return System.nanoTime();
}

@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
});
private KafkaServerStartable createKafkaServer(KafkaConfig kafkaConfig) {
Properties properties = new Properties();
properties.putAll(kafkaConfig.props());
return KafkaServerStartable.fromProps(properties);
}

/**
Expand All @@ -134,6 +117,15 @@ private KafkaConfig createKafkaConfig(Properties properties) {
Preconditions.checkState(randomPort > 0, "Failed to get random port.");
prop.setProperty("port", Integer.toString(randomPort));
}
String brokerId = prop.getProperty("broker.id");
if (brokerId == null) {
Random random = new Random(System.currentTimeMillis());
int i;
do {
i = random.nextInt(1000);
} while (i < 0);
prop.setProperty("broker.id", Integer.toString(i));
}

return new KafkaConfig(prop);
}
Expand Down
Loading