diff --git a/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/AttributeMapUtil.java b/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/AttributeMapUtil.java index 03ffe3c525..4c6fbcbc25 100644 --- a/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/AttributeMapUtil.java +++ b/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/AttributeMapUtil.java @@ -143,6 +143,11 @@ public static AttributeMap create(final HttpServletRequest httpServletRequest, return attributeMap; } + public static void addReceiptInfo(final AttributeMap attributeMap, + final UniqueId receiptId) { + addReceiptInfo(attributeMap, Instant.now(), receiptId); + } + public static void addReceiptInfo(final AttributeMap attributeMap, final Instant receiveTime, final UniqueId receiptId) { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/DirScannerConfig.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/DirScannerConfig.java new file mode 100644 index 0000000000..10510b8bd2 --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/DirScannerConfig.java @@ -0,0 +1,107 @@ +package stroom.proxy.app; + +import stroom.util.config.annotations.RequiresProxyRestart; +import stroom.util.shared.AbstractConfig; +import stroom.util.shared.IsProxyConfig; +import stroom.util.shared.NullSafe; +import stroom.util.time.StroomDuration; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +import java.util.List; +import java.util.Objects; + +/** + * Config to control the scanning of one or more directories to ingest data in proxy zip format + */ +@JsonPropertyOrder(alphabetic = true) +public class DirScannerConfig extends AbstractConfig implements IsProxyConfig { + + public static final StroomDuration DEFAULT_SCAN_FREQUENCY = StroomDuration.ofMinutes(1); + public static final boolean DEFAULT_ENABLED_STATE = true; + public static final List DEFAULT_DIRS = List.of("zip_file_ingest"); + public static final String DEFAULT_FAILURE_DIR = "zip_file_ingest_failed"; + + private final List dirs; + private final String failureDir; + private final boolean enabled; + private final StroomDuration scanFrequency; + + public DirScannerConfig() { + this.enabled = DEFAULT_ENABLED_STATE; + this.dirs = DEFAULT_DIRS; + this.failureDir = DEFAULT_FAILURE_DIR; + this.scanFrequency = DEFAULT_SCAN_FREQUENCY; + } + + @JsonCreator + public DirScannerConfig(@JsonProperty("dirs") final List dirs, + @JsonProperty("failureDir") final String failureDir, + @JsonProperty("enabled") final Boolean enabled, + @JsonProperty("scanFrequency") final StroomDuration scanFrequency) { + this.dirs = Objects.requireNonNullElse(dirs, DEFAULT_DIRS); + this.failureDir = Objects.requireNonNullElse(failureDir, DEFAULT_FAILURE_DIR); + this.enabled = Objects.requireNonNullElse(enabled, DEFAULT_ENABLED_STATE); + this.scanFrequency = Objects.requireNonNullElse(scanFrequency, DEFAULT_SCAN_FREQUENCY); + } + + @JsonProperty + @JsonPropertyDescription("The list of directories to scan for proxy format ZIP files. " + + "The dirs will be scanned in the order they appear in this list. No guarantee is " + + "made about the order in which ZIP files are scanned within each dir.") + public List getDirs() { + return NullSafe.list(dirs); + } + + @RequiresProxyRestart + @JsonProperty + @JsonPropertyDescription("The directory where ZIPs will be moved to if they could not be ingested.") + public String getFailureDir() { + return failureDir; + } + + @JsonProperty + @JsonPropertyDescription("Whether scanning of the directories for proxy format ZIP files is enabled or not.") + public boolean isEnabled() { + return enabled; + } + + @RequiresProxyRestart + @JsonProperty + @JsonPropertyDescription("The frequency at which scans of the directories will occur. All directories will " + + "be scanned on each run.") + public StroomDuration getScanFrequency() { + return scanFrequency; + } + + @Override + public String toString() { + return "DirScannerConfig{" + + "dirs=" + dirs + + ", enabled=" + enabled + + ", scanFrequency=" + scanFrequency + + '}'; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final DirScannerConfig that = (DirScannerConfig) object; + return enabled == that.enabled && Objects.equals(dirs, that.dirs) && Objects.equals( + scanFrequency, + that.scanFrequency); + } + + @Override + public int hashCode() { + return Objects.hash(dirs, enabled, scanFrequency); + } +} diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyConfig.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyConfig.java index a97bf10109..39e25e98e3 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyConfig.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyConfig.java @@ -51,6 +51,7 @@ public class ProxyConfig extends AbstractConfig implements IsProxyConfig { public static final String PROP_NAME_RECEIVE = "receive"; public static final String PROP_NAME_EVENT_STORE = "eventStore"; public static final String PROP_NAME_AGGREGATOR = "aggregator"; + public static final String PROP_NAME_DIR_SCANNER = "dirScanner"; public static final String PROP_NAME_FORWARD_FILE_DESTINATIONS = "forwardFileDestinations"; public static final String PROP_NAME_FORWARD_HTTP_DESTINATIONS = "forwardHttpDestinations"; public static final String PROP_NAME_LOG_STREAM = "logStream"; @@ -72,6 +73,7 @@ public class ProxyConfig extends AbstractConfig implements IsProxyConfig { private final ReceiveDataConfig receiveDataConfig; private final EventStoreConfig eventStoreConfig; private final AggregatorConfig aggregatorConfig; + private final DirScannerConfig dirScannerConfig; private final List forwardFileDestinations; private final List forwardHttpDestinations; private final LogStreamConfig logStreamConfig; @@ -89,6 +91,7 @@ public ProxyConfig() { new ReceiveDataConfig(), new EventStoreConfig(), new AggregatorConfig(), + new DirScannerConfig(), new ArrayList<>(), new ArrayList<>(), new LogStreamConfig(), @@ -109,6 +112,7 @@ public ProxyConfig( @JsonProperty(PROP_NAME_RECEIVE) final ReceiveDataConfig receiveDataConfig, @JsonProperty(PROP_NAME_EVENT_STORE) final EventStoreConfig eventStoreConfig, @JsonProperty(PROP_NAME_AGGREGATOR) final AggregatorConfig aggregatorConfig, + @JsonProperty(PROP_NAME_DIR_SCANNER) final DirScannerConfig dirScannerConfig, @JsonProperty(PROP_NAME_FORWARD_FILE_DESTINATIONS) final List forwardFileDestinations, @JsonProperty(PROP_NAME_FORWARD_HTTP_DESTINATIONS) final List forwardHttpDestinations, @JsonProperty(PROP_NAME_LOG_STREAM) final LogStreamConfig logStreamConfig, @@ -125,6 +129,7 @@ public ProxyConfig( this.receiveDataConfig = receiveDataConfig; this.eventStoreConfig = eventStoreConfig; this.aggregatorConfig = aggregatorConfig; + this.dirScannerConfig = dirScannerConfig; this.forwardFileDestinations = forwardFileDestinations; this.forwardHttpDestinations = forwardHttpDestinations; this.logStreamConfig = logStreamConfig; @@ -184,6 +189,11 @@ public AggregatorConfig getAggregatorConfig() { return aggregatorConfig; } + @JsonProperty(PROP_NAME_DIR_SCANNER) + public DirScannerConfig getDirScannerConfig() { + return dirScannerConfig; + } + @RequiresProxyRestart @JsonProperty(PROP_NAME_FORWARD_FILE_DESTINATIONS) public List getForwardFileDestinations() { @@ -352,6 +362,7 @@ public static class Builder { private ReceiveDataConfig receiveDataConfig = new ReceiveDataConfig(); private EventStoreConfig eventStoreConfig = new EventStoreConfig(); private AggregatorConfig aggregatorConfig = new AggregatorConfig(); + private DirScannerConfig dirScannerConfig = new DirScannerConfig(); private final List forwardFileDestinations = new ArrayList<>(); private final List forwardHttpDestinations = new ArrayList<>(); private LogStreamConfig logStreamConfig = new LogStreamConfig(); @@ -359,7 +370,7 @@ public static class Builder { private FeedStatusConfig feedStatusConfig = new FeedStatusConfig(); private ThreadConfig threadConfig = new ThreadConfig(); private ProxySecurityConfig proxySecurityConfig = new ProxySecurityConfig(); - private List sqsConnectors = new ArrayList<>(); + private final List sqsConnectors = new ArrayList<>(); private Builder() { @@ -400,6 +411,11 @@ public Builder aggregatorConfig(final AggregatorConfig aggregatorConfig) { return this; } + public Builder dirScannerConfig(final DirScannerConfig dirScannerConfig) { + this.dirScannerConfig = dirScannerConfig; + return this; + } + public Builder addForwardFileDestination(final ForwardFileConfig forwarderFileConfig) { this.forwardFileDestinations.add(forwarderFileConfig); return this; @@ -465,6 +481,7 @@ public ProxyConfig build() { receiveDataConfig, eventStoreConfig, aggregatorConfig, + dirScannerConfig, forwardFileDestinations, forwardHttpDestinations, logStreamConfig, diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java index 1bc22f79ee..ca82e29000 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java @@ -2,6 +2,7 @@ import stroom.proxy.app.event.EventStore; import stroom.proxy.app.event.EventStoreConfig; +import stroom.proxy.app.handler.ZipDirScanner; import stroom.proxy.repo.ProxyServices; import stroom.receive.common.ReceiptIdGenerator; @@ -15,22 +16,30 @@ public class ProxyLifecycle implements Managed { @Inject public ProxyLifecycle(final ProxyConfig proxyConfig, - final EventStoreConfig eventStoreConfig, final Provider eventStoreProvider, + final ZipDirScanner zipDirScanner, final ProxyServices proxyServices, final ReceiptIdGenerator receiptIdGenerator) { this.proxyServices = proxyServices; + final EventStoreConfig eventStoreConfig = proxyConfig.getEventStoreConfig(); + final DirScannerConfig dirScannerConfig = proxyConfig.getDirScannerConfig(); // Add executor to roll event store. final EventStore eventStore = eventStoreProvider.get(); proxyServices.addFrequencyExecutor("Event Store - roll", () -> eventStore::tryRoll, eventStoreConfig.getRollFrequency().toMillis()); + // Add executor to forward event store. proxyServices.addFrequencyExecutor("Event Store - forward", () -> eventStore::forwardAll, eventStoreConfig.getRollFrequency().toMillis()); + // Add executor to scan dirs for proxy zips. + proxyServices.addFrequencyExecutor("ZIP Dir Scanner", + () -> zipDirScanner::scan, + dirScannerConfig.getScanFrequency().toMillis()); + if (proxyConfig.getSqsConnectors() != null) { for (final SqsConnectorConfig sqsConnectorConfig : proxyConfig.getSqsConnectors()) { final SqsConnector sqsConnector = new SqsConnector( diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyConfigProvidersModule.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyConfigProvidersModule.java index c5c261f2a8..63701a72d3 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyConfigProvidersModule.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyConfigProvidersModule.java @@ -24,6 +24,15 @@ stroom.proxy.app.ContentSyncConfig getContentSyncConfig( stroom.proxy.app.ContentSyncConfig.class); } + @Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule") + @Provides + @SuppressWarnings("unused") + stroom.proxy.app.DirScannerConfig getDirScannerConfig( + final ProxyConfigProvider proxyConfigProvider) { + return proxyConfigProvider.getConfigObject( + stroom.proxy.app.DirScannerConfig.class); + } + @Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule") @Provides @SuppressWarnings("unused") @@ -175,6 +184,16 @@ stroom.proxy.app.handler.ForwardQueueConfig getForwardQueueConfigButThrow( + "Inject a config class that uses it or one of its sub-class instead."); } + @Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule") + @Provides + @SuppressWarnings("unused") + stroom.proxy.app.handler.PathTemplateConfig getPathTemplateConfigButThrow( + final ProxyConfigProvider proxyConfigProvider) { + throw new UnsupportedOperationException( + "stroom.proxy.app.handler.PathTemplateConfig cannot be injected directly. " + + "Inject a config class that uses it or one of its sub-class instead."); + } + @Generated("stroom.proxy.app.guice.GenerateProxyConfigProvidersModule") @Provides @SuppressWarnings("unused") diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DirUtil.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DirUtil.java index 469e7cc7ba..bcffabbec9 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DirUtil.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DirUtil.java @@ -666,6 +666,19 @@ public static String makeSafeName(final FeedKey feedKey) { return sb.toString(); } + /** + * Creates a provider of unique paths such that each directory never contains more than 999 items. + *

+ * See also {@link NumberedDirProvider} for a non-nested directory structure. + *

+ *

+ * e.g. {@code root_path/2/333/555/333555777} + *

+ */ + public static NestedNumberedDirProvider createNestedNumberedDirProvider(final Path root) { + return new NestedNumberedDirProvider(root); + } + // -------------------------------------------------------------------------------- diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/FileGroup.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/FileGroup.java index 6300836001..34e46f9448 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/FileGroup.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/FileGroup.java @@ -5,10 +5,13 @@ public class FileGroup { - static final String META_FILE = "proxy.meta"; - static final String ZIP_FILE = "proxy.zip"; + static final String BASE_FILENAME = "proxy"; + static final String META_EXTENSION = "meta"; + static final String ZIP_EXTENSION = "zip"; static final String ENTRIES_EXTENSION = "entries"; - static final String ENTRIES_FILE = "proxy." + ENTRIES_EXTENSION; + static final String META_FILE = BASE_FILENAME + "." + META_EXTENSION; + static final String ZIP_FILE = BASE_FILENAME + "." + ZIP_EXTENSION; + static final String ENTRIES_FILE = BASE_FILENAME + "." + ENTRIES_EXTENSION; private final Path parentDir; private final Path zip; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileDestinationImpl.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileDestinationImpl.java index 5156bf4e49..aada9a9448 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileDestinationImpl.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileDestinationImpl.java @@ -353,7 +353,7 @@ private void move(final Path source, final Path target) throws IOException { if (!Files.exists(source)) { throw e; } - DirUtil.ensureDirExists(target.getParent()); + Files.createDirectories(target.getParent()); } } if (!success) { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NestedNumberedDirProvider.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NestedNumberedDirProvider.java new file mode 100644 index 0000000000..7028bdac1f --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NestedNumberedDirProvider.java @@ -0,0 +1,52 @@ +package stroom.proxy.app.handler; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A provider of unique paths such that each directory never contains more than 999 items. + */ +public class NestedNumberedDirProvider { + + private final AtomicLong dirId; + private final Path root; + + NestedNumberedDirProvider(final Path root) { + this.root = Objects.requireNonNull(root); + this.dirId = new AtomicLong(DirUtil.getMaxDirId(root)); + } + + public static NestedNumberedDirProvider create(final Path root) { + return new NestedNumberedDirProvider(root); + } + + /** + * Each call to this creates a unique subdirectory of the root path. + *

+ * e.g. {@code root_path/2/333/555/333555777} + *

+ * + * @throws UncheckedIOException If the new path cannot be created. + */ + public Path createNumberedPath() { + final Path path = DirUtil.createPath(root, dirId.incrementAndGet()); + try { + Files.createDirectories(path); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + return path; + } + + @Override + public String toString() { + return "NumberedDirProvider{" + + "dirId=" + dirId + + ", root=" + root + + '}'; + } +} diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NumberedDirProvider.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NumberedDirProvider.java index 19679a288a..ce09b695bc 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NumberedDirProvider.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/NumberedDirProvider.java @@ -16,6 +16,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +/** + * Creates unique non-nested directories as direct children of parentDir, where each child + * is a number padded to 10 digits. + *

+ * See also {@link DirUtil#createNestedNumberedDirProvider(Path)} for a nested directory + * structure with <= 999 items per directory. + *

+ *

+ * e.g. {@code parent_dir/0000000123} + *

+ */ public class NumberedDirProvider { private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(NumberedDirProvider.class); @@ -83,6 +94,7 @@ static String create(final long num) { * Find all direct child directories in path. */ private static Stream findDirectories(final Path path) throws IOException { + //noinspection resource // See javadoc. return Files.find(path, 1, (aPath, basicFileAttributes) -> { LOGGER.trace(() -> LogUtil.message("aPath: {}, isDirectory {}", diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java index 2d2a75f726..f8dfbf05c2 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java @@ -31,6 +31,9 @@ public ProxyReceiptIdGenerator(final ProxyId proxyId) { receiptIdGenerator = new UniqueIdGenerator(NODE_TYPE, proxyId.getId()); } + /** + * For testing + */ public ProxyReceiptIdGenerator(final Supplier nodeIdSupplier) { final String nodeId = Objects.requireNonNull(nodeIdSupplier).get(); LOGGER.info("Creating receiptIdGenerator for proxyId '{}'", nodeId); diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/RetryingForwardDestination.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/RetryingForwardDestination.java index 66679a4f9a..4d01d7302c 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/RetryingForwardDestination.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/RetryingForwardDestination.java @@ -44,7 +44,7 @@ public class RetryingForwardDestination implements ForwardDestination { /** * File to hold the log of all forwarding errors from all forward attempts for this {@link FileGroup} */ - private static final String ERROR_LOG_FILENAME = "error.log"; + static final String ERROR_LOG_FILENAME = "error.log"; /** * Holds the state relating to retries. Held in binary form. */ diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipDirScanner.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipDirScanner.java new file mode 100644 index 0000000000..0ddfb24933 --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipDirScanner.java @@ -0,0 +1,446 @@ +package stroom.proxy.app.handler; + +import stroom.data.zip.StroomZipFileType; +import stroom.meta.api.AttributeMap; +import stroom.meta.api.AttributeMapUtil; +import stroom.meta.api.StandardHeaderArguments; +import stroom.proxy.app.DirScannerConfig; +import stroom.receive.common.ReceiptIdGenerator; +import stroom.util.concurrent.UniqueId; +import stroom.util.io.FileUtil; +import stroom.util.io.PathCreator; +import stroom.util.logging.DurationTimer; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; +import stroom.util.logging.LogUtil; +import stroom.util.shared.NullSafe; + +import jakarta.inject.Inject; +import jakarta.inject.Provider; +import jakarta.inject.Singleton; +import org.apache.commons.compress.utils.FileNameUtils; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Stream; + +/** + * Scans one or more configured directories in sequence looking for ZIP files to ingest as if they had + * been forwarded from another proxy. + * Will recurse into subdirectories. Once the scan is complete any empty directories inside the configured + * root directories will be deleted. + */ +@Singleton // So we can synchronise +public class ZipDirScanner { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ZipDirScanner.class); + private static final Set SIDECAR_EXTENSIONS = Set.of( + FileGroup.META_EXTENSION, + FileGroup.ENTRIES_FILE); + private static final Set SIDECAR_FILENAMES = Set.of( + "error.log"); + + private final Provider dirScannerConfigProvider; + private final PathCreator pathCreator; + private final ZipReceiver zipReceiver; + private final ReceiptIdGenerator receiptIdGenerator; + private final NestedNumberedDirProvider failureDirProvider; + private final Path failureDir; + + @Inject + public ZipDirScanner(final Provider dirScannerConfigProvider, + final PathCreator pathCreator, + final ZipReceiver zipReceiver, + final ReceiptIdGenerator receiptIdGenerator) { + this.dirScannerConfigProvider = dirScannerConfigProvider; + this.pathCreator = pathCreator; + this.zipReceiver = zipReceiver; + this.receiptIdGenerator = receiptIdGenerator; + + this.failureDir = pathCreator.toAppPath(dirScannerConfigProvider.get().getFailureDir()); + FileUtil.ensureDirExists(failureDir); + this.failureDirProvider = NestedNumberedDirProvider.create(failureDir); + } + + public synchronized void scan() { + try { + final DirScannerConfig dirScannerConfig = dirScannerConfigProvider.get(); + if (dirScannerConfig.isEnabled()) { + final List dirs = getPathsToScan(dirScannerConfig); + if (NullSafe.hasItems(dirs)) { + final ScanResult scanResult = new ScanResult(); + final DurationTimer timer = DurationTimer.start(); + for (final Path dir : dirs) { + // This will log and swallow any exceptions to ensure we can keep processing + scanDir(dir, scanResult); + } + if (!scanResult.isEmpty()) { + LOGGER.info("Completed scan for ZIP files to ingest, success: {}, failed: {}, " + + "unknown files: {}, duration: {}", + scanResult.successCount, scanResult.failCount, scanResult.unknownCount, timer); + } else { + LOGGER.debug("Completed scan for ZIP files to ingest, success: {}, failed: {}, " + + "unknown files: {}, duration: {}", + scanResult.successCount, scanResult.failCount, scanResult.unknownCount, timer); + } + } else { + LOGGER.debug("scan() - No dirs to scan"); + } + } else { + LOGGER.debug("scan() - disabled"); + } + } catch (final Exception e) { + logError(e); + // We need to swallow the error so the scheduled executor can try again next time + } + } + + private void logError(final Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("scan() - Error during scan - {}", + LogUtil.exceptionMessage(e), e); + } else { + LOGGER.error("scan() - Error during scan - {} (enable DEBUG for stack trace)", + LogUtil.exceptionMessage(e)); + } + } + + private void processZipFile(final Path zipFile, final ScanResult scanResult) { + LOGGER.debug("processZipFile() - {}", zipFile); + final ZipGroup zipGroup = createZipGroup(zipFile); + // Need to swallow all exceptions, so we don't halt the dir walking + try { + final AttributeMap attributeMap = createAttributeMap(zipGroup); + + zipReceiver.receive(zipFile, attributeMap); + // receive will have cloned our zip, so as there were no problems, we can now delete it + // and the other files in the group + deleteZipGroup(zipGroup); + scanResult.incrementSuccessCount(); + LOGGER.info("Ingested ZIP file {}", zipFile); + } catch (final Exception e) { + scanResult.incrementFailCount(); + final Path destDir = failureDirProvider.createNumberedPath(); + if (Files.exists(zipFile)) { + LOGGER.error("Error processing zipFile {}, moving it (and any associated sidecar files) into {} - {}", + zipFile, destDir, LogUtil.exceptionMessage(e), e); + // Move the zip and its associated sidecar files to a failure dir + zipGroup.streamPaths() + .forEach(sourceFile -> { + final Path destFile = destDir.resolve(sourceFile.getFileName()); + try { + Files.move(sourceFile, destFile); + } catch (final IOException ex) { + LOGGER.error("Error moving failed file from {} to {}", sourceFile, destFile, e); + } + }); + } else { + LOGGER.error("Error processing zipFile {} - {}", zipFile, LogUtil.exceptionMessage(e), e); + LOGGER.debug("processZipFile() - zipFile {} doesn't exist", zipFile); + } + } + } + + private void deleteZipGroup(final ZipGroup zipGroup) { + zipGroup.streamPaths() + .forEach(path -> { + try { + LOGGER.debug("deleteZipGroup() - Deleting file {}", path); + Files.delete(path); + } catch (final Exception e) { + LOGGER.error("Error deleting file {}. This file needs to be manually deleted " + + "or it risks being re-ingested. - {}", path, LogUtil.exceptionMessage(e), e); + } + }); + } + + private ZipGroup createZipGroup(final Path zipFile) { + // We may be dealing with just a zip file or a set of files moved in from the forward failure + // dir, e.g. + // ./03_failure/20251014/BAD_FEED/0/001/proxy.zip + // ./03_failure/20251014/BAD_FEED/0/001/proxy.meta + // ./03_failure/20251014/BAD_FEED/0/001/proxy.entries + // ./03_failure/20251014/BAD_FEED/0/001/error.log + // We assume that apart from the error.log file (which has a specific name), any sidecar files + // will the same base name as our zip + + final Path parentDir = zipFile.getParent(); + final String baseName = FileNameUtils.getBaseName(zipFile); + Path metaFile = parentDir.resolve(baseName + "." + StroomZipFileType.META.getExtension()); + if (!Files.isRegularFile(metaFile)) { + metaFile = null; + } + // This one has a specific name + Path errorFile = parentDir.resolve(RetryingForwardDestination.ERROR_LOG_FILENAME); + if (!Files.isRegularFile(errorFile)) { + errorFile = null; + } + + Path entriesFile = parentDir.resolve(baseName + "." + FileGroup.ENTRIES_EXTENSION); + if (!Files.isRegularFile(entriesFile)) { + entriesFile = null; + } + final ZipGroup zipGroup = new ZipGroup(zipFile, metaFile, errorFile, entriesFile); + LOGGER.debug("createZipGroup() - zipGroup: {}", zipGroup); + return zipGroup; + } + + private AttributeMap createAttributeMap(final ZipGroup zipGroup) throws IOException { + final AttributeMap attributeMap = new AttributeMap(); + if (zipGroup.hasMetaFile() && Files.isRegularFile(zipGroup.metaFile)) { + AttributeMapUtil.read(zipGroup.metaFile, attributeMap); + LOGGER.debug("createAttributeMap() - Read attributes from {}, attributeMap: {}", + zipGroup.metaFile, attributeMap); + } else { + LOGGER.debug("createAttributeMap() - File {} not found, creating minimal attributeMap", + zipGroup.metaFile); + // We may already have a receiptId, but this will just set a new one and append + // to ReceiptIdPath + final UniqueId receiptId = receiptIdGenerator.generateId(); + AttributeMapUtil.addReceiptInfo(attributeMap, receiptId); + } + // Make sure we have a GUID + if (NullSafe.isEmptyString(attributeMap.get(StandardHeaderArguments.GUID))) { + attributeMap.put(StandardHeaderArguments.GUID, UUID.randomUUID().toString()); + } + return attributeMap; + } + + private void scanDir(final Path rootDir, final ScanResult scanResult) { + LOGGER.debug("scanDir() - {}", rootDir); + final DurationTimer timer = DurationTimer.start(); + final Deque unknownFiles = new ArrayDeque<>(); + + try { + Files.walkFileTree(rootDir, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException { + if (isZipFile(file)) { + LOGGER.debug("scanDir() - Found ZIP file {}", file); + // This will log and swallow, so we can carry on walking + processZipFile(file, scanResult); + } else if (isSidecarFile(file)) { + LOGGER.debug("scanDir() - Found sidecar file {}", file); + // This will get handled by processZipFile() + } else if (Files.isRegularFile(file)) { + LOGGER.debug("scanDir() - Unknown file {}", file); + unknownFiles.push(file); + scanResult.incrementUnknownCount(); +// LOGGER.warn("Found file that is not a ZIP file ({}), it will be ignored. " + +// "You should remove this file to stop seeing this message.", file); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(final Path path, final IOException exc) throws IOException { + if (isZipFile(path)) { + scanResult.incrementFailCount(); + LOGGER.error("scanDir() - unable to read zip file {}. Unable to move it to {}. " + + "Might be a permissions issue.: {}", + path, failureDir, LogUtil.exceptionMessage(exc)); + } else { + LOGGER.debug(() -> LogUtil.message( + "scanDir() - unable to read file/dir {}. This may be because the file has been " + + "deleted after successful processing of the associated zip file.: {}", + path, LogUtil.exceptionMessage(exc), exc)); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException { + // By this point all the sidecar files should have been dealt with + if (!unknownFiles.isEmpty()) { + Path unknownFile; + while (true) { + unknownFile = unknownFiles.poll(); + if (unknownFile != null) { + final Path destPath = moveToFailureDir(unknownFile); + LOGGER.info("Found unknown file {}, moved it to {}", unknownFile, destPath); + } else { + break; + } + } + } + return super.postVisitDirectory(dir, exc); + } + }); + LOGGER.debug("scanDir() - Scanned {} for ZIP files in {}", rootDir, timer); + } catch (final Exception e) { + LOGGER.error("scanDir() - unable to read ZIP file {}: {}", + rootDir, LogUtil.exceptionMessage(e)); + // Just log and swallow, so we can move on to the next dir to scan + } finally { + // This will not throw + FileUtil.deleteEmptyDirs(rootDir); + } + } + + /** + * @return The path after the move, or null if it could not be moved. + */ + private Path moveToFailureDir(final Path sourceFile) { + final Path destDir = failureDirProvider.createNumberedPath(); + final Path destFile = destDir.resolve(sourceFile.getFileName()); + try { + Files.move(sourceFile, destFile); + return destFile; + } catch (final IOException ex) { + LOGGER.error("Error moving failed file from {} to {}", sourceFile, destFile, ex); + } + return null; + } + + private void deleteDirectoryIfEmpty(final Path dir) { + try { + if (Files.isDirectory(dir) && Files.isWritable(dir)) { + final boolean isEmpty; + try (final Stream entries = Files.list(dir)) { + isEmpty = entries.findAny().isEmpty(); + } + if (isEmpty) { + // May fail if something has been dropped in since we checked, but that is OK + // as we will check it again on next run. + Files.delete(dir); + } + } + } catch (final IOException e) { + // Just swallow it + LOGGER.debug(() -> LogUtil.message("Unable to delete directory {} - ", + dir, LogUtil.exceptionMessage(e), e)); + } + } + + private boolean isZipFile(final Path path) { + Objects.requireNonNull(path); + return FileGroup.ZIP_EXTENSION.equalsIgnoreCase(FileNameUtils.getExtension(path)) + && Files.isRegularFile(path); + } + + private boolean isSidecarFile(final Path path) throws IOException { + final Path parentDir = path.getParent(); + final String baseName = FileNameUtils.getBaseName(path); + final String extension = FileNameUtils.getExtension(path); + + if (SIDECAR_FILENAMES.contains(path.getFileName().toString())) { + // e.g. error.log + try (final Stream pathStream = Files.list(parentDir)) { + final boolean foundMatchingZip = pathStream.filter(Files::isRegularFile) + .anyMatch(aPath -> { + final String anExtension = FileNameUtils.getExtension(aPath); + return FileGroup.ZIP_EXTENSION.equalsIgnoreCase(anExtension); + }); + LOGGER.debug("isSidecarFile() - path: {}, foundMatchingZip: {}", path, foundMatchingZip); + return foundMatchingZip; + } + } else if (SIDECAR_EXTENSIONS.contains(extension)) { + // e.g. proxy.meta, proxy.entries + try (final Stream pathStream = Files.list(parentDir)) { + final boolean foundMatchingZip = pathStream.filter(Files::isRegularFile) + .filter(aPath -> { + final String aBaseName = FileNameUtils.getBaseName(aPath); + return Objects.equals(aBaseName, baseName); + }) + .anyMatch(aPath -> { + final String anExtension = FileNameUtils.getExtension(aPath); + return FileGroup.ZIP_EXTENSION.equalsIgnoreCase(anExtension); + }); + LOGGER.debug("isSidecarFile() - path: {}, foundMatchingZip: {}", path, foundMatchingZip); + return foundMatchingZip; + } + } else { + return false; + } + } + + private List getPathsToScan(final DirScannerConfig dirScannerConfig) { + + return NullSafe.stream(dirScannerConfig.getDirs()) + .map(pathCreator::toAppPath) + .filter(path -> { + try { + FileUtil.ensureDirExists(path); + } catch (final Exception e) { + LOGGER.error("Error ensuring directory {} exists - {}", + path, LogUtil.exceptionMessage(e), e); + return false; + } + return true; + }) + .toList(); + } + + + // -------------------------------------------------------------------------------- + + + private static class ScanResult { + + private int successCount; + private int failCount; + private int unknownCount; + + int getTotalCount() { + return successCount + failCount; + } + + void incrementSuccessCount() { + successCount += 1; + } + + void incrementFailCount() { + failCount += 1; + } + + void incrementUnknownCount() { + unknownCount += 1; + } + + boolean isEmpty() { + return successCount == 0 + && failCount == 0 + && unknownCount == 0; + } + + @Override + public String toString() { + return "ScanResult{" + + "successCount=" + successCount + + ", failCount=" + failCount + + ", unknownCount=" + unknownCount + + '}'; + } + } + + + // -------------------------------------------------------------------------------- + + + private record ZipGroup(Path zipFile, Path metaFile, Path errorFile, Path entriesFile) { + + private ZipGroup { + Objects.requireNonNull(zipFile); + } + + boolean hasMetaFile() { + return metaFile != null; + } + + Stream streamPaths() { + return Stream.of(zipFile, metaFile, errorFile, entriesFile) + .filter(Objects::nonNull) + .filter(Files::exists); + } + } +} diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipReceiver.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipReceiver.java index c2d04c658a..8573dd1150 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipReceiver.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ZipReceiver.java @@ -23,6 +23,7 @@ import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; import stroom.util.logging.LogUtil; +import stroom.util.net.HostNameUtil; import stroom.util.zip.ZipUtil; import jakarta.inject.Inject; @@ -55,11 +56,14 @@ /** *

* This class deals with the reception of zip files. It will perform the following tasks: - * 1. Streams the zip file to disk, recording zip entries and reading meta data as it goes. - * 2. It tries to match all data entries to associated meta data. - * 3. It finds all unique feeds and checks the status for each feed. An entries file is created containing + * 1. Writes the inputStream to a temporary zip file on local disk. + * 2. It then clones this temporary zip to a zip file in a managed directory, updating the .meta + * files with the headers. All other entries are unchanged. In the process it records what + * entries are in the zip and what feed/type they belong to. + * 3. It tries to match all data entries to associated meta data. + * 4. It finds all unique feeds and checks the status for each feed. An entries file is created containing * a line for all allowed entries in the zip, which will be used by {@link ZipSplitter} to split the zips. - * 4. If the zip contains multiple feedKeys or is not in proper proxy zip format it passes + * 5. If the zip contains multiple feedKeys or is not in proper proxy zip format it passes * it to the ZipSplitter, else if passed it to the destination. *

* Along with the final zip files there will be associated meta files written to use for forwarding. @@ -113,6 +117,70 @@ private NumberedDirProvider createDirProvider(final DataDirProvider dataDirProvi return new NumberedDirProvider(dir); } + /** + * Receive a proxy zip file that is located on disk. + * Caller is responsible for deciding what to do with sourceZipFile after this method + * returns successfully, e.g. deleting it. + * + * @param sourceZipFile The zip file. + * @param attributeMap Additional attributes that are global to all entries in the zip. + */ + public void receive(final Path sourceZipFile, + final AttributeMap attributeMap) { + Objects.requireNonNull(sourceZipFile); + Objects.requireNonNull(attributeMap); + if (!Files.isRegularFile(sourceZipFile)) { + throw new RuntimeException(LogUtil.message( + "Zip file '{}' is not a regular file or does not exist", sourceZipFile)); + } + + final Instant startTime = Instant.now(); + final Path receivingDir; + final ReceiveResult receiveResult; + try { + receivingDir = receivingDirProvider.get(); + final FileGroup destFileGroup = new FileGroup(receivingDir); + final Path destZipFile = destFileGroup.getZip(); + final long receivedBytes = Files.size(sourceZipFile); + try { + receiveResult = receiveZipStream( + attributeMap, + sourceZipFile, + destZipFile, + receivedBytes); + } catch (final Exception e) { + LOGGER.debug(() -> LogUtil.exceptionMessage(e), e); + // Cleanup. + Files.deleteIfExists(destZipFile); + deleteDir(receivingDir); + throw StroomStreamException.create(e, attributeMap); + } + + handleReceiveResult(attributeMap, receiveResult, destFileGroup, receivingDir, destZipFile); + } catch (final IOException e) { + throw StroomStreamException.create(e, attributeMap); + } + + final Duration duration = Duration.between(startTime, Instant.now()); + logStream.log( + RECEIVE_LOG, + attributeMap, + EventType.RECEIVE, + pathToUri(sourceZipFile), + StroomStatusCode.OK, + attributeMap.get(StandardHeaderArguments.RECEIPT_ID), + receiveResult.receivedBytes, + duration.toMillis()); + } + + final String pathToUri(final Path path) { + return String.join( + "/", + "file:/", + HostNameUtil.determineHostName(), + path.toAbsolutePath().toString()); + } + @Override public void receive(final Instant startTime, final AttributeMap attributeMap, @@ -123,65 +191,22 @@ public void receive(final Instant startTime, final ReceiveResult receiveResult; try { receivingDir = receivingDirProvider.get(); - final FileGroup fileGroup = new FileGroup(receivingDir); - final Path sourceZip = fileGroup.getZip(); + final FileGroup destFileGroup = new FileGroup(receivingDir); + final Path destZipFile = destFileGroup.getZip(); try { receiveResult = receiveZipStream( inputStreamSupplier.get(), attributeMap, - sourceZip); + destZipFile); } catch (final Exception e) { LOGGER.debug(() -> LogUtil.exceptionMessage(e), e); // Cleanup. - Files.deleteIfExists(sourceZip); + Files.deleteIfExists(destZipFile); deleteDir(receivingDir); throw StroomStreamException.create(e, attributeMap); } - if (LOGGER.isDebugEnabled() && receiveResult.feedGroups.size() > 1) { - // Log if we received a multi feed zip. - logFeedGroupsToDebug(receiveResult); - } - - // Check all the feeds are OK. - final Map> allowedEntries = filterAllowedEntries( - attributeMap, receiveResult); - - // Only keep data for allowed feeds. - if (!allowedEntries.isEmpty()) { - // Write out the allowed entries so the destination knows which entries are in the zip - // that are allowed to be used, i.e. so zipSplitter can drop zip entries that have no - // corresponding entry in the entries file - writeZipEntryGroups(fileGroup.getEntries(), allowedEntries); - - // If the data we received was for a perfectly formed zip file with data for a single feed then don't - // bother to rewrite it in the zipSplitter. - final int feedGroupCount = receiveResult.feedGroups.size(); - if (receiveResult.valid && feedGroupCount == 1) { - final FeedKey feedKey = allowedEntries.keySet().iterator().next(); - - // Write meta. Single feed/type so add them to the attr map - AttributeMapUtil.addFeedAndType(attributeMap, feedKey.feed(), feedKey.type()); - AttributeMapUtil.write(attributeMap, fileGroup.getMeta()); - - // Move receiving dir to destination. - LOGGER.debug("Pass {} with feedKey: {} to destination {}", receivingDir, feedKey, destination); - destination.accept(receivingDir); - } else { - // We have more than one feed in the source zip so split the source into a zip file for each feed. - // Before we can queue the zip for splitting we need to serialise the attr map, so it is - // available for the split process. - AttributeMapUtil.write(attributeMap, fileGroup.getMeta()); - LOGGER.debug(() -> LogUtil.message("Pass {} to zipSplitter, isValid: {}, feedGroupCount: {}", - receivingDir, receiveResult.valid, feedGroupCount)); - zipSplitter.add(receivingDir); - } - } else { - LOGGER.debug("No allowed feedKeys, all are dropped"); - // Delete the source zip. - Files.delete(sourceZip); - deleteDir(receivingDir); - } + handleReceiveResult(attributeMap, receiveResult, destFileGroup, receivingDir, destZipFile); } catch (final IOException e) { throw StroomStreamException.create(e, attributeMap); } @@ -198,6 +223,57 @@ public void receive(final Instant startTime, duration.toMillis()); } + private void handleReceiveResult(final AttributeMap attributeMap, + final ReceiveResult receiveResult, + final FileGroup fileGroup, + final Path receivingDir, + final Path sourceZip) throws IOException { + if (LOGGER.isDebugEnabled() && receiveResult.feedGroups.size() > 1) { + // Log if we received a multi feed zip. + logFeedGroupsToDebug(receiveResult); + } + + // Check all the feeds are OK. + final Map> allowedEntries = filterAllowedEntries( + attributeMap, receiveResult); + + // Only keep data for allowed feeds. + if (!allowedEntries.isEmpty()) { + // Write out the allowed entries so the destination knows which entries are in the zip + // that are allowed to be used, i.e. so zipSplitter can drop zip entries that have no + // corresponding entry in the entries file + writeZipEntryGroups(fileGroup.getEntries(), allowedEntries); + + // If the data we received was for a perfectly formed zip file with data for a single feed then don't + // bother to rewrite it in the zipSplitter. + final int feedGroupCount = receiveResult.feedGroups.size(); + if (receiveResult.valid && feedGroupCount == 1) { + final FeedKey feedKey = allowedEntries.keySet().iterator().next(); + + // Write meta. Single feed/type so add them to the attr map + AttributeMapUtil.addFeedAndType(attributeMap, feedKey.feed(), feedKey.type()); + AttributeMapUtil.write(attributeMap, fileGroup.getMeta()); + + // Move receiving dir to destination. + LOGGER.debug("Pass {} with feedKey: {} to destination {}", receivingDir, feedKey, destination); + destination.accept(receivingDir); + } else { + // We have more than one feed in the source zip so split the source into a zip file for each feed. + // Before we can queue the zip for splitting we need to serialise the attr map, so it is + // available for the split process. + AttributeMapUtil.write(attributeMap, fileGroup.getMeta()); + LOGGER.debug(() -> LogUtil.message("Pass {} to zipSplitter, isValid: {}, feedGroupCount: {}", + receivingDir, receiveResult.valid, feedGroupCount)); + zipSplitter.add(receivingDir); + } + } else { + LOGGER.debug("No allowed feedKeys, all are dropped"); + // Delete the source zip. + Files.delete(sourceZip); + deleteDir(receivingDir); + } + } + private Map> filterAllowedEntries(final AttributeMap attributeMap, final ReceiveResult receiveResult) { final Map> allowed = new HashMap<>(); @@ -262,42 +338,56 @@ private void deleteDir(final Path path) { */ static ReceiveResult receiveZipStream(final InputStream inputStream, final AttributeMap attributeMap, - final Path zipFilePath) throws IOException { - LOGGER.debug("receiveZipStream() - START zipFilePath: {}", zipFilePath); - final DurationTimer timer = LogUtil.startTimerIfDebugEnabled(LOGGER); - final String defaultFeedName = attributeMap.get(StandardHeaderArguments.FEED); - final String defaultTypeName = attributeMap.get(StandardHeaderArguments.TYPE); - final FeedKeyInterner feedKeyInterner = FeedKey.createInterner(); - final FeedKey defaultFeedKey = feedKeyInterner.intern(defaultFeedName, defaultTypeName); - - final Map baseNameToGroupMap = new HashMap<>(); - final ProxyZipValidator validator = new ProxyZipValidator(); - final List dataEntries = new ArrayList<>(); + final Path destZipFile) throws IOException { + LOGGER.debug("receiveZipStream() - destZipFile: {}, attributeMap: {}", destZipFile, attributeMap); // Create a .zip.staging file for the inputStream to be written to. We can then // copy what we want out of that zip into a new zip at zipFilePath. // Don't use a temp dir as these files may be very big, so just make it a sibling. - final Path stagingZipFile = zipFilePath.resolveSibling(zipFilePath.getFileName() + ".staging"); + final Path stagingZipFile = destZipFile.resolveSibling(destZipFile.getFileName() + ".staging"); + final long receivedBytes; try { // Write the stream to disk, because reading the stream as a ZipArchiveInputStream is risky // as it can't read the central directory at the end of the stream, so it doesn't know which // entries are actually valid and doesn't know the uncompressed sizes. receivedBytes = writeStreamToFile(inputStream, stagingZipFile); - - // Clone the zip with added/updated meta entries - cloneZipFileWithUpdatedMeta( - attributeMap, - defaultFeedKey, - feedKeyInterner, - baseNameToGroupMap, - validator, - dataEntries, - stagingZipFile, - zipFilePath); + return receiveZipStream(attributeMap, stagingZipFile, destZipFile, receivedBytes); } finally { Files.deleteIfExists(stagingZipFile); } + } + + /** + * Static and pkg private to aid testing + */ + static ReceiveResult receiveZipStream(final AttributeMap attributeMap, + final Path sourceZipFile, + final Path destZipFile, + final long receivedBytes) throws IOException { + LOGGER.debug("receiveZipStream() - sourceZipFile: {}, destZipFile: {}, attributeMap: {}", + sourceZipFile, destZipFile, attributeMap); + final DurationTimer timer = LogUtil.startTimerIfDebugEnabled(LOGGER); + final String defaultFeedName = attributeMap.get(StandardHeaderArguments.FEED); + final String defaultTypeName = attributeMap.get(StandardHeaderArguments.TYPE); + // This is to reduce the memory used by all the FeedKey objects in the ZipEntryGroups + final FeedKeyInterner feedKeyInterner = FeedKey.createInterner(); + final FeedKey defaultFeedKey = feedKeyInterner.intern(defaultFeedName, defaultTypeName); + + final Map baseNameToGroupMap = new HashMap<>(); + final ProxyZipValidator validator = new ProxyZipValidator(); + final List dataEntries = new ArrayList<>(); + + // Clone the zip with added/updated meta entries + cloneZipFileWithUpdatedMeta( + attributeMap, + defaultFeedKey, + feedKeyInterner, + baseNameToGroupMap, + validator, + dataEntries, + sourceZipFile, + destZipFile); // TODO : Worry about memory usage here storing potentially 1000's of data entries and groups. // Now look at the entries and see if we can match them to meta. @@ -318,7 +408,6 @@ static ReceiveResult receiveZipStream(final InputStream inputStream, zipEntryGroup = new ZipEntryGroup(defaultFeedKey); zipEntryGroup.setDataEntry(dataEntry); entryList.add(zipEntryGroup); - } else { if (zipEntryGroup.getDataEntry() != null) { // This shouldn't really happen as it means we found meta that could be for more than @@ -370,7 +459,7 @@ static ReceiveResult receiveZipStream(final InputStream inputStream, "feedKey count: {}, total entry count: {}, duration: {}", defaultFeedName, defaultTypeName, - zipFilePath, + destZipFile, feedGroups.size(), LogUtil.swallowExceptions(() -> feedGroups.values().stream().mapToInt(List::size).sum()) .orElse(-1), @@ -560,7 +649,8 @@ static long writeStreamToFile(final InputStream inputStream, "Error writing inputStream to file {}: {}", zipFilePath, LogUtil.exceptionMessage(e)), e); } - }, () -> LogUtil.message("writeStreamToFile() - zipFilePath: {}", zipFilePath)); + }, receivedBytes -> LogUtil.message("writeStreamToFile() - zipFilePath: {}, receivedBytes: {}", + zipFilePath, receivedBytes)); } /** diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestNestedNumberedDirProvider.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestNestedNumberedDirProvider.java new file mode 100644 index 0000000000..fbf48911bb --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestNestedNumberedDirProvider.java @@ -0,0 +1,52 @@ +package stroom.proxy.app.handler; + +import stroom.util.io.FileUtil; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestNestedNumberedDirProvider { + + @Test + void test1(@TempDir final Path rootDir) { + NestedNumberedDirProvider dirProvider = NestedNumberedDirProvider.create(rootDir); + + Path numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("0/001"); + + numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("0/002"); + + // Create a new one, should continue the numbering + dirProvider = NestedNumberedDirProvider.create(rootDir); + + numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("0/003"); + } + + @Test + void test2(@TempDir final Path rootDir) { + FileUtil.ensureDirExists(rootDir.resolve("0/998")); + + final NestedNumberedDirProvider dirProvider = NestedNumberedDirProvider.create(rootDir); + + Path numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("0/999"); + + numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("1/001/001000"); + + numberedPath = rootDir.relativize(dirProvider.createNumberedPath()); + assertThat(numberedPath.toString()) + .isEqualTo("1/001/001001"); + } +} diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestZipDirScanner.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestZipDirScanner.java new file mode 100644 index 0000000000..0f8bab9817 --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestZipDirScanner.java @@ -0,0 +1,291 @@ +package stroom.proxy.app.handler; + +import stroom.meta.api.AttributeMap; +import stroom.proxy.app.DirScannerConfig; +import stroom.test.common.DirectorySnapshot; +import stroom.test.common.DirectorySnapshot.PathSnapshot; +import stroom.test.common.DirectorySnapshot.Snapshot; +import stroom.test.common.TestUtil; +import stroom.util.io.SimplePathCreator; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; +import stroom.util.time.StroomDuration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(MockitoExtension.class) +class TestZipDirScanner { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestZipDirScanner.class); + + @Mock + private ZipReceiver mockZipReceiver; + + @TempDir + Path testDir; + + @Captor + ArgumentCaptor zipFileCaptor; + @Captor + ArgumentCaptor attributeMapCaptor; + + @Test + void testScan() { + final Path ingestDir1 = testDir.resolve("ingest1"); + final Path failureDir = testDir.resolve("failure"); + final DirScannerConfig config = new DirScannerConfig( + List.of(ingestDir1.toString()), + failureDir.toString(), + true, + StroomDuration.ofSeconds(1)); + + final ZipDirScanner zipDirScanner = createZipDirScanner(config); + + final Path file1 = ingestDir1.resolve("file1.txt"); + final Path file2 = ingestDir1.resolve("file2.zip"); + final Path file3 = ingestDir1.resolve("file3.ZIP"); + final Path file4 = ingestDir1.resolve("file4"); + final Path file5 = ingestDir1.resolve("dir1/subDir1/file5"); + final Path file6 = ingestDir1.resolve("dir2/subDir2/file6.Zip"); + + TestUtil.createFiles(file1, file2, file3, file4, file5, file6); + + Mockito.doNothing() + .when(mockZipReceiver) + .receive(zipFileCaptor.capture(), attributeMapCaptor.capture()); + + zipDirScanner.scan(); + + assertThat(ingestDir1) + .exists() + .isDirectory(); + + final List zipFilesProcessed = zipFileCaptor.getAllValues(); + assertThat(zipFilesProcessed) + .containsExactlyInAnyOrder( + file2, + file3, + file6); + + assertThat(DirectorySnapshot.of(ingestDir1).pathSnapshots()) + .isEmpty(); + + final Snapshot snapshot = DirectorySnapshot.of(failureDir); + LOGGER.debug("Snapshot of {}\n{}", failureDir, snapshot); + + assertThat(snapshot.stream() + .map(PathSnapshot::path) + .map(failureDir::resolve) + .filter(Files::isRegularFile) + .map(Path::getFileName) + .toList()) + .containsExactlyInAnyOrderElementsOf(Stream.of( + file1, + file4, + file5) + .map(Path::getFileName) + .toList()); + } + + @Test + void testScan_multipleDirs() { + final Path ingestDir1 = testDir.resolve("ingest1"); + final Path ingestDir2 = testDir.resolve("ingest2"); + final Path failureDir = testDir.resolve("failure"); + final DirScannerConfig config = new DirScannerConfig( + List.of(ingestDir1.toString(), + ingestDir2.toString()), + failureDir.toString(), + true, + StroomDuration.ofSeconds(1)); + + final ZipDirScanner zipDirScanner = createZipDirScanner(config); + + final Path file11 = ingestDir1.resolve("file1.txt"); + final Path file12 = ingestDir1.resolve("file2.zip"); + final Path file13 = ingestDir1.resolve("file3.ZIP"); + final Path file14 = ingestDir1.resolve("file4"); + final Path file15 = ingestDir1.resolve("dir1/subDir1/file5"); + final Path file16 = ingestDir1.resolve("dir2/subDir2/file6.Zip"); + + final Path file21 = ingestDir2.resolve("file1.txt"); + final Path file22 = ingestDir2.resolve("file2.zip"); + final Path file23 = ingestDir2.resolve("file3.ZIP"); + final Path file24 = ingestDir2.resolve("file4"); + final Path file25 = ingestDir2.resolve("dir1/subDir1/file5"); + final Path file26 = ingestDir2.resolve("dir2/subDir2/file6.Zip"); + + TestUtil.createFiles( + file11, file12, file13, file14, file15, file16, + file21, file22, file23, file24, file25, file26); + + Mockito.doNothing() + .when(mockZipReceiver) + .receive(zipFileCaptor.capture(), attributeMapCaptor.capture()); + + zipDirScanner.scan(); + + assertThat(ingestDir1) + .exists() + .isDirectory(); + assertThat(ingestDir2) + .exists() + .isDirectory(); + + final List zipFilesProcessed = zipFileCaptor.getAllValues(); + assertThat(zipFilesProcessed) + .containsExactlyInAnyOrder( + file12, + file13, + file16, + file22, + file23, + file26); + + Snapshot snapshot; + snapshot = DirectorySnapshot.of(ingestDir1); + assertThat(snapshot.pathSnapshots()) + .isEmpty(); + + snapshot = DirectorySnapshot.of(ingestDir2); + assertThat(snapshot.pathSnapshots()) + .isEmpty(); + + snapshot = DirectorySnapshot.of(failureDir); + assertThat(snapshot.stream() + .map(PathSnapshot::path) + .map(failureDir::resolve) + .filter(Files::isRegularFile) + .map(Path::getFileName) + .toList()) + .containsExactlyInAnyOrderElementsOf(Stream.of( + file11, + file14, + file15, + file21, + file24, + file25) + .map(Path::getFileName) + .toList()); + } + + @Test + void testScan_badZip() { + final Path ingestDir1 = testDir.resolve("ingest1"); + final Path ingestDir2 = testDir.resolve("ingest2"); + final Path failureDir = testDir.resolve("failure"); + final DirScannerConfig config = new DirScannerConfig( + List.of(ingestDir1.toString(), + ingestDir2.toString()), + failureDir.toString(), + true, + StroomDuration.ofSeconds(1)); + + final ZipDirScanner zipDirScanner = createZipDirScanner(config); + + final Path badFile = ingestDir1.resolve("bad.zip"); + final Path file11 = ingestDir1.resolve("file1.txt"); + final Path file12 = ingestDir1.resolve("file2.zip"); + final Path file13 = ingestDir1.resolve("file3.ZIP"); + final Path file14 = ingestDir1.resolve("file4"); + final Path file15 = ingestDir1.resolve("dir1/subDir1/file5"); + final Path file16 = ingestDir1.resolve("dir2/subDir2/file6.Zip"); + + final Path file21 = ingestDir2.resolve("file1.txt"); + final Path file22 = ingestDir2.resolve("file2.zip"); + final Path file23 = ingestDir2.resolve("file3.ZIP"); + final Path file24 = ingestDir2.resolve("file4"); + final Path file25 = ingestDir2.resolve("dir1/subDir1/file5"); + final Path file26 = ingestDir2.resolve("dir2/subDir2/file6.Zip"); + + TestUtil.createFiles( + badFile, + file11, file12, file13, file14, file15, file16, + file21, file22, file23, file24, file25, file26); + + final List zipFiles = new ArrayList<>(); + Mockito.doAnswer( + invocation -> { + final Path zipFile = invocation.getArgument(0, Path.class); + zipFiles.add(zipFile); + if (zipFile.getFileName().toString().contains("bad")) { + throw new RuntimeException("bad zip"); + } + return null; + }) + .when(mockZipReceiver) + .receive(Mockito.any(), Mockito.any()); + + zipDirScanner.scan(); + + assertThat(ingestDir1) + .exists() + .isDirectory(); + assertThat(ingestDir2) + .exists() + .isDirectory(); + + assertThat(zipFiles) + .containsExactlyInAnyOrder( + badFile, + file12, + file13, + file16, + file22, + file23, + file26); + + Snapshot snapshot; + snapshot = DirectorySnapshot.of(ingestDir1); + assertThat(snapshot.pathSnapshots()) + .isEmpty(); + + snapshot = DirectorySnapshot.of(ingestDir2); + assertThat(snapshot.pathSnapshots()) + .isEmpty(); + + snapshot = DirectorySnapshot.of(failureDir); + assertThat(snapshot.stream() + .map(PathSnapshot::path) + .map(failureDir::resolve) + .filter(Files::isRegularFile) + .map(Path::getFileName) + .toList()) + .containsExactlyInAnyOrderElementsOf(Stream.of( + file11, + file14, + file15, + file21, + file24, + file25, + badFile) + .map(Path::getFileName) + .toList()); + } + + private ZipDirScanner createZipDirScanner(final DirScannerConfig config) { + final Path homeDir = testDir.resolve("home"); + final Path stroomTempDir = testDir.resolve("temp"); + final SimplePathCreator pathCreator = new SimplePathCreator(() -> homeDir, () -> stroomTempDir); + return new ZipDirScanner( + () -> config, + pathCreator, + mockZipReceiver, + new ProxyReceiptIdGenerator(() -> "test-node")); + } +} diff --git a/stroom-proxy/stroom-proxy-app/src/test/resources/logback-test.xml b/stroom-proxy/stroom-proxy-app/src/test/resources/logback-test.xml index e877ee32d0..ad0e57d116 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/resources/logback-test.xml +++ b/stroom-proxy/stroom-proxy-app/src/test/resources/logback-test.xml @@ -11,6 +11,8 @@ + + diff --git a/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml b/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml index 40184820f0..6c21941c50 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml +++ b/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml @@ -12,6 +12,12 @@ proxyConfig: contentSyncEnabled: false syncFrequency: "PT1M" upstreamUrl: null + dirScanner: + dirs: + - "zip_file_ingest" + enabled: true + failureDir: "zip_file_ingest_failed" + scanFrequency: "PT1M" eventStore: forwardQueueSize: 1000 maxAge: "PT1M" diff --git a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/FeedKey.java b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/FeedKey.java index 4ba4692de7..08f9b3b92c 100644 --- a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/FeedKey.java +++ b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/FeedKey.java @@ -82,18 +82,23 @@ public static FeedKeyInterner createInterner() { */ public static class FeedKeyInterner { - private final Map map = new HashMap<>(); + // Fewer types than feeds so key on that first to reduce number of child maps + private final Map> typeToFeedToFeedKeyMap = new HashMap<>(); private FeedKeyInterner() { } public FeedKey intern(final String feed, final String type) { - return intern(FeedKey.of(feed, type)); + final Map feedToFeedKeyMap = typeToFeedToFeedKeyMap.computeIfAbsent( + type, k -> new HashMap<>()); + return feedToFeedKeyMap.computeIfAbsent(feed, aFeed -> FeedKey.of(aFeed, type)); } public FeedKey intern(final FeedKey feedKey) { if (feedKey != null) { - final FeedKey prevVal = map.putIfAbsent(feedKey, feedKey); + final FeedKey prevVal = typeToFeedToFeedKeyMap.computeIfAbsent( + feedKey.type, k -> new HashMap<>()) + .putIfAbsent(feedKey.feed, feedKey); return prevVal != null ? prevVal : feedKey; diff --git a/stroom-proxy/stroom-proxy-repo/src/test/java/stroom/proxy/repo/TestFeedKey.java b/stroom-proxy/stroom-proxy-repo/src/test/java/stroom/proxy/repo/TestFeedKey.java index c7360b4cc7..c2fb214423 100644 --- a/stroom-proxy/stroom-proxy-repo/src/test/java/stroom/proxy/repo/TestFeedKey.java +++ b/stroom-proxy/stroom-proxy-repo/src/test/java/stroom/proxy/repo/TestFeedKey.java @@ -1,13 +1,23 @@ package stroom.proxy.repo; import stroom.proxy.repo.FeedKey.FeedKeyInterner; +import stroom.test.common.TestUtil; +import stroom.test.common.TestUtil.TimedCase; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; class TestFeedKey { + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestFeedKey.class); + @Test void testIntern_null() { final FeedKeyInterner interner = FeedKey.createInterner(); @@ -59,4 +69,86 @@ void testIntern_1() { .isEqualTo(feedKey1b) .isSameAs(feedKey1a); } + + @Test + void testIntern3() { + final FeedKeyInterner interner = FeedKey.createInterner(); + final FeedKey feedKeyNulla = interner.intern(null, null); + final FeedKey feedKeyNullb = interner.intern(null, null); + final FeedKey feedKey11a = interner.intern("feed1", "type1"); + final FeedKey feedKey11b = interner.intern("feed1", "type1"); + final FeedKey feedKey12a = interner.intern("feed1", "type2"); + final FeedKey feedKey12b = interner.intern("feed1", "type2"); + final FeedKey feedKey21a = interner.intern("feed2", "type1"); + final FeedKey feedKey21b = interner.intern("feed2", "type1"); + final FeedKey feedKey22a = interner.intern("feed2", "type2"); + final FeedKey feedKey22b = interner.intern("feed2", "type2"); + + assertThat(feedKeyNulla) + .isSameAs(feedKeyNullb) + .isNotEqualTo(feedKey11a) + .isNotEqualTo(feedKey12a) + .isNotEqualTo(feedKey21a) + .isNotEqualTo(feedKey22a); + assertThat(feedKey11a) + .isSameAs(feedKey11b) + .isNotEqualTo(feedKey12a) + .isNotEqualTo(feedKey21a) + .isNotEqualTo(feedKey22a); + assertThat(feedKey12a) + .isSameAs(feedKey12b) + .isNotEqualTo(feedKey11a) + .isNotEqualTo(feedKey21a) + .isNotEqualTo(feedKey22a); + assertThat(feedKey21a) + .isSameAs(feedKey21b) + .isNotEqualTo(feedKey11a) + .isNotEqualTo(feedKey12a) + .isNotEqualTo(feedKey22a); + assertThat(feedKey22a) + .isSameAs(feedKey22b) + .isNotEqualTo(feedKey11a) + .isNotEqualTo(feedKey12a) + .isNotEqualTo(feedKey21a); + } + + @Disabled // Manual perf only + @Test + void testInternPerf() { + + final FeedKeyInterner feedKeyInterner = FeedKey.createInterner(); + for (int i = 1; i <= 2; i++) { + for (int j = 1; j <= 2; j++) { + feedKeyInterner.intern("feed" + i, "type" + j); + } + } + final int iter = 1_000_000; + final List feedKeys = new ArrayList<>(iter * 4); + + TestUtil.comparePerformance( + 5, + iter, + (rounds, iterations1) -> { + feedKeys.clear(); + }, + LOGGER::info, + TimedCase.of("intern", (round, iterations) -> { + for (long k = 0; k < iterations; k++) { + for (int i = 1; i <= 2; i++) { + for (int j = 1; j <= 2; j++) { + feedKeys.add(feedKeyInterner.intern("feed" + i, "type" + j)); + } + } + } + }), + TimedCase.of("Obj Creation", (round, iterations) -> { + for (long k = 0; k < iterations; k++) { + for (int i = 1; i <= 2; i++) { + for (int j = 1; j <= 2; j++) { + feedKeys.add(new FeedKey("feed" + i, "type" + j)); + } + } + } + })); + } } diff --git a/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatIECByteSize.java b/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatIECByteSize.java index adf83c7d51..1f787e5a86 100644 --- a/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatIECByteSize.java +++ b/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatIECByteSize.java @@ -47,6 +47,7 @@ @FunctionArg( name = "omitTrailingZeros", argType = ValBoolean.class, + defaultValue = "false", description = "Whether to omit trailing zeros (default false)")}, returnDescription = "A more human readable IEC representation of byte size.", description = "Convert a number of bytes into a more human readable form."), @@ -59,6 +60,7 @@ @FunctionArg( name = "omitTrailingZeros", argType = ValBoolean.class, + defaultValue = "false", description = "Whether to omit trailing zeros (default false)"), @FunctionArg( name = "significantFigures", diff --git a/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatMetricByteSize.java b/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatMetricByteSize.java index 5439d2c7b0..ce078b6d5a 100644 --- a/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatMetricByteSize.java +++ b/stroom-query/stroom-query-language/src/main/java/stroom/query/language/functions/FormatMetricByteSize.java @@ -46,6 +46,7 @@ description = "The number of bytes"), @FunctionArg( name = "omitTrailingZeros", + defaultValue = "false", argType = ValBoolean.class, description = "Whether to omit trailing zeros (default false)")}, returnDescription = "A more human readable metric representation of byte size.", @@ -59,6 +60,7 @@ @FunctionArg( name = "omitTrailingZeros", argType = ValBoolean.class, + defaultValue = "false", description = "Whether to omit trailing zeros (default false)"), @FunctionArg( name = "significantFigures", diff --git a/stroom-test-common/src/main/java/stroom/test/common/TestUtil.java b/stroom-test-common/src/main/java/stroom/test/common/TestUtil.java index c7fd26691d..ec2a119016 100644 --- a/stroom-test-common/src/main/java/stroom/test/common/TestUtil.java +++ b/stroom-test-common/src/main/java/stroom/test/common/TestUtil.java @@ -21,6 +21,9 @@ import org.junit.jupiter.api.DynamicTest; import org.mockito.Mockito; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; @@ -389,6 +392,27 @@ public static void comparePerformance(final int rounds, tableStr)); } + /** + * Will create the passed files as empty files, ensuring their parent directories exist first. + * Will throw if the file already exists. + */ + public static void createFiles(final Path... files) { + NullSafe.stream(files) + .forEach(file -> { + try { + final Path parent = Objects.requireNonNull( + file.getParent(), + file + " has no parent"); + Files.createDirectories(parent); + Files.createFile(file); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + } + // -------------------------------------------------------------------------------- diff --git a/stroom-util/src/main/java/stroom/util/io/FileUtil.java b/stroom-util/src/main/java/stroom/util/io/FileUtil.java index a8046da8be..5b19dad147 100644 --- a/stroom-util/src/main/java/stroom/util/io/FileUtil.java +++ b/stroom-util/src/main/java/stroom/util/io/FileUtil.java @@ -20,10 +20,13 @@ import stroom.util.logging.LambdaLoggerFactory; import stroom.util.logging.LogUtil; +import org.apache.commons.lang3.mutable.MutableLong; + import java.io.FileOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.channels.FileChannel; +import java.nio.file.DirectoryNotEmptyException; import java.nio.file.DirectoryStream; import java.nio.file.FileVisitOption; import java.nio.file.FileVisitResult; @@ -122,6 +125,68 @@ public static boolean isEmptyDirectory(final Path path) throws IOException { } } + /** + * Attempts to delete any empty directories found while walking the tree starting from rootDir. + * Will not delete rootDir. + * No exceptions will be thrown. It will only log errors. + */ + public static int deleteEmptyDirs(final Path rootDir) { + final MutableLong deleteCount = new MutableLong(); + try { + Files.walkFileTree(rootDir, new SimpleFileVisitor<>() { + final Set dirsWithFiles = new HashSet<>(); + + // + @Override + public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException { + // Mark all parent directories as having files + Path parent = file.getParent(); + while (parent != null && parent.startsWith(rootDir)) { + dirsWithFiles.add(parent); + parent = parent.getParent(); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(final Path file, final IOException exc) throws IOException { + // Sill an item in the dir even if we can't visit it + // Mark all parent directories as having files + Path parent = file.getParent(); + while (parent != null && parent.startsWith(rootDir)) { + dirsWithFiles.add(parent); + parent = parent.getParent(); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException { + if (!Files.isSameFile(rootDir, dir)) { + if (!dirsWithFiles.contains(dir)) { + try { + Files.delete(dir); + deleteCount.increment(); + } catch (final DirectoryNotEmptyException e) { + LOGGER.debug("deleteEmptyDirs() - Directory {} is not empty so cannot be deleted.", + dir); + } catch (final IOException e) { + LOGGER.error("Error while trying to delete directory {} - {}", + dir, LogUtil.exceptionMessage(e)); + } + } + } + return FileVisitResult.CONTINUE; + } + }); + } catch (final IOException e) { + // Swallow + LOGGER.error("Error walking directory {} - {}", rootDir, LogUtil.exceptionMessage(e), e); + } + LOGGER.debug("deleteEmptyDirs() - Deleted {} empty directories", deleteCount); + return deleteCount.intValue(); + } + private static void recursiveDelete(final Path path, final AtomicBoolean success) { try { Files.walkFileTree( diff --git a/stroom-util/src/test/java/stroom/util/io/TestFileUtil.java b/stroom-util/src/test/java/stroom/util/io/TestFileUtil.java index 261b0108e9..b7bd91b761 100644 --- a/stroom-util/src/test/java/stroom/util/io/TestFileUtil.java +++ b/stroom-util/src/test/java/stroom/util/io/TestFileUtil.java @@ -16,12 +16,14 @@ package stroom.util.io; +import stroom.test.common.DirectorySnapshot; +import stroom.test.common.DirectorySnapshot.Snapshot; +import stroom.test.common.TestUtil; import stroom.util.concurrent.SimpleExecutor; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; import org.apache.commons.lang3.RandomUtils; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -39,6 +41,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static stroom.test.common.DirectorySnapshot.PathFlag.DIRECTORY; +import static stroom.test.common.DirectorySnapshot.PathFlag.REGULAR_FILE; class TestFileUtil { @@ -162,9 +166,85 @@ void testDeepListContents(@TempDir final Path tempDir) throws IOException { final long fileCount = FileUtil.deepListContents(tempDir, false, isFilePredicate) .size(); - Assertions.assertThat(fileCount) + assertThat(fileCount) .isEqualTo(6); - Assertions.assertThat(totalSize) + assertThat(totalSize) .hasValue(6L * "XFileX".getBytes(StandardCharsets.UTF_8).length); } + + @Test + void testDeleteEmptyDirs_alreadyEmpty(@TempDir final Path tempDir) throws IOException { + final int cnt = FileUtil.deleteEmptyDirs(tempDir); + // Not deleted + assertThat(tempDir) + .isDirectory() + .exists(); + + assertThat(cnt) + .isZero(); + } + + @Test + void testDeleteEmptyDirs_justFiles(@TempDir final Path tempDir) throws IOException { + TestUtil.createFiles( + tempDir.resolve("file1"), + tempDir.resolve("file2"), + tempDir.resolve("file3"), + tempDir.resolve("file4"), + tempDir.resolve("file5")); + final int cnt = FileUtil.deleteEmptyDirs(tempDir); + // Not deleted + assertThat(tempDir) + .isDirectory() + .exists(); + + assertThat(cnt) + .isZero(); + } + + @Test + void testDeleteEmptyDirs(@TempDir final Path tempDir) throws IOException { + Files.createDirectories(tempDir.resolve("emptyDir1/emptySubDir1/emptySubSubDir1")); + Files.createDirectories(tempDir.resolve("emptyDir1/emptySubDir1/emptySubSubDir2")); + Files.createDirectories(tempDir.resolve("emptyDir1/emptySubDir2")); + Files.createDirectories(tempDir.resolve("nonEmptyDir1/emptySubDir3")); + Files.createDirectories(tempDir.resolve("nonEmptyDir1/nonEmptySubDir2/emptySubSubDir3")); + TestUtil.createFiles( + tempDir.resolve("nonEmptyDir1/nonEmptySubDir2/nonEmptySubSubDir2/file1"), + tempDir.resolve("nonEmptyDir1/nonEmptySubDir2/nonEmptySubSubDir2/file2"), + tempDir.resolve("nonEmptyDir1/nonEmptySubDir2/file3"), + tempDir.resolve("nonEmptyDir1/file4"), + tempDir.resolve("nonEmptyDir2/file5"), + tempDir.resolve("nonEmptyDir3/nonEmptySubDir3/file6")); + Files.createDirectories(tempDir.resolve("emptyDir2")); + Files.createDirectories(tempDir.resolve("emptyDir3")); + + Snapshot snapshot = DirectorySnapshot.of(tempDir); + LOGGER.debug("Snapshot of {}\n{}", tempDir, snapshot); + + final int cnt = FileUtil.deleteEmptyDirs(tempDir); + + snapshot = DirectorySnapshot.of(tempDir); + LOGGER.debug("Snapshot of {}\n{}", tempDir, snapshot); + + // Not deleted + assertThat(tempDir) + .isDirectory() + .exists(); + + assertThat(cnt) + .isEqualTo(9); + + assertThat(snapshot.stream() + .filter(pathSnapshot -> + pathSnapshot.flags().contains(REGULAR_FILE)) + .count()) + .isEqualTo(6); + + assertThat(snapshot.stream() + .filter(pathSnapshot -> + pathSnapshot.flags().contains(DIRECTORY)) + .count()) + .isEqualTo(6); + } } diff --git a/stroom-util/src/test/java/stroom/util/testshared/TestModelStringUtil.java b/stroom-util/src/test/java/stroom/util/testshared/TestModelStringUtil.java index 6bf368e44e..54a97052e2 100644 --- a/stroom-util/src/test/java/stroom/util/testshared/TestModelStringUtil.java +++ b/stroom-util/src/test/java/stroom/util/testshared/TestModelStringUtil.java @@ -206,7 +206,49 @@ Stream testFormatIECByteSizeString() { .addCase(10_240L, "10K") .addCase(20_508_468_838L, "19G") .addCase(9_878_424_780L, "9.2G") + .build(); + } + + @TestFactory + Stream testFormatIECByteSizeString2() { + return TestUtil.buildDynamicTestStream() + .withInputType(Long.class) + .withOutputType(String.class) + .withTestFunction(testCase -> + ModelStringUtil.formatIECByteSizeString(testCase.getInput(), true)) + .withSimpleEqualityAssertion() + .addCase(1L, "1B") + .addCase(999L, "999B") + .addCase(1_024L, "1K") + .addCase(1_126L, "1.1K") // 1.099K + .addCase(1_127L, "1.1K") + .addCase(1_946L, "1.9K") + .addCase(10_240L, "10K") + .addCase(1024 * 1024L, "1M") + .addCase(20_508_468_838L, "19G") + .addCase(9_878_424_780L, "9.2G") + .build(); + } + @TestFactory + Stream testFormatIECByteSizeString3() { + return TestUtil.buildDynamicTestStream() + .withInputType(Long.class) + .withOutputType(String.class) + .withTestFunction(testCase -> + ModelStringUtil.formatIECByteSizeString( + testCase.getInput(), false, 6)) + .withSimpleEqualityAssertion() + .addCase(1L, "1.0B") + .addCase(999L, "999.0B") + .addCase(1_024L, "1.0K") + .addCase(1_126L, "1.09961K") + .addCase(1_127L, "1.10059K") + .addCase(1_946L, "1.90039K") + .addCase(10_240L, "10.0K") + .addCase(1024 * 1024L, "1.0M") + .addCase(20_508_468_838L, "19.1000G") + .addCase(9_878_424_780L, "9.20000G") .build(); } diff --git a/unreleased_changes/20251001_163422_854__0.md b/unreleased_changes/20251001_163422_854__0.md new file mode 100644 index 0000000000..791728bbd2 --- /dev/null +++ b/unreleased_changes/20251001_163422_854__0.md @@ -0,0 +1,55 @@ +* Add a proxy zip file ingest mechanism to proxy. Add property branch `proxyConfig.dirScanner` to the config. + + +```sh +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. + + +# -------------------------------------------------------------------------------- +# The following is random text to make this file unique for git's change detection +# spAPR8oqCE6U1zDi6pdaCxb6E3z3rlsD5YjVlsEh6HeSYMV3Up61piPDp8ZxIOsBFtVSglXJLB916dMS +# WcKUeG8kiDDKnCKvizrw6g9TzaKRKb75WYYE1c7Wi8ga01IZug2cWtCltooxpLnSPn5Zpau05JxvfXim +# MebnEPrwUUMWxLMfS2vvAqvKujk0dNFRRaOKeXfvNOerotsdEOym6yxfhcVKThUoZTLhNiUcdBRThNOr +# 48gSbf2SlaFEQhr0ovX54T5U4qoUpUZ3Y0cDUyRxn2sDUAgA0C9cGveBdeBuKXtVGp2NxyYx7scL9yot +# YnkoUopgBKVLwmITGKWNgouvc13l1zwtN1RnkEMTwfZW8n73XYOEV4rOwhmzcKfGer41dyZH8mr2F4HS +# CpHmYuYJYRtqkX58yTv1wLyPlNWwWCGM8Vx6hrFfozDFwNPYRCqE1DKdN9RoqVyStJH4564DdoP4BXEQ +# T1rLccpV8AfqVjNz5XnZWnXFNFxlraF9CUmSYvHGmr0qzSw4qK1Q6JfruuVOBXaSuNQoPdRMSKbem7c3 +# PbTaSerX4ptvmt3ge146gVfPqp7BCHFoFOVTb8z0vRjx6uQEu3ASY3b7EYxthqJQBP6vKFcOIobe0RUJ +# S0nnLN8516Tsdp2ZOI3ID3hDJMz13KaR7tWIqZnI8MsQqqvnl8wvJHIrimfZuaIa4dBMK2bHYukvgcsU +# RlnR41QgkKgCX7fliD9nIs9PEU0ClkE5lFyYBl1xIHLe1Hxm4ICAjQpYH8aqKaAUQZXBuDJ9E77LJFa6 +# Pi1nZ4KfE0u6mZ2sUMeLTeIKRyrJhK2HYXsk63sJuH4pE3gHU1myY7NL9pEAs1VnIaSJOFW4N12PY16K +# WSaapOpPNqA3MdfSCtB6abt6h4IjQzZSgBG7Qs6AXBnOCBSvGv3QOOpkxYDfCBVqaxvceWMe0aECZ5v4 +# 24SPhPH5NBOlcR6qSU6qCrzMmufkNBPuC5KOcDNPZCnDAq3UUdV84QdTU3HXCEB0GQ6Pup6eZ8JIQltz +# K47YQBJtUo2fpgDTTjV50C8Yp34TfJA8dRqHV2AV4KbdVBMbF7zTc9CnQwaCLFhLNsujO29G6cmkCetJ +# 2dRmxroLTrb90CVOW5Xup8Dz4LjTQUtOjbTNL3Sxhmn99GUWF6er3fzdyvF0NBf6U2rJg1rVhcLbNscK +# drZknND30zJoI84tZOxrBElEo3CbC5WQF9t2UpaRz26bYbUDuuptzHy7Axy7XpoB0PHYcXAEcLwQe9mG +# oYB79metXbjaJqwhR5dHt7Jncf2RW2Lqwb9B8WmT8RZNTzQHa4SlEFtFjCHAKNeBIeUvuakFVBSLcacU +# 6gEuczLZrEQM9JNdCaQWvJ3kmWOT6W91iRFGwSv71fhV6lsMcNE7PTdlfCZpOPqqocWNR881rgKoGCfV +# j7nlHT2Uo5EmIQ9YgSC2YGBF6nqclZks6vNSjXUGbSnhUhWQOCCUCNdsGKE52V84tCrHLWAFLHxp8vdG +# LO4HdiJ0LngljSO3M1uMdbzf0IAJIIFq8xIBq80ZF4922LkBhQveqFbef4u4XGgtiTnWlyiJrCyi3Mpu +# iDp0SxLwyFFUxNrcp7tfkb77OfcRGRMT6JEOamaRXtkM5E7sAKSTD12dAez9VmwHHEH0QAPTo5J46TdB +# dZJIUqP6qWoZmUYFjgOMLJe3pV9hCVwTEE16akv9J4PFyLYwxntZRCkSRz7OnSsjLeFz5PbToFBPeyTd +# I8pwnuORWtDZGaXDc0P4mwsvIOcpBPrGFBxTHIMzZqjAspGRRUILYmmzVqHHiiu4NI7gY5wasJ9LJsts +# NBOoSqT4NpRW0YKIj8uVP7GvsnuNt3dehDn6h6WNbTQ6oa7N8GU7qYoUJvRelCViXOY3klvIFXPSyXOe +# CIBICoMk65JQIi91r6H1SUD5hYeNRqkrdHwFVC77px1YtBiufWIgasruqvrNSUaIYukHnQIifKtuP03Y +# uhPd7GXGZggTOgvGqKcurKmJyrz6r1Fo5DP1SfBC5gza3rwRayHqiRveO1zfM4vPlO1U8Rwj0IGX61eY +# fkgk9bbtpJRR7dwvHNLJ9ggMT8vjHrdm654htNZA2iyfT3pb0tparHke0dKqjtFOd3MGjkV09xU1fkRl +# JxFx9ZnUvjwG8xlO9nbkjfvpVVDmOIbqLnLSnv32Slyu1FzvLPnmxEIYqe6pZFm70wj9Cppz3PKICYYl +# b4JW1Q6bI2n6OVGrWj3Cw40K3PxlwEvbSDZrrjQ6PW1R6MF7iQhENpNyVjpZLwiz7Q6D1FhUPqLAvzV6 +# KEbBlvrVncJzupUWw1aPmmp8mopiqStefyrMPaAG7V6OkUM2r5dDuB5ksvssydO3VHTEbACvIdJ4vZ3W +# -------------------------------------------------------------------------------- + +```