Skip to content

Commit

Permalink
Migrate DynamicConfigurationManager to use java.util.concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal committed Mar 8, 2024
1 parent 9e510a6 commit 3dadaf9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,14 @@ public void run(WhisperServerConfiguration config, Environment environment) thro

UncaughtExceptionHandler.register();

ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();

DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(config.getAppConfig().getApplication(),
config.getAppConfig().getEnvironment(),
config.getAppConfig().getConfigurationName(),
DynamicConfiguration.class);
DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start();

MetricsUtil.configureRegistries(config, environment, dynamicConfigurationManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
Expand All @@ -38,8 +41,9 @@ public class DynamicConfigurationManager<T> {

// Set on initial config fetch
private final AtomicReference<T> configuration = new AtomicReference<>();
private final CountDownLatch initialized = new CountDownLatch(1);
private final ScheduledExecutorService scheduledExecutorService;
private String configurationToken = null;
private boolean initialized = false;

private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator();

Expand All @@ -50,61 +54,48 @@ public class DynamicConfigurationManager<T> {
private static final Logger logger = LoggerFactory.getLogger(DynamicConfigurationManager.class);

public DynamicConfigurationManager(String application, String environment, String configurationName,
Class<T> configurationClass) {
Class<T> configurationClass, ScheduledExecutorService scheduledExecutorService) {
this(AppConfigDataClient
.builder()
.overrideConfiguration(ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofSeconds(10))
.apiCallAttemptTimeout(Duration.ofSeconds(10)).build())
.build(),
application, environment, configurationName, configurationClass);
application, environment, configurationName, configurationClass, scheduledExecutorService);
}

@VisibleForTesting
DynamicConfigurationManager(AppConfigDataClient appConfigClient, String application, String environment,
String configurationName, Class<T> configurationClass) {
String configurationName, Class<T> configurationClass, ScheduledExecutorService scheduledExecutorService) {
this.appConfigClient = appConfigClient;
this.application = application;
this.environment = environment;
this.configurationName = configurationName;
this.configurationClass = configurationClass;
this.scheduledExecutorService = scheduledExecutorService;
}

public T getConfiguration() {
synchronized (this) {
while (!initialized) {
try {
this.wait();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for initial configuration", e);
throw new RuntimeException(e);
}
}
try {
initialized.await();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for initial configuration", e);
throw new RuntimeException(e);
}
return configuration.get();
}

public void start() {
configuration.set(retrieveInitialDynamicConfiguration());
synchronized (this) {
this.initialized = true;
this.notifyAll();
}
initialized.countDown();

final Thread workerThread = new Thread(() -> {
while (true) {
try {
retrieveDynamicConfiguration().ifPresent(configuration::set);
} catch (Exception e) {
logger.warn("Error retrieving dynamic configuration", e);
}

Util.sleep(5000);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
retrieveDynamicConfiguration().ifPresent(configuration::set);
} catch (Exception e) {
logger.warn("Error retrieving dynamic configuration", e);
}
}, "DynamicConfigurationManagerWorker");

workerThread.setDaemon(true);
workerThread.start();
}, 0, 5, TimeUnit.SECONDS);
}

private Optional<T> retrieveDynamicConfiguration() throws JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ protected void run(Environment environment, Namespace namespace,
throws Exception {
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();

DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start();

ClientResources redisClusterClientResources = ClientResources.builder().build();

FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
Expand Down Expand Up @@ -128,11 +136,6 @@ protected void run(Environment environment, Namespace namespace,
ExternalServiceCredentialsGenerator secureValueRecoveryCredentialsGenerator = SecureValueRecovery2Controller.credentialsGenerator(
configuration.getSvr2Configuration());

DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class);
dynamicConfigurationManager.start();

ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(
dynamicConfigurationManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ static CommandDependencies build(

environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "dynamicConfiguration-%d")).threads(1).build();

DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class);
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start();

MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
Expand All @@ -27,19 +30,25 @@ class DynamicConfigurationManagerTest {
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private AppConfigDataClient appConfig;
private StartConfigurationSessionRequest startConfigurationSession;
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

@BeforeEach
void setup() {
this.appConfig = mock(AppConfigDataClient.class);
this.dynamicConfigurationManager = new DynamicConfigurationManager<>(
appConfig, "foo", "bar", "baz", DynamicConfiguration.class);
appConfig, "foo", "bar", "baz", DynamicConfiguration.class, scheduledExecutorService);
this.startConfigurationSession = StartConfigurationSessionRequest.builder()
.applicationIdentifier("foo")
.environmentIdentifier("bar")
.configurationProfileIdentifier("baz")
.build();
}

@AfterEach
void teardown() {
scheduledExecutorService.shutdown();
}

@Test
void testGetInitialConfig() {
when(appConfig.startConfigurationSession(startConfigurationSession))
Expand Down

0 comments on commit 3dadaf9

Please sign in to comment.