Skip to content

Commit

Permalink
Merge branch 'master' into datafeed-keys-master
Browse files Browse the repository at this point in the history
  • Loading branch information
at055612 committed Jan 28, 2025
2 parents ea9c196 + 62b0ce0 commit 3c9b2d2
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import stroom.meta.shared.SimpleMetaImpl;
import stroom.security.mock.MockSecurityContextModule;
import stroom.task.mock.MockTaskModule;
import stroom.test.common.MockMetricsModule;
import stroom.test.common.util.db.DbTestModule;
import stroom.test.common.util.guice.AbstractTestModule;
import stroom.util.entityevent.EntityEventBus;
Expand Down Expand Up @@ -72,6 +73,7 @@ protected void configure() {
new MockClusterLockModule(),
new MockTaskModule(),
new MockSecurityContextModule(),
new MockMetricsModule(),
new CacheModule(),
new CacheServiceModule(),
new DbTestModule())
Expand Down
2 changes: 2 additions & 0 deletions stroom-app/src/test/java/stroom/receive/TestBaseModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import stroom.security.api.UserIdentity;
import stroom.security.mock.MockSecurityContextModule;
import stroom.task.impl.TaskContextModule;
import stroom.test.common.MockMetricsModule;
import stroom.util.entityevent.EntityEventBus;
import stroom.util.pipeline.scope.PipelineScopeModule;

Expand All @@ -45,6 +46,7 @@ protected void configure() {
install(new MemoryPersistenceModule());
install(new MockMetaModule());
install(new MockMetaStatisticsModule());
install(new MockMetricsModule());
install(new MockNodeServiceModule());
install(new MockSecurityContextModule());
install(new MockStreamStoreModule());
Expand Down
1 change: 1 addition & 0 deletions stroom-headless/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
implementation project(':stroom-util')
implementation project(':stroom-util-shared')

implementation libs.dropwizard_metrics_core
implementation libs.guice
implementation libs.jackson_annotations
implementation libs.jakarta_el
Expand Down
5 changes: 5 additions & 0 deletions stroom-headless/src/main/java/stroom/headless/CliModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@
import stroom.util.io.StreamCloser;
import stroom.util.io.StroomPathConfig;
import stroom.util.jersey.MockJerseyModule;
import stroom.util.metrics.Metrics;
import stroom.util.metrics.MetricsImpl;
import stroom.util.pipeline.scope.PipelineScopeModule;
import stroom.util.pipeline.scope.PipelineScoped;
import stroom.util.servlet.MockServletModule;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;

Expand Down Expand Up @@ -120,6 +123,8 @@ protected void configure() {

// Only needed for feed import so not an issue for Cli
bind(FsVolumeGroupService.class).to(MockFsVolumeGroupService.class);

bind(Metrics.class).toInstance(new MetricsImpl(new MetricRegistry()));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ class TestMetaValueDaoImpl {

@BeforeEach
void setup() {
final AbstractModule module = new AbstractModule() {
@Override
protected void configure() {
bind(MetaServiceConfig.class).toProvider(() ->
getMetaServiceConfig());
bind(MetaValueConfig.class).toProvider(() ->
getMetaValueConfig());
}
};

Guice.createInjector(
new MetaModule(),
new MetaDbModule(),
Expand All @@ -77,15 +87,7 @@ void setup() {
new MetaTestModule(),
new MockTaskModule(),
new MockStroomEventLoggingModule(),
new AbstractModule() {
@Override
protected void configure() {
bind(MetaServiceConfig.class).toProvider(() ->
getMetaServiceConfig());
bind(MetaValueConfig.class).toProvider(() ->
getMetaValueConfig());
}
})
module)
.injectMembers(this);
setAddAsync(false);
// Delete everything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.string.StringIdUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.concurrent.atomic.AtomicLong;

@Singleton
public class CleanupDirQueue {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(CleanupDirQueue.class);

private final Path dir;
private final AtomicLong count = new AtomicLong();

@Inject
CleanupDirQueue(final DataDirProvider dataDirProvider) {
Expand All @@ -31,12 +32,12 @@ public class CleanupDirQueue {
public void add(final Path sourceDir) {
try {
// We will move before delete to help ensure we don't end up partially deleting dir contents in place.
final Path deleteDir = dir.resolve(sourceDir.getFileName());
// Make sure we get a unique dir name.
final Path deleteDir = dir.resolve(StringIdUtil.idToString(count.incrementAndGet()));
Files.move(sourceDir, deleteDir, StandardCopyOption.ATOMIC_MOVE);
FileUtil.deleteDir(deleteDir);
} catch (final IOException e) {
} catch (final Exception e) {
LOGGER.error(() -> "Failed to cleanup dir: " + FileUtil.getCanonicalPath(sourceDir), e);
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ForwardHttpPostConfig() {
retryDelay = DEFAULT_RETRY_DELAY;
maxRetries = DEFAULT_MAX_RETRIES;
addOpenIdAccessToken = false;
httpClient = new HttpClientConfiguration();
httpClient = createDefaultHttpClientConfiguration();
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -74,13 +74,17 @@ public ForwardHttpPostConfig(@JsonProperty("enabled") final boolean enabled,
? DEFAULT_MAX_RETRIES
: maxRetries;
this.addOpenIdAccessToken = addOpenIdAccessToken;
this.httpClient = Objects.requireNonNullElse(httpClient, HttpClientConfiguration
this.httpClient = Objects.requireNonNullElse(httpClient, createDefaultHttpClientConfiguration());
}

private HttpClientConfiguration createDefaultHttpClientConfiguration() {
return HttpClientConfiguration
.builder()
.timeout(DEFAULT_FORWARD_TIMEOUT)
.connectionTimeout(DEFAULT_FORWARD_TIMEOUT)
.connectionRequestTimeout(DEFAULT_FORWARD_TIMEOUT)
.timeToLive(DEFAULT_FORWARD_TIMEOUT)
.build());
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,19 @@ private int post(final HttpPost httpPost,
final AttributeMap attributeMap) {
// Execute and get the response.
try {
if (!forwardDelay.isZero()) {
LOGGER.trace("'{}' - adding delay {}", forwarderName, forwardDelay);
ThreadUtil.sleep(forwardDelay);
}

return httpClient.execute(httpPost, response -> {
try {
if (!forwardDelay.isZero()) {
LOGGER.trace("'{}' - adding delay {}", forwarderName, forwardDelay);
ThreadUtil.sleep(forwardDelay);
}

LOGGER.debug(() -> LogUtil.message("'{}' - Closing stream, response header fields:\n{}",
forwarderName,
formatHeaderEntryListForLogging(response.getHeaders())));
} finally {
logResponse(startTime, response, attributeMap);
}

return response.getCode();
});
} catch (final RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


@ResourceLock(TestResourceLocks.STROOM_APP_PORT_8080)
public class AbstractEndToEndTest extends AbstractApplicationTest {
public abstract class AbstractEndToEndTest extends AbstractApplicationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEndToEndTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import stroom.util.json.JsonUtil;
import stroom.util.logging.AsciiTable;
import stroom.util.logging.AsciiTable.Column;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.LogUtil;
import stroom.util.shared.ResourcePaths;
import stroom.util.time.StroomDuration;
Expand All @@ -40,8 +42,6 @@
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -57,12 +57,13 @@
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

public class MockHttpDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(MockHttpDestination.class);
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MockHttpDestination.class);

private static final int DEFAULT_STROOM_PORT = 8080;

Expand All @@ -74,6 +75,9 @@ public class MockHttpDestination {
// Hold all requests send to the wiremock stroom datafeed endpoint
private final List<DataFeedRequest> dataFeedRequests = new ArrayList<>();

private final ThreadLocal<Long> responseTimes = new ThreadLocal<>();
private final AtomicInteger count = new AtomicInteger();

WireMockExtension createExtension() {
return WireMockExtension.newInstance()
.options(WireMockConfiguration.wireMockConfig().port(DEFAULT_STROOM_PORT))
Expand All @@ -83,17 +87,34 @@ public String getName() {
return "Request logging action";
}

@Override
public void beforeResponseSent(final ServeEvent serveEvent, final Parameters parameters) {
responseTimes.set(System.currentTimeMillis());
}

@Override
public void afterComplete(final ServeEvent serveEvent, final Parameters parameters) {
if (serveEvent.getResponse().getStatus() == 200) {
if (isRequestLoggingEnabled) {
dumpWireMockEvent(serveEvent);
}
if (serveEvent.getRequest().getUrl().equals(getDataFeedPath())) {
captureDataFeedRequest(serveEvent);
try {
if (serveEvent.getResponse().getStatus() == 200) {
if (isRequestLoggingEnabled) {
dumpWireMockEvent(serveEvent);
}
if (serveEvent.getRequest().getUrl().equals(getDataFeedPath())) {
captureDataFeedRequest(serveEvent);
}
} else {
LOGGER.error(serveEvent.toString());
}
} else {
LOGGER.error(serveEvent.toString());

} finally {
final long startTime = responseTimes.get();
responseTimes.remove();
LOGGER.info(() -> "Responding " +
serveEvent.getResponse().getStatus() +
" after " +
Duration.ofMillis(System.currentTimeMillis() - startTime).toString() +
" count = " +
count.incrementAndGet());
}
}
}))
Expand Down
Loading

0 comments on commit 3c9b2d2

Please sign in to comment.