Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Helix Auto Registration #2982

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class ClusterMapUtils {
static final int CURRENT_SCHEMA_VERSION = 0;
static final String WRITABLE_LOG_STR = "writable";
static final String FULLY_WRITABLE_LOG_STR = "fully writable";
private static final String INSTANCE_NAME_DELIMITER = "_";
private static final Logger logger = LoggerFactory.getLogger(ClusterMapUtils.class);

/**
Expand Down Expand Up @@ -160,7 +161,7 @@ public ReplicaType getReplicaType() {
* @return the constructed instance name.
*/
public static String getInstanceName(String host, Integer port) {
return port == null ? host : host + "_" + port;
return port == null ? host : host + INSTANCE_NAME_DELIMITER + port;
}

/**
Expand All @@ -172,6 +173,10 @@ public static String getInstanceName(DataNodeId dataNodeId) {
return getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
}

public static String getPortFromInstanceName(String instanceName) {
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
return instanceName.contains(INSTANCE_NAME_DELIMITER) ? instanceName.split(INSTANCE_NAME_DELIMITER)[1] : null;
}

/**
* Parses DC information JSON string and returns a map of datacenter name to {@link DcZkInfo}.
* @param dcInfoJsonString the string containing the DC info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.clustermap.ClusterMapUtils.*;


/**
* A factory class to construct and get a reference to a {@link HelixManager}
Expand Down Expand Up @@ -96,7 +101,21 @@ public DataNodeConfigSource getDataNodeConfigSource(ClusterMapConfig clusterMapC
* @return a new instance of {@link HelixManager}.
*/
HelixManager buildZKHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddr) {
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
return HelixManagerFactory.getZKHelixManager(clusterName, instanceName, instanceType, zkAddr);

String port = getPortFromInstanceName(instanceName);

InstanceConfig.Builder instanceConfigBuilder = new InstanceConfig.Builder().setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
if (port != null && !port.isEmpty()) {
instanceConfigBuilder.setPort(port);
}

HelixManagerProperty participantHelixProperty = new HelixManagerProperty.Builder().setDefaultInstanceConfigBuilder(instanceConfigBuilder).build();
HelixManagerProperty defaultHelixManagerProperty = new HelixManagerProperty.Builder().setDefaultInstanceConfigBuilder(new InstanceConfig.Builder()).build();

HelixManager helixManager = new ZKHelixManager(clusterName, instanceName, instanceType, zkAddr, null,
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
instanceType == InstanceType.PARTICIPANT ? participantHelixProperty : defaultHelixManagerProperty);
LOGGER.info("Created HelixManager for cluster {} with instanceName {} and instanceType {}", clusterName, instanceName, instanceType);
return helixManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.lock.LockScope;
import org.apache.helix.lock.helix.HelixLockScope;
import org.apache.helix.lock.helix.ZKDistributedNonblockingLock;
Expand Down Expand Up @@ -822,6 +823,29 @@ public void testStateModelParticipation() throws Exception {
helixParticipant.close();
}


@Test
public void testEnableStatePostParticipate() throws IOException {
assumeTrue(stateModelDef.equals(ClusterMapConfig.AMBRY_STATE_MODEL_DEF));
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
MetricRegistry metricRegistry = new MetricRegistry();
HelixParticipant helixParticipant =
new HelixParticipant(mock(HelixClusterManager.class), clusterMapConfig, new HelixFactory(), metricRegistry,
getDefaultZkConnectStr(clusterMapConfig), true);
HelixManager manager = helixParticipant.getHelixManager();

assertEquals(manager.getInstanceType(), InstanceType.PARTICIPANT);
assertNotNull("HelixManager cannot be null", manager);

helixParticipant.participate(Collections.emptyList(), null, null);

// Without cloud config, instances should still be in ENABLE state
assertEquals(InstanceConstants.InstanceOperation.ENABLE,
manager.getConfigAccessor().getInstanceConfig(clusterName, manager.getInstanceName()).getInstanceOperation().getOperation());

helixParticipant.close();
}

/**
* Test participate method with property store helix task enabled
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public ClusterManagementMode getClusterManagementMode(String clusterName) {
return null;
}

@Override
public void setPartitionsToError(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
throw new IllegalStateException("Not implemented");
}

@Override
public void enableInstance(String clusterName, String instanceName, boolean enabled,
InstanceConstants.InstanceDisabledType disabledType, String reason) {
Expand Down Expand Up @@ -632,6 +638,24 @@ public void enableInstance(String s, List<String> list, boolean b) {
throw new IllegalStateException("Not implemented");
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation) {
setInstanceOperation(clusterName, instanceName, instanceOperation, "Not Implemented");
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason) {
setInstanceOperation(clusterName, instanceName, instanceOperation, reason, false);
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason, boolean overrideAll) {
throw new IllegalStateException("Not implemented");
}

@Override
public void enableResource(String clusterName, String resourceName, boolean enabled) {
throw new IllegalStateException("Not implemented");
Expand Down Expand Up @@ -762,6 +786,11 @@ public void rebalance(String clusterName, String resourceName, int replica) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onDemandRebalance(String s) {
throw new IllegalStateException("Not implemented");
}

@Override
public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException {
throw new IllegalStateException("Not implemented");
Expand Down Expand Up @@ -870,6 +899,26 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean canCompleteSwap(String clusterName, String instanceName) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean completeSwapIfPossible(String clusterName, String instanceName, boolean forceComplete) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
throw new IllegalStateException("Not implemented");
}

/**
* Private class that holds partition state infos from one data node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -29,6 +30,7 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -62,6 +64,7 @@
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.Watcher;

import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -182,6 +185,12 @@ public void disconnect() {
helixPropertyStore.close();
}

@Override
public void addListener(Object o, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventTypes) {
throw new IllegalStateException("Not implemented");
}

@Override
public void addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener listener) throws Exception {
// deprecated LiveInstanceChangeListener is no longer supported in testing
Expand Down Expand Up @@ -370,6 +379,12 @@ public void addCurrentStateChangeListener(org.apache.helix.CurrentStateChangeLis
throw new IllegalStateException("Not implemented");
}

@Override
public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener, String instanceName,
String sessionId) throws Exception {
throw new IllegalStateException("Not implemented");
}

@Override
public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, String instanceName)
throws Exception {
Expand Down Expand Up @@ -489,6 +504,11 @@ public Long getSessionStartTime() {
throw new IllegalStateException("Not implemented");
}

@Override
public Optional<String> getSessionIdIfLead() {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean isLeader() {
throw new IllegalStateException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.github.ambry.config.ClusterMapConfig;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.Watcher;


public class MockHelixManagerFactory extends HelixFactory {
Expand Down Expand Up @@ -188,6 +191,11 @@ public Long getSessionStartTime() {
throw new IllegalStateException("Not implemented");
}

@Override
public Optional<String> getSessionIdIfLead() {
throw new IllegalStateException("Not implemented");
}

@Override
public void connect() throws Exception {
if (beBad) {
Expand All @@ -207,6 +215,12 @@ public void disconnect() {
isConnected = false;
}

@Override
public void addListener(Object o, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventTypes) {
throw new IllegalStateException("Not implemented");
}

String getStateModelDef() {
return stateModelDef;
}
Expand Down Expand Up @@ -310,6 +324,12 @@ public void addCurrentStateChangeListener(CurrentStateChangeListener listener, S
throw new IllegalStateException("Not implemented");
}

@Override
public void addTaskCurrentStateChangeListener(org.apache.helix.api.listeners.CurrentStateChangeListener listener,
String instanceName, String sessionId) throws Exception {
throw new IllegalStateException("Not implemented");
}

@Override
public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, String instanceName)
throws Exception {
Expand Down
36 changes: 28 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ project(':ambry-commons') {
compile "io.netty:netty-all:$nettyVersion"
compile "io.netty:netty-tcnative-boringssl-static:$nettyTcNativeBoringSSLStatic_linux_x86_64"
compile "io.netty:netty-tcnative-boringssl-static:$nettyTcNativeBoringSSLStatic_osx_aarch_64"
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.github.ben-manes.caffeine:caffeine:$caffeineVersion"
testCompile project(':ambry-test-utils')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand All @@ -276,7 +280,11 @@ project(':ambry-account') {
project(':ambry-utils'),
project(':ambry-commons'),
project(':ambry-mysql')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "org.json:json:$jsonVersion"
testCompile project(':ambry-test-utils')
Expand All @@ -290,8 +298,12 @@ project(':ambry-clustermap') {
compile project(':ambry-api'),
project(':ambry-commons'),
project(':ambry-utils')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-lock:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-lock:$helixLockVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
Expand All @@ -301,7 +313,6 @@ project(':ambry-clustermap') {
testCompile project(':ambry-test-utils')
testCompile project(':ambry-mysql')
testCompile project(path: ':ambry-api', configuration: 'testArchives')

}
}

Expand Down Expand Up @@ -382,6 +393,7 @@ project(':ambry-replication') {
project(':ambry-network'),
project(':ambry-clustermap')
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "org.apache.commons:commons-lang3:$commonsLangVersion"
testCompile project(':ambry-store')
testCompile project(':ambry-clustermap')
testCompile project(':ambry-test-utils')
Expand Down Expand Up @@ -508,7 +520,7 @@ project(':ambry-cloud') {
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion"
compile "com.microsoft.azure:msal4j:$azureMsal4jVersion"
compile "org.apache.helix:helix-lock:$helixVersion"
compile "org.apache.helix:helix-lock:$helixLockVersion" exclude group: "org.apache.helix"
testCompile project(':ambry-router')
testCompile project(':ambry-store')
testCompile project(':ambry-test-utils')
Expand Down Expand Up @@ -549,7 +561,11 @@ project(':ambry-quota') {
project(':ambry-commons'),
project(':ambry-mysql'),
project(':ambry-clustermap')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
testCompile project(':ambry-test-utils')
testCompile project(':ambry-account')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand All @@ -562,7 +578,11 @@ project(':ambry-mysql') {
compile project(':ambry-utils')
compile project(':ambry-commons')
compile "mysql:mysql-connector-java:$mysqlConnectorVersion"
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.zaxxer:HikariCP:$hikariVersion"
testCompile project(':ambry-test-utils')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand Down
Loading
Loading