Skip to content

Commit

Permalink
Add a cli tool to perform some account metadata related operations (#…
Browse files Browse the repository at this point in the history
…1268)

Add tool that interacts with RouterStore
  • Loading branch information
justinlin-linkedin authored and cgtz committed Oct 1, 2019
1 parent 523f2cb commit 87638c7
Show file tree
Hide file tree
Showing 6 changed files with 725 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AccountMetadataStore {
protected final AccountServiceMetrics accountServiceMetrics;
protected final BackupFileManager backupFileManager;
protected final String znRecordPath;
private final HelixPropertyStore<ZNRecord> helixStore;
protected final HelixPropertyStore<ZNRecord> helixStore;

/** Create a new {@link AccountMetadataStore} instance for the subclasses.
* @param accountServiceMetrics The {@link AccountServiceMetrics}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ public void setupRouter(final Router router) throws IllegalStateException {
}
}

/**
* Return the {@link Notifier}.
* @return The {@link Notifier}
*/
Notifier<String> getNotifier() {
return notifier;
}

/**
* A synchronized function to fetch account metadata from {@link AccountMetadataStore} and update the in memory cache.
* @param isCalledFromListener True is this function is invoked in the {@link TopicListener}.
Expand Down Expand Up @@ -273,7 +281,17 @@ public boolean removeAccountUpdateConsumer(Consumer<Collection<Account>> account
*/
@Override
public boolean updateAccounts(Collection<Account> accounts) {
checkOpen();
return updateAccountsWithAccountMetadataStore(accounts, accountMetadataStore);
}

/**
* Helper function to update {@link Account} metadata.
* @param accounts The {@link Account} metadata to update.
* @param accountMetadataStore The {@link AccountMetadataStore}.
* @return True when the update operation succeeds.
*/
boolean updateAccountsWithAccountMetadataStore(Collection<Account> accounts, AccountMetadataStore accountMetadataStore) {
checkOpen();
Objects.requireNonNull(accounts, "accounts cannot be null");
if (accounts.isEmpty()) {
logger.debug("Empty account collection to update.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.zookeeper.data.Stat;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -142,6 +144,69 @@ Map<String, String> readAccountMetadataFromBlobID(String blobID) {
return null;
}

/**
* Get the list of {@link BlobIDAndVersion} from {@link ZNRecord}. This function returns null when there
* is any error.
* @return list of {@link BlobIDAndVersion}.
*/
private List<BlobIDAndVersion> fetchAllBlobIDAndVersions() {
if (router.get() == null) {
logger.error("Router is not yet initialized");
return null;
}
long startTimeMs = System.currentTimeMillis();
logger.trace("Start reading ZNRecord from path={}", znRecordPath);
Stat stat = new Stat();
ZNRecord znRecord = helixStore.get(znRecordPath, stat, AccessOption.PERSISTENT);
logger.trace("Fetched ZNRecord from path={}, took time={} ms", znRecordPath,
System.currentTimeMillis() - startTimeMs);
if (znRecord == null) {
logger.info("The ZNRecord to read does not exist on path={}", znRecordPath);
return null;
}

List<String> blobIDAndVersionsJson = znRecord.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (blobIDAndVersionsJson == null || blobIDAndVersionsJson.size() == 0) {
logger.info("ZNRecord={} to read on path={} does not have a simple list with key={}", znRecord,
ACCOUNT_METADATA_BLOB_IDS_PATH, ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
return null;
} else {
return blobIDAndVersionsJson.stream().map(BlobIDAndVersion::fromJson).collect(Collectors.toList());
}
}

/**
* Returns all the versions of {@link Account} metadata as a list. It will return null when the versions are empty.
* @return All the versions of {@link Account} metadata.
*/
public List<Integer> getAllVersions() {
List<BlobIDAndVersion> blobIDAndVersions = fetchAllBlobIDAndVersions() ;
if (blobIDAndVersions == null) {
return null;
} else {
return blobIDAndVersions.stream().map(BlobIDAndVersion::getVersion).collect(Collectors.toList());
}
}

/**
* Return a map from account id to {@link Account} metadata at given version. It returns null when there is any error.
* @param version The version of {@link Account} metadata to return.
* @return A map from account id to {@link Account} metadata in json format.
* @throws IllegalArgumentException When the version is not valid.
*/
public Map<String, String> fetchAccountMetadataAtVersion(int version) throws IllegalArgumentException {
List<BlobIDAndVersion> blobIDAndVersions = fetchAllBlobIDAndVersions();
if (blobIDAndVersions == null) {
return null;
}
for (BlobIDAndVersion blobIDAndVersion: blobIDAndVersions) {
if (blobIDAndVersion.getVersion() == version) {
return readAccountMetadataFromBlobID(blobIDAndVersion.getBlobID());
}
}
throw new IllegalArgumentException("Version " + version + " doesn't exist");
}

@Override
AccountMetadataStore.ZKUpdater createNewZKUpdater(Collection<Account> accounts) {
Objects.requireNonNull(router.get(), "Router is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void testUpdateAndFetch() throws Exception {
// generate an new account and test update and fetch on this account
AccountTestUtils.generateRefAccounts(idToRefAccountMap, idtoRefContainerMap, accountIDSet, 1, 1);
assertUpdateAndFetch(store, idToRefAccountMap, idToRefAccountMap, 1, 1);
Map<Short, Account> accountMapFirstVersion = new HashMap<>(idToRefAccountMap);

// generate another new account and test update and fetch on this account
Map<Short, Account> anotherIdToRefAccountMap = new HashMap<>();
Expand All @@ -133,6 +134,18 @@ public void testUpdateAndFetch() throws Exception {
}
// the version should be 2 now
assertUpdateAndFetch(store, idToRefAccountMap, anotherIdToRefAccountMap, 2, 2);

// Make sure we can get all the versions out
List<Integer> versions = store.getAllVersions();
assertNotNull(versions);
Collections.sort(versions);
assertEquals(versions.size(), 2);
assertEquals((int) versions.get(0), 1);
assertEquals((int) versions.get(1), 2);

// Make sure we can still get the first version out
Map<String, String> accountMap = store.fetchAccountMetadataAtVersion(1);
assertAccountsEqual(accountMap, accountMapFirstVersion);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,40 @@
*/
public class HelixPropertyStoreConfig {
public static final String HELIX_PROPERTY_STORE_PREFIX = "helix.property.store.";
public static final String HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS =
HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms";
public static final String HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS =
HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms";
public static final String HELIX_ROOT_PATH = HELIX_PROPERTY_STORE_PREFIX + "root.path";

/**
* Time in ms to time out a connection to a ZooKeeper server.
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms")
@Config(HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS)
@Default("20 * 1000")
public final int zkClientConnectionTimeoutMs;

/**
* Time in ms defines disconnection tolerance by a session. I.e., if reconnected within this time, it will
* be considered as the same session.
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms")
@Config(HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS)
@Default("20 * 1000")
public final int zkClientSessionTimeoutMs;

/**
* The root path of helix property store in the ZooKeeper. Must start with {@code /}, and must not end with {@code /}.
* It is recommended to make root path in the form of {@code /ambry/<clustername>/helixPropertyStore}
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "root.path")
@Config(HELIX_ROOT_PATH)
@Default("/ambry/defaultCluster/helixPropertyStore")
public final String rootPath;

public HelixPropertyStoreConfig(VerifiableProperties verifiableProperties) {
zkClientConnectionTimeoutMs =
verifiableProperties.getIntInRange(HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms", 20 * 1000,
1, Integer.MAX_VALUE);
verifiableProperties.getIntInRange(HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS, 20 * 1000, 1, Integer.MAX_VALUE);
zkClientSessionTimeoutMs =
verifiableProperties.getIntInRange(HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms", 20 * 1000, 1,
Integer.MAX_VALUE);
rootPath = verifiableProperties.getString(HELIX_PROPERTY_STORE_PREFIX + "root.path",
"/ambry/defaultCluster/helixPropertyStore");
verifiableProperties.getIntInRange(HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS, 20 * 1000, 1, Integer.MAX_VALUE);
rootPath = verifiableProperties.getString(HELIX_ROOT_PATH, "/ambry/defaultCluster/helixPropertyStore");
}
}
Loading

0 comments on commit 87638c7

Please sign in to comment.