Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Weighted Id Generator #34

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.DistributedIdGeneratorPerfTest.testGenerate",
"mode" : "Throughput",
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 406378.3277020471
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.DistributedIdGeneratorTest.testGenerateWithBenchmark",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 3823,
"avgTime" : 764.6
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.DistributedIdGeneratorTest.testGenerateWithConstraints",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 5386,
"avgTime" : 1077.2
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 644166.1778513143
"mean_ops" : 800941.387237486
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 502644.4941310657
"mean_ops" : 643473.2832640476
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorPerfTest.testGenerate",
"mode" : "Throughput",
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 410789.0590215035
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithBenchmark",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 3929,
"avgTime" : 785.8
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithConstraints",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 2867,
"avgTime" : 573.4
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
public class Constants {
public static final int MAX_ID_PER_MS = 1000;
public static final int MAX_NUM_NODES = 10000;
public static final int ID_DELETION_DELAY_IN_SECONDS = 60;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
package io.appform.ranger.discovery.bundle.id;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import dev.failsafe.RetryPolicy;
import io.appform.ranger.discovery.bundle.id.constraints.KeyValidationConstraint;
import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter;
import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;

/**
* Distributed Id Generation
*/
@SuppressWarnings("unused")
@Slf4j
public class DistributedIdGenerator {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename it to partitionAwareIdGenerator.


private static final int MINIMUM_ID_LENGTH = 22;
protected static final SecureRandom SECURE_RANDOM = new SecureRandom(Long.toBinaryString(System.currentTimeMillis()).getBytes());
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyMMddHHmmss");
private final RetryPolicy<Integer> RETRY_POLICY = RetryPolicy.<Integer>builder()
.withMaxAttempts(readRetryCount())
.handleIf(throwable -> true)
.handleResultIf(Objects::isNull)
.build();
protected final FailsafeExecutor<Integer> RETRIER = Failsafe.with(Collections.singletonList(RETRY_POLICY));
private static final Pattern PATTERN = Pattern.compile("(.*)([0-9]{12})([0-9]{4})([0-9]{6})");
private static final List<KeyValidationConstraint> GLOBAL_CONSTRAINTS = new ArrayList<>();
private static final Map<String, List<KeyValidationConstraint>> DOMAIN_SPECIFIC_CONSTRAINTS = new HashMap<>();
protected static final int NODE_ID = IdGenerator.getNodeId();
@Getter
private final Map<String, Map<Long, PartitionIdTracker>> dataStore = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename it to IdPartitionsMapping or IdPartitionStore, datastore doesn't capture what this is storing.

protected final IdFormatter idFormatter;
protected final Function<String, Integer> partitionResolver;
protected final int partitionCount;

// dataStore Structure

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented code, can be removed

// {
// prefix: {
// timestamp: {
// partitions: [
// {
// ids: [],
// pointer: <int>
// },
// {
// ids: [],
// pointer: <int>
// } ...
// ],
// counter: <int>
// }
// }
// }

public DistributedIdGenerator(final int partitionSize,
final Function<String, Integer> partitionResolverSupplier) {
partitionCount = partitionSize;
partitionResolver = partitionResolverSupplier;
idFormatter = IdFormatters.distributed();
val executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
this::deleteExpiredKeys,
Constants.ID_DELETION_DELAY_IN_SECONDS,
Constants.ID_DELETION_DELAY_IN_SECONDS,
TimeUnit.SECONDS);
}

public DistributedIdGenerator(final int partitionSize,
final Function<String, Integer> partitionResolverSupplier,
final IdFormatter idFormatterInstance) {
partitionCount = partitionSize;
partitionResolver = partitionResolverSupplier;
idFormatter = idFormatterInstance;
}

public synchronized void registerGlobalConstraints(final KeyValidationConstraint... constraints) {
registerGlobalConstraints(ImmutableList.copyOf(constraints));
}

public synchronized void registerGlobalConstraints(final List<KeyValidationConstraint> constraints) {
Preconditions.checkArgument(null != constraints && !constraints.isEmpty());
GLOBAL_CONSTRAINTS.addAll(constraints);
}

public synchronized void registerDomainSpecificConstraints(
final String domain,
final KeyValidationConstraint... validationConstraints) {
registerDomainSpecificConstraints(domain, ImmutableList.copyOf(validationConstraints));
}

public synchronized void registerDomainSpecificConstraints(
final String domain,
final List<KeyValidationConstraint> validationConstraints) {
Preconditions.checkArgument(null != validationConstraints && !validationConstraints.isEmpty());
DOMAIN_SPECIFIC_CONSTRAINTS.computeIfAbsent(domain, key -> new ArrayList<>())
.addAll(validationConstraints);
}

/**
* Generate id with given prefix
*
* @param prefix String prefix for ID to be generated
* @return Generated Id
*/
public Id generate(final String prefix) {
val targetPartitionId = getTargetPartitionId();
return generateForPartition(prefix, targetPartitionId);
}

public Id generateForPartition(final String prefix, final int targetPartitionId) {
val prefixIdMap = dataStore.computeIfAbsent(prefix, k -> new ConcurrentHashMap<>());
val currentTimestamp = new DateTime();
val timeKey = currentTimestamp.getMillis() / 1000;
val idCounter = generateForAllPartitions(
prefixIdMap.computeIfAbsent(timeKey, key -> new PartitionIdTracker(partitionCount)),
prefix,
currentTimestamp,
targetPartitionId);
val id = String.format("%s%s", prefix, idFormatter.format(currentTimestamp, NODE_ID, idCounter));
return Id.builder()
.id(id)
.exponent(idCounter)
.generatedDate(currentTimestamp.toDate())
.node(NODE_ID)
.build();
}

private int generateForAllPartitions(final PartitionIdTracker partitionIdTracker,
final String prefix,
final DateTime timestamp,
final int targetPartitionId) {
val idPool = partitionIdTracker.getPartition(targetPartitionId);
int idIdx = idPool.getPointer().getAndIncrement();
// ToDo: Add Retry Limit
while (idPool.getIds().size() <= idIdx) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a retry limit which is explicitly passed by client, since partitionResolver is passed by client.

val counterValue = partitionIdTracker.getCounter().getAndIncrement();
val txnId = String.format("%s%s", prefix, idFormatter.format(timestamp, NODE_ID, counterValue));
val mappedPartitionId = partitionResolver.apply(txnId);
partitionIdTracker.getPartition(mappedPartitionId).getIds().add(counterValue);
}
return idPool.getId(idIdx);
}

/**
* Generate id that matches all passed constraints.
* NOTE: There are performance implications for this.
* The evaluation of constraints will take it's toll on ID generation rates.
*
* @param prefix String prefix
* @param domain Domain for constraint selection
* @return Return generated id or empty if it was impossible to satisfy constraints and generate
*/
public Optional<Id> generateWithConstraints(final String prefix, final String domain) {
return generateWithConstraints(prefix, domain, true);
}

/**
* Generate id that matches all passed constraints.
* NOTE: There are performance implications for this.
* The evaluation of constraints will take it's toll on id generation rates.
*
* @param prefix String prefix
* @param domain Domain for constraint selection
* @param skipGlobal Skip global constrains and use only passed ones
* @return Id if it could be generated
*/
public Optional<Id> generateWithConstraints(final String prefix, final String domain, final boolean skipGlobal) {
val targetPartitionId = getTargetPartitionId(DOMAIN_SPECIFIC_CONSTRAINTS.getOrDefault(domain, Collections.emptyList()), skipGlobal);
return targetPartitionId.map(id -> generateForPartition(prefix, id));
}

public Optional<Id> generateWithConstraints(final String prefix,
final List<KeyValidationConstraint> inConstraints,
final boolean skipGlobal) {
val targetPartitionId = getTargetPartitionId(inConstraints, skipGlobal);
return targetPartitionId.map(id -> generateForPartition(prefix, id));
}

/**
* Generate id by parsing given string
*
* @param idString String idString
* @return Id if it could be generated
*/
public Optional<Id> parse(final String idString) {
if (idString == null
|| idString.length() < MINIMUM_ID_LENGTH) {
return Optional.empty();
}
try {
val matcher = PATTERN.matcher(idString);
if (matcher.find()) {
return Optional.of(Id.builder()
.id(idString)
.node(Integer.parseInt(matcher.group(3)))
.exponent(Integer.parseInt(matcher.group(4)))
.generatedDate(DATE_TIME_FORMATTER.parseDateTime(matcher.group(2)).toDate())
.build());
}
return Optional.empty();
}
catch (Exception e) {
log.warn("Could not parse idString {}", e.getMessage());
return Optional.empty();
}
}

private int getTargetPartitionId() {
return SECURE_RANDOM.nextInt(partitionCount);
}

private Optional<Integer> getTargetPartitionId(final List<KeyValidationConstraint> inConstraints, final boolean skipGlobal) {
// ToDo: Check if we need Collision Checker here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we required, since we are fetching from list.

return Optional.ofNullable(
RETRIER.get(() -> SECURE_RANDOM.nextInt(partitionCount)))
.filter(key -> validateId(inConstraints, key, skipGlobal));
}

protected boolean validateId(final List<KeyValidationConstraint> inConstraints,
final int partitionId,
final boolean skipGlobal) {
//First evaluate global constraints
val failedGlobalConstraint
= skipGlobal
? null
: GLOBAL_CONSTRAINTS.stream()
.filter(constraint -> !constraint.isValid(partitionId))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are assuming that constraints will always be on partitionId, Should we have it on ID level also, else client have to implement this method, and can't moveover seamlessly. Let's discuss it once.

.findFirst()
.orElse(null);
if (null != failedGlobalConstraint) {
return false;
}
//Evaluate local + domain constraints
val failedLocalConstraint
= null == inConstraints
? null
: inConstraints.stream()
.filter(constraint -> !constraint.isValid(partitionId))
.findFirst()
.orElse(null);
return null == failedLocalConstraint;
}

protected int readRetryCount() {
try {
val count = Integer.parseInt(System.getenv().getOrDefault("NUM_ID_GENERATION_RETRIES", "512"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep default as the number of partition?

if (count <= 0) {
throw new IllegalArgumentException(
"Negative number of retries does not make sense. Please set a proper value for " +
"NUM_ID_GENERATION_RETRIES");
}
return count;
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Please provide a valid positive integer for NUM_ID_GENERATION_RETRIES");
}
}

private void deleteExpiredKeys() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make this method synchronised.

val timeThreshold = DateTime.now().getMillis() / 1000;
for (val entry : dataStore.entrySet()) {
entry.getValue().entrySet().removeIf(partitionIdTrackerEntry -> partitionIdTrackerEntry.getKey() < timeThreshold);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think and handle cases where time changes on a machine, probably safer to remove entries which are timeThreshold - SOME_VALUE

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.appform.ranger.discovery.bundle.id;

import lombok.Value;

@Value
class GenerationResult {
Id id;
IdValidationState state;
}
Loading