diff --git a/stroom-config/stroom-config-app/src/test/resources/stroom/config/app/expected.yaml b/stroom-config/stroom-config-app/src/test/resources/stroom/config/app/expected.yaml index b05b20352ce..2e3b82bf171 100644 --- a/stroom-config/stroom-config-app/src/test/resources/stroom/config/app/expected.yaml +++ b/stroom-config/stroom-config-app/src/test/resources/stroom/config/app/expected.yaml @@ -701,6 +701,8 @@ appConfig: receive: authenticationRequired: true certificateAuthenticationEnabled: true + dataFeedKeysDir: "data_feed_keys" + datafeedKeyAuthenticationEnabled: false metaTypes: - "Context" - "Raw Reference" diff --git a/stroom-core/src/main/java/stroom/core/receive/AttributeMapFilterFactory.java b/stroom-core/src/main/java/stroom/core/receive/AttributeMapFilterFactory.java index 838733c1e5f..3abbf5239a7 100644 --- a/stroom-core/src/main/java/stroom/core/receive/AttributeMapFilterFactory.java +++ b/stroom-core/src/main/java/stroom/core/receive/AttributeMapFilterFactory.java @@ -18,53 +18,78 @@ import stroom.docref.DocRef; import stroom.receive.common.AttributeMapFilter; +import stroom.receive.common.DataFeedKeyService; import stroom.receive.common.DataReceiptPolicyAttributeMapFilterFactory; import stroom.receive.common.FeedStatusAttributeMapFilter; import stroom.receive.common.ReceiveDataConfig; import stroom.receive.rules.shared.ReceiveDataRules; +import stroom.util.NullSafe; +import stroom.util.concurrent.PeriodicallyUpdatedValue; import jakarta.inject.Inject; import jakarta.inject.Provider; import jakarta.inject.Singleton; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; @Singleton public class AttributeMapFilterFactory { - private final Provider receiveDataConfigProvider; private final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory; private final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter; - - private volatile AttributeMapFilter attributeMapFilter; - private final AtomicReference lastPolicyUuid = new AtomicReference<>(); + private final DataFeedKeyService dataFeedKeyService; + private final PeriodicallyUpdatedValue updatableAttributeMapFilter; @Inject public AttributeMapFilterFactory( final Provider receiveDataConfigProvider, final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory, - final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter) { + final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter, + final DataFeedKeyService dataFeedKeyService) { - this.receiveDataConfigProvider = receiveDataConfigProvider; this.dataReceiptPolicyAttributeMapFilterFactory = dataReceiptPolicyAttributeMapFilterFactory; this.feedStatusAttributeMapFilter = feedStatusAttributeMapFilter; + this.dataFeedKeyService = dataFeedKeyService; + + // Every 60s, see if config has changed and if so create a new filter + this.updatableAttributeMapFilter = new PeriodicallyUpdatedValue<>( + Duration.ofSeconds(60), + this::create, + () -> ConfigState.fromConfig(receiveDataConfigProvider.get())); } - public AttributeMapFilter create() { - final String receiptPolicyUuid = receiveDataConfigProvider.get().getReceiptPolicyUuid(); - final String last = lastPolicyUuid.get(); - if (attributeMapFilter == null || !Objects.equals(last, receiptPolicyUuid)) { - lastPolicyUuid.compareAndSet(last, receiptPolicyUuid); - - if (receiptPolicyUuid != null && receiptPolicyUuid.length() > 0) { - attributeMapFilter = dataReceiptPolicyAttributeMapFilterFactory.create( - new DocRef(ReceiveDataRules.DOCUMENT_TYPE, receiptPolicyUuid)); - } else { - attributeMapFilter = feedStatusAttributeMapFilter; - } + private AttributeMapFilter create(final ConfigState configState) { + + final List filters = new ArrayList<>(); + if (configState.isDatafeedKeyAuthenticationEnabled) { + filters.add(dataFeedKeyService); } + if (NullSafe.isNonEmptyString(configState.policyUuid)) { + filters.add(dataReceiptPolicyAttributeMapFilterFactory.create( + new DocRef(ReceiveDataRules.DOCUMENT_TYPE, configState.policyUuid))); + } + filters.add(feedStatusAttributeMapFilter); + return AttributeMapFilter.wrap(filters); + } + + public AttributeMapFilter create() { + return updatableAttributeMapFilter.getValue(); + } + + + // -------------------------------------------------------------------------------- - return attributeMapFilter; + + private record ConfigState( + String policyUuid, + boolean isDatafeedKeyAuthenticationEnabled) { + + public static ConfigState fromConfig(final ReceiveDataConfig receiveDataConfig) { + return new ConfigState( + receiveDataConfig.getReceiptPolicyUuid(), + receiveDataConfig.isDatafeedKeyAuthenticationEnabled()); + } } } diff --git a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java index ccbbc177f1f..48f5a0d6796 100755 --- a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java +++ b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java @@ -31,6 +31,7 @@ import stroom.receive.common.StroomStreamProcessor; import stroom.receive.common.StroomStreamStatus; import stroom.security.api.SecurityContext; +import stroom.security.api.UserIdentity; import stroom.task.api.TaskContextFactory; import stroom.task.api.TaskProgressHandler; import stroom.util.NullSafe; @@ -91,13 +92,13 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r // Authenticate the request using token or cert depending on configuration // Adds sender details to the attributeMap - requestAuthenticator.authenticate(request, attributeMap); + final UserIdentity userIdentity = requestAuthenticator.authenticate(request, attributeMap); // Validate the supplied attributes. AttributeMapValidator.validate(attributeMap, metaService::getTypes); final String feedName; - if (attributeMapFilter.filter(attributeMap)) { + if (attributeMapFilter.filter(attributeMap, userIdentity)) { debug("Receiving data", attributeMap); feedName = NullSafe.trim(attributeMap.get(StandardHeaderArguments.FEED)); diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java index ee7f8297f5d..00bcd4e24d0 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java @@ -82,7 +82,7 @@ public String process(final HttpServletRequest request, receiveDataConfig::getMetaTypes); // Test to see if we are going to accept this stream or drop the data. - if (attributeMapFilter.filter(attributeMap)) { + if (attributeMapFilter.filter(attributeMap, userIdentity)) { consumeHandler.handle(request, attributeMap, requestUuid); } else { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/AttributeMapFilterFactory.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/AttributeMapFilterFactory.java index f6174630a0e..d37bb2f6c60 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/AttributeMapFilterFactory.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/AttributeMapFilterFactory.java @@ -18,48 +18,104 @@ import stroom.docref.DocRef; import stroom.receive.common.AttributeMapFilter; +import stroom.receive.common.DataFeedKeyService; import stroom.receive.common.DataReceiptPolicyAttributeMapFilterFactory; import stroom.receive.common.FeedStatusAttributeMapFilter; -import stroom.receive.common.PermissiveAttributeMapFilter; import stroom.receive.common.ReceiveDataConfig; import stroom.receive.rules.shared.ReceiveDataRules; +import stroom.util.NullSafe; +import stroom.util.concurrent.PeriodicallyUpdatedValue; import jakarta.inject.Inject; import jakarta.inject.Provider; import jakarta.inject.Singleton; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + @Singleton public class AttributeMapFilterFactory { private static final Logger LOGGER = LoggerFactory.getLogger(AttributeMapFilterFactory.class); - private final AttributeMapFilter attributeMapFilter; + private final Provider receiveDataConfigProvider; + private final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory; + private final Provider feedStatusConfigProvider; + private final Provider remoteFeedStatusServiceProvider; + private final DataFeedKeyService dataFeedKeyService; + + private final PeriodicallyUpdatedValue updatableAttributeMapFilter; @Inject public AttributeMapFilterFactory( - final ReceiveDataConfig receiveDataConfig, - final FeedStatusConfig feedStatusConfig, + final Provider receiveDataConfigProvider, + final Provider feedStatusConfigProvider, final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory, - final Provider remoteFeedStatusServiceProvider) { - - if (StringUtils.isNotBlank(receiveDataConfig.getReceiptPolicyUuid())) { - LOGGER.info("Using data receipt policy to filter received data"); - attributeMapFilter = dataReceiptPolicyAttributeMapFilterFactory.create( - new DocRef(ReceiveDataRules.DOCUMENT_TYPE, receiveDataConfig.getReceiptPolicyUuid())); - } else if (StringUtils.isNotBlank(feedStatusConfig.getFeedStatusUrl())) { - LOGGER.info("Using remote feed status service to filter received data"); + final Provider remoteFeedStatusServiceProvider, + final DataFeedKeyService dataFeedKeyService) { + + this.receiveDataConfigProvider = receiveDataConfigProvider; + this.feedStatusConfigProvider = feedStatusConfigProvider; + this.dataReceiptPolicyAttributeMapFilterFactory = dataReceiptPolicyAttributeMapFilterFactory; + this.remoteFeedStatusServiceProvider = remoteFeedStatusServiceProvider; + this.dataFeedKeyService = dataFeedKeyService; + + // Every 60s, see if config has changed and if so create a new filter + this.updatableAttributeMapFilter = new PeriodicallyUpdatedValue<>( + Duration.ofSeconds(60), + this::create, + () -> ConfigState.fromConfig( + receiveDataConfigProvider.get(), + feedStatusConfigProvider.get())); + } + + private AttributeMapFilter create(final ConfigState configState) { + final List filters = new ArrayList<>(); + + if (configState.isDatafeedKeyAuthenticationEnabled()) { + LOGGER.debug("Adding data feed key filter"); + filters.add(dataFeedKeyService); + } + + if (NullSafe.isNonBlankString(configState.receiptPolicyUuid)) { + LOGGER.debug("Adding data receipt policy filter"); + filters.add(dataReceiptPolicyAttributeMapFilterFactory.create( + new DocRef(ReceiveDataRules.DOCUMENT_TYPE, configState.receiptPolicyUuid))); + } + + if (NullSafe.isNonBlankString(configState.feedStatusUrl)) { + LOGGER.debug("Adding remote feed status service filter"); final RemoteFeedStatusService remoteFeedStatusService = remoteFeedStatusServiceProvider.get(); - attributeMapFilter = new FeedStatusAttributeMapFilter(remoteFeedStatusService); - } else { - LOGGER.info("Permitting receipt of all data"); - attributeMapFilter = new PermissiveAttributeMapFilter(); + filters.add(new FeedStatusAttributeMapFilter(remoteFeedStatusService)); } + + return AttributeMapFilter.wrap(filters); } public AttributeMapFilter create() { - return attributeMapFilter; + return updatableAttributeMapFilter.getValue(); + } + + + // -------------------------------------------------------------------------------- + + + private record ConfigState( + String receiptPolicyUuid, + boolean isDatafeedKeyAuthenticationEnabled, + String feedStatusUrl) { + + public static ConfigState fromConfig( + final ReceiveDataConfig receiveDataConfig, + final FeedStatusConfig feedStatusConfig) { + + return new ConfigState( + receiveDataConfig.getReceiptPolicyUuid(), + receiveDataConfig.isDatafeedKeyAuthenticationEnabled(), + feedStatusConfig.getFeedStatusUrl()); + } } } diff --git a/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml b/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml index c119ad37ddd..9e300a14eb7 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml +++ b/stroom-proxy/stroom-proxy-app/src/test/resources/stroom/dist/proxy-expected.yaml @@ -86,6 +86,8 @@ proxyConfig: receive: authenticationRequired: true certificateAuthenticationEnabled: true + dataFeedKeysDir: "data_feed_keys" + datafeedKeyAuthenticationEnabled: false metaTypes: - "Context" - "Raw Reference" diff --git a/stroom-proxy/stroom-proxy-remote-api/src/main/java/stroom/proxy/StroomStatusCode.java b/stroom-proxy/stroom-proxy-remote-api/src/main/java/stroom/proxy/StroomStatusCode.java index 8a570dd181e..1fbcf4e84ec 100644 --- a/stroom-proxy/stroom-proxy-remote-api/src/main/java/stroom/proxy/StroomStatusCode.java +++ b/stroom-proxy/stroom-proxy-remote-api/src/main/java/stroom/proxy/StroomStatusCode.java @@ -21,6 +21,10 @@ public enum StroomStatusCode { FEED_IS_NOT_SET_TO_RECEIVED_DATA(HttpServletResponse.SC_NOT_ACCEPTABLE, 110, "Feed is not set to receive data", "The feed you have provided has not been setup to receive data"), + INVALID_FEED_NAME(HttpServletResponse.SC_NOT_ACCEPTABLE, 111, + "Feed is not valid", + "The feed you have provided does not match an agreed pattern"), + UNEXPECTED_DATA_TYPE(HttpServletResponse.SC_NOT_ACCEPTABLE, 120, "Unexpected data type", "The data type supplied is not expected"), @@ -57,11 +61,17 @@ public enum StroomStatusCode { "Client Token or Certificate failed authentication", "The provided client token or certificate cannot be authorised"), + DATA_FEED_KEY_NOT_AUTHENTICATED( + HttpServletResponse.SC_UNAUTHORIZED, + 313, + "Data feed key failed authentication", + "The provided data feed key cannot be authorised"), + COMPRESSED_STREAM_INVALID(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 400, "Compressed stream invalid", "The stream of data sent does not form a valid compressed file. Maybe it terminated " + - "unexpectedly or is corrupt."), + "unexpectedly or is corrupt."), UNKNOWN_ERROR( HttpServletResponse.SC_INTERNAL_SERVER_ERROR, @@ -117,7 +127,7 @@ public static void main(String[] args) { for (StroomStatusCode stroomStatusCode : StroomStatusCode.values()) { System.out.println("|-"); System.out.println("|" + stroomStatusCode.getHttpCode() + "||" + stroomStatusCode.getCode() + "||" - + stroomStatusCode.getMessage() + "||" + stroomStatusCode.getReason()); + + stroomStatusCode.getMessage() + "||" + stroomStatusCode.getReason()); } System.out.println("|}"); diff --git a/stroom-receive/stroom-receive-common/build.gradle b/stroom-receive/stroom-receive-common/build.gradle index d29548f8ffc..ff295a275f6 100644 --- a/stroom-receive/stroom-receive-common/build.gradle +++ b/stroom-receive/stroom-receive-common/build.gradle @@ -14,6 +14,8 @@ dependencies { implementation project(':stroom-util-shared') implementation project(':stroom-util') + implementation libs.bcrypt + implementation libs.bouncy_castle implementation libs.commons_compress implementation libs.dropwizard_metrics_annotation implementation libs.dropwizard_metrics_healthchecks diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AttributeMapFilter.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AttributeMapFilter.java index 866351faa17..668f0bd3587 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AttributeMapFilter.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AttributeMapFilter.java @@ -18,8 +18,79 @@ package stroom.receive.common; import stroom.meta.api.AttributeMap; +import stroom.security.api.UserIdentity; +import stroom.util.NullSafe; + +import java.util.List; public interface AttributeMapFilter { - boolean filter(AttributeMap attributeMap); + boolean filter(AttributeMap attributeMap, final UserIdentity userIdentity); + + /** + * Combine multiple filters into a single filter. Each one will be called in turn + * until one returns false. + * If null or empty, a permissive filter will be returned. + */ + static AttributeMapFilter wrap(final AttributeMapFilter... attributeMapFilters) { + return wrap(NullSafe.asList(attributeMapFilters)); + } + + /** + * Combine multiple filters into a single filter. Each one will be called in turn + * until one returns false. + * If null or empty, a permissive filter will be returned. + */ + static AttributeMapFilter wrap(final List attributeMapFilters) { + if (NullSafe.isEmptyCollection(attributeMapFilters)) { + return PermissiveAttributeMapFilter.getInstance(); + } else if (attributeMapFilters.size() == 1 && attributeMapFilters.getFirst() != null) { + return attributeMapFilters.getFirst(); + } else { + return new MultiAttributeMapFilter(attributeMapFilters); + } + } + + + // -------------------------------------------------------------------------------- + + + class MultiAttributeMapFilter implements AttributeMapFilter { + + private final List attributeMapFilters; + + private MultiAttributeMapFilter(final List attributeMapFilters) { + if (NullSafe.isEmptyCollection(attributeMapFilters)) { + throw new IllegalArgumentException("Null or empty attributeMapFilters"); + } + this.attributeMapFilters = attributeMapFilters; + } + +// public static AttributeMapFilter wrap(final AttributeMapFilter... attributeMapFilters) { +// return wrap(NullSafe.asList(attributeMapFilters)); +// } +// +// public static AttributeMapFilter wrap(final List attributeMapFilters) { +// if (NullSafe.isEmptyCollection(attributeMapFilters)) { +// return PermissiveAttributeMapFilter.getInstance(); +// } else if (attributeMapFilters.size() == 1 && attributeMapFilters.getFirst() != null) { +// return attributeMapFilters.getFirst(); +// } else { +// return new MultiAttributeMapFilter(attributeMapFilters); +// } +// } + + @Override + public boolean filter(final AttributeMap attributeMap, final UserIdentity userIdentity) { + for (final AttributeMapFilter attributeMapFilter : attributeMapFilters) { + if (attributeMapFilter != null) { + final boolean filterResult = attributeMapFilter.filter(attributeMap, userIdentity); + if (!filterResult) { + return false; + } + } + } + return true; + } + } } diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AuthenticatorFilter.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AuthenticatorFilter.java new file mode 100644 index 00000000000..f8d8810392b --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/AuthenticatorFilter.java @@ -0,0 +1,67 @@ +package stroom.receive.common; + +import stroom.meta.api.AttributeMap; +import stroom.security.api.UserIdentity; +import stroom.util.NullSafe; + +import jakarta.servlet.http.HttpServletRequest; + +import java.util.List; +import java.util.Optional; + +public interface AuthenticatorFilter { + + /** + * Filter that returns no identity, i.e. the request fails authentication. + */ + AuthenticatorFilter UNAUTHENTICATED_FILTER = (request, attributeMap) -> + Optional.empty(); + + Optional authenticate(final HttpServletRequest request, + final AttributeMap attributeMap); + + static AuthenticatorFilter wrap(final AuthenticatorFilter... attributeMapFilters) { + return wrap(NullSafe.asList(attributeMapFilters)); + } + + static AuthenticatorFilter wrap(final List attributeMapFilters) { + if (NullSafe.isEmptyCollection(attributeMapFilters)) { + return UNAUTHENTICATED_FILTER; + } else if (attributeMapFilters.size() == 1 && attributeMapFilters.getFirst() != null) { + return attributeMapFilters.getFirst(); + } else { + return new MultiAuthenticatorFilter(attributeMapFilters); + } + } + + + // -------------------------------------------------------------------------------- + + + class MultiAuthenticatorFilter implements AuthenticatorFilter { + + private final List authenticatorFilters; + + private MultiAuthenticatorFilter(final List authenticatorFilters) { + if (NullSafe.isEmptyCollection(authenticatorFilters)) { + throw new IllegalArgumentException("Null or empty authenticatorFilters"); + } + this.authenticatorFilters = authenticatorFilters; + } + + @Override + public Optional authenticate(final HttpServletRequest request, + final AttributeMap attributeMap) { + for (final AuthenticatorFilter authenticatorFilter : authenticatorFilters) { + if (authenticatorFilter != null) { + final Optional optUserIdentity = authenticatorFilter.authenticate( + request, attributeMap); + if (optUserIdentity.isPresent()) { + return optUserIdentity; + } + } + } + return Optional.empty(); + } + } +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKey.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKey.java new file mode 100644 index 00000000000..3db09f236ec --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKey.java @@ -0,0 +1,148 @@ +package stroom.receive.common; + +import stroom.util.NullSafe; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@JsonPropertyOrder(alphabetic = true) +public class DataFeedKey { + + @JsonProperty + @JsonPropertyDescription("The hash of the datafeed key. Hashed using hashAlgorithm.") + private final String hash; + + @JsonProperty + @JsonPropertyDescription("The hash algorithm used to hash the datafeed key.") + private final String hashAlgorithm; + + @JsonProperty + @JsonPropertyDescription("The unique subject ID of the user associated with the datafeed key.") + private final String subjectId; + + @JsonProperty + @JsonPropertyDescription("A more human friendly display form of the user identity. May be null.") + private final String displayName; + + @JsonProperty + @JsonPropertyDescription( + "A list of case sensitive regular expression patterns that will be used to verify the " + + "'Feed' header on data receipt. Only feeds matching one of these patterns will be accepted.") + private final List feedRegexPatterns; + + @JsonProperty + @JsonPropertyDescription("A map of stream attribute key/value pairs.") + private final Map streamMetaData; + + @JsonProperty + @JsonPropertyDescription("The date the key expires, expressed as milliseconds since the unix epoch.") + private final long expiryDateEpochMs; + + @JsonCreator + public DataFeedKey(@JsonProperty("hash") final String hash, + @JsonProperty("hashAlgorithm") final String hashAlgorithm, + @JsonProperty("subjectId") final String subjectId, + @JsonProperty("displayName") final String displayName, + @JsonProperty("feedRegexPatterns") final List feedRegexPatterns, + @JsonProperty("streamMetaData") final Map streamMetaData, + @JsonProperty("expiryDateEpochMs") final long expiryDateEpochMs) { + this.hash = hash; + this.hashAlgorithm = hashAlgorithm; + this.subjectId = subjectId; + this.displayName = displayName; + this.feedRegexPatterns = feedRegexPatterns; + this.streamMetaData = streamMetaData; + this.expiryDateEpochMs = expiryDateEpochMs; + } + + @NotBlank + public String getHash() { + return hash; + } + + @NotBlank + public String getHashAlgorithm() { + return hashAlgorithm; + } + + @NotBlank + public String getSubjectId() { + return subjectId; + } + + /** + * May be null. + */ + public String getDisplayName() { + return displayName; + } + + public List getFeedRegexPatterns() { + return NullSafe.list(feedRegexPatterns); + } + + public Map getStreamMetaData() { + return NullSafe.map(streamMetaData); + } + + @Min(0) + public long getExpiryDateEpochMs() { + return expiryDateEpochMs; + } + + @JsonIgnore + public Instant getExpiryDate() { + return Instant.ofEpochMilli(expiryDateEpochMs); + } + + @JsonIgnore + public boolean isExpired() { + return Instant.now().isAfter(getExpiryDate()); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final DataFeedKey that = (DataFeedKey) object; + return Objects.equals(hash, that.hash) + && Objects.equals(hashAlgorithm, that.hashAlgorithm) + && Objects.equals(subjectId, that.subjectId) + && Objects.equals(displayName, that.displayName) + && Objects.equals(feedRegexPatterns, that.feedRegexPatterns) + && Objects.equals(streamMetaData, that.streamMetaData) + && expiryDateEpochMs == that.expiryDateEpochMs; + } + + @Override + public int hashCode() { + return Objects.hash(hash, hashAlgorithm, subjectId, feedRegexPatterns, streamMetaData, expiryDateEpochMs); + } + + @Override + public String toString() { + return "DatafeedKey{" + + "hash='" + hash + '\'' + + ", hashAlgorithm='" + hashAlgorithm + '\'' + + ", subjectId='" + subjectId + '\'' + + ", displayName='" + displayName + '\'' + + ", feedRegexPatterns=" + feedRegexPatterns + + ", streamMetaData=" + streamMetaData + + ", expiryDateEpochMs=" + expiryDateEpochMs + + '}'; + } +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyService.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyService.java new file mode 100644 index 00000000000..2f03ac1395f --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyService.java @@ -0,0 +1,13 @@ +package stroom.receive.common; + +import stroom.meta.api.AttributeMap; + +import jakarta.servlet.http.HttpServletRequest; + +import java.util.Optional; + +public interface DataFeedKeyService extends AttributeMapFilter, AuthenticatorFilter { + + Optional getDataFeedKey(final HttpServletRequest request, + final AttributeMap attributeMap); +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyServiceImpl.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyServiceImpl.java new file mode 100644 index 00000000000..d421bcf021b --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyServiceImpl.java @@ -0,0 +1,425 @@ +package stroom.receive.common; + +import stroom.docref.HasDisplayValue; +import stroom.meta.api.AttributeMap; +import stroom.meta.api.StandardHeaderArguments; +import stroom.proxy.StroomStatusCode; +import stroom.security.api.UserIdentity; +import stroom.util.NullSafe; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; +import stroom.util.logging.LogUtil; +import stroom.util.string.Base58; + +import com.google.common.base.Strings; +import jakarta.inject.Inject; +import jakarta.inject.Provider; +import jakarta.inject.Singleton; +import jakarta.servlet.http.HttpServletRequest; +import org.bouncycastle.crypto.generators.Argon2BytesGenerator; +import org.bouncycastle.crypto.params.Argon2Parameters; +import org.bouncycastle.crypto.params.Argon2Parameters.Builder; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Singleton +public class DataFeedKeyServiceImpl + implements DataFeedKeyService { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(DataFeedKeyServiceImpl.class); + + private static final String AUTHORIZATION_HEADER = "Authorization"; + public static final String BEARER_PREFIX = "Bearer "; + private static final Pattern DATA_FEED_KEY_PATTERN = Pattern.compile( + "^sdk_[0-9]{3}_[A-HJ-NP-Za-km-z1-9]{128}$"); + + private final Provider receiveDataConfigProvider; + + // Holds all the keys read from the data feed key files, entries are evicted when + // the DataFeedKey has passed its expiry date. + private final Map cacheKeyTodataFeedKeyMap = new ConcurrentHashMap<>(); + private final Map subjectIdToDataFeedKeyMap = new ConcurrentHashMap<>(); + + // TODO replace with cache + private final Map feedPatternCache = new HashMap<>(); + + // TODO replace with cache + // Cache of the un-hashed key to validated DataFeedKey, to save us the hashing cost + private final Map> keyToDataFeedKeyMap = new HashMap<>(); + + + private final Map hashFunctionMap = new EnumMap<>( + DataFeedKeyHashAlgorithm.class); + + @Inject + public DataFeedKeyServiceImpl(final Provider receiveDataConfigProvider) { + this.receiveDataConfigProvider = receiveDataConfigProvider; + + hashFunctionMap.put(DataFeedKeyHashAlgorithm.ARGON2, new Argon2DataFeedKeyHasher()); +// hashFunctionMap.put(DataFeedKeyHashAlgorithm.BCRYPT, new BCryptApiKeyHasher()); + } + + @Override + public Optional getDataFeedKey(final HttpServletRequest request, + final AttributeMap attributeMap) { + Objects.requireNonNull(request); + final Optional optDataFeedKey = getAttribute(attributeMap, AUTHORIZATION_HEADER) + .map(str -> { + final String key; + if (str.startsWith(BEARER_PREFIX)) { + // This chops out 'Bearer' so we get just the token. + key = str.substring(BEARER_PREFIX.length()); + } else { + key = str; + } + LOGGER.debug(() -> + "Found Authorization in request:\n" + str); + return key; + }) + .flatMap((String key2) -> lookupKey(key2, attributeMap)); + + optDataFeedKey.ifPresent(dataFeedKey -> { + + validateDataFeedKeyExpiry(dataFeedKey, attributeMap); + + // Add the user identity to the attributeMap so that we can use it for request filtering + // later + NullSafe.consume(dataFeedKey.getSubjectId(), id -> + attributeMap.put(StandardHeaderArguments.UPLOAD_USER_ID, id)); + NullSafe.consume(dataFeedKey.getDisplayName(), username -> + attributeMap.put(StandardHeaderArguments.UPLOAD_USERNAME, username)); + }); + + // Remove authorization header from attributes as it should not be stored or forwarded on. + attributeMap.remove(AUTHORIZATION_HEADER); + + return optDataFeedKey; + } + + private void addDataFeedKeys(final DataFeedKeys dataFeedKeys) { + if (dataFeedKeys != null) { + dataFeedKeys.getDataFeedKeys() + .forEach(this::addDataFeedKey); + } + } + + private void addDataFeedKey(final DataFeedKey dataFeedKey) { + if (dataFeedKey != null) { + final String hash = dataFeedKey.getHash(); + final String hashAlgorithmName = dataFeedKey.getHashAlgorithm(); + final DataFeedKeyHashAlgorithm hashAlgorithm = DataFeedKeyHashAlgorithm.fromDisplayValue( + hashAlgorithmName); + CacheKey cacheKey = new CacheKey(hashAlgorithm, hash); + cacheKeyTodataFeedKeyMap.put(cacheKey, dataFeedKey); + subjectIdToDataFeedKeyMap.put(dataFeedKey.getSubjectId(), dataFeedKey); + } + } + + private void evictExpiredKeys() { + cacheKeyTodataFeedKeyMap.entrySet().removeIf(entry -> + entry.getValue().isExpired()); + subjectIdToDataFeedKeyMap.entrySet().removeIf(entry -> + entry.getValue().isExpired()); + } + + private void validateDataFeedKeyExpiry(final DataFeedKey dataFeedKey, + final AttributeMap attributeMap) { + if (dataFeedKey.isExpired()) { + throw new StroomStreamException( + StroomStatusCode.DATA_FEED_KEY_NOT_AUTHENTICATED, attributeMap); + } + } + + private String extractUniqueIdFromKey(final String key) { + // sdk_123_...... + return key.substring(4, 7); + } + + private Optional getCacheKey(final String key) { + Objects.requireNonNull(key); + if (DATA_FEED_KEY_PATTERN.matcher(key).matches()) { + final String uniqueId = extractUniqueIdFromKey(key); + final DataFeedKeyHashAlgorithm hashAlgorithm = DataFeedKeyHashAlgorithm.fromUniqueId(uniqueId); + + Objects.requireNonNull(hashAlgorithm, () -> + LogUtil.message("Hash algorithm not found for uniqueId '{}'", uniqueId)); + + final DataFeedKeyHasher hasher = hashFunctionMap.get(hashAlgorithm); + Objects.requireNonNull(hasher, () -> LogUtil.message("No hasher found for {}", hashAlgorithm)); + final String hash = hasher.hash(key); + return Optional.of(new CacheKey(hashAlgorithm, hash)); + } else { + return Optional.empty(); + } + } + + private Optional lookupKey(final String key, + final AttributeMap attributeMap) { + + // Try the cache first to save on the hashing cost. + Optional optDataFeedKey = keyToDataFeedKeyMap.get(key); + if (optDataFeedKey.isPresent()) { + final DataFeedKey dataFeedKey = optDataFeedKey.get(); + validateDataFeedKeyExpiry(dataFeedKey, attributeMap); + return Optional.of(dataFeedKey); + } else { + optDataFeedKey = getCacheKey(key) + .map(cacheKey -> { + Objects.requireNonNull(cacheKey); + final DataFeedKey dataFeedKey = cacheKeyTodataFeedKeyMap.get(cacheKey); + LOGGER.debug("Lookup of cacheKey {}, found {}", cacheKey, dataFeedKey); + return dataFeedKey; + }); + // Cache it to save hashing next time + keyToDataFeedKeyMap.put(key, optDataFeedKey); + return optDataFeedKey; + } + } + + private Optional getAttribute(final AttributeMap attributeMap, final String header) { + return Optional.ofNullable(attributeMap.get(header)) + .filter(str -> !NullSafe.isNonBlankString(str)); + } + + @Override + public boolean filter(final AttributeMap attributeMap, final UserIdentity userIdentity) { + final String feedName = getAttribute(attributeMap, StandardHeaderArguments.FEED) + .orElseThrow(() -> + new StroomStreamException(StroomStatusCode.FEED_MUST_BE_SPECIFIED, attributeMap)); + + final DataFeedKey dataFeedKey = subjectIdToDataFeedKeyMap.get(userIdentity.getSubjectId()); + Objects.requireNonNull(dataFeedKey, "dataFeedKey should not be null at this point"); + + final List feedRegexPatterns = dataFeedKey.getFeedRegexPatterns(); + if (NullSafe.hasItems(feedRegexPatterns)) { + for (final String feedRegexPattern : feedRegexPatterns) { + final Pattern pattern = feedPatternCache.get(feedRegexPattern); + if (pattern.matcher(feedName).matches()) { + return true; + } + } + LOGGER.debug(() -> LogUtil.message("No match on feedName '{}' with patterns [{}]", + feedName, + feedRegexPatterns.stream() + .map(pattern -> "'" + pattern + "'") + .collect(Collectors.joining(", ")))); + throw new StroomStreamException(StroomStatusCode.INVALID_FEED_NAME, attributeMap); + } else { + LOGGER.debug("No feed patterns to match on, allowing it to continue"); + return true; + } + } + + @Override + public Optional authenticate(final HttpServletRequest request, + final AttributeMap attributeMap) { + return getDataFeedKey(request, attributeMap) + .map(DataFeedKeyUserIdentity::new); + } + + // -------------------------------------------------------------------------------- + + + private record CacheKey(DataFeedKeyHashAlgorithm dataFeedKeyHashAlgorithm, + String hash) { + + } + + + // -------------------------------------------------------------------------------- + + + public enum DataFeedKeyHashAlgorithm implements HasDisplayValue { + BCRYPT("BCrypt", 0), + ARGON2("Argon2", 1), + ; + + private static final DataFeedKeyHashAlgorithm[] sparseArray; + private static final Map nameToValueMap = Arrays.stream(values()) + .collect(Collectors.toMap(DataFeedKeyHashAlgorithm::getDisplayValue, Function.identity())); + + + static { + final DataFeedKeyHashAlgorithm[] values = DataFeedKeyHashAlgorithm.values(); + final int maxPrimitive = Arrays.stream(values) + .mapToInt(dataFeedKeyHashAlgorithm -> dataFeedKeyHashAlgorithm.uniqueId) + .max() + .orElseThrow(() -> new RuntimeException("Empty values array supplied")); + sparseArray = new DataFeedKeyHashAlgorithm[maxPrimitive + 1]; + for (final DataFeedKeyHashAlgorithm value : values) { + sparseArray[value.uniqueId] = value; + } + } + + private final String displayValue; + private final int uniqueId; + + DataFeedKeyHashAlgorithm(final String displayValue, final int uniqueId) { + if (uniqueId < 0) { + throw new IllegalArgumentException("Min uniqueId is 0"); + } + if (uniqueId > 999) { + throw new IllegalArgumentException("Max uniqueId is 999"); + } + this.displayValue = displayValue; + this.uniqueId = uniqueId; + } + + @Override + public String getDisplayValue() { + return null; + } + + /** + * @return A 3 digit, zero padded number. + */ + public String getUniqueId() { + return Strings.padStart(String.valueOf(uniqueId), 3, '0'); + } + + public static DataFeedKeyHashAlgorithm fromDisplayValue(final String displayValue) { + if (displayValue == null) { + return null; + } else if (NullSafe.isBlankString(displayValue)) { + throw new IllegalArgumentException("Blank displayValue"); + } else { + final DataFeedKeyHashAlgorithm hashAlgorithm = nameToValueMap.get(displayValue); + if (hashAlgorithm == null) { + throw new IllegalArgumentException("Unknown displayValue " + displayValue); + } + return hashAlgorithm; + } + } + + public static DataFeedKeyHashAlgorithm fromUniqueId(final String uniqueId) { + if (uniqueId == null) { + return null; + } else if (uniqueId.isBlank()) { + throw new IllegalArgumentException("Blank uniqueId"); + } else { + final int intVal = Integer.parseInt(uniqueId); + DataFeedKeyHashAlgorithm dataFeedKeyHashAlgorithm; + try { + dataFeedKeyHashAlgorithm = sparseArray[intVal]; + if (dataFeedKeyHashAlgorithm == null) { + throw new IllegalArgumentException("Unknown uniqueId " + uniqueId); + } + return dataFeedKeyHashAlgorithm; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public String toString() { + return "DataFeedKeyHashAlgorithm{" + + "displayValue='" + displayValue + '\'' + + ", uniqueId=" + uniqueId + + '}'; + } + } + + + // -------------------------------------------------------------------------------- + + + interface DataFeedKeyHasher { + + String hash(String dataFeedKey); + +// default boolean verify(String apiKeyStr, String hash) { +// final String computedHash = hash(Objects.requireNonNull(apiKeyStr)); +// return Objects.equals(Objects.requireNonNull(hash), computedHash); +// } + + DataFeedKeyHashAlgorithm getAlgorithm(); + } + + + // -------------------------------------------------------------------------------- + + + private static class Argon2DataFeedKeyHasher implements DataFeedKeyHasher { + + // WARNING!!! + // Do not change any of these otherwise it will break hash verification of existing + // keys. If you want to tune it, make a new ApiKeyHasher impl with a new getType() + // 48, 2, 65_536, 1 => ~90ms per hash + private static final int HASH_LENGTH = 48; + private static final int ITERATIONS = 2; + private static final int MEMORY_KB = 65_536; + private static final int PARALLELISM = 1; + + private final Argon2Parameters argon2Parameters; + + public Argon2DataFeedKeyHasher() { + // No salt given the length of api keys being hashed + this.argon2Parameters = new Builder(Argon2Parameters.ARGON2_id) + .withVersion(Argon2Parameters.ARGON2_VERSION_13) + .withIterations(ITERATIONS) + .withMemoryAsKB(MEMORY_KB) + .withParallelism(PARALLELISM) + .build(); + } + + @Override + public String hash(final String dataFeedKey) { + Objects.requireNonNull(dataFeedKey); + Argon2BytesGenerator generate = new Argon2BytesGenerator(); + generate.init(argon2Parameters); + byte[] result = new byte[HASH_LENGTH]; + generate.generateBytes( + dataFeedKey.trim().getBytes(StandardCharsets.UTF_8), + result, + 0, + result.length); + + // Base58 is a bit less nasty than base64 and widely supported in other languages + // due to use in bitcoin. + return Base58.encode(result); + } + + @Override + public DataFeedKeyHashAlgorithm getAlgorithm() { + return DataFeedKeyHashAlgorithm.ARGON2; + } + } + + + // -------------------------------------------------------------------------------- + + +// private static class BCryptApiKeyHasher implements DataFeedKeyHasher { +// +// @Override +// public String hash(final String apiKeyStr) { +// return BCrypt.hashpw(Objects.requireNonNull(apiKeyStr), BCrypt.gensalt()); +// } +// +// @Override +// public boolean verify(final String apiKeyStr, final String hash) { +// if (apiKeyStr == null) { +// return false; +// } else { +// return BCrypt.checkpw(apiKeyStr, hash); +// } +// } +// +// @Override +// public DataFeedKeyHashAlgorithm getAlgorithm() { +// return DataFeedKeyHashAlgorithm.BCRYPT; +// } +// } +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyUserIdentity.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyUserIdentity.java new file mode 100644 index 00000000000..a434cfac67d --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyUserIdentity.java @@ -0,0 +1,56 @@ +package stroom.receive.common; + +import stroom.security.api.UserIdentity; + +import java.util.Objects; + +/** + * {@link UserIdentity} obtained when authenticated by {@link DataFeedKey} + */ +public class DataFeedKeyUserIdentity implements UserIdentity { + + private final String subjectId; + private final String displayName; + + public DataFeedKeyUserIdentity(final DataFeedKey dataFeedKey) { + Objects.requireNonNull(dataFeedKey); + this.subjectId = dataFeedKey.getSubjectId(); + this.displayName = dataFeedKey.getDisplayName(); + } + + @Override + public String getSubjectId() { + return subjectId; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String toString() { + return "DataFeedKeyUserIdentity{" + + "subjectId='" + subjectId + '\'' + + ", displayName='" + displayName + '\'' + + '}'; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final DataFeedKeyUserIdentity that = (DataFeedKeyUserIdentity) object; + return Objects.equals(subjectId, that.subjectId) && Objects.equals(displayName, + that.displayName); + } + + @Override + public int hashCode() { + return Objects.hash(subjectId, displayName); + } +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeys.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeys.java new file mode 100644 index 00000000000..7ae65628e80 --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeys.java @@ -0,0 +1,49 @@ +package stroom.receive.common; + +import stroom.util.NullSafe; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +import java.util.List; +import java.util.Objects; + +@JsonPropertyOrder(alphabetic = true) +public class DataFeedKeys { + + private final List dataFeedKeys; + + @JsonCreator + public DataFeedKeys(@JsonProperty("dataFeedKeys") final List dataFeedKeys) { + this.dataFeedKeys = dataFeedKeys; + } + + public List getDataFeedKeys() { + return NullSafe.list(dataFeedKeys); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final DataFeedKeys that = (DataFeedKeys) object; + return Objects.equals(dataFeedKeys, that.dataFeedKeys); + } + + @Override + public int hashCode() { + return Objects.hash(dataFeedKeys); + } + + @Override + public String toString() { + return "DataFeedKeys{" + + "dataFeedKeys=" + dataFeedKeys + + '}'; + } +} diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/FeedStatusAttributeMapFilter.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/FeedStatusAttributeMapFilter.java index c6be28153a3..4378fd893bd 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/FeedStatusAttributeMapFilter.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/FeedStatusAttributeMapFilter.java @@ -6,6 +6,7 @@ import stroom.proxy.feed.remote.FeedStatus; import stroom.proxy.feed.remote.GetFeedStatusRequest; import stroom.proxy.feed.remote.GetFeedStatusResponse; +import stroom.security.api.UserIdentity; import jakarta.inject.Inject; import org.slf4j.Logger; @@ -23,7 +24,7 @@ public FeedStatusAttributeMapFilter(final FeedStatusService feedStatusService) { } @Override - public boolean filter(final AttributeMap attributeMap) { + public boolean filter(final AttributeMap attributeMap, final UserIdentity userIdentity) { final String feedName = attributeMap.get(StandardHeaderArguments.FEED); final String senderDn = attributeMap.get(StandardHeaderArguments.REMOTE_DN); diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/PermissiveAttributeMapFilter.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/PermissiveAttributeMapFilter.java index 34d095aef0b..6967327b0f0 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/PermissiveAttributeMapFilter.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/PermissiveAttributeMapFilter.java @@ -18,11 +18,21 @@ package stroom.receive.common; import stroom.meta.api.AttributeMap; +import stroom.security.api.UserIdentity; public class PermissiveAttributeMapFilter implements AttributeMapFilter { + public static final PermissiveAttributeMapFilter INSTANCE = new PermissiveAttributeMapFilter(); + + private PermissiveAttributeMapFilter() { + } + + public static PermissiveAttributeMapFilter getInstance() { + return INSTANCE; + } + @Override - public boolean filter(AttributeMap attributeMap) { + public boolean filter(final AttributeMap attributeMap, final UserIdentity userIdentity) { return true; } } diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiveDataConfig.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiveDataConfig.java index 6807ed45a56..4174a46b57a 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiveDataConfig.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiveDataConfig.java @@ -24,18 +24,29 @@ public class ReceiveDataConfig extends AbstractConfig implements IsStroomConfig, IsProxyConfig { + @JsonProperty private final String receiptPolicyUuid; + @JsonProperty private final Set metaTypes; + @JsonProperty private final boolean tokenAuthenticationEnabled; + @JsonProperty private final boolean certificateAuthenticationEnabled; + @JsonProperty + private final boolean datafeedKeyAuthenticationEnabled; + @JsonProperty private final boolean authenticationRequired; + @JsonProperty + private final String dataFeedKeysDir; public ReceiveDataConfig() { receiptPolicyUuid = null; metaTypes = new HashSet<>(StreamTypeNames.ALL_HARD_CODED_STREAM_TYPE_NAMES); tokenAuthenticationEnabled = false; certificateAuthenticationEnabled = true; + datafeedKeyAuthenticationEnabled = false; authenticationRequired = true; + dataFeedKeysDir = "data_feed_keys"; } @SuppressWarnings("unused") @@ -45,13 +56,17 @@ public ReceiveDataConfig( @JsonProperty("metaTypes") final Set metaTypes, @JsonProperty("tokenAuthenticationEnabled") final boolean tokenAuthenticationEnabled, @JsonProperty("certificateAuthenticationEnabled") final boolean certificateAuthenticationEnabled, - @JsonProperty("authenticationRequired") final boolean authenticationRequired) { + @JsonProperty("datafeedKeyAuthenticationEnabled") final boolean datafeedKeyAuthenticationEnabled, + @JsonProperty("authenticationRequired") final boolean authenticationRequired, + @JsonProperty("dataFeedKeysDir") final String dataFeedKeysDir) { this.receiptPolicyUuid = receiptPolicyUuid; this.metaTypes = metaTypes; this.tokenAuthenticationEnabled = tokenAuthenticationEnabled; this.certificateAuthenticationEnabled = certificateAuthenticationEnabled; + this.datafeedKeyAuthenticationEnabled = datafeedKeyAuthenticationEnabled; this.authenticationRequired = authenticationRequired; + this.dataFeedKeysDir = dataFeedKeysDir; } private ReceiveDataConfig(final Builder builder) { @@ -59,7 +74,9 @@ private ReceiveDataConfig(final Builder builder) { metaTypes = builder.metaTypes; tokenAuthenticationEnabled = builder.tokenAuthenticationEnabled; certificateAuthenticationEnabled = builder.certificateAuthenticationEnabled; + datafeedKeyAuthenticationEnabled = builder.datafeedKeyAuthenticationEnabled; authenticationRequired = builder.authenticationRequired; + dataFeedKeysDir = builder.dataFeedKeysDir; } @@ -71,7 +88,7 @@ public String getReceiptPolicyUuid() { @NotNull @NotEmpty @JsonPropertyDescription("Set of supported meta type names. This set must contain all of the names " + - "in the default value for this property but can contain additional names.") + "in the default value for this property but can contain additional names.") @IsSupersetOf(requiredValues = { StreamTypeNames.RAW_EVENTS, StreamTypeNames.RAW_REFERENCE, @@ -86,13 +103,13 @@ public Set getMetaTypes() { } @JsonPropertyDescription("If true, the data receipt request headers will be checked for the presence of an " + - "Open ID access token. This token will be used to authenticate the sender.") + "Open ID access token. This token will be used to authenticate the sender.") public boolean isTokenAuthenticationEnabled() { return tokenAuthenticationEnabled; } @JsonPropertyDescription("If true, the data receipt request will be checked for the presence of a " + - "certificate. The certificate will be used to authenticate the sender.") + "certificate. The certificate will be used to authenticate the sender.") public boolean isCertificateAuthenticationEnabled() { return certificateAuthenticationEnabled; } @@ -106,13 +123,29 @@ public boolean isAuthenticationRequired() { return authenticationRequired; } + @JsonPropertyDescription("If true, the data receipt request will be checked for the presence of a datafeed key. " + + "If present this key will be checked against the configured list of valid datafeed keys.") + public boolean isDatafeedKeyAuthenticationEnabled() { + return datafeedKeyAuthenticationEnabled; + } + + @JsonPropertyDescription("The directory where Stroom will look for datafeed key files. " + + "Only used if datafeedKeyAuthenticationEnabled is true." + + "If the value is a relative path then it will be treated as being " + + "relative to stroom.path.home.") + public String getDataFeedKeysDir() { + return dataFeedKeysDir; + } + @SuppressWarnings("unused") @JsonIgnore @ValidationMethod(message = "If authenticationRequired is true, then one of tokenAuthenticationEnabled " + - "or certificateAuthenticationEnabled must also be set to true.") + "or certificateAuthenticationEnabled must also be set to true.") public boolean isAuthenticationRequiredValid() { return !authenticationRequired - || (tokenAuthenticationEnabled || certificateAuthenticationEnabled); + || (tokenAuthenticationEnabled + || certificateAuthenticationEnabled + || datafeedKeyAuthenticationEnabled); } public ReceiveDataConfig withTokenAuthenticationEnabled(final boolean isTokenAuthenticationEnabled) { @@ -121,7 +154,8 @@ public ReceiveDataConfig withTokenAuthenticationEnabled(final boolean isTokenAut metaTypes, isTokenAuthenticationEnabled, certificateAuthenticationEnabled, - authenticationRequired); + datafeedKeyAuthenticationEnabled, authenticationRequired, + dataFeedKeysDir); } public ReceiveDataConfig withCertificateAuthenticationEnabled(final boolean isCertificateAuthenticationEnabled) { @@ -130,7 +164,18 @@ public ReceiveDataConfig withCertificateAuthenticationEnabled(final boolean isCe metaTypes, tokenAuthenticationEnabled, isCertificateAuthenticationEnabled, - authenticationRequired); + datafeedKeyAuthenticationEnabled, authenticationRequired, + dataFeedKeysDir); + } + + public ReceiveDataConfig withDatafeedKeyAuthenticationEnabled(final boolean isDatafeedKeyAuthenticationEnabled) { + return new ReceiveDataConfig( + receiptPolicyUuid, + metaTypes, + datafeedKeyAuthenticationEnabled, + certificateAuthenticationEnabled, + isDatafeedKeyAuthenticationEnabled, authenticationRequired, + dataFeedKeysDir); } public ReceiveDataConfig withAuthenticationRequired(final boolean isAuthenticationRequired) { @@ -139,17 +184,20 @@ public ReceiveDataConfig withAuthenticationRequired(final boolean isAuthenticati metaTypes, tokenAuthenticationEnabled, certificateAuthenticationEnabled, - isAuthenticationRequired); + datafeedKeyAuthenticationEnabled, isAuthenticationRequired, + dataFeedKeysDir); } @Override public String toString() { return "ReceiveDataConfig{" + - "receiptPolicyUuid='" + receiptPolicyUuid + '\'' + - ", tokenAuthenticationEnabled=" + tokenAuthenticationEnabled + - ", certificateAuthenticationEnabled=" + certificateAuthenticationEnabled + - ", authenticationRequired=" + authenticationRequired + - '}'; + "receiptPolicyUuid='" + receiptPolicyUuid + '\'' + + ", tokenAuthenticationEnabled=" + tokenAuthenticationEnabled + + ", certificateAuthenticationEnabled=" + certificateAuthenticationEnabled + + ", datafeedKeyAuthenticationEnabled=" + datafeedKeyAuthenticationEnabled + + ", authenticationRequired=" + authenticationRequired + + ", dataFeedKeysDir=" + dataFeedKeysDir + + '}'; } public static Builder copy(final ReceiveDataConfig receiveDataConfig) { @@ -158,7 +206,9 @@ public static Builder copy(final ReceiveDataConfig receiveDataConfig) { builder.metaTypes = receiveDataConfig.getMetaTypes(); builder.tokenAuthenticationEnabled = receiveDataConfig.isTokenAuthenticationEnabled(); builder.certificateAuthenticationEnabled = receiveDataConfig.isCertificateAuthenticationEnabled(); + builder.datafeedKeyAuthenticationEnabled = receiveDataConfig.isDatafeedKeyAuthenticationEnabled(); builder.authenticationRequired = receiveDataConfig.isAuthenticationRequired(); + builder.dataFeedKeysDir = receiveDataConfig.getDataFeedKeysDir(); return builder; } @@ -177,6 +227,8 @@ public static final class Builder { private boolean tokenAuthenticationEnabled; private boolean certificateAuthenticationEnabled; private boolean authenticationRequired; + private boolean datafeedKeyAuthenticationEnabled; + private String dataFeedKeysDir; private Builder() { } @@ -210,6 +262,16 @@ public Builder withAuthenticationRequired(final boolean val) { return this; } + public Builder withDatafeedKeyAuthenticationEnabled(final boolean val) { + datafeedKeyAuthenticationEnabled = val; + return this; + } + + public Builder withDataFeedKeysDir(final String val) { + dataFeedKeysDir = val; + return this; + } + public ReceiveDataConfig build() { return new ReceiveDataConfig(this); } diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RemoteFeedModule.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RemoteFeedModule.java index 60d57dfd35a..621d99b0fd3 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RemoteFeedModule.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RemoteFeedModule.java @@ -20,11 +20,13 @@ import com.google.inject.AbstractModule; +// TODO maybe rename to ReceiveModule public class RemoteFeedModule extends AbstractModule { @Override protected void configure() { bind(RequestAuthenticator.class).to(RequestAuthenticatorImpl.class); + bind(DataFeedKeyService.class).to(DataFeedKeyServiceImpl.class); RestResourcesBinder.create(binder()) .bind(FeedStatusResourceImpl.class); diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RequestAuthenticatorImpl.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RequestAuthenticatorImpl.java index 079b369da07..7068ea5f563 100644 --- a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RequestAuthenticatorImpl.java +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/RequestAuthenticatorImpl.java @@ -45,6 +45,7 @@ public UserIdentity authenticate(final HttpServletRequest request, final boolean isAuthRequired = receiveDataConfig.isAuthenticationRequired(); final boolean isTokenAuthEnabled = receiveDataConfig.isTokenAuthenticationEnabled(); final boolean isCertAuthEnabled = receiveDataConfig.isCertificateAuthenticationEnabled(); + final boolean isDataFeedKeyAuthEnabled = receiveDataConfig.isDatafeedKeyAuthenticationEnabled(); // Try tokens first in preference final boolean foundToken = userIdentityFactory.hasAuthenticationToken(request); diff --git a/stroom-receive/stroom-receive-rules-impl/build.gradle b/stroom-receive/stroom-receive-rules-impl/build.gradle index 3c991da2ed9..2f3f08db52f 100644 --- a/stroom-receive/stroom-receive-rules-impl/build.gradle +++ b/stroom-receive/stroom-receive-rules-impl/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation project(':stroom-query:stroom-query-api') implementation project(':stroom-receive:stroom-receive-common') implementation project(':stroom-search:stroom-expression-matcher') + implementation project(':stroom-security:stroom-security-api') implementation project(':stroom-util-shared') implementation project(':stroom-util') diff --git a/stroom-receive/stroom-receive-rules-impl/src/main/java/stroom/receive/rules/impl/DataReceiptPolicyAttributeMapFilter.java b/stroom-receive/stroom-receive-rules-impl/src/main/java/stroom/receive/rules/impl/DataReceiptPolicyAttributeMapFilter.java index 5a4d0f1babe..f79f27f0fce 100644 --- a/stroom-receive/stroom-receive-rules-impl/src/main/java/stroom/receive/rules/impl/DataReceiptPolicyAttributeMapFilter.java +++ b/stroom-receive/stroom-receive-rules-impl/src/main/java/stroom/receive/rules/impl/DataReceiptPolicyAttributeMapFilter.java @@ -22,6 +22,7 @@ import stroom.receive.common.AttributeMapFilter; import stroom.receive.common.StroomStreamException; import stroom.receive.rules.shared.RuleAction; +import stroom.security.api.UserIdentity; import java.util.Objects; @@ -35,7 +36,7 @@ class DataReceiptPolicyAttributeMapFilter implements AttributeMapFilter { } @Override - public boolean filter(final AttributeMap attributeMap) { + public boolean filter(final AttributeMap attributeMap, final UserIdentity userIdentity) { // We need to examine the meta map and ensure we aren't dropping or rejecting this data. final RuleAction action = dataReceiptPolicyChecker.check(attributeMap); diff --git a/stroom-util/src/main/java/stroom/util/concurrent/PeriodicallyUpdatedValue.java b/stroom-util/src/main/java/stroom/util/concurrent/PeriodicallyUpdatedValue.java new file mode 100644 index 00000000000..20567f37cc1 --- /dev/null +++ b/stroom-util/src/main/java/stroom/util/concurrent/PeriodicallyUpdatedValue.java @@ -0,0 +1,82 @@ +package stroom.util.concurrent; + +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + *

+ * Useful when you need a value supplier to update the value supplied based on some condition, + * but you don't want to incur the cost of checking that condition on each call. + *

+ *

+ * This will only call hasStateChangedCheck every checkInterval. If hasStateChangedCheck + * returns true then it will call valueSupplier to obtain a new value. + *

+ * + * @param The type of the value. + * @param The type of the state used to determine if the value needs to be updated. + */ +public class PeriodicallyUpdatedValue { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(PeriodicallyUpdatedValue.class); + private static final int UNINITIALISED = 0; + + private final long checkIntervalMs; + private final Function valueSupplier; + private final Supplier stateSupplier; + + private final AtomicLong nextCheckEpochMs = new AtomicLong(UNINITIALISED); + private S state = null; + private V value = null; + + /** + * @param checkInterval The interval to call stateSupplier to determine if the valueSupplier needs + * to be called. + * @param valueSupplier Will be called on first call of {@link PeriodicallyUpdatedValue#getValue()} + * then any time the value supplied by stateSupplier changes. If multiple + * threads call {@link PeriodicallyUpdatedValue#getValue()} at once valueSupplier + * may be called by each thread. Should be side effect free. + * @param stateSupplier Will generally be called once per checkInterval unless multiple threads + * call {@link PeriodicallyUpdatedValue#getValue()} at once, in which case + * it may be called by multiple threads at once. Should be side effect free. + * Value returned must implement equals method to determine if it has changed. + */ + public PeriodicallyUpdatedValue(final Duration checkInterval, + final Function valueSupplier, + final Supplier stateSupplier) { + this.checkIntervalMs = Objects.requireNonNull(checkInterval).toMillis(); + this.valueSupplier = Objects.requireNonNull(valueSupplier); + this.stateSupplier = Objects.requireNonNull(stateSupplier); + } + + public V getValue() { + final long oldNextCheck = nextCheckEpochMs.get(); + LOGGER.debug("getValue(), value: {}, nextCheck: {}", value, oldNextCheck); + + final long x = nextCheckEpochMs.accumulateAndGet(checkIntervalMs, (checkEpochMs, interval) -> { + final long nowMs = System.currentTimeMillis(); + if (nowMs > checkEpochMs) { + final S newState = stateSupplier.get(); + if (checkEpochMs == UNINITIALISED || !Objects.equals(state, newState)) { + state = newState; + final V newValue = valueSupplier.apply(newState); + LOGGER.debug("nextCheck: {}, state: {}, newState: {}, value: {}, newValue: {}", + oldNextCheck, state, newState, value, newValue); + value = newValue; + } + return nowMs + interval; + } else { + LOGGER.debug("No change"); + return checkEpochMs; + } + }); + LOGGER.debug("Returning {}", value); + return value; + } +} diff --git a/stroom-util/src/test/java/stroom/util/concurrent/TestPeriodicallyUpdatedValue.java b/stroom-util/src/test/java/stroom/util/concurrent/TestPeriodicallyUpdatedValue.java new file mode 100644 index 00000000000..4c724c2d01b --- /dev/null +++ b/stroom-util/src/test/java/stroom/util/concurrent/TestPeriodicallyUpdatedValue.java @@ -0,0 +1,116 @@ +package stroom.util.concurrent; + +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestPeriodicallyUpdatedValue { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestPeriodicallyUpdatedValue.class); + + @Test + void singleThreadTest() { + + final AtomicInteger version = new AtomicInteger(1); + + final PeriodicallyUpdatedValue periodicallyUpdatedValue = new PeriodicallyUpdatedValue<>( + Duration.ofMillis(100), + state -> { + LOGGER.debug("Supplying value"); + return "version " + state; + }, + () -> { + LOGGER.debug("Supplying state"); + return version.get(); + }); + + assertThat(periodicallyUpdatedValue.getValue()) + .isEqualTo("version 1"); + assertThat(periodicallyUpdatedValue.getValue()) + .isEqualTo("version 1"); + + version.incrementAndGet(); + + LOGGER.debug("Sleeping"); + ThreadUtil.sleepIgnoringInterrupts(100); + + assertThat(periodicallyUpdatedValue.getValue()) + .isEqualTo("version 2"); + assertThat(periodicallyUpdatedValue.getValue()) + .isEqualTo("version 2"); + + LOGGER.debug("Sleeping"); + ThreadUtil.sleepIgnoringInterrupts(100); + + assertThat(periodicallyUpdatedValue.getValue()) + .isEqualTo("version 2"); + } + + @Test + void multiThreadTest() throws InterruptedException { + +// final AtomicInteger version = new AtomicInteger(1); + final int version = 1; + + final PeriodicallyUpdatedValue periodicallyUpdatedValue = new PeriodicallyUpdatedValue<>( + Duration.ofMillis(60_000), + state -> { + LOGGER.debug("Supplying value"); + return "version " + state; + }, + () -> { + LOGGER.debug("Supplying state"); +// return version.get(); + return version; + }); + + final int threadCount = 3; + final CountDownLatch startLatch = new CountDownLatch(threadCount); + final CountDownLatch finishLatch = new CountDownLatch(threadCount); + final ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + + for (int i = 0; i < threadCount; i++) { + CompletableFuture.runAsync(() -> { + try { + try { + startLatch.countDown(); + LOGGER.debug("Thread waiting, startLatch {}", startLatch.getCount()); + startLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("interrupted", e); + throw new RuntimeException(e); + } + LOGGER.debug("Thread starting"); + final String value = periodicallyUpdatedValue.getValue(); + final boolean areEqual = Objects.equals(value, "version 1"); + if (!areEqual) { + LOGGER.debug("Not equal, value: {}", value); + } + assertThat(areEqual) + .isTrue(); + finishLatch.countDown(); + LOGGER.debug("Thread finished, finishLatch {}", finishLatch.getCount()); + } catch (RuntimeException e) { + LOGGER.debug("error", e); + Assertions.fail(e.getMessage()); + } + }, executorService); + } + + finishLatch.await(10, TimeUnit.SECONDS); + LOGGER.debug("Finished"); + } +}