Skip to content

Commit 2463215

Browse files
Merge pull request #545 from dylanmei/mesos_task_labels
Mesos labels can be added to executor tasks
2 parents 60a8f59 + 8092e8c commit 2463215

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

docs/index.md

+3
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ Usage: (Options preceded by an asterisk are required) [options]
158158
--executorName
159159
The name given to the executor task.
160160
Default: elasticsearch-executor
161+
--executorLabels
162+
One or more labels given to the executor task. Example: 'environment=prod bananas=apples'
163+
Default: <empty string>
161164
--externalVolumeDriver
162165
Use external volume storage driver. By default, nodes will use volumes on
163166
host.

scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/Configuration.java

+18
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.ArrayList;
2020
import java.util.Collections;
2121
import java.util.List;
22+
import java.util.Map;
23+
import java.util.HashMap;
2224
import java.util.stream.Collectors;
2325

2426
import static java.util.Arrays.asList;
@@ -36,6 +38,7 @@ public class Configuration {
3638
public static final String WEB_UI_PORT = "--webUiPort";
3739
public static final String FRAMEWORK_NAME = "--frameworkName";
3840
public static final String EXECUTOR_NAME = "--executorName";
41+
public static final String EXECUTOR_LABELS = "--executorLabels";
3942
public static final String DATA_DIR = "--dataDir";
4043
public static final String DEFAULT_HOST_DATA_DIR = "/var/lib/mesos/slave/elasticsearch";
4144
// DCOS Certification requirement 01
@@ -85,6 +88,9 @@ public class Configuration {
8588
private String frameworkName = "elasticsearch";
8689
@Parameter(names = {EXECUTOR_NAME}, description = "The name given to the executor task.", validateWith = CLIValidators.NotEmptyString.class)
8790
private String executorName = "elasticsearch-executor";
91+
@Parameter(names = {EXECUTOR_LABELS}, description = "One or more labels given to the executor task." +
92+
"E.g. 'environment=prod bananas=apples'", variableArity = true)
93+
private List<String> executorLabels = new ArrayList<>();
8894
@Parameter(names = {DATA_DIR}, description = "The host data directory used by Docker volumes in the executors. [DOCKER MODE ONLY]")
8995
private String dataDir = DEFAULT_HOST_DATA_DIR;
9096
@Parameter(names = {FRAMEWORK_FAILOVER_TIMEOUT}, description = "The time before Mesos kills a scheduler and tasks if it has not recovered (ms).", validateValueWith = CLIValidators.PositiveDouble.class)
@@ -176,6 +182,18 @@ public String getTaskName() {
176182
return executorName;
177183
}
178184

185+
public Map<String, String> getTaskLabels() {
186+
HashMap<String, String> map = new HashMap<>();
187+
for (String keyValue : executorLabels) {
188+
String[] kvp = keyValue.split("=", 2);
189+
if (kvp.length == 2) {
190+
map.put(kvp[0], kvp[1]);
191+
}
192+
}
193+
194+
return map;
195+
}
196+
179197
public String getDataDir() {
180198
return dataDir;
181199
}

scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/TaskInfoFactory.java

+20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.ZoneOffset;
2020
import java.time.ZonedDateTime;
2121
import java.util.List;
22+
import java.util.Map;
2223
import java.util.Optional;
2324
import java.util.Properties;
2425
import java.util.stream.Collectors;
@@ -69,6 +70,7 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer, Configuration config
6970
final List<Integer> ports = getPorts(offer, configuration);
7071
final List<Protos.Resource> resources = getResources(configuration, ports);
7172
final Protos.DiscoveryInfo discovery = getDiscovery(ports, configuration);
73+
final Protos.Labels labels = getLabels(configuration);
7274

7375
final String hostAddress = resolveHostAddress(offer, ports);
7476

@@ -83,6 +85,7 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer, Configuration config
8385
.setSlaveId(offer.getSlaveId())
8486
.addAllResources(resources)
8587
.setDiscovery(discovery)
88+
.setLabels(labels)
8689
.setCommand(nativeCommand(configuration, args, elasticSearchNodeId))
8790
.build();
8891
}
@@ -91,6 +94,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
9194
final List<Integer> ports = getPorts(offer, configuration);
9295
final List<Protos.Resource> resources = getResources(configuration, ports);
9396
final Protos.DiscoveryInfo discovery = getDiscovery(ports, configuration);
97+
final Protos.Labels labels = getLabels(configuration);
9498

9599
final String hostAddress = resolveHostAddress(offer, ports);
96100

@@ -107,6 +111,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
107111
.setSlaveId(offer.getSlaveId())
108112
.addAllResources(resources)
109113
.setDiscovery(discovery)
114+
.setLabels(labels)
110115
.setCommand(dockerCommand(configuration, args, elasticSearchNodeId))
111116
.setContainer(containerInfo)
112117
.build();
@@ -151,6 +156,21 @@ private Protos.DiscoveryInfo getDiscovery(List<Integer> ports, Configuration con
151156
return discovery.build();
152157
}
153158

159+
private Protos.Labels getLabels(Configuration configuration) {
160+
Protos.Labels.Builder builder = Protos.Labels.newBuilder();
161+
Map<String, String> labels = configuration.getTaskLabels();
162+
for (Map.Entry<String, String> kvp : labels.entrySet()) {
163+
Protos.Label label = Protos.Label.newBuilder()
164+
.setKey(kvp.getKey())
165+
.setValue(kvp.getValue())
166+
.build();
167+
168+
builder.addLabels(label);
169+
}
170+
171+
return builder.build();
172+
}
173+
154174
private Protos.ContainerInfo getContainer(Configuration configuration, Protos.TaskID taskID, Long elasticSearchNodeId, Protos.SlaveID slaveID) {
155175
final Protos.Environment environment = Protos.Environment.newBuilder().addAllVariables(new ExecutorEnvironmentalVariables(configuration, elasticSearchNodeId).getList()).build();
156176
final Protos.ContainerInfo.DockerInfo.Builder dockerInfo = Protos.ContainerInfo.DockerInfo.newBuilder()

scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/ConfigurationTest.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.net.UnknownHostException;
1212
import java.util.Arrays;
1313
import java.util.List;
14+
import java.util.Map;
1415

1516
import static org.junit.Assert.*;
1617

@@ -78,4 +79,20 @@ public void shouldCreateVolumeName() {
7879
Configuration configuration = new Configuration(ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, "aa", Configuration.FRAMEWORK_NAME, "test");
7980
assertEquals("test0data", configuration.dataVolumeName(0L));
8081
}
81-
}
82+
83+
@Test
84+
public void shouldCreateTaskLabels() {
85+
Configuration configuration = new Configuration(
86+
ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, "aa", Configuration.EXECUTOR_LABELS,
87+
"foo=bar",
88+
"incomplete",
89+
"empty=",
90+
"separator=values=are=joined");
91+
Map<String, String> labels = configuration.getTaskLabels();
92+
93+
assertEquals("bar", labels.get("foo"));
94+
assertFalse(labels.containsKey("incomplete"));
95+
assertEquals("", labels.get("empty"));
96+
assertEquals("values=are=joined", labels.get("separator"));
97+
}
98+
}

0 commit comments

Comments
 (0)