Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public static AttributeMap create(final HttpServletRequest httpServletRequest,
return attributeMap;
}

public static void addReceiptInfo(final AttributeMap attributeMap,
final UniqueId receiptId) {
addReceiptInfo(attributeMap, Instant.now(), receiptId);
}

public static void addReceiptInfo(final AttributeMap attributeMap,
final Instant receiveTime,
final UniqueId receiptId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package stroom.proxy.app;

import stroom.util.config.annotations.RequiresProxyRestart;
import stroom.util.shared.AbstractConfig;
import stroom.util.shared.IsProxyConfig;
import stroom.util.shared.NullSafe;
import stroom.util.time.StroomDuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import java.util.List;
import java.util.Objects;

/**
* Config to control the scanning of one or more directories to ingest data in proxy zip format
*/
@JsonPropertyOrder(alphabetic = true)
public class DirScannerConfig extends AbstractConfig implements IsProxyConfig {

public static final StroomDuration DEFAULT_SCAN_FREQUENCY = StroomDuration.ofMinutes(1);
public static final boolean DEFAULT_ENABLED_STATE = true;
public static final List<String> DEFAULT_DIRS = List.of("zip_file_ingest");
public static final String DEFAULT_FAILURE_DIR = "zip_file_ingest_failed";

private final List<String> dirs;
private final String failureDir;
private final boolean enabled;
private final StroomDuration scanFrequency;

public DirScannerConfig() {
this.enabled = DEFAULT_ENABLED_STATE;
this.dirs = DEFAULT_DIRS;
this.failureDir = DEFAULT_FAILURE_DIR;
this.scanFrequency = DEFAULT_SCAN_FREQUENCY;
}

@JsonCreator
public DirScannerConfig(@JsonProperty("dirs") final List<String> dirs,
@JsonProperty("failureDir") final String failureDir,
@JsonProperty("enabled") final Boolean enabled,
@JsonProperty("scanFrequency") final StroomDuration scanFrequency) {
this.dirs = Objects.requireNonNullElse(dirs, DEFAULT_DIRS);
this.failureDir = Objects.requireNonNullElse(failureDir, DEFAULT_FAILURE_DIR);
this.enabled = Objects.requireNonNullElse(enabled, DEFAULT_ENABLED_STATE);
this.scanFrequency = Objects.requireNonNullElse(scanFrequency, DEFAULT_SCAN_FREQUENCY);
}

@JsonProperty
@JsonPropertyDescription("The list of directories to scan for proxy format ZIP files. " +
"The dirs will be scanned in the order they appear in this list. No guarantee is " +
"made about the order in which ZIP files are scanned within each dir.")
public List<String> getDirs() {
return NullSafe.list(dirs);
}

@RequiresProxyRestart
@JsonProperty
@JsonPropertyDescription("The directory where ZIPs will be moved to if they could not be ingested.")
public String getFailureDir() {
return failureDir;
}

@JsonProperty
@JsonPropertyDescription("Whether scanning of the directories for proxy format ZIP files is enabled or not.")
public boolean isEnabled() {
return enabled;
}

@RequiresProxyRestart
@JsonProperty
@JsonPropertyDescription("The frequency at which scans of the directories will occur. All directories will " +
"be scanned on each run.")
public StroomDuration getScanFrequency() {
return scanFrequency;
}

@Override
public String toString() {
return "DirScannerConfig{" +
"dirs=" + dirs +
", enabled=" + enabled +
", scanFrequency=" + scanFrequency +
'}';
}

@Override
public boolean equals(final Object object) {
if (this == object) {
return true;
}
if (object == null || getClass() != object.getClass()) {
return false;
}
final DirScannerConfig that = (DirScannerConfig) object;
return enabled == that.enabled && Objects.equals(dirs, that.dirs) && Objects.equals(
scanFrequency,
that.scanFrequency);
}

@Override
public int hashCode() {
return Objects.hash(dirs, enabled, scanFrequency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ProxyConfig extends AbstractConfig implements IsProxyConfig {
public static final String PROP_NAME_RECEIVE = "receive";
public static final String PROP_NAME_EVENT_STORE = "eventStore";
public static final String PROP_NAME_AGGREGATOR = "aggregator";
public static final String PROP_NAME_DIR_SCANNER = "dirScanner";
public static final String PROP_NAME_FORWARD_FILE_DESTINATIONS = "forwardFileDestinations";
public static final String PROP_NAME_FORWARD_HTTP_DESTINATIONS = "forwardHttpDestinations";
public static final String PROP_NAME_LOG_STREAM = "logStream";
Expand All @@ -72,6 +73,7 @@ public class ProxyConfig extends AbstractConfig implements IsProxyConfig {
private final ReceiveDataConfig receiveDataConfig;
private final EventStoreConfig eventStoreConfig;
private final AggregatorConfig aggregatorConfig;
private final DirScannerConfig dirScannerConfig;
private final List<ForwardFileConfig> forwardFileDestinations;
private final List<ForwardHttpPostConfig> forwardHttpDestinations;
private final LogStreamConfig logStreamConfig;
Expand All @@ -89,6 +91,7 @@ public ProxyConfig() {
new ReceiveDataConfig(),
new EventStoreConfig(),
new AggregatorConfig(),
new DirScannerConfig(),
new ArrayList<>(),
new ArrayList<>(),
new LogStreamConfig(),
Expand All @@ -109,6 +112,7 @@ public ProxyConfig(
@JsonProperty(PROP_NAME_RECEIVE) final ReceiveDataConfig receiveDataConfig,
@JsonProperty(PROP_NAME_EVENT_STORE) final EventStoreConfig eventStoreConfig,
@JsonProperty(PROP_NAME_AGGREGATOR) final AggregatorConfig aggregatorConfig,
@JsonProperty(PROP_NAME_DIR_SCANNER) final DirScannerConfig dirScannerConfig,
@JsonProperty(PROP_NAME_FORWARD_FILE_DESTINATIONS) final List<ForwardFileConfig> forwardFileDestinations,
@JsonProperty(PROP_NAME_FORWARD_HTTP_DESTINATIONS) final List<ForwardHttpPostConfig> forwardHttpDestinations,
@JsonProperty(PROP_NAME_LOG_STREAM) final LogStreamConfig logStreamConfig,
Expand All @@ -125,6 +129,7 @@ public ProxyConfig(
this.receiveDataConfig = receiveDataConfig;
this.eventStoreConfig = eventStoreConfig;
this.aggregatorConfig = aggregatorConfig;
this.dirScannerConfig = dirScannerConfig;
this.forwardFileDestinations = forwardFileDestinations;
this.forwardHttpDestinations = forwardHttpDestinations;
this.logStreamConfig = logStreamConfig;
Expand Down Expand Up @@ -184,6 +189,11 @@ public AggregatorConfig getAggregatorConfig() {
return aggregatorConfig;
}

@JsonProperty(PROP_NAME_DIR_SCANNER)
public DirScannerConfig getDirScannerConfig() {
return dirScannerConfig;
}

@RequiresProxyRestart
@JsonProperty(PROP_NAME_FORWARD_FILE_DESTINATIONS)
public List<ForwardFileConfig> getForwardFileDestinations() {
Expand Down Expand Up @@ -352,14 +362,15 @@ public static class Builder {
private ReceiveDataConfig receiveDataConfig = new ReceiveDataConfig();
private EventStoreConfig eventStoreConfig = new EventStoreConfig();
private AggregatorConfig aggregatorConfig = new AggregatorConfig();
private DirScannerConfig dirScannerConfig = new DirScannerConfig();
private final List<ForwardFileConfig> forwardFileDestinations = new ArrayList<>();
private final List<ForwardHttpPostConfig> forwardHttpDestinations = new ArrayList<>();
private LogStreamConfig logStreamConfig = new LogStreamConfig();
private ContentSyncConfig contentSyncConfig = new ContentSyncConfig();
private FeedStatusConfig feedStatusConfig = new FeedStatusConfig();
private ThreadConfig threadConfig = new ThreadConfig();
private ProxySecurityConfig proxySecurityConfig = new ProxySecurityConfig();
private List<SqsConnectorConfig> sqsConnectors = new ArrayList<>();
private final List<SqsConnectorConfig> sqsConnectors = new ArrayList<>();

private Builder() {

Expand Down Expand Up @@ -400,6 +411,11 @@ public Builder aggregatorConfig(final AggregatorConfig aggregatorConfig) {
return this;
}

public Builder dirScannerConfig(final DirScannerConfig dirScannerConfig) {
this.dirScannerConfig = dirScannerConfig;
return this;
}

public Builder addForwardFileDestination(final ForwardFileConfig forwarderFileConfig) {
this.forwardFileDestinations.add(forwarderFileConfig);
return this;
Expand Down Expand Up @@ -465,6 +481,7 @@ public ProxyConfig build() {
receiveDataConfig,
eventStoreConfig,
aggregatorConfig,
dirScannerConfig,
forwardFileDestinations,
forwardHttpDestinations,
logStreamConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import stroom.proxy.app.event.EventStore;
import stroom.proxy.app.event.EventStoreConfig;
import stroom.proxy.app.handler.ZipDirScanner;
import stroom.proxy.repo.ProxyServices;
import stroom.receive.common.ReceiptIdGenerator;

Expand All @@ -15,22 +16,30 @@ public class ProxyLifecycle implements Managed {

@Inject
public ProxyLifecycle(final ProxyConfig proxyConfig,
final EventStoreConfig eventStoreConfig,
final Provider<EventStore> eventStoreProvider,
final ZipDirScanner zipDirScanner,
final ProxyServices proxyServices,
final ReceiptIdGenerator receiptIdGenerator) {
this.proxyServices = proxyServices;
final EventStoreConfig eventStoreConfig = proxyConfig.getEventStoreConfig();
final DirScannerConfig dirScannerConfig = proxyConfig.getDirScannerConfig();

// Add executor to roll event store.
final EventStore eventStore = eventStoreProvider.get();
proxyServices.addFrequencyExecutor("Event Store - roll",
() -> eventStore::tryRoll,
eventStoreConfig.getRollFrequency().toMillis());

// Add executor to forward event store.
proxyServices.addFrequencyExecutor("Event Store - forward",
() -> eventStore::forwardAll,
eventStoreConfig.getRollFrequency().toMillis());

// Add executor to scan dirs for proxy zips.
proxyServices.addFrequencyExecutor("ZIP Dir Scanner",
() -> zipDirScanner::scan,
dirScannerConfig.getScanFrequency().toMillis());

if (proxyConfig.getSqsConnectors() != null) {
for (final SqsConnectorConfig sqsConnectorConfig : proxyConfig.getSqsConnectors()) {
final SqsConnector sqsConnector = new SqsConnector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ stroom.proxy.app.ContentSyncConfig getContentSyncConfig(
stroom.proxy.app.ContentSyncConfig.class);
}

@Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.proxy.app.DirScannerConfig getDirScannerConfig(
final ProxyConfigProvider proxyConfigProvider) {
return proxyConfigProvider.getConfigObject(
stroom.proxy.app.DirScannerConfig.class);
}

@Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
Expand Down Expand Up @@ -175,6 +184,16 @@ stroom.proxy.app.handler.ForwardQueueConfig getForwardQueueConfigButThrow(
+ "Inject a config class that uses it or one of its sub-class instead.");
}

@Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
stroom.proxy.app.handler.PathTemplateConfig getPathTemplateConfigButThrow(
final ProxyConfigProvider proxyConfigProvider) {
throw new UnsupportedOperationException(
"stroom.proxy.app.handler.PathTemplateConfig cannot be injected directly. "
+ "Inject a config class that uses it or one of its sub-class instead.");
}

@Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule")
@Provides
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,19 @@ public static String makeSafeName(final FeedKey feedKey) {
return sb.toString();
}

/**
* Creates a provider of unique paths such that each directory never contains more than 999 items.
* <p>
* See also {@link NumberedDirProvider} for a non-nested directory structure.
* </p>
* <p>
* e.g. {@code root_path/2/333/555/333555777}
* </p>
*/
public static NestedNumberedDirProvider createNestedNumberedDirProvider(final Path root) {
return new NestedNumberedDirProvider(root);
}


// --------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

public class FileGroup {

static final String META_FILE = "proxy.meta";
static final String ZIP_FILE = "proxy.zip";
static final String BASE_FILENAME = "proxy";
static final String META_EXTENSION = "meta";
static final String ZIP_EXTENSION = "zip";
static final String ENTRIES_EXTENSION = "entries";
static final String ENTRIES_FILE = "proxy." + ENTRIES_EXTENSION;
static final String META_FILE = BASE_FILENAME + "." + META_EXTENSION;
static final String ZIP_FILE = BASE_FILENAME + "." + ZIP_EXTENSION;
static final String ENTRIES_FILE = BASE_FILENAME + "." + ENTRIES_EXTENSION;

private final Path parentDir;
private final Path zip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private void move(final Path source, final Path target) throws IOException {
if (!Files.exists(source)) {
throw e;
}
DirUtil.ensureDirExists(target.getParent());
Files.createDirectories(target.getParent());
}
}
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package stroom.proxy.app.handler;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
* A provider of unique paths such that each directory never contains more than 999 items.
*/
public class NestedNumberedDirProvider {

private final AtomicLong dirId;
private final Path root;

NestedNumberedDirProvider(final Path root) {
this.root = Objects.requireNonNull(root);
this.dirId = new AtomicLong(DirUtil.getMaxDirId(root));
}

public static NestedNumberedDirProvider create(final Path root) {
return new NestedNumberedDirProvider(root);
}

/**
* Each call to this creates a unique subdirectory of the root path.
* <p>
* e.g. {@code root_path/2/333/555/333555777}
* </p>
*
* @throws UncheckedIOException If the new path cannot be created.
*/
public Path createNumberedPath() {
final Path path = DirUtil.createPath(root, dirId.incrementAndGet());
try {
Files.createDirectories(path);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return path;
}

@Override
public String toString() {
return "NumberedDirProvider{" +
"dirId=" + dirId +
", root=" + root +
'}';
}
}
Loading