From 2d49e09f5aa31a85efccb950813372f782896b68 Mon Sep 17 00:00:00 2001 From: "alejandro.gonzalez" Date: Thu, 25 Jul 2024 11:32:26 +0200 Subject: [PATCH] first iteration --- .../appsec/gateway/AppSecRequestContext.java | 46 +- .../datadog/appsec/gateway/GatewayBridge.java | 778 +----------------- .../appsec/gateway/SubscribersCache.java | 102 +++ .../gateway/callbacks/CallbackUtils.java | 242 ++++++ .../callbacks/DatabaseConnectionCallback.java | 18 + .../callbacks/DatabaseSqlQueryCallback.java | 60 ++ .../GraphqlServerRequestMessageCallback.java | 56 ++ .../callbacks/GrpcServerMethodCallback.java | 53 ++ .../GrpcServerRequestMessageCallback.java | 57 ++ .../callbacks/MethodAndRawURICallback.java | 63 ++ .../callbacks/NewRequestHeaderCallback.java | 26 + .../callbacks/RequestBodyDoneCallback.java | 61 ++ .../RequestBodyProcessedCallback.java | 67 ++ .../callbacks/RequestBodyStartCallback.java | 21 + .../RequestClientSocketAddressCallback.java | 34 + .../callbacks/RequestEndedCallBack.java | 179 ++++ .../callbacks/RequestHeadersDoneCallback.java | 31 + .../RequestInferredClientAddressCallback.java | 21 + .../callbacks/RequestPathParamsCallback.java | 56 ++ .../callbacks/RequestStartedCallback.java | 42 + .../callbacks/ResponseHeaderCallback.java | 17 + .../callbacks/ResponseHeaderDoneCallback.java | 32 + .../callbacks/ResponseStartedCallback.java | 32 + 23 files changed, 1336 insertions(+), 758 deletions(-) create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/SubscribersCache.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/CallbackUtils.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseConnectionCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseSqlQueryCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GraphqlServerRequestMessageCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerMethodCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerRequestMessageCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/MethodAndRawURICallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/NewRequestHeaderCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyDoneCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyProcessedCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyStartCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestClientSocketAddressCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestEndedCallBack.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestHeadersDoneCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestInferredClientAddressCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestPathParamsCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestStartedCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderDoneCallback.java create mode 100644 dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseStartedCallback.java diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java index 1d4fda19444..7e4fd67f93d 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java @@ -236,27 +236,27 @@ public Iterator, Object>> iterator() { /* Interface for use of GatewayBridge */ - String getScheme() { + public String getScheme() { return scheme; } - void setScheme(String scheme) { + public void setScheme(String scheme) { this.scheme = scheme; } - String getMethod() { + public String getMethod() { return method; } - void setMethod(String method) { + public void setMethod(String method) { this.method = method; } - String getSavedRawURI() { + public String getSavedRawURI() { return savedRawURI; } - void setRawURI(String savedRawURI) { + public void setRawURI(String savedRawURI) { if (this.savedRawURI != null && this.savedRawURI.compareToIgnoreCase(savedRawURI) != 0) { throw new IllegalStateException( "Forbidden attempt to set different raw URI for given request context"); @@ -264,7 +264,7 @@ void setRawURI(String savedRawURI) { this.savedRawURI = savedRawURI; } - void addRequestHeader(String name, String value) { + public void addRequestHeader(String name, String value) { if (finishedRequestHeaders) { throw new IllegalStateException("Request headers were said to be finished before"); } @@ -278,19 +278,19 @@ void addRequestHeader(String name, String value) { strings.add(value); } - void finishRequestHeaders() { + public void finishRequestHeaders() { this.finishedRequestHeaders = true; } - boolean isFinishedRequestHeaders() { + public boolean isFinishedRequestHeaders() { return finishedRequestHeaders; } - Map> getRequestHeaders() { + public Map> getRequestHeaders() { return requestHeaders; } - void addResponseHeader(String name, String value) { + public void addResponseHeader(String name, String value) { if (finishedResponseHeaders) { throw new IllegalStateException("Response headers were said to be finished before"); } @@ -312,11 +312,11 @@ public boolean isFinishedResponseHeaders() { return finishedResponseHeaders; } - Map> getResponseHeaders() { + public Map> getResponseHeaders() { return responseHeaders; } - void addCookies(Map> cookies) { + public void addCookies(Map> cookies) { if (finishedRequestHeaders) { throw new IllegalStateException("Request headers were said to be finished before"); } @@ -327,15 +327,15 @@ void addCookies(Map> cookies) { } } - Map> getCookies() { + public Map> getCookies() { return collectedCookies != null ? collectedCookies : Collections.emptyMap(); } - String getPeerAddress() { + public String getPeerAddress() { return peerAddress; } - void setPeerAddress(String peerAddress) { + public void setPeerAddress(String peerAddress) { this.peerAddress = peerAddress; } @@ -347,15 +347,15 @@ public void setPeerPort(int peerPort) { this.peerPort = peerPort; } - void setInferredClientIp(String ipAddress) { + public void setInferredClientIp(String ipAddress) { this.inferredClientIp = ipAddress; } - String getInferredClientIp() { + public String getInferredClientIp() { return inferredClientIp; } - void setStoredRequestBodySupplier(StoredBodySupplier storedRequestBodySupplier) { + public void setStoredRequestBodySupplier(StoredBodySupplier storedRequestBodySupplier) { this.storedRequestBodySupplier = storedRequestBodySupplier; } @@ -467,7 +467,7 @@ public void reportStackTrace(StackTraceEvent stackTraceEvent) { } } - Collection transferCollectedEvents() { + public Collection transferCollectedEvents() { if (this.appSecEvents == null) { return Collections.emptyList(); } @@ -481,7 +481,7 @@ Collection transferCollectedEvents() { return events; } - StackTraceCollection transferStackTracesCollection() { + public StackTraceCollection transferStackTracesCollection() { if (this.stackTraceEvents == null) { return null; } @@ -492,7 +492,7 @@ StackTraceCollection transferStackTracesCollection() { stackTraces.add(item); } - if (stackTraces.size() != 0) { + if (!stackTraces.isEmpty()) { return new StackTraceCollection(stackTraces); } else { return null; @@ -509,7 +509,7 @@ public void reportApiSchemas(Map schemas) { } } - boolean commitApiSchemas(TraceSegment traceSegment) { + public boolean commitApiSchemas(TraceSegment traceSegment) { if (traceSegment == null || apiSchemas == null) { return false; } diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java index e9d3a89e3ee..ffbdf1f2d22 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java @@ -1,91 +1,49 @@ package com.datadog.appsec.gateway; -import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_0_2; -import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_6_10; -import static com.datadog.appsec.gateway.AppSecRequestContext.DEFAULT_REQUEST_HEADERS_ALLOW_LIST; -import static com.datadog.appsec.gateway.AppSecRequestContext.REQUEST_HEADERS_ALLOW_LIST; -import static com.datadog.appsec.gateway.AppSecRequestContext.RESPONSE_HEADERS_ALLOW_LIST; - -import com.datadog.appsec.AppSecSystem; import com.datadog.appsec.api.security.ApiSecurityRequestSampler; import com.datadog.appsec.config.TraceSegmentPostProcessor; import com.datadog.appsec.event.EventProducerService; -import com.datadog.appsec.event.EventProducerService.DataSubscriberInfo; -import com.datadog.appsec.event.ExpiredSubscriberInfoException; import com.datadog.appsec.event.data.Address; -import com.datadog.appsec.event.data.DataBundle; import com.datadog.appsec.event.data.KnownAddresses; -import com.datadog.appsec.event.data.MapDataBundle; -import com.datadog.appsec.event.data.ObjectIntrospection; -import com.datadog.appsec.event.data.SingletonDataBundle; -import com.datadog.appsec.report.AppSecEvent; -import com.datadog.appsec.report.AppSecEventWrapper; -import com.datadog.appsec.stack_trace.StackTraceCollection; -import com.datadog.appsec.util.ObjectFlattener; -import datadog.trace.api.Config; -import datadog.trace.api.function.TriConsumer; -import datadog.trace.api.function.TriFunction; +import com.datadog.appsec.gateway.callbacks.DatabaseConnectionCallback; +import com.datadog.appsec.gateway.callbacks.DatabaseSqlQueryCallback; +import com.datadog.appsec.gateway.callbacks.GraphqlServerRequestMessageCallback; +import com.datadog.appsec.gateway.callbacks.GrpcServerMethodCallback; +import com.datadog.appsec.gateway.callbacks.GrpcServerRequestMessageCallback; +import com.datadog.appsec.gateway.callbacks.MethodAndRawURICallback; +import com.datadog.appsec.gateway.callbacks.NewRequestHeaderCallback; +import com.datadog.appsec.gateway.callbacks.RequestBodyDoneCallback; +import com.datadog.appsec.gateway.callbacks.RequestBodyProcessedCallback; +import com.datadog.appsec.gateway.callbacks.RequestBodyStartCallback; +import com.datadog.appsec.gateway.callbacks.RequestClientSocketAddressCallback; +import com.datadog.appsec.gateway.callbacks.RequestEndedCallBack; +import com.datadog.appsec.gateway.callbacks.RequestHeadersDoneCallback; +import com.datadog.appsec.gateway.callbacks.RequestInferredClientAddressCallback; +import com.datadog.appsec.gateway.callbacks.RequestPathParamsCallback; +import com.datadog.appsec.gateway.callbacks.RequestStartedCallback; +import com.datadog.appsec.gateway.callbacks.ResponseHeaderCallback; +import com.datadog.appsec.gateway.callbacks.ResponseHeaderDoneCallback; +import com.datadog.appsec.gateway.callbacks.ResponseStartedCallback; import datadog.trace.api.gateway.Events; -import datadog.trace.api.gateway.Flow; -import datadog.trace.api.gateway.IGSpanInfo; -import datadog.trace.api.gateway.RequestContext; -import datadog.trace.api.gateway.RequestContextSlot; import datadog.trace.api.gateway.SubscriptionService; -import datadog.trace.api.http.StoredBodySupplier; -import datadog.trace.api.internal.TraceSegment; -import datadog.trace.api.telemetry.RuleType; -import datadog.trace.api.telemetry.WafMetricCollector; -import datadog.trace.bootstrap.instrumentation.api.Tags; -import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; -import java.util.regex.Pattern; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Bridges the instrumentation gateway and the reactive engine. */ public class GatewayBridge { private static final Events EVENTS = Events.get(); - private static final Logger log = LoggerFactory.getLogger(GatewayBridge.class); - - private static final Pattern QUERY_PARAM_VALUE_SPLITTER = Pattern.compile("="); - private static final Pattern QUERY_PARAM_SPLITTER = Pattern.compile("&"); - private static final Map> EMPTY_QUERY_PARAMS = Collections.emptyMap(); - - /** User tracking tags that will force the collection of request headers */ - private static final String[] USER_TRACKING_TAGS = { - "appsec.events.users.login.success.track", "appsec.events.users.login.failure.track" - }; - private final SubscriptionService subscriptionService; private final EventProducerService producerService; private final ApiSecurityRequestSampler requestSampler; private final List traceSegmentPostProcessors; - // subscriber cache - private volatile DataSubscriberInfo initialReqDataSubInfo; - private volatile DataSubscriberInfo rawRequestBodySubInfo; - private volatile DataSubscriberInfo requestBodySubInfo; - private volatile DataSubscriberInfo pathParamsSubInfo; - private volatile DataSubscriberInfo respDataSubInfo; - private volatile DataSubscriberInfo grpcServerMethodSubInfo; - private volatile DataSubscriberInfo grpcServerRequestMsgSubInfo; - private volatile DataSubscriberInfo graphqlServerRequestMsgSubInfo; - private volatile DataSubscriberInfo requestEndSubInfo; - private volatile DataSubscriberInfo dbSqlQuerySubInfo; + private final SubscribersCache subscribersCache; public GatewayBridge( SubscriptionService subscriptionService, @@ -96,6 +54,7 @@ public GatewayBridge( this.producerService = producerService; this.requestSampler = requestSampler; this.traceSegmentPostProcessors = traceSegmentPostProcessors; + this.subscribersCache = new SubscribersCache(); } public void init() { @@ -104,728 +63,77 @@ public void init() { IGAppSecEventDependencies.additionalIGEventTypes( producerService.allSubscribedDataAddresses()); - subscriptionService.registerCallback( - events.requestStarted(), - () -> { - if (!AppSecSystem.isActive()) { - return RequestContextSupplier.EMPTY; - } - return new RequestContextSupplier(); - }); + subscriptionService.registerCallback(events.requestStarted(), new RequestStartedCallback()); subscriptionService.registerCallback( events.requestEnded(), - (RequestContext ctx_, IGSpanInfo spanInfo) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - - maybeExtractSchemas(ctx); - - // WAF call - ctx.closeAdditive(); - - TraceSegment traceSeg = ctx_.getTraceSegment(); - - // AppSec report metric and events for web span only - if (traceSeg != null) { - traceSeg.setTagTop("_dd.appsec.enabled", 1); - traceSeg.setTagTop("_dd.runtime_family", "jvm"); - - Collection collectedEvents = ctx.transferCollectedEvents(); - - for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) { - pp.processTraceSegment(traceSeg, ctx, collectedEvents); - } - - // If detected any events - mark span at appsec.event - if (!collectedEvents.isEmpty()) { - // Set asm keep in case that root span was not available when events are detected - traceSeg.setTagTop(Tags.ASM_KEEP, true); - traceSeg.setTagTop(Tags.PROPAGATED_APPSEC, true); - traceSeg.setTagTop("appsec.event", true); - traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress()); - - // Reflect client_ip as actor.ip for backward compatibility - Object clientIp = spanInfo.getTags().get(Tags.HTTP_CLIENT_IP); - if (clientIp != null) { - traceSeg.setTagTop("actor.ip", clientIp); - } - - // Report AppSec events via "_dd.appsec.json" tag - AppSecEventWrapper wrapper = new AppSecEventWrapper(collectedEvents); - traceSeg.setDataTop("appsec", wrapper); - - // Report collected request and response headers based on allow list - writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - writeResponseHeaders(traceSeg, RESPONSE_HEADERS_ALLOW_LIST, ctx.getResponseHeaders()); - - // Report collected stack traces - StackTraceCollection stackTraceCollection = ctx.transferStackTracesCollection(); - if (stackTraceCollection != null) { - Object flatStruct = ObjectFlattener.flatten(stackTraceCollection); - if (flatStruct != null) { - traceSeg.setMetaStructTop("_dd.stack", flatStruct); - } - } - } else if (hasUserTrackingEvent(traceSeg)) { - // Report all collected request headers on user tracking event - writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - } else { - // Report minimum set of collected request headers - writeRequestHeaders( - traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - } - // If extracted any Api Schemas - commit them - if (!ctx.commitApiSchemas(traceSeg)) { - log.debug("Unable to commit, api security schemas and will be skipped"); - } - - if (ctx.isBlocked()) { - WafMetricCollector.get().wafRequestBlocked(); - } else if (!collectedEvents.isEmpty()) { - WafMetricCollector.get().wafRequestTriggered(); - } else { - WafMetricCollector.get().wafRequest(); - } - } - - ctx.close(); - return NoopFlow.INSTANCE; - }); + new RequestEndedCallBack( + requestSampler, subscribersCache, producerService, traceSegmentPostProcessors)); subscriptionService.registerCallback(EVENTS.requestHeader(), new NewRequestHeaderCallback()); subscriptionService.registerCallback( - EVENTS.requestHeaderDone(), new RequestHeadersDoneCallback()); - - subscriptionService.registerCallback( - EVENTS.requestMethodUriRaw(), new MethodAndRawURICallback()); + EVENTS.requestHeaderDone(), + new RequestHeadersDoneCallback(subscribersCache, producerService)); subscriptionService.registerCallback( - EVENTS.requestBodyStart(), - (RequestContext ctx_, StoredBodySupplier supplier) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return null; - } + EVENTS.requestMethodUriRaw(), + new MethodAndRawURICallback(subscribersCache, producerService)); - ctx.setStoredRequestBodySupplier(supplier); - return null; - }); + subscriptionService.registerCallback(EVENTS.requestBodyStart(), new RequestBodyStartCallback()); if (additionalIGEvents.contains(EVENTS.requestPathParams())) { subscriptionService.registerCallback( EVENTS.requestPathParams(), - (ctx_, data) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isPathParamsPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setPathParamsPublished(true); - - while (true) { - DataSubscriberInfo subInfo = pathParamsSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_PATH_PARAMS); - pathParamsSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.REQUEST_PATH_PARAMS, data); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - pathParamsSubInfo = null; - } - } - }); + new RequestPathParamsCallback(subscribersCache, producerService)); } subscriptionService.registerCallback( - EVENTS.requestBodyDone(), - (RequestContext ctx_, StoredBodySupplier supplier) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRawReqBodyPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setRawReqBodyPublished(true); - - while (true) { - DataSubscriberInfo subInfo = rawRequestBodySubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_RAW); - rawRequestBodySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - - CharSequence bodyContent = supplier.get(); - if (bodyContent == null || bodyContent.length() == 0) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - rawRequestBodySubInfo = null; - } - } - }); + EVENTS.requestBodyDone(), new RequestBodyDoneCallback(subscribersCache, producerService)); if (additionalIGEvents.contains(EVENTS.requestBodyProcessed())) { subscriptionService.registerCallback( EVENTS.requestBodyProcessed(), - (RequestContext ctx_, Object obj) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - - if (ctx.isConvertedReqBodyPublished()) { - log.debug( - "Request body already published; will ignore new value of type {}", - obj.getClass()); - return NoopFlow.INSTANCE; - } - ctx.setConvertedReqBodyPublished(true); - - while (true) { - DataSubscriberInfo subInfo = requestBodySubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_OBJECT); - requestBodySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>( - KnownAddresses.REQUEST_BODY_OBJECT, ObjectIntrospection.convert(obj)); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - requestBodySubInfo = null; - } - } - }); + new RequestBodyProcessedCallback(subscribersCache, producerService)); } subscriptionService.registerCallback( EVENTS.requestClientSocketAddress(), - (ctx_, ip, port) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isReqDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setPeerAddress(ip); - ctx.setPeerPort(port); - return maybePublishRequestData(ctx); - }); + new RequestClientSocketAddressCallback(subscribersCache, producerService)); subscriptionService.registerCallback( - EVENTS.requestInferredClientAddress(), - (ctx_, ip) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx != null) { - ctx.setInferredClientIp(ip); - } - return NoopFlow.INSTANCE; // expected to be called before requestClientSocketAddress - }); + EVENTS.requestInferredClientAddress(), new RequestInferredClientAddressCallback()); subscriptionService.registerCallback( - EVENTS.responseStarted(), - (ctx_, status) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRespDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setResponseStatus(status); - return maybePublishResponseData(ctx); - }); + EVENTS.responseStarted(), new ResponseStartedCallback(subscribersCache, producerService)); + + subscriptionService.registerCallback(EVENTS.responseHeader(), new ResponseHeaderCallback()); - subscriptionService.registerCallback( - EVENTS.responseHeader(), - (ctx_, name, value) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx != null) { - ctx.addResponseHeader(name, value); - } - }); subscriptionService.registerCallback( EVENTS.responseHeaderDone(), - ctx_ -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRespDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.finishResponseHeaders(); - return maybePublishResponseData(ctx); - }); + new ResponseHeaderDoneCallback(subscribersCache, producerService)); subscriptionService.registerCallback( - EVENTS.grpcServerMethod(), - (ctx_, method) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || method == null || method.isEmpty()) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = grpcServerMethodSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD); - grpcServerMethodSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_METHOD, method); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - grpcServerMethodSubInfo = null; - } - } - }); + EVENTS.grpcServerMethod(), new GrpcServerMethodCallback(subscribersCache, producerService)); subscriptionService.registerCallback( EVENTS.grpcServerRequestMessage(), - (ctx_, obj) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = grpcServerRequestMsgSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE); - grpcServerRequestMsgSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - Object convObj = ObjectIntrospection.convert(obj); - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE, convObj); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - grpcServerRequestMsgSubInfo = null; - } - } - }); + new GrpcServerRequestMessageCallback(subscribersCache, producerService)); subscriptionService.registerCallback( EVENTS.graphqlServerRequestMessage(), - (RequestContext ctx_, Map data) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = graphqlServerRequestMsgSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS); - graphqlServerRequestMsgSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS, data); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - graphqlServerRequestMsgSubInfo = null; - } - } - }); + new GraphqlServerRequestMessageCallback(subscribersCache, producerService)); subscriptionService.registerCallback( - EVENTS.databaseConnection(), - (ctx_, dbType) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return; - } - ctx.setDbType(dbType); - }); + EVENTS.databaseConnection(), new DatabaseConnectionCallback()); subscriptionService.registerCallback( - EVENTS.databaseSqlQuery(), - (ctx_, sql) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = dbSqlQuerySubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers( - KnownAddresses.DB_TYPE, KnownAddresses.DB_SQL_QUERY); - dbSqlQuerySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new MapDataBundle.Builder(CAPACITY_0_2) - .add(KnownAddresses.DB_TYPE, ctx.getDbType()) - .add(KnownAddresses.DB_SQL_QUERY, sql) - .build(); - try { - GatewayContext gwCtx = new GatewayContext(false, RuleType.SQL_INJECTION); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - dbSqlQuerySubInfo = null; - } - } - }); + EVENTS.databaseSqlQuery(), new DatabaseSqlQueryCallback(subscribersCache, producerService)); } public void stop() { subscriptionService.reset(); } - private static boolean hasUserTrackingEvent(final TraceSegment traceSeg) { - for (String tagName : USER_TRACKING_TAGS) { - final Object value = traceSeg.getTagTop(tagName); - if (value != null && "true".equalsIgnoreCase(value.toString())) { - return true; - } - } - return false; - } - - private static void writeRequestHeaders( - final TraceSegment traceSeg, - final Set allowed, - final Map> headers) { - writeHeaders(traceSeg, "http.request.headers.", allowed, headers); - } - - private static void writeResponseHeaders( - final TraceSegment traceSeg, - final Set allowed, - final Map> headers) { - writeHeaders(traceSeg, "http.response.headers.", allowed, headers); - } - - private static void writeHeaders( - final TraceSegment traceSeg, - final String prefix, - final Set allowed, - final Map> headers) { - if (headers != null) { - headers.forEach( - (name, value) -> { - if (allowed.contains(name)) { - String v = String.join(",", value); - if (!v.isEmpty()) { - traceSeg.setTagTop(prefix + name, v); - } - } - }); - } - } - - private static class RequestContextSupplier implements Flow { - private static final Flow EMPTY = new RequestContextSupplier(null); - - private final AppSecRequestContext appSecRequestContext; - - public RequestContextSupplier() { - this(new AppSecRequestContext()); - } - - public RequestContextSupplier(AppSecRequestContext ctx) { - appSecRequestContext = ctx; - } - - @Override - public Action getAction() { - return Action.Noop.INSTANCE; - } - - @Override - public AppSecRequestContext getResult() { - return appSecRequestContext; - } - } - - private static class NewRequestHeaderCallback - implements TriConsumer { - @Override - public void accept(RequestContext ctx_, String name, String value) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return; - } - - if (name.equalsIgnoreCase("cookie")) { - Map> cookies = CookieCutter.parseCookieHeader(value); - ctx.addCookies(cookies); - } else { - ctx.addRequestHeader(name, value); - } - } - } - - private class RequestHeadersDoneCallback implements Function> { - public Flow apply(RequestContext ctx_) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isReqDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.finishRequestHeaders(); - return maybePublishRequestData(ctx); - } - } - - private class MethodAndRawURICallback - implements TriFunction> { - @Override - public Flow apply(RequestContext ctx_, String method, URIDataAdapter uri) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - - if (ctx.isReqDataPublished()) { - log.debug( - "Request method and URI already published; will ignore new values {}, {}", method, uri); - return NoopFlow.INSTANCE; - } - ctx.setMethod(method); - ctx.setScheme(uri.scheme()); - if (uri.supportsRaw()) { - ctx.setRawURI(uri.raw()); - } else { - try { - URI encodedUri = new URI(null, null, uri.path(), uri.query(), null); - String q = encodedUri.getRawQuery(); - StringBuilder encoded = new StringBuilder(); - encoded.append(encodedUri.getRawPath()); - if (null != q && !q.isEmpty()) { - encoded.append('?').append(q); - } - ctx.setRawURI(encoded.toString()); - } catch (URISyntaxException e) { - log.debug("Failed to encode URI '{}{}'", uri.path(), uri.query()); - } - } - return maybePublishRequestData(ctx); - } - } - - private Flow maybePublishRequestData(AppSecRequestContext ctx) { - String savedRawURI = ctx.getSavedRawURI(); - - if (savedRawURI == null || !ctx.isFinishedRequestHeaders() || ctx.getPeerAddress() == null) { - return NoopFlow.INSTANCE; - } - - Map> queryParams = EMPTY_QUERY_PARAMS; - int i = savedRawURI.indexOf("?"); - if (i != -1) { - String qs = savedRawURI.substring(i + 1); - // ideally we'd have the query string as parsed by the server - // or at the very least the encoding used by the server - queryParams = parseQueryStringParams(qs, StandardCharsets.UTF_8); - } - - String scheme = ctx.getScheme(); - if (scheme == null) { - scheme = "http"; - } - - ctx.setReqDataPublished(true); - - MapDataBundle bundle = - new MapDataBundle.Builder(CAPACITY_6_10) - .add(KnownAddresses.HEADERS_NO_COOKIES, ctx.getRequestHeaders()) - .add(KnownAddresses.REQUEST_COOKIES, ctx.getCookies()) - .add(KnownAddresses.REQUEST_SCHEME, scheme) - .add(KnownAddresses.REQUEST_METHOD, ctx.getMethod()) - .add(KnownAddresses.REQUEST_URI_RAW, savedRawURI) - .add(KnownAddresses.REQUEST_QUERY, queryParams) - .add(KnownAddresses.REQUEST_CLIENT_IP, ctx.getPeerAddress()) - .add(KnownAddresses.REQUEST_CLIENT_PORT, ctx.getPeerPort()) - .add(KnownAddresses.REQUEST_INFERRED_CLIENT_IP, ctx.getInferredClientIp()) - .build(); - - while (true) { - DataSubscriberInfo subInfo = this.initialReqDataSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers( - KnownAddresses.HEADERS_NO_COOKIES, - KnownAddresses.REQUEST_COOKIES, - KnownAddresses.REQUEST_SCHEME, - KnownAddresses.REQUEST_METHOD, - KnownAddresses.REQUEST_URI_RAW, - KnownAddresses.REQUEST_QUERY, - KnownAddresses.REQUEST_CLIENT_IP, - KnownAddresses.REQUEST_CLIENT_PORT, - KnownAddresses.REQUEST_INFERRED_CLIENT_IP); - initialReqDataSubInfo = subInfo; - } - - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - this.initialReqDataSubInfo = null; - } - } - } - - private Flow maybePublishResponseData(AppSecRequestContext ctx) { - - int status = ctx.getResponseStatus(); - - if (status == 0 || !ctx.isFinishedResponseHeaders()) { - return NoopFlow.INSTANCE; - } - - ctx.setRespDataPublished(true); - - MapDataBundle bundle = - MapDataBundle.of( - KnownAddresses.RESPONSE_STATUS, String.valueOf(ctx.getResponseStatus()), - KnownAddresses.RESPONSE_HEADERS_NO_COOKIES, ctx.getResponseHeaders()); - - while (true) { - DataSubscriberInfo subInfo = respDataSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers( - KnownAddresses.RESPONSE_STATUS, KnownAddresses.RESPONSE_HEADERS_NO_COOKIES); - respDataSubInfo = subInfo; - } - - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - respDataSubInfo = null; - } - } - } - - private void maybeExtractSchemas(AppSecRequestContext ctx) { - boolean extractSchema = false; - if (Config.get().isApiSecurityEnabled() && requestSampler != null) { - extractSchema = requestSampler.sampleRequest(); - } - - if (!extractSchema) { - return; - } - - while (true) { - DataSubscriberInfo subInfo = requestEndSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR); - requestEndSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return; - } - - DataBundle bundle = - new SingletonDataBundle<>( - KnownAddresses.WAF_CONTEXT_PROCESSOR, - Collections.singletonMap("extract-schema", true)); - try { - GatewayContext gwCtx = new GatewayContext(false); - producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - return; - } catch (ExpiredSubscriberInfoException e) { - requestEndSubInfo = null; - } - } - } - - private static Map> parseQueryStringParams( - String queryString, Charset uriEncoding) { - if (queryString == null) { - return Collections.emptyMap(); - } - - Map> result = new HashMap<>(); - - String[] keyValues = QUERY_PARAM_SPLITTER.split(queryString); - - for (String keyValue : keyValues) { - String[] kv = QUERY_PARAM_VALUE_SPLITTER.split(keyValue, 2); - String value = kv.length > 1 ? urlDecode(kv[1], uriEncoding, true) : ""; - String key = urlDecode(kv[0], uriEncoding, true); - List strings = result.computeIfAbsent(key, k -> new ArrayList<>(1)); - strings.add(value); - } - - return result; - } - - private static String urlDecode(String str, Charset charset, boolean queryString) { - return decodeString(str, charset, queryString, Integer.MAX_VALUE); - } - - private static String decodeString(String str, Charset charset, boolean queryString, int limit) { - byte[] bytes = str.getBytes(charset); - int j = 0; - for (int i = 0; i < bytes.length && j < limit; i++, j++) { - int b = bytes[i]; - if (b == 0x25 /* % */) { - if (i + 2 < bytes.length) { - int val = byteToDigit(bytes[i + 2]); - if (val >= 0) { - val += 16 * byteToDigit(bytes[i + 1]); - if (val >= 0) { - i += 2; - bytes[j] = (byte) val; - continue; - } - } - } - } else if (b == 0x2b /* + */ && queryString) { - bytes[j] = ' '; - continue; - } - bytes[j] = (byte) b; - } - - return new String(bytes, 0, j, charset); - } - - private static int byteToDigit(byte b) { - if (b >= 0x30 /* 0 */ && b <= 0x39 /* 9 */) { - return b - 0x30; - } - if (b >= 0x41 /* A */ && b <= 0x46 /* F */) { - return 10 + (b - 0x41); - } - if (b >= 0x61 /* a */ && b <= 0x66 /* f */) { - return 10 + (b - 0x61); - } - return -1; - } - private static class IGAppSecEventDependencies { private static final Map, Collection>> diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/SubscribersCache.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/SubscribersCache.java new file mode 100644 index 00000000000..2e8c3109059 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/SubscribersCache.java @@ -0,0 +1,102 @@ +package com.datadog.appsec.gateway; + +import com.datadog.appsec.event.EventProducerService; + +public class SubscribersCache { + + private volatile EventProducerService.DataSubscriberInfo initialReqDataSubInfo; + private volatile EventProducerService.DataSubscriberInfo rawRequestBodySubInfo; + private volatile EventProducerService.DataSubscriberInfo requestBodySubInfo; + private volatile EventProducerService.DataSubscriberInfo pathParamsSubInfo; + private volatile EventProducerService.DataSubscriberInfo respDataSubInfo; + private volatile EventProducerService.DataSubscriberInfo grpcServerMethodSubInfo; + private volatile EventProducerService.DataSubscriberInfo grpcServerRequestMsgSubInfo; + private volatile EventProducerService.DataSubscriberInfo graphqlServerRequestMsgSubInfo; + private volatile EventProducerService.DataSubscriberInfo requestEndSubInfo; + private volatile EventProducerService.DataSubscriberInfo dbSqlQuerySubInfo; + + public EventProducerService.DataSubscriberInfo getInitialReqDataSubInfo() { + return initialReqDataSubInfo; + } + + public void setInitialReqDataSubInfo( + EventProducerService.DataSubscriberInfo initialReqDataSubInfo) { + this.initialReqDataSubInfo = initialReqDataSubInfo; + } + + public EventProducerService.DataSubscriberInfo getRawRequestBodySubInfo() { + return rawRequestBodySubInfo; + } + + public void setRawRequestBodySubInfo( + EventProducerService.DataSubscriberInfo rawRequestBodySubInfo) { + this.rawRequestBodySubInfo = rawRequestBodySubInfo; + } + + public EventProducerService.DataSubscriberInfo getRequestBodySubInfo() { + return requestBodySubInfo; + } + + public void setRequestBodySubInfo(EventProducerService.DataSubscriberInfo requestBodySubInfo) { + this.requestBodySubInfo = requestBodySubInfo; + } + + public EventProducerService.DataSubscriberInfo getPathParamsSubInfo() { + return pathParamsSubInfo; + } + + public void setPathParamsSubInfo(EventProducerService.DataSubscriberInfo pathParamsSubInfo) { + this.pathParamsSubInfo = pathParamsSubInfo; + } + + public EventProducerService.DataSubscriberInfo getRespDataSubInfo() { + return respDataSubInfo; + } + + public void setRespDataSubInfo(EventProducerService.DataSubscriberInfo respDataSubInfo) { + this.respDataSubInfo = respDataSubInfo; + } + + public EventProducerService.DataSubscriberInfo getGrpcServerMethodSubInfo() { + return grpcServerMethodSubInfo; + } + + public void setGrpcServerMethodSubInfo( + EventProducerService.DataSubscriberInfo grpcServerMethodSubInfo) { + this.grpcServerMethodSubInfo = grpcServerMethodSubInfo; + } + + public EventProducerService.DataSubscriberInfo getGrpcServerRequestMsgSubInfo() { + return grpcServerRequestMsgSubInfo; + } + + public void setGrpcServerRequestMsgSubInfo( + EventProducerService.DataSubscriberInfo grpcServerRequestMsgSubInfo) { + this.grpcServerRequestMsgSubInfo = grpcServerRequestMsgSubInfo; + } + + public EventProducerService.DataSubscriberInfo getGraphqlServerRequestMsgSubInfo() { + return graphqlServerRequestMsgSubInfo; + } + + public void setGraphqlServerRequestMsgSubInfo( + EventProducerService.DataSubscriberInfo graphqlServerRequestMsgSubInfo) { + this.graphqlServerRequestMsgSubInfo = graphqlServerRequestMsgSubInfo; + } + + public EventProducerService.DataSubscriberInfo getRequestEndSubInfo() { + return requestEndSubInfo; + } + + public void setRequestEndSubInfo(EventProducerService.DataSubscriberInfo requestEndSubInfo) { + this.requestEndSubInfo = requestEndSubInfo; + } + + public EventProducerService.DataSubscriberInfo getDbSqlQuerySubInfo() { + return dbSqlQuerySubInfo; + } + + public void setDbSqlQuerySubInfo(EventProducerService.DataSubscriberInfo dbSqlQuerySubInfo) { + this.dbSqlQuerySubInfo = dbSqlQuerySubInfo; + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/CallbackUtils.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/CallbackUtils.java new file mode 100644 index 00000000000..9b209a18e47 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/CallbackUtils.java @@ -0,0 +1,242 @@ +package com.datadog.appsec.gateway.callbacks; + +import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_6_10; + +import com.datadog.appsec.api.security.ApiSecurityRequestSampler; +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.MapDataBundle; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.Config; +import datadog.trace.api.gateway.Flow; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class CallbackUtils { + + public static final CallbackUtils INSTANCE = new CallbackUtils(); + + private static final Pattern QUERY_PARAM_VALUE_SPLITTER = Pattern.compile("="); + private static final Pattern QUERY_PARAM_SPLITTER = Pattern.compile("&"); + private static final Map> EMPTY_QUERY_PARAMS = Collections.emptyMap(); + + private CallbackUtils() { + // utility class + } + + public Flow maybePublishRequestData( + final AppSecRequestContext ctx, + final SubscribersCache subscribersCache, + final EventProducerService producerService) { + String savedRawURI = ctx.getSavedRawURI(); + + if (savedRawURI == null || !ctx.isFinishedRequestHeaders() || ctx.getPeerAddress() == null) { + return NoopFlow.INSTANCE; + } + + Map> queryParams = EMPTY_QUERY_PARAMS; + int i = savedRawURI.indexOf("?"); + if (i != -1) { + String qs = savedRawURI.substring(i + 1); + // ideally we'd have the query string as parsed by the server + // or at the very least the encoding used by the server + queryParams = parseQueryStringParams(qs, StandardCharsets.UTF_8); + } + + String scheme = ctx.getScheme(); + if (scheme == null) { + scheme = "http"; + } + + ctx.setReqDataPublished(true); + + MapDataBundle bundle = + new MapDataBundle.Builder(CAPACITY_6_10) + .add(KnownAddresses.HEADERS_NO_COOKIES, ctx.getRequestHeaders()) + .add(KnownAddresses.REQUEST_COOKIES, ctx.getCookies()) + .add(KnownAddresses.REQUEST_SCHEME, scheme) + .add(KnownAddresses.REQUEST_METHOD, ctx.getMethod()) + .add(KnownAddresses.REQUEST_URI_RAW, savedRawURI) + .add(KnownAddresses.REQUEST_QUERY, queryParams) + .add(KnownAddresses.REQUEST_CLIENT_IP, ctx.getPeerAddress()) + .add(KnownAddresses.REQUEST_CLIENT_PORT, ctx.getPeerPort()) + .add(KnownAddresses.REQUEST_INFERRED_CLIENT_IP, ctx.getInferredClientIp()) + .build(); + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getInitialReqDataSubInfo(); + if (subInfo == null) { + subInfo = + producerService.getDataSubscribers( + KnownAddresses.HEADERS_NO_COOKIES, + KnownAddresses.REQUEST_COOKIES, + KnownAddresses.REQUEST_SCHEME, + KnownAddresses.REQUEST_METHOD, + KnownAddresses.REQUEST_URI_RAW, + KnownAddresses.REQUEST_QUERY, + KnownAddresses.REQUEST_CLIENT_IP, + KnownAddresses.REQUEST_CLIENT_PORT, + KnownAddresses.REQUEST_INFERRED_CLIENT_IP); + subscribersCache.setInitialReqDataSubInfo(subInfo); + } + + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setInitialReqDataSubInfo(null); + } + } + } + + public Flow maybePublishResponseData( + AppSecRequestContext ctx, + SubscribersCache subscribersCache, + EventProducerService producerService) { + + int status = ctx.getResponseStatus(); + + if (status == 0 || !ctx.isFinishedResponseHeaders()) { + return NoopFlow.INSTANCE; + } + + ctx.setRespDataPublished(true); + + MapDataBundle bundle = + MapDataBundle.of( + KnownAddresses.RESPONSE_STATUS, String.valueOf(ctx.getResponseStatus()), + KnownAddresses.RESPONSE_HEADERS_NO_COOKIES, ctx.getResponseHeaders()); + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getRespDataSubInfo(); + if (subInfo == null) { + subInfo = + producerService.getDataSubscribers( + KnownAddresses.RESPONSE_STATUS, KnownAddresses.RESPONSE_HEADERS_NO_COOKIES); + subscribersCache.setRespDataSubInfo(subInfo); + } + + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setRespDataSubInfo(null); + } + } + } + + public void maybeExtractSchemas( + AppSecRequestContext ctx, + ApiSecurityRequestSampler requestSampler, + SubscribersCache subscribersCache, + EventProducerService producerService) { + boolean extractSchema = false; + if (Config.get().isApiSecurityEnabled() && requestSampler != null) { + extractSchema = requestSampler.sampleRequest(); + } + + if (!extractSchema) { + return; + } + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getRequestEndSubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR); + subscribersCache.setRequestEndSubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return; + } + + DataBundle bundle = + new SingletonDataBundle<>( + KnownAddresses.WAF_CONTEXT_PROCESSOR, + Collections.singletonMap("extract-schema", true)); + try { + GatewayContext gwCtx = new GatewayContext(false); + producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + return; + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setRequestEndSubInfo(null); + } + } + } + + private Map> parseQueryStringParams( + String queryString, Charset uriEncoding) { + if (queryString == null) { + return Collections.emptyMap(); + } + + Map> result = new HashMap<>(); + + String[] keyValues = QUERY_PARAM_SPLITTER.split(queryString); + + for (String keyValue : keyValues) { + String[] kv = QUERY_PARAM_VALUE_SPLITTER.split(keyValue, 2); + String value = kv.length > 1 ? urlDecode(kv[1], uriEncoding, true) : ""; + String key = urlDecode(kv[0], uriEncoding, true); + List strings = result.computeIfAbsent(key, k -> new ArrayList<>(1)); + strings.add(value); + } + + return result; + } + + private String urlDecode(String str, Charset charset, boolean queryString) { + return decodeString(str, charset, queryString, Integer.MAX_VALUE); + } + + private String decodeString(String str, Charset charset, boolean queryString, int limit) { + byte[] bytes = str.getBytes(charset); + int j = 0; + for (int i = 0; i < bytes.length && j < limit; i++, j++) { + int b = bytes[i]; + if (b == 0x25 /* % */) { + if (i + 2 < bytes.length) { + int val = byteToDigit(bytes[i + 2]); + if (val >= 0) { + val += 16 * byteToDigit(bytes[i + 1]); + if (val >= 0) { + i += 2; + bytes[j] = (byte) val; + continue; + } + } + } + } else if (b == 0x2b /* + */ && queryString) { + bytes[j] = ' '; + continue; + } + bytes[j] = (byte) b; + } + + return new String(bytes, 0, j, charset); + } + + private int byteToDigit(byte b) { + if (b >= 0x30 /* 0 */ && b <= 0x39 /* 9 */) { + return b - 0x30; + } + if (b >= 0x41 /* A */ && b <= 0x46 /* F */) { + return 10 + (b - 0x41); + } + if (b >= 0x61 /* a */ && b <= 0x66 /* f */) { + return 10 + (b - 0x61); + } + return -1; + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseConnectionCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseConnectionCallback.java new file mode 100644 index 00000000000..e7f6ab4683e --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseConnectionCallback.java @@ -0,0 +1,18 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiConsumer; + +public class DatabaseConnectionCallback implements BiConsumer { + + @Override + public void accept(RequestContext requestContext, String dbType) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return; + } + ctx.setDbType(dbType); + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseSqlQueryCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseSqlQueryCallback.java new file mode 100644 index 00000000000..81f0b164847 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/DatabaseSqlQueryCallback.java @@ -0,0 +1,60 @@ +package com.datadog.appsec.gateway.callbacks; + +import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_0_2; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.MapDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.api.telemetry.RuleType; +import java.util.function.BiFunction; + +public class DatabaseSqlQueryCallback implements BiFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public DatabaseSqlQueryCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, String sql) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getDbSqlQuerySubInfo(); + if (subInfo == null) { + subInfo = + producerService.getDataSubscribers(KnownAddresses.DB_TYPE, KnownAddresses.DB_SQL_QUERY); + subscribersCache.setDbSqlQuerySubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new MapDataBundle.Builder(CAPACITY_0_2) + .add(KnownAddresses.DB_TYPE, ctx.getDbType()) + .add(KnownAddresses.DB_SQL_QUERY, sql) + .build(); + try { + GatewayContext gwCtx = new GatewayContext(false, RuleType.SQL_INJECTION); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setDbSqlQuerySubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GraphqlServerRequestMessageCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GraphqlServerRequestMessageCallback.java new file mode 100644 index 00000000000..baf49ef704d --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GraphqlServerRequestMessageCallback.java @@ -0,0 +1,56 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.Map; +import java.util.function.BiFunction; + +public class GraphqlServerRequestMessageCallback + implements BiFunction, Flow> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public GraphqlServerRequestMessageCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, Map data) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + EventProducerService.DataSubscriberInfo subInfo = + subscribersCache.getGraphqlServerRequestMsgSubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS); + subscribersCache.setGraphqlServerRequestMsgSubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new SingletonDataBundle<>(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS, data); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setGraphqlServerRequestMsgSubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerMethodCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerMethodCallback.java new file mode 100644 index 00000000000..80990ca68a2 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerMethodCallback.java @@ -0,0 +1,53 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiFunction; + +public class GrpcServerMethodCallback implements BiFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public GrpcServerMethodCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, String method) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || method == null || method.isEmpty()) { + return NoopFlow.INSTANCE; + } + while (true) { + EventProducerService.DataSubscriberInfo subInfo = + subscribersCache.getGrpcServerMethodSubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD); + subscribersCache.setGrpcServerMethodSubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_METHOD, method); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setGrpcServerMethodSubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerRequestMessageCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerRequestMessageCallback.java new file mode 100644 index 00000000000..7b3928413d8 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/GrpcServerRequestMessageCallback.java @@ -0,0 +1,57 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.ObjectIntrospection; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiFunction; + +public class GrpcServerRequestMessageCallback + implements BiFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public GrpcServerRequestMessageCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, Object obj) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + EventProducerService.DataSubscriberInfo subInfo = + subscribersCache.getGrpcServerRequestMsgSubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE); + subscribersCache.setGrpcServerRequestMsgSubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + Object convObj = ObjectIntrospection.convert(obj); + DataBundle bundle = + new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE, convObj); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setGrpcServerRequestMsgSubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/MethodAndRawURICallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/MethodAndRawURICallback.java new file mode 100644 index 00000000000..728bea4a70a --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/MethodAndRawURICallback.java @@ -0,0 +1,63 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.function.TriFunction; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter; +import java.net.URI; +import java.net.URISyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MethodAndRawURICallback + implements TriFunction> { + + private static final Logger log = LoggerFactory.getLogger(MethodAndRawURICallback.class); + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public MethodAndRawURICallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext ctx_, String method, URIDataAdapter uri) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + + if (ctx.isReqDataPublished()) { + log.debug( + "Request method and URI already published; will ignore new values {}, {}", method, uri); + return NoopFlow.INSTANCE; + } + ctx.setMethod(method); + ctx.setScheme(uri.scheme()); + if (uri.supportsRaw()) { + ctx.setRawURI(uri.raw()); + } else { + try { + URI encodedUri = new URI(null, null, uri.path(), uri.query(), null); + String q = encodedUri.getRawQuery(); + StringBuilder encoded = new StringBuilder(); + encoded.append(encodedUri.getRawPath()); + if (null != q && !q.isEmpty()) { + encoded.append('?').append(q); + } + ctx.setRawURI(encoded.toString()); + } catch (URISyntaxException e) { + log.debug("Failed to encode URI '{}{}'", uri.path(), uri.query()); + } + } + return CallbackUtils.INSTANCE.maybePublishRequestData(ctx, subscribersCache, producerService); + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/NewRequestHeaderCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/NewRequestHeaderCallback.java new file mode 100644 index 00000000000..c8b31a07f23 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/NewRequestHeaderCallback.java @@ -0,0 +1,26 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.CookieCutter; +import datadog.trace.api.function.TriConsumer; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.List; +import java.util.Map; + +public class NewRequestHeaderCallback implements TriConsumer { + + @Override + public void accept(RequestContext ctx_, String name, String value) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return; + } + if (name.equalsIgnoreCase("cookie")) { + Map> cookies = CookieCutter.parseCookieHeader(value); + ctx.addCookies(cookies); + } else { + ctx.addRequestHeader(name, value); + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyDoneCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyDoneCallback.java new file mode 100644 index 00000000000..27a1f888f47 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyDoneCallback.java @@ -0,0 +1,61 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.api.http.StoredBodySupplier; +import java.util.function.BiFunction; + +public class RequestBodyDoneCallback + implements BiFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public RequestBodyDoneCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, StoredBodySupplier storedBodySupplier) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRawReqBodyPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setRawReqBodyPublished(true); + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getRawRequestBodySubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_RAW); + subscribersCache.setRawRequestBodySubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + + CharSequence bodyContent = storedBodySupplier.get(); + if (bodyContent == null || bodyContent.length() == 0) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setRawRequestBodySubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyProcessedCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyProcessedCallback.java new file mode 100644 index 00000000000..0c8a3fe0ac6 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyProcessedCallback.java @@ -0,0 +1,67 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.ObjectIntrospection; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RequestBodyProcessedCallback + implements BiFunction> { + + private static final Logger log = LoggerFactory.getLogger(RequestBodyProcessedCallback.class); + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public RequestBodyProcessedCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, Object obj) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + + if (ctx.isConvertedReqBodyPublished()) { + log.debug("Request body already published; will ignore new value of type {}", obj.getClass()); + return NoopFlow.INSTANCE; + } + ctx.setConvertedReqBodyPublished(true); + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getRequestBodySubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_OBJECT); + subscribersCache.setRequestBodySubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new SingletonDataBundle<>( + KnownAddresses.REQUEST_BODY_OBJECT, ObjectIntrospection.convert(obj)); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setRequestBodySubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyStartCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyStartCallback.java new file mode 100644 index 00000000000..7201311a617 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestBodyStartCallback.java @@ -0,0 +1,21 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.api.http.StoredBodySupplier; +import java.util.function.BiFunction; + +public class RequestBodyStartCallback + implements BiFunction { + + @Override + public Void apply(RequestContext requestContext, StoredBodySupplier storedBodySupplier) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return null; + } + ctx.setStoredRequestBodySupplier(storedBodySupplier); + return null; + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestClientSocketAddressCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestClientSocketAddressCallback.java new file mode 100644 index 00000000000..781fbdb477d --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestClientSocketAddressCallback.java @@ -0,0 +1,34 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.function.TriFunction; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; + +public class RequestClientSocketAddressCallback + implements TriFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public RequestClientSocketAddressCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, String ip, Integer port) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isReqDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setPeerAddress(ip); + ctx.setPeerPort(port); + return CallbackUtils.INSTANCE.maybePublishRequestData(ctx, subscribersCache, producerService); + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestEndedCallBack.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestEndedCallBack.java new file mode 100644 index 00000000000..adafe1650a3 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestEndedCallBack.java @@ -0,0 +1,179 @@ +package com.datadog.appsec.gateway.callbacks; + +import static com.datadog.appsec.gateway.AppSecRequestContext.DEFAULT_REQUEST_HEADERS_ALLOW_LIST; +import static com.datadog.appsec.gateway.AppSecRequestContext.REQUEST_HEADERS_ALLOW_LIST; +import static com.datadog.appsec.gateway.AppSecRequestContext.RESPONSE_HEADERS_ALLOW_LIST; + +import com.datadog.appsec.api.security.ApiSecurityRequestSampler; +import com.datadog.appsec.config.TraceSegmentPostProcessor; +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import com.datadog.appsec.report.AppSecEvent; +import com.datadog.appsec.report.AppSecEventWrapper; +import com.datadog.appsec.stack_trace.StackTraceCollection; +import com.datadog.appsec.util.ObjectFlattener; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.IGSpanInfo; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.api.internal.TraceSegment; +import datadog.trace.api.telemetry.WafMetricCollector; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RequestEndedCallBack implements BiFunction> { + + /** User tracking tags that will force the collection of request headers */ + private static final String[] USER_TRACKING_TAGS = { + "appsec.events.users.login.success.track", "appsec.events.users.login.failure.track" + }; + + private static final Logger log = LoggerFactory.getLogger(RequestEndedCallBack.class); + + private final ApiSecurityRequestSampler requestSampler; + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + private final List traceSegmentPostProcessors; + + public RequestEndedCallBack( + ApiSecurityRequestSampler requestSampler, + SubscribersCache subscribersCache, + EventProducerService producerService, + List traceSegmentPostProcessors) { + this.requestSampler = requestSampler; + this.subscribersCache = subscribersCache; + this.producerService = producerService; + this.traceSegmentPostProcessors = traceSegmentPostProcessors; + } + + @Override + public Flow apply(RequestContext requestContext, IGSpanInfo spanInfo) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + + CallbackUtils.INSTANCE.maybeExtractSchemas( + ctx, requestSampler, subscribersCache, producerService); + + // WAF call + ctx.closeAdditive(); + + TraceSegment traceSeg = requestContext.getTraceSegment(); + + // AppSec report metric and events for web span only + if (traceSeg != null) { + traceSeg.setTagTop("_dd.appsec.enabled", 1); + traceSeg.setTagTop("_dd.runtime_family", "jvm"); + + Collection collectedEvents = ctx.transferCollectedEvents(); + + for (TraceSegmentPostProcessor pp : traceSegmentPostProcessors) { + pp.processTraceSegment(traceSeg, ctx, collectedEvents); + } + + // If detected any events - mark span at appsec.event + if (!collectedEvents.isEmpty()) { + // Set asm keep in case that root span was not available when events are detected + traceSeg.setTagTop(Tags.ASM_KEEP, true); + traceSeg.setTagTop(Tags.PROPAGATED_APPSEC, true); + traceSeg.setTagTop("appsec.event", true); + traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress()); + + // Reflect client_ip as actor.ip for backward compatibility + Object clientIp = spanInfo.getTags().get(Tags.HTTP_CLIENT_IP); + if (clientIp != null) { + traceSeg.setTagTop("actor.ip", clientIp); + } + + // Report AppSec events via "_dd.appsec.json" tag + AppSecEventWrapper wrapper = new AppSecEventWrapper(collectedEvents); + traceSeg.setDataTop("appsec", wrapper); + + // Report collected request and response headers based on allow list + writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + writeResponseHeaders(traceSeg, RESPONSE_HEADERS_ALLOW_LIST, ctx.getResponseHeaders()); + + // Report collected stack traces + StackTraceCollection stackTraceCollection = ctx.transferStackTracesCollection(); + if (stackTraceCollection != null) { + Object flatStruct = ObjectFlattener.flatten(stackTraceCollection); + if (flatStruct != null) { + traceSeg.setMetaStructTop("_dd.stack", flatStruct); + } + } + } else if (hasUserTrackingEvent(traceSeg)) { + // Report all collected request headers on user tracking event + writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + } else { + // Report minimum set of collected request headers + writeRequestHeaders(traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + } + // If extracted any Api Schemas - commit them + if (!ctx.commitApiSchemas(traceSeg)) { + log.debug("Unable to commit, api security schemas and will be skipped"); + } + + if (ctx.isBlocked()) { + WafMetricCollector.get().wafRequestBlocked(); + } else if (!collectedEvents.isEmpty()) { + WafMetricCollector.get().wafRequestTriggered(); + } else { + WafMetricCollector.get().wafRequest(); + } + } + + ctx.close(); + return NoopFlow.INSTANCE; + } + + private static boolean hasUserTrackingEvent(final TraceSegment traceSeg) { + for (String tagName : USER_TRACKING_TAGS) { + final Object value = traceSeg.getTagTop(tagName); + if (value != null && "true".equalsIgnoreCase(value.toString())) { + return true; + } + } + return false; + } + + private static void writeRequestHeaders( + final TraceSegment traceSeg, + final Set allowed, + final Map> headers) { + writeHeaders(traceSeg, "http.request.headers.", allowed, headers); + } + + private static void writeResponseHeaders( + final TraceSegment traceSeg, + final Set allowed, + final Map> headers) { + writeHeaders(traceSeg, "http.response.headers.", allowed, headers); + } + + private static void writeHeaders( + final TraceSegment traceSeg, + final String prefix, + final Set allowed, + final Map> headers) { + if (headers != null) { + headers.forEach( + (name, value) -> { + if (allowed.contains(name)) { + String v = String.join(",", value); + if (!v.isEmpty()) { + traceSeg.setTagTop(prefix + name, v); + } + } + }); + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestHeadersDoneCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestHeadersDoneCallback.java new file mode 100644 index 00000000000..3409e082bb8 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestHeadersDoneCallback.java @@ -0,0 +1,31 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.Function; + +public class RequestHeadersDoneCallback implements Function> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public RequestHeadersDoneCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + public Flow apply(RequestContext ctx_) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isReqDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.finishRequestHeaders(); + return CallbackUtils.INSTANCE.maybePublishRequestData(ctx, subscribersCache, producerService); + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestInferredClientAddressCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestInferredClientAddressCallback.java new file mode 100644 index 00000000000..d358324d51f --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestInferredClientAddressCallback.java @@ -0,0 +1,21 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiFunction; + +public class RequestInferredClientAddressCallback + implements BiFunction> { + + @Override + public Flow apply(RequestContext requestContext, String ip) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx != null) { + ctx.setInferredClientIp(ip); + } + return NoopFlow.INSTANCE; // expected to be called before requestClientSocketAddress + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestPathParamsCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestPathParamsCallback.java new file mode 100644 index 00000000000..d0475a2a0f4 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestPathParamsCallback.java @@ -0,0 +1,56 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.Map; +import java.util.function.BiFunction; + +public class RequestPathParamsCallback + implements BiFunction, Flow> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public RequestPathParamsCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, Map data) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isPathParamsPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setPathParamsPublished(true); + + while (true) { + EventProducerService.DataSubscriberInfo subInfo = subscribersCache.getPathParamsSubInfo(); + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_PATH_PARAMS); + subscribersCache.setPathParamsSubInfo(subInfo); + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_PATH_PARAMS, data); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + subscribersCache.setPathParamsSubInfo(null); + } + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestStartedCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestStartedCallback.java new file mode 100644 index 00000000000..06ad8ffea6a --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/RequestStartedCallback.java @@ -0,0 +1,42 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.AppSecSystem; +import com.datadog.appsec.gateway.AppSecRequestContext; +import datadog.trace.api.gateway.Flow; +import java.util.function.Supplier; + +public class RequestStartedCallback implements Supplier> { + + @Override + public Flow get() { + if (!AppSecSystem.isActive()) { + return RequestContextSupplier.EMPTY; + } + return new RequestContextSupplier(); + } + + public static class RequestContextSupplier implements Flow { + + public static final Flow EMPTY = new RequestContextSupplier(null); + + private final AppSecRequestContext appSecRequestContext; + + public RequestContextSupplier() { + this(new AppSecRequestContext()); + } + + public RequestContextSupplier(AppSecRequestContext ctx) { + appSecRequestContext = ctx; + } + + @Override + public Action getAction() { + return Action.Noop.INSTANCE; + } + + @Override + public AppSecRequestContext getResult() { + return appSecRequestContext; + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderCallback.java new file mode 100644 index 00000000000..952b0b0e8eb --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderCallback.java @@ -0,0 +1,17 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import datadog.trace.api.function.TriConsumer; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; + +public class ResponseHeaderCallback implements TriConsumer { + + @Override + public void accept(RequestContext requestContext, String name, String value) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx != null) { + ctx.addResponseHeader(name, value); + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderDoneCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderDoneCallback.java new file mode 100644 index 00000000000..b77c26865eb --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseHeaderDoneCallback.java @@ -0,0 +1,32 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.Function; + +public class ResponseHeaderDoneCallback implements Function> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public ResponseHeaderDoneCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRespDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.finishResponseHeaders(); + return CallbackUtils.INSTANCE.maybePublishResponseData(ctx, subscribersCache, producerService); + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseStartedCallback.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseStartedCallback.java new file mode 100644 index 00000000000..dc8f01349c5 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/callbacks/ResponseStartedCallback.java @@ -0,0 +1,32 @@ +package com.datadog.appsec.gateway.callbacks; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.NoopFlow; +import com.datadog.appsec.gateway.SubscribersCache; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import java.util.function.BiFunction; + +public class ResponseStartedCallback implements BiFunction> { + + private final SubscribersCache subscribersCache; + private final EventProducerService producerService; + + public ResponseStartedCallback( + SubscribersCache subscribersCache, EventProducerService producerService) { + this.subscribersCache = subscribersCache; + this.producerService = producerService; + } + + @Override + public Flow apply(RequestContext requestContext, Integer status) { + AppSecRequestContext ctx = requestContext.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRespDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setResponseStatus(status); + return CallbackUtils.INSTANCE.maybePublishResponseData(ctx, subscribersCache, producerService); + } +}