From b924dea045a3d72cb3d805a39f5ed5521ec97a3f Mon Sep 17 00:00:00 2001 From: Ravi Khadiwala Date: Tue, 30 Jan 2024 13:49:12 -0600 Subject: [PATCH] Remove VirtualThreadPinEventMonitor --- .../WhisperServerConfiguration.java | 10 -- .../textsecuregcm/WhisperServerService.java | 8 -- .../VirtualThreadConfiguration.java | 9 -- .../dynamic/DynamicConfiguration.java | 10 +- .../DynamicVirtualThreadConfiguration.java | 10 -- .../util/VirtualThreadPinEventMonitor.java | 97 ---------------- .../VirtualThreadPinEventMonitorTest.java | 107 ------------------ 7 files changed, 1 insertion(+), 250 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index dbca46e6a..7b32ce23b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -56,7 +56,6 @@ import org.whispersystems.textsecuregcm.configuration.TlsKeyStoreConfiguration; import org.whispersystems.textsecuregcm.configuration.TurnSecretConfiguration; import org.whispersystems.textsecuregcm.configuration.UnidentifiedDeliveryConfiguration; -import org.whispersystems.textsecuregcm.configuration.VirtualThreadConfiguration; import org.whispersystems.textsecuregcm.configuration.ZkConfig; import org.whispersystems.textsecuregcm.limits.RateLimiterConfig; import org.whispersystems.websocket.configuration.WebSocketConfiguration; @@ -317,11 +316,6 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private LinkDeviceSecretConfiguration linkDevice; - @Valid - @NotNull - @JsonProperty - private VirtualThreadConfiguration virtualThreadConfiguration = new VirtualThreadConfiguration(Duration.ofMillis(1)); - public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() { return tlsKeyStore; } @@ -533,8 +527,4 @@ public MessageByteLimitCardinalityEstimatorConfiguration getMessageByteLimitCard public LinkDeviceSecretConfiguration getLinkDeviceSecretConfiguration() { return linkDevice; } - - public VirtualThreadConfiguration getVirtualThreadConfiguration() { - return virtualThreadConfiguration; - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f6aa1a83a..f7287b7d7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -212,7 +212,6 @@ import org.whispersystems.textsecuregcm.util.ManagedAwsCrt; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier; -import org.whispersystems.textsecuregcm.util.VirtualThreadPinEventMonitor; import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; @@ -435,8 +434,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro .executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build(); ExecutorService storageServiceExecutor = environment.lifecycle() .executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); - ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle() - .executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build(); ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build(); ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle() @@ -632,10 +629,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds()); CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), recurringJobExecutor, Clock.systemUTC()); - VirtualThreadPinEventMonitor virtualThreadPinEventMonitor = new VirtualThreadPinEventMonitor( - virtualThreadEventLoggerExecutor, - () -> dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(), - config.getVirtualThreadConfiguration().pinEventThreshold()); environment.lifecycle().manage(apnSender); environment.lifecycle().manage(apnPushNotificationScheduler); @@ -645,7 +638,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro environment.lifecycle().manage(currencyManager); environment.lifecycle().manage(registrationServiceClient); environment.lifecycle().manage(clientReleaseManager); - environment.lifecycle().manage(virtualThreadPinEventMonitor); final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker, rateLimiters, config.getTestDevices(), dynamicConfigurationManager); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java deleted file mode 100644 index 2e81aaf31..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.configuration; - -import java.time.Duration; - -public record VirtualThreadConfiguration(Duration pinEventThreshold) {} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java index 658051454..d4f601591 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java @@ -55,13 +55,10 @@ public class DynamicConfiguration { @Valid DynamicInboundMessageByteLimitConfiguration inboundMessageByteLimit = new DynamicInboundMessageByteLimitConfiguration(true); - @JsonProperty - @Valid - DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false); @JsonProperty @Valid - DynamicVirtualThreadConfiguration virtualThreads = new DynamicVirtualThreadConfiguration(Collections.emptySet()); + DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false); public Optional getExperimentEnrollmentConfiguration( final String experimentName) { @@ -108,9 +105,4 @@ public DynamicInboundMessageByteLimitConfiguration getInboundMessageByteLimitCon public DynamicRegistrationConfiguration getRegistrationConfiguration() { return registrationConfiguration; } - - public DynamicVirtualThreadConfiguration getVirtualThreads() { - return virtualThreads; - } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java deleted file mode 100644 index 045efa5e8..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration.dynamic; - -import java.util.Set; - -public record DynamicVirtualThreadConfiguration(Set allowedPinEvents) {} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java deleted file mode 100644 index 2f161a22b..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.util; - -import com.google.common.annotations.VisibleForTesting; -import io.dropwizard.lifecycle.Managed; -import io.micrometer.core.instrument.Metrics; -import java.time.Duration; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Supplier; -import jdk.jfr.consumer.RecordedEvent; -import jdk.jfr.consumer.RecordedFrame; -import jdk.jfr.consumer.RecordingStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.metrics.MetricsUtil; - -/** - * Watches for JFR events indicating that a virtual thread was pinned - */ -public class VirtualThreadPinEventMonitor implements Managed { - - private static final Logger logger = LoggerFactory.getLogger(VirtualThreadPinEventMonitor.class); - private static final String PIN_COUNTER_NAME = MetricsUtil.name(VirtualThreadPinEventMonitor.class, "virtualThreadPinned"); - private static final String JFR_THREAD_PINNED_EVENT_NAME = "jdk.VirtualThreadPinned"; - private static final long MAX_JFR_REPOSITORY_SIZE = 1024 * 1024 * 4L; // 4MiB - - private final ExecutorService executorService; - private final Supplier> allowList; - private final Duration pinEventThreshold; - private final RecordingStream recordingStream; - - private BiConsumer pinEventConsumer; - - @VisibleForTesting - VirtualThreadPinEventMonitor( - final ExecutorService executorService, - final Supplier> allowList, - final Duration pinEventThreshold, - final BiConsumer pinEventConsumer) { - this.executorService = executorService; - this.allowList = allowList; - this.pinEventThreshold = pinEventThreshold; - this.pinEventConsumer = pinEventConsumer; - this.recordingStream = new RecordingStream(); - } - public VirtualThreadPinEventMonitor( - final ExecutorService executorService, - final Supplier> allowList, - final Duration pinEventThreshold) { - this(executorService, allowList, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent); - } - - @Override - public void start() { - recordingStream.setMaxSize(MAX_JFR_REPOSITORY_SIZE); - recordingStream.enable(JFR_THREAD_PINNED_EVENT_NAME).withThreshold(pinEventThreshold).withStackTrace(); - recordingStream.onEvent(event -> pinEventConsumer.accept(event, allowed(event))); - executorService.submit(() -> recordingStream.start()); - } - - @Override - public void stop() throws InterruptedException { - // flushes events and waits for callbacks to finish - recordingStream.stop(); - // immediately frees all resources - recordingStream.close(); - } - - private static void processPinEvent(final RecordedEvent event, final boolean allowedPinEvent) { - if (allowedPinEvent) { - logger.info("Long allowed virtual thread pin event detected", event); - } else { - logger.error("Long forbidden virtual thread pin event detected", event); - } - Metrics.counter(PIN_COUNTER_NAME, "allowed", String.valueOf(allowedPinEvent)).increment(); - } - - private boolean allowed(final RecordedEvent event) { - final Set allowedMethodFrames = allowList.get(); - for (RecordedFrame st : event.getStackTrace().getFrames()) { - if (!st.isJavaFrame()) { - continue; - } - final String qualifiedName = "%s.%s".formatted(st.getMethod().getType().getName(), st.getMethod().getName()); - if (allowedMethodFrames.stream().anyMatch(qualifiedName::contains)) { - return true; - } - } - return false; - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java deleted file mode 100644 index 137d50723..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.util; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import jdk.jfr.consumer.RecordedEvent; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - - -public class VirtualThreadPinEventMonitorTest { - private void synchronizedSleep1() { - synchronized (this) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - private void synchronizedSleep2() { - synchronized (this) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Test - @Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event") - public void testPinEventProduced() throws InterruptedException, ExecutionException { - final BlockingQueue> bq = new LinkedBlockingQueue<>(); - final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); - VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(), bq); - eventMonitor.start(); - // give start a moment to begin the event stream thread - Thread.sleep(100); - exec.submit(() -> synchronizedSleep1()).get(); - eventMonitor.stop(); - - final Pair event = bq.poll(1, TimeUnit.SECONDS); - assertThat(event).isNotNull(); - assertThat(event.getRight()).isFalse(); - assertThat(bq.isEmpty()); - exec.shutdown(); - exec.awaitTermination(1, TimeUnit.MILLISECONDS); - } - - @ParameterizedTest - @ValueSource(strings = {"VirtualThreadPinEventMonitorTest.synchronizedSleep1", "synchronizedSleep1"}) - @Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event") - public void testPinEventFiltered(final String allowString) throws InterruptedException, ExecutionException { - final BlockingQueue> bq = new LinkedBlockingQueue<>(); - final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); - final VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(allowString), bq); - eventMonitor.start(); - // give start a moment to begin the event stream thread - Thread.sleep(100); - exec.submit(() -> synchronizedSleep1()).get(); - exec.submit(() -> synchronizedSleep2()).get(); - eventMonitor.stop(); - - final Pair sleep1Event = bq.poll(1, TimeUnit.SECONDS); - final Pair sleep2Event = bq.poll(1, TimeUnit.SECONDS); - assertThat(sleep1Event).isNotNull(); - assertThat(sleep2Event).isNotNull(); - assertThat(sleep1Event.getRight()).isTrue(); - assertThat(sleep2Event.getRight()).isFalse(); - assertThat(bq.isEmpty()); - exec.shutdown(); - exec.awaitTermination(1, TimeUnit.MILLISECONDS); - } - - private static VirtualThreadPinEventMonitor queueingLogger( - final ExecutorService exec, - final Set allowedMethods, - final BlockingQueue> bq) { - return new VirtualThreadPinEventMonitor(exec, - () -> allowedMethods, - Duration.ofMillis(1), - (event, allowed) -> { - try { - bq.put(Pair.of(event, allowed)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - - } -}