Skip to content

Commit

Permalink
Move command boilerplate into a base class
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-signal committed May 17, 2024
1 parent 7d95926 commit 1182d15
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 175 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;

import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import net.sourceforge.argparse4j.inf.Namespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;

/**
* Base class for one-shot commands that use {@link CommandDependencies}.
* <p>
* Override {@link #run(Environment, Namespace, WhisperServerConfiguration, CommandDependencies)} in a child class to
* let the parent class handle common initialization of dependencies, metrics, and logging.
*/
public abstract class AbstractCommandWithDependencies extends EnvironmentCommand<WhisperServerConfiguration> {

private final Logger logger = LoggerFactory.getLogger(getClass());

protected AbstractCommandWithDependencies(final Application<WhisperServerConfiguration> application,
final String name, final String description) {
super(application, name, description);
}

/**
* Run the command with the given initialized {@link CommandDependencies}
*/
protected abstract void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception;

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());

try {
logger.info("Starting command dependencies");
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});

run(environment, namespace, configuration, commandDependencies);

} finally {
logger.info("Stopping command dependencies");
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}

@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
public abstract class AbstractSinglePassCrawlAccountsCommand extends AbstractCommandWithDependencies {

private CommandDependencies commandDependencies;
private Namespace namespace;
Expand Down Expand Up @@ -59,44 +59,17 @@ protected Namespace getNamespace() {

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {

UncaughtExceptionHandler.register();

final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
this.namespace = namespace;
this.commandDependencies = CommandDependencies.build(getName(), environment, configuration);
this.commandDependencies = commandDependencies;

final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT));

logger.info("Crawling accounts with {} segments and {} processors",
segments,
Runtime.getRuntime().availableProcessors());

try {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});

crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel()));
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}

@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel()));
}

protected abstract void crawlAccounts(final Flux<Account> accounts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;

public class BackupMetricsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
public class BackupMetricsCommand extends AbstractCommandWithDependencies {

private final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -61,70 +61,47 @@ public void configure(final Subparser subparser) {

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {

UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {

final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
logger.info("Crawling backups for metrics with {} segments and {} processors",
segments,
Runtime.getRuntime().availableProcessors());

try {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});

final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MESSAGES.name());

final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(),
"timeSinceLastRefresh"));
final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(),
"timeSinceLastMediaRefresh"));
final String backupsCounterName = name(getClass(), "backups");

final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsExpired = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.doOnNext(backupMetadata -> {
final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh());
if (subscribed) {
numObjectsMediaTier.record(backupMetadata.numObjects());
bytesUsedMediaTier.record(backupMetadata.bytesUsed());
} else {
numObjectsMessagesTier.record(backupMetadata.numObjects());
bytesUsedMessagesTier.record(backupMetadata.bytesUsed());
}
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
})
.count()
.block();
logger.info("Crawled {} backups", backupsExpired);
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MESSAGES.name());

final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(),
"timeSinceLastRefresh"));
final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(),
"timeSinceLastMediaRefresh"));
final String backupsCounterName = name(getClass(), "backups");

final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsExpired = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.doOnNext(backupMetadata -> {
final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh());
if (subscribed) {
numObjectsMediaTier.record(backupMetadata.numObjects());
bytesUsedMediaTier.record(backupMetadata.bytesUsed());
} else {
numObjectsMessagesTier.record(backupMetadata.numObjects());
bytesUsedMessagesTier.record(backupMetadata.bytesUsed());
}
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
})
.count()
.block();
logger.info("Crawled {} backups", backupsExpired);
}

private Duration timeSince(Instant t) {
Expand All @@ -134,9 +111,4 @@ private Duration timeSince(Instant t) {
}
return between;
}

@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.whispersystems.textsecuregcm.workers;

import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import java.util.Optional;
import net.sourceforge.argparse4j.inf.Namespace;
Expand All @@ -19,7 +17,7 @@
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;

public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
public class DeleteUserCommand extends AbstractCommandWithDependencies {

private final Logger logger = LoggerFactory.getLogger(DeleteUserCommand.class);

Expand All @@ -36,22 +34,17 @@ public void run(WhisperServerConfiguration configuration, Environment environmen
public void configure(Subparser subparser) {
super.configure(subparser);
subparser.addArgument("-u", "--user")
.dest("user")
.type(String.class)
.required(true)
.help("The user to remove");
.dest("user")
.type(String.class)
.required(true)
.help("The user to remove");
}

@Override
protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration)
throws Exception
{
protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration,
CommandDependencies deps) throws Exception {
try {
String[] users = namespace.getString("user").split(",");
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

final CommandDependencies deps = CommandDependencies.build("rmuser", environment, configuration);

AccountsManager accountsManager = deps.accountsManager();

for (String user : users) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.whispersystems.textsecuregcm.workers;

import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
Expand All @@ -22,11 +20,10 @@
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.ExpiredBackup;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
public class RemoveExpiredBackupsCommand extends AbstractCommandWithDependencies {

private final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -89,12 +86,7 @@ public void configure(final Subparser subparser) {

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {

UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());

final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
final int concurrency = Objects.requireNonNull(namespace.getInt(MAX_CONCURRENCY_ARGUMENT));
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
Expand All @@ -105,32 +97,14 @@ protected void run(final Environment environment, final Namespace namespace,
Runtime.getRuntime().availableProcessors(),
gracePeriod);

try {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});
final BackupManager backupManager = commandDependencies.backupManager();
final long backupsExpired = backupManager
.getExpiredBackups(segments, Schedulers.parallel(), clock.instant().minus(gracePeriod))
.flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
.filter(Boolean.TRUE::equals)
.count()
.block();
logger.info("Expired {} backups", backupsExpired);
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
final BackupManager backupManager = commandDependencies.backupManager();
final long backupsExpired = backupManager
.getExpiredBackups(segments, Schedulers.parallel(), clock.instant().minus(gracePeriod))
.flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
.filter(Boolean.TRUE::equals)
.count()
.block();
logger.info("Expired {} backups", backupsExpired);
}

private Mono<Boolean> removeExpiredBackup(
Expand Down Expand Up @@ -162,9 +136,4 @@ private Mono<Boolean> removeExpiredBackup(
return Mono.just(false);
});
}

@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}
Loading

0 comments on commit 1182d15

Please sign in to comment.