Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Helix Auto Registration #2982

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +37,7 @@ public class HelixFactory {
// exposed for use in testing
private final Map<ManagerKey, HelixManager> helixManagers = new ConcurrentHashMap<>();
private final Map<String, DataNodeConfigSource> dataNodeConfigSources = new ConcurrentHashMap<>();
private static final String INSTANCE_CONFIG_HELIX_PORT = "15088";
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get a reference to a {@link HelixManager}
Expand Down Expand Up @@ -96,7 +101,14 @@ public DataNodeConfigSource getDataNodeConfigSource(ClusterMapConfig clusterMapC
* @return a new instance of {@link HelixManager}.
*/
HelixManager buildZKHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddr) {
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
return HelixManagerFactory.getZKHelixManager(clusterName, instanceName, instanceType, zkAddr);
HelixManagerProperty participantHelixProperty = new HelixManagerProperty.Builder().setDefaultInstanceConfigBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to create a flag in config to switch b/w old logic and new logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you can add a comment explaining what is autoregistration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for flag here as we are not changing the way we connect to helix. In this we are providing an InstanceConfig as extra configuration while connection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be tested in EI for sometime. We will need prod deployments to go through while wait for validation. And if we encounter some issue, we can just turn off the the flag.

Copy link
Contributor Author

@mudit-saxena mudit-saxena Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no auto registration at this point. Auto registration will have two requisites after this enabling cloud config and taking dependency on helix-cloud in closed.

Now for this specific change, it is change to the factory method on how we instantiate the HelixManager. I have added an additional test as safety check.

I don't think there should be any issue due to this but I can hold off from merging until I test on perf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, check if this is not getting used in VCR too.

new InstanceConfig.Builder().setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN).setPort(INSTANCE_CONFIG_HELIX_PORT)).build();
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved

HelixManager helixManager = new ZKHelixManager(clusterName, instanceName, instanceType, zkAddr, null,
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
instanceType == InstanceType.PARTICIPANT ? participantHelixProperty :
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
new HelixManagerProperty.Builder().setDefaultInstanceConfigBuilder(new InstanceConfig.Builder()).build());
mudit-saxena marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info("Created HelixManager for cluster {} with instanceName {} and instanceType {}", clusterName, instanceName, instanceType);
return helixManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public ClusterManagementMode getClusterManagementMode(String clusterName) {
return null;
}

@Override
public void setPartitionsToError(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
throw new IllegalStateException("Not implemented");
}

@Override
public void enableInstance(String clusterName, String instanceName, boolean enabled,
InstanceConstants.InstanceDisabledType disabledType, String reason) {
Expand Down Expand Up @@ -632,6 +638,24 @@ public void enableInstance(String s, List<String> list, boolean b) {
throw new IllegalStateException("Not implemented");
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation) {
setInstanceOperation(clusterName, instanceName, instanceOperation, "Not Implemented");
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason) {
setInstanceOperation(clusterName, instanceName, instanceOperation, reason, false);
}

@Override
public void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason, boolean overrideAll) {
throw new IllegalStateException("Not implemented");
}

@Override
public void enableResource(String clusterName, String resourceName, boolean enabled) {
throw new IllegalStateException("Not implemented");
Expand Down Expand Up @@ -762,6 +786,11 @@ public void rebalance(String clusterName, String resourceName, int replica) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onDemandRebalance(String s) {
throw new IllegalStateException("Not implemented");
}

@Override
public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException {
throw new IllegalStateException("Not implemented");
Expand Down Expand Up @@ -870,6 +899,26 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean canCompleteSwap(String clusterName, String instanceName) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean completeSwapIfPossible(String clusterName, String instanceName, boolean forceComplete) {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
throw new IllegalStateException("Not implemented");
}

/**
* Private class that holds partition state infos from one data node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -29,6 +30,7 @@
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -62,6 +64,7 @@
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.Watcher;

import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -182,6 +185,12 @@ public void disconnect() {
helixPropertyStore.close();
}

@Override
public void addListener(Object o, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventTypes) {
throw new IllegalStateException("Not implemented");
}

@Override
public void addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener listener) throws Exception {
// deprecated LiveInstanceChangeListener is no longer supported in testing
Expand Down Expand Up @@ -370,6 +379,12 @@ public void addCurrentStateChangeListener(org.apache.helix.CurrentStateChangeLis
throw new IllegalStateException("Not implemented");
}

@Override
public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener, String instanceName,
String sessionId) throws Exception {
throw new IllegalStateException("Not implemented");
}

@Override
public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, String instanceName)
throws Exception {
Expand Down Expand Up @@ -489,6 +504,11 @@ public Long getSessionStartTime() {
throw new IllegalStateException("Not implemented");
}

@Override
public Optional<String> getSessionIdIfLead() {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean isLeader() {
throw new IllegalStateException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.github.ambry.config.ClusterMapConfig;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.Watcher;


public class MockHelixManagerFactory extends HelixFactory {
Expand Down Expand Up @@ -188,6 +191,11 @@ public Long getSessionStartTime() {
throw new IllegalStateException("Not implemented");
}

@Override
public Optional<String> getSessionIdIfLead() {
throw new IllegalStateException("Not implemented");
}

@Override
public void connect() throws Exception {
if (beBad) {
Expand All @@ -207,6 +215,12 @@ public void disconnect() {
isConnected = false;
}

@Override
public void addListener(Object o, PropertyKey propertyKey, HelixConstants.ChangeType changeType,
Watcher.Event.EventType[] eventTypes) {
throw new IllegalStateException("Not implemented");
}

String getStateModelDef() {
return stateModelDef;
}
Expand Down Expand Up @@ -310,6 +324,12 @@ public void addCurrentStateChangeListener(CurrentStateChangeListener listener, S
throw new IllegalStateException("Not implemented");
}

@Override
public void addTaskCurrentStateChangeListener(org.apache.helix.api.listeners.CurrentStateChangeListener listener,
String instanceName, String sessionId) throws Exception {
throw new IllegalStateException("Not implemented");
}

@Override
public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, String instanceName)
throws Exception {
Expand Down
36 changes: 28 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ project(':ambry-commons') {
compile "io.netty:netty-all:$nettyVersion"
compile "io.netty:netty-tcnative-boringssl-static:$nettyTcNativeBoringSSLStatic_linux_x86_64"
compile "io.netty:netty-tcnative-boringssl-static:$nettyTcNativeBoringSSLStatic_osx_aarch_64"
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.github.ben-manes.caffeine:caffeine:$caffeineVersion"
testCompile project(':ambry-test-utils')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand All @@ -276,7 +280,11 @@ project(':ambry-account') {
project(':ambry-utils'),
project(':ambry-commons'),
project(':ambry-mysql')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "org.json:json:$jsonVersion"
testCompile project(':ambry-test-utils')
Expand All @@ -290,8 +298,12 @@ project(':ambry-clustermap') {
compile project(':ambry-api'),
project(':ambry-commons'),
project(':ambry-utils')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-lock:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-lock:$helixLockVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
Expand All @@ -301,7 +313,6 @@ project(':ambry-clustermap') {
testCompile project(':ambry-test-utils')
testCompile project(':ambry-mysql')
testCompile project(path: ':ambry-api', configuration: 'testArchives')

}
}

Expand Down Expand Up @@ -382,6 +393,7 @@ project(':ambry-replication') {
project(':ambry-network'),
project(':ambry-clustermap')
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "org.apache.commons:commons-lang3:$commonsLangVersion"
testCompile project(':ambry-store')
testCompile project(':ambry-clustermap')
testCompile project(':ambry-test-utils')
Expand Down Expand Up @@ -508,7 +520,7 @@ project(':ambry-cloud') {
compile "io.dropwizard.metrics:metrics-core:$metricsVersion"
compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion"
compile "com.microsoft.azure:msal4j:$azureMsal4jVersion"
compile "org.apache.helix:helix-lock:$helixVersion"
compile "org.apache.helix:helix-lock:$helixLockVersion" exclude group: "org.apache.helix"
testCompile project(':ambry-router')
testCompile project(':ambry-store')
testCompile project(':ambry-test-utils')
Expand Down Expand Up @@ -549,7 +561,11 @@ project(':ambry-quota') {
project(':ambry-commons'),
project(':ambry-mysql'),
project(':ambry-clustermap')
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
testCompile project(':ambry-test-utils')
testCompile project(':ambry-account')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand All @@ -562,7 +578,11 @@ project(':ambry-mysql') {
compile project(':ambry-utils')
compile project(':ambry-commons')
compile "mysql:mysql-connector-java:$mysqlConnectorVersion"
compile "org.apache.helix:helix-core:$helixVersion"
compile "org.apache.helix:helix-core:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:helix-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:zookeeper-api:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metadata-store-directory-common:$helixVersion" exclude group: 'org.apache.helix'
compile "org.apache.helix:metrics-common:$helixVersion" exclude group: 'org.apache.helix'
compile "com.zaxxer:HikariCP:$hikariVersion"
testCompile project(':ambry-test-utils')
testCompile project(path: ':ambry-clustermap', configuration: 'testArchives')
Expand Down
4 changes: 3 additions & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ ext {
nettyVersion = "4.1.79.Final"
nettyTcNativeBoringSSLStatic_linux_x86_64 = "2.0.61.Final:linux-x86_64"
nettyTcNativeBoringSSLStatic_osx_aarch_64 = "2.0.61.Final:osx-aarch_64"
helixVersion = "1.1.0"
helixLockVersion = "1.1.0"
helixVersion = "1.4.1:jdk8"
jacksonVersion = "2.13.5"
jaydioVersion = "0.1"
azureMsal4jVersion="1.7.1"
Expand All @@ -36,4 +37,5 @@ ext {
powermockVersion = "2.+"
caffeineVersion = "2.9.3"
hadoopCommonVersion = "3.3.6"
commonsLangVersion = "3.12.0"
}
Loading