Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafeed Keys #4658

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ appConfig:
receive:
authenticationRequired: true
certificateAuthenticationEnabled: true
dataFeedKeysDir: "data_feed_keys"
datafeedKeyAuthenticationEnabled: false
metaTypes:
- "Context"
- "Raw Reference"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,78 @@

import stroom.docref.DocRef;
import stroom.receive.common.AttributeMapFilter;
import stroom.receive.common.DataFeedKeyService;
import stroom.receive.common.DataReceiptPolicyAttributeMapFilterFactory;
import stroom.receive.common.FeedStatusAttributeMapFilter;
import stroom.receive.common.ReceiveDataConfig;
import stroom.receive.rules.shared.ReceiveDataRules;
import stroom.util.NullSafe;
import stroom.util.concurrent.PeriodicallyUpdatedValue;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

@Singleton
public class AttributeMapFilterFactory {

private final Provider<ReceiveDataConfig> receiveDataConfigProvider;
private final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory;
private final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter;

private volatile AttributeMapFilter attributeMapFilter;
private final AtomicReference<String> lastPolicyUuid = new AtomicReference<>();
private final DataFeedKeyService dataFeedKeyService;
private final PeriodicallyUpdatedValue<AttributeMapFilter, ConfigState> updatableAttributeMapFilter;

@Inject
public AttributeMapFilterFactory(
final Provider<ReceiveDataConfig> receiveDataConfigProvider,
final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory,
final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter) {
final FeedStatusAttributeMapFilter feedStatusAttributeMapFilter,
final DataFeedKeyService dataFeedKeyService) {

this.receiveDataConfigProvider = receiveDataConfigProvider;
this.dataReceiptPolicyAttributeMapFilterFactory = dataReceiptPolicyAttributeMapFilterFactory;
this.feedStatusAttributeMapFilter = feedStatusAttributeMapFilter;
this.dataFeedKeyService = dataFeedKeyService;

// Every 60s, see if config has changed and if so create a new filter
this.updatableAttributeMapFilter = new PeriodicallyUpdatedValue<>(
Duration.ofSeconds(60),
this::create,
() -> ConfigState.fromConfig(receiveDataConfigProvider.get()));
}

public AttributeMapFilter create() {
final String receiptPolicyUuid = receiveDataConfigProvider.get().getReceiptPolicyUuid();
final String last = lastPolicyUuid.get();
if (attributeMapFilter == null || !Objects.equals(last, receiptPolicyUuid)) {
lastPolicyUuid.compareAndSet(last, receiptPolicyUuid);

if (receiptPolicyUuid != null && receiptPolicyUuid.length() > 0) {
attributeMapFilter = dataReceiptPolicyAttributeMapFilterFactory.create(
new DocRef(ReceiveDataRules.DOCUMENT_TYPE, receiptPolicyUuid));
} else {
attributeMapFilter = feedStatusAttributeMapFilter;
}
private AttributeMapFilter create(final ConfigState configState) {

final List<AttributeMapFilter> filters = new ArrayList<>();
if (configState.isDatafeedKeyAuthenticationEnabled) {
filters.add(dataFeedKeyService);
}
if (NullSafe.isNonEmptyString(configState.policyUuid)) {
filters.add(dataReceiptPolicyAttributeMapFilterFactory.create(
new DocRef(ReceiveDataRules.DOCUMENT_TYPE, configState.policyUuid)));
}
filters.add(feedStatusAttributeMapFilter);
return AttributeMapFilter.wrap(filters);
}

public AttributeMapFilter create() {
return updatableAttributeMapFilter.getValue();
}


// --------------------------------------------------------------------------------

return attributeMapFilter;

private record ConfigState(
String policyUuid,
boolean isDatafeedKeyAuthenticationEnabled) {

public static ConfigState fromConfig(final ReceiveDataConfig receiveDataConfig) {
return new ConfigState(
receiveDataConfig.getReceiptPolicyUuid(),
receiveDataConfig.isDatafeedKeyAuthenticationEnabled());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import stroom.receive.common.StroomStreamProcessor;
import stroom.receive.common.StroomStreamStatus;
import stroom.security.api.SecurityContext;
import stroom.security.api.UserIdentity;
import stroom.task.api.TaskContextFactory;
import stroom.task.api.TaskProgressHandler;
import stroom.util.NullSafe;
Expand Down Expand Up @@ -91,13 +92,13 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r

// Authenticate the request using token or cert depending on configuration
// Adds sender details to the attributeMap
requestAuthenticator.authenticate(request, attributeMap);
final UserIdentity userIdentity = requestAuthenticator.authenticate(request, attributeMap);

// Validate the supplied attributes.
AttributeMapValidator.validate(attributeMap, metaService::getTypes);

final String feedName;
if (attributeMapFilter.filter(attributeMap)) {
if (attributeMapFilter.filter(attributeMap, userIdentity)) {
debug("Receiving data", attributeMap);

feedName = NullSafe.trim(attributeMap.get(StandardHeaderArguments.FEED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public String process(final HttpServletRequest request,
receiveDataConfig::getMetaTypes);

// Test to see if we are going to accept this stream or drop the data.
if (attributeMapFilter.filter(attributeMap)) {
if (attributeMapFilter.filter(attributeMap, userIdentity)) {
consumeHandler.handle(request, attributeMap, requestUuid);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,104 @@

import stroom.docref.DocRef;
import stroom.receive.common.AttributeMapFilter;
import stroom.receive.common.DataFeedKeyService;
import stroom.receive.common.DataReceiptPolicyAttributeMapFilterFactory;
import stroom.receive.common.FeedStatusAttributeMapFilter;
import stroom.receive.common.PermissiveAttributeMapFilter;
import stroom.receive.common.ReceiveDataConfig;
import stroom.receive.rules.shared.ReceiveDataRules;
import stroom.util.NullSafe;
import stroom.util.concurrent.PeriodicallyUpdatedValue;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

@Singleton
public class AttributeMapFilterFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(AttributeMapFilterFactory.class);

private final AttributeMapFilter attributeMapFilter;
private final Provider<ReceiveDataConfig> receiveDataConfigProvider;
private final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory;
private final Provider<FeedStatusConfig> feedStatusConfigProvider;
private final Provider<RemoteFeedStatusService> remoteFeedStatusServiceProvider;
private final DataFeedKeyService dataFeedKeyService;

private final PeriodicallyUpdatedValue<AttributeMapFilter, ConfigState> updatableAttributeMapFilter;

@Inject
public AttributeMapFilterFactory(
final ReceiveDataConfig receiveDataConfig,
final FeedStatusConfig feedStatusConfig,
final Provider<ReceiveDataConfig> receiveDataConfigProvider,
final Provider<FeedStatusConfig> feedStatusConfigProvider,
final DataReceiptPolicyAttributeMapFilterFactory dataReceiptPolicyAttributeMapFilterFactory,
final Provider<RemoteFeedStatusService> remoteFeedStatusServiceProvider) {

if (StringUtils.isNotBlank(receiveDataConfig.getReceiptPolicyUuid())) {
LOGGER.info("Using data receipt policy to filter received data");
attributeMapFilter = dataReceiptPolicyAttributeMapFilterFactory.create(
new DocRef(ReceiveDataRules.DOCUMENT_TYPE, receiveDataConfig.getReceiptPolicyUuid()));
} else if (StringUtils.isNotBlank(feedStatusConfig.getFeedStatusUrl())) {
LOGGER.info("Using remote feed status service to filter received data");
final Provider<RemoteFeedStatusService> remoteFeedStatusServiceProvider,
final DataFeedKeyService dataFeedKeyService) {

this.receiveDataConfigProvider = receiveDataConfigProvider;
this.feedStatusConfigProvider = feedStatusConfigProvider;
this.dataReceiptPolicyAttributeMapFilterFactory = dataReceiptPolicyAttributeMapFilterFactory;
this.remoteFeedStatusServiceProvider = remoteFeedStatusServiceProvider;
this.dataFeedKeyService = dataFeedKeyService;

// Every 60s, see if config has changed and if so create a new filter
this.updatableAttributeMapFilter = new PeriodicallyUpdatedValue<>(
Duration.ofSeconds(60),
this::create,
() -> ConfigState.fromConfig(
receiveDataConfigProvider.get(),
feedStatusConfigProvider.get()));
}

private AttributeMapFilter create(final ConfigState configState) {
final List<AttributeMapFilter> filters = new ArrayList<>();

if (configState.isDatafeedKeyAuthenticationEnabled()) {
LOGGER.debug("Adding data feed key filter");
filters.add(dataFeedKeyService);
}

if (NullSafe.isNonBlankString(configState.receiptPolicyUuid)) {
LOGGER.debug("Adding data receipt policy filter");
filters.add(dataReceiptPolicyAttributeMapFilterFactory.create(
new DocRef(ReceiveDataRules.DOCUMENT_TYPE, configState.receiptPolicyUuid)));
}

if (NullSafe.isNonBlankString(configState.feedStatusUrl)) {
LOGGER.debug("Adding remote feed status service filter");
final RemoteFeedStatusService remoteFeedStatusService = remoteFeedStatusServiceProvider.get();
attributeMapFilter = new FeedStatusAttributeMapFilter(remoteFeedStatusService);
} else {
LOGGER.info("Permitting receipt of all data");
attributeMapFilter = new PermissiveAttributeMapFilter();
filters.add(new FeedStatusAttributeMapFilter(remoteFeedStatusService));
}

return AttributeMapFilter.wrap(filters);
}

public AttributeMapFilter create() {
return attributeMapFilter;
return updatableAttributeMapFilter.getValue();
}


// --------------------------------------------------------------------------------


private record ConfigState(
String receiptPolicyUuid,
boolean isDatafeedKeyAuthenticationEnabled,
String feedStatusUrl) {

public static ConfigState fromConfig(
final ReceiveDataConfig receiveDataConfig,
final FeedStatusConfig feedStatusConfig) {

return new ConfigState(
receiveDataConfig.getReceiptPolicyUuid(),
receiveDataConfig.isDatafeedKeyAuthenticationEnabled(),
feedStatusConfig.getFeedStatusUrl());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ proxyConfig:
receive:
authenticationRequired: true
certificateAuthenticationEnabled: true
dataFeedKeysDir: "data_feed_keys"
datafeedKeyAuthenticationEnabled: false
metaTypes:
- "Context"
- "Raw Reference"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public enum StroomStatusCode {
FEED_IS_NOT_SET_TO_RECEIVED_DATA(HttpServletResponse.SC_NOT_ACCEPTABLE, 110, "Feed is not set to receive data",
"The feed you have provided has not been setup to receive data"),

INVALID_FEED_NAME(HttpServletResponse.SC_NOT_ACCEPTABLE, 111,
"Feed is not valid",
"The feed you have provided does not match an agreed pattern"),

UNEXPECTED_DATA_TYPE(HttpServletResponse.SC_NOT_ACCEPTABLE, 120, "Unexpected data type",
"The data type supplied is not expected"),

Expand Down Expand Up @@ -57,11 +61,17 @@ public enum StroomStatusCode {
"Client Token or Certificate failed authentication",
"The provided client token or certificate cannot be authorised"),

DATA_FEED_KEY_NOT_AUTHENTICATED(
HttpServletResponse.SC_UNAUTHORIZED,
313,
"Data feed key failed authentication",
"The provided data feed key cannot be authorised"),

COMPRESSED_STREAM_INVALID(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
400,
"Compressed stream invalid",
"The stream of data sent does not form a valid compressed file. Maybe it terminated " +
"unexpectedly or is corrupt."),
"unexpectedly or is corrupt."),

UNKNOWN_ERROR(
HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -117,7 +127,7 @@ public static void main(String[] args) {
for (StroomStatusCode stroomStatusCode : StroomStatusCode.values()) {
System.out.println("|-");
System.out.println("|" + stroomStatusCode.getHttpCode() + "||" + stroomStatusCode.getCode() + "||"
+ stroomStatusCode.getMessage() + "||" + stroomStatusCode.getReason());
+ stroomStatusCode.getMessage() + "||" + stroomStatusCode.getReason());

}
System.out.println("|}");
Expand Down
2 changes: 2 additions & 0 deletions stroom-receive/stroom-receive-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dependencies {
implementation project(':stroom-util-shared')
implementation project(':stroom-util')

implementation libs.bcrypt
implementation libs.bouncy_castle
implementation libs.commons_compress
implementation libs.dropwizard_metrics_annotation
implementation libs.dropwizard_metrics_healthchecks
Expand Down
Loading
Loading