diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java index e9d3a89e3ee..a5eb1c9ba90 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java @@ -23,8 +23,6 @@ import com.datadog.appsec.stack_trace.StackTraceCollection; import com.datadog.appsec.util.ObjectFlattener; import datadog.trace.api.Config; -import datadog.trace.api.function.TriConsumer; -import datadog.trace.api.function.TriFunction; import datadog.trace.api.gateway.Events; import datadog.trace.api.gateway.Flow; import datadog.trace.api.gateway.IGSpanInfo; @@ -50,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,402 +96,435 @@ public GatewayBridge( } public void init() { - Events events = Events.get(); Collection> additionalIGEvents = IGAppSecEventDependencies.additionalIGEventTypes( producerService.allSubscribedDataAddresses()); + subscriptionService.registerCallback(EVENTS.requestStarted(), this::onRequestStarted); + subscriptionService.registerCallback(EVENTS.requestEnded(), this::onRequestEnded); + subscriptionService.registerCallback(EVENTS.requestHeader(), this::onRequestHeader); + subscriptionService.registerCallback(EVENTS.requestHeaderDone(), this::onRequestHeadersDone); + subscriptionService.registerCallback(EVENTS.requestMethodUriRaw(), this::onRequestMethodUriRaw); + subscriptionService.registerCallback(EVENTS.requestBodyStart(), this::onRequestBodyStart); + subscriptionService.registerCallback(EVENTS.requestBodyDone(), this::onRequestBodyDone); subscriptionService.registerCallback( - events.requestStarted(), - () -> { - if (!AppSecSystem.isActive()) { - return RequestContextSupplier.EMPTY; - } - return new RequestContextSupplier(); - }); - + EVENTS.requestClientSocketAddress(), this::onRequestClientSocketAddress); subscriptionService.registerCallback( - events.requestEnded(), - (RequestContext ctx_, IGSpanInfo spanInfo) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } + EVENTS.requestInferredClientAddress(), this::onRequestInferredClientAddress); + subscriptionService.registerCallback(EVENTS.responseStarted(), this::onResponseStarted); + subscriptionService.registerCallback(EVENTS.responseHeader(), this::onResponseHeader); + subscriptionService.registerCallback(EVENTS.responseHeaderDone(), this::onResponseHeaderDone); + subscriptionService.registerCallback(EVENTS.grpcServerMethod(), this::onGrpcServerMethod); + subscriptionService.registerCallback( + EVENTS.grpcServerRequestMessage(), this::onGrpcServerRequestMessage); + subscriptionService.registerCallback( + EVENTS.graphqlServerRequestMessage(), this::onGraphqlServerRequestMessage); + subscriptionService.registerCallback(EVENTS.databaseConnection(), this::onDatabaseConnection); + subscriptionService.registerCallback(EVENTS.databaseSqlQuery(), this::onDatabaseSqlQuery); - maybeExtractSchemas(ctx); + if (additionalIGEvents.contains(EVENTS.requestPathParams())) { + subscriptionService.registerCallback(EVENTS.requestPathParams(), this::onRequestPathParams); + } + if (additionalIGEvents.contains(EVENTS.requestBodyProcessed())) { + subscriptionService.registerCallback( + EVENTS.requestBodyProcessed(), this::onRequestBodyProcessed); + } + } - // WAF call - ctx.closeAdditive(); + private Flow onDatabaseSqlQuery(RequestContext ctx_, String sql) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + DataSubscriberInfo subInfo = dbSqlQuerySubInfo; + if (subInfo == null) { + subInfo = + producerService.getDataSubscribers(KnownAddresses.DB_TYPE, KnownAddresses.DB_SQL_QUERY); + dbSqlQuerySubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new MapDataBundle.Builder(CAPACITY_0_2) + .add(KnownAddresses.DB_TYPE, ctx.getDbType()) + .add(KnownAddresses.DB_SQL_QUERY, sql) + .build(); + try { + GatewayContext gwCtx = new GatewayContext(false, RuleType.SQL_INJECTION); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + dbSqlQuerySubInfo = null; + } + } + } - TraceSegment traceSeg = ctx_.getTraceSegment(); + private void onDatabaseConnection(RequestContext ctx_, String dbType) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return; + } + ctx.setDbType(dbType); + } - // AppSec report metric and events for web span only - if (traceSeg != null) { - traceSeg.setTagTop("_dd.appsec.enabled", 1); - traceSeg.setTagTop("_dd.runtime_family", "jvm"); + private Flow onGraphqlServerRequestMessage(RequestContext ctx_, Map data) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + DataSubscriberInfo subInfo = graphqlServerRequestMsgSubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS); + graphqlServerRequestMsgSubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new SingletonDataBundle<>(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS, data); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + graphqlServerRequestMsgSubInfo = null; + } + } + } - Collection collectedEvents = ctx.transferCollectedEvents(); + private Flow onGrpcServerRequestMessage(RequestContext ctx_, Object obj) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + DataSubscriberInfo subInfo = grpcServerRequestMsgSubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE); + grpcServerRequestMsgSubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + Object convObj = ObjectIntrospection.convert(obj); + DataBundle bundle = + new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE, convObj); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + grpcServerRequestMsgSubInfo = null; + } + } + } - for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) { - pp.processTraceSegment(traceSeg, ctx, collectedEvents); - } + private Flow onGrpcServerMethod(RequestContext ctx_, String method) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || method == null || method.isEmpty()) { + return NoopFlow.INSTANCE; + } + while (true) { + DataSubscriberInfo subInfo = grpcServerMethodSubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD); + grpcServerMethodSubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_METHOD, method); + try { + GatewayContext gwCtx = new GatewayContext(true); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + grpcServerMethodSubInfo = null; + } + } + } - // If detected any events - mark span at appsec.event - if (!collectedEvents.isEmpty()) { - // Set asm keep in case that root span was not available when events are detected - traceSeg.setTagTop(Tags.ASM_KEEP, true); - traceSeg.setTagTop(Tags.PROPAGATED_APPSEC, true); - traceSeg.setTagTop("appsec.event", true); - traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress()); - - // Reflect client_ip as actor.ip for backward compatibility - Object clientIp = spanInfo.getTags().get(Tags.HTTP_CLIENT_IP); - if (clientIp != null) { - traceSeg.setTagTop("actor.ip", clientIp); - } + private Flow onResponseHeaderDone(RequestContext ctx_) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRespDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.finishResponseHeaders(); + return maybePublishResponseData(ctx); + } - // Report AppSec events via "_dd.appsec.json" tag - AppSecEventWrapper wrapper = new AppSecEventWrapper(collectedEvents); - traceSeg.setDataTop("appsec", wrapper); - - // Report collected request and response headers based on allow list - writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - writeResponseHeaders(traceSeg, RESPONSE_HEADERS_ALLOW_LIST, ctx.getResponseHeaders()); - - // Report collected stack traces - StackTraceCollection stackTraceCollection = ctx.transferStackTracesCollection(); - if (stackTraceCollection != null) { - Object flatStruct = ObjectFlattener.flatten(stackTraceCollection); - if (flatStruct != null) { - traceSeg.setMetaStructTop("_dd.stack", flatStruct); - } - } - } else if (hasUserTrackingEvent(traceSeg)) { - // Report all collected request headers on user tracking event - writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - } else { - // Report minimum set of collected request headers - writeRequestHeaders( - traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); - } - // If extracted any Api Schemas - commit them - if (!ctx.commitApiSchemas(traceSeg)) { - log.debug("Unable to commit, api security schemas and will be skipped"); - } + private void onResponseHeader(RequestContext ctx_, String name, String value) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx != null) { + ctx.addResponseHeader(name, value); + } + } - if (ctx.isBlocked()) { - WafMetricCollector.get().wafRequestBlocked(); - } else if (!collectedEvents.isEmpty()) { - WafMetricCollector.get().wafRequestTriggered(); - } else { - WafMetricCollector.get().wafRequest(); - } - } + private Flow onResponseStarted(RequestContext ctx_, Integer status) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRespDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setResponseStatus(status); + return maybePublishResponseData(ctx); + } - ctx.close(); - return NoopFlow.INSTANCE; - }); + private NoopFlow onRequestInferredClientAddress(RequestContext ctx_, String ip) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx != null) { + ctx.setInferredClientIp(ip); + } + return NoopFlow.INSTANCE; + } - subscriptionService.registerCallback(EVENTS.requestHeader(), new NewRequestHeaderCallback()); - subscriptionService.registerCallback( - EVENTS.requestHeaderDone(), new RequestHeadersDoneCallback()); + private Flow onRequestClientSocketAddress(RequestContext ctx_, String ip, Integer port) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isReqDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setPeerAddress(ip); + ctx.setPeerPort(port); + return maybePublishRequestData(ctx); + } - subscriptionService.registerCallback( - EVENTS.requestMethodUriRaw(), new MethodAndRawURICallback()); + private Flow onRequestBodyProcessed(RequestContext ctx_, Object obj) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } - subscriptionService.registerCallback( - EVENTS.requestBodyStart(), - (RequestContext ctx_, StoredBodySupplier supplier) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return null; - } + if (ctx.isConvertedReqBodyPublished()) { + log.debug("Request body already published; will ignore new value of type {}", obj.getClass()); + return NoopFlow.INSTANCE; + } + ctx.setConvertedReqBodyPublished(true); + + while (true) { + DataSubscriberInfo subInfo = requestBodySubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_OBJECT); + requestBodySubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new SingletonDataBundle<>( + KnownAddresses.REQUEST_BODY_OBJECT, ObjectIntrospection.convert(obj)); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + requestBodySubInfo = null; + } + } + } - ctx.setStoredRequestBodySupplier(supplier); - return null; - }); + private Flow onRequestBodyDone(RequestContext ctx_, StoredBodySupplier supplier) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isRawReqBodyPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setRawReqBodyPublished(true); - if (additionalIGEvents.contains(EVENTS.requestPathParams())) { - subscriptionService.registerCallback( - EVENTS.requestPathParams(), - (ctx_, data) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isPathParamsPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setPathParamsPublished(true); + while (true) { + DataSubscriberInfo subInfo = rawRequestBodySubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_RAW); + rawRequestBodySubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } - while (true) { - DataSubscriberInfo subInfo = pathParamsSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_PATH_PARAMS); - pathParamsSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.REQUEST_PATH_PARAMS, data); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - pathParamsSubInfo = null; - } - } - }); + CharSequence bodyContent = supplier.get(); + if (bodyContent == null || bodyContent.length() == 0) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + rawRequestBodySubInfo = null; + } } + } - subscriptionService.registerCallback( - EVENTS.requestBodyDone(), - (RequestContext ctx_, StoredBodySupplier supplier) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRawReqBodyPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setRawReqBodyPublished(true); + private Flow onRequestPathParams(RequestContext ctx_, Map data) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isPathParamsPublished()) { + return NoopFlow.INSTANCE; + } + ctx.setPathParamsPublished(true); - while (true) { - DataSubscriberInfo subInfo = rawRequestBodySubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_RAW); - rawRequestBodySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } + while (true) { + DataSubscriberInfo subInfo = pathParamsSubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_PATH_PARAMS); + pathParamsSubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_PATH_PARAMS, data); + try { + GatewayContext gwCtx = new GatewayContext(false); + return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); + } catch (ExpiredSubscriberInfoException e) { + pathParamsSubInfo = null; + } + } + } - CharSequence bodyContent = supplier.get(); - if (bodyContent == null || bodyContent.length() == 0) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - rawRequestBodySubInfo = null; - } - } - }); + private Void onRequestBodyStart(RequestContext ctx_, StoredBodySupplier supplier) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return null; + } - if (additionalIGEvents.contains(EVENTS.requestBodyProcessed())) { - subscriptionService.registerCallback( - EVENTS.requestBodyProcessed(), - (RequestContext ctx_, Object obj) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } + ctx.setStoredRequestBodySupplier(supplier); + return null; + } - if (ctx.isConvertedReqBodyPublished()) { - log.debug( - "Request body already published; will ignore new value of type {}", - obj.getClass()); - return NoopFlow.INSTANCE; - } - ctx.setConvertedReqBodyPublished(true); + private Flow onRequestStarted() { + if (!AppSecSystem.isActive()) { + return RequestContextSupplier.EMPTY; + } + return new RequestContextSupplier(); + } - while (true) { - DataSubscriberInfo subInfo = requestBodySubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.REQUEST_BODY_OBJECT); - requestBodySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>( - KnownAddresses.REQUEST_BODY_OBJECT, ObjectIntrospection.convert(obj)); - try { - GatewayContext gwCtx = new GatewayContext(false); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - requestBodySubInfo = null; - } - } - }); + private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; } - subscriptionService.registerCallback( - EVENTS.requestClientSocketAddress(), - (ctx_, ip, port) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isReqDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setPeerAddress(ip); - ctx.setPeerPort(port); - return maybePublishRequestData(ctx); - }); + maybeExtractSchemas(ctx); - subscriptionService.registerCallback( - EVENTS.requestInferredClientAddress(), - (ctx_, ip) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx != null) { - ctx.setInferredClientIp(ip); - } - return NoopFlow.INSTANCE; // expected to be called before requestClientSocketAddress - }); + // WAF call + ctx.closeAdditive(); - subscriptionService.registerCallback( - EVENTS.responseStarted(), - (ctx_, status) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRespDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.setResponseStatus(status); - return maybePublishResponseData(ctx); - }); + TraceSegment traceSeg = ctx_.getTraceSegment(); - subscriptionService.registerCallback( - EVENTS.responseHeader(), - (ctx_, name, value) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx != null) { - ctx.addResponseHeader(name, value); - } - }); - subscriptionService.registerCallback( - EVENTS.responseHeaderDone(), - ctx_ -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isRespDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.finishResponseHeaders(); - return maybePublishResponseData(ctx); - }); + // AppSec report metric and events for web span only + if (traceSeg != null) { + traceSeg.setTagTop("_dd.appsec.enabled", 1); + traceSeg.setTagTop("_dd.runtime_family", "jvm"); - subscriptionService.registerCallback( - EVENTS.grpcServerMethod(), - (ctx_, method) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || method == null || method.isEmpty()) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = grpcServerMethodSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD); - grpcServerMethodSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_METHOD, method); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - grpcServerMethodSubInfo = null; - } - } - }); + Collection collectedEvents = ctx.transferCollectedEvents(); - subscriptionService.registerCallback( - EVENTS.grpcServerRequestMessage(), - (ctx_, obj) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = grpcServerRequestMsgSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE); - grpcServerRequestMsgSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - Object convObj = ObjectIntrospection.convert(obj); - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_REQUEST_MESSAGE, convObj); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - grpcServerRequestMsgSubInfo = null; - } - } - }); + for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) { + pp.processTraceSegment(traceSeg, ctx, collectedEvents); + } - subscriptionService.registerCallback( - EVENTS.graphqlServerRequestMessage(), - (RequestContext ctx_, Map data) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = graphqlServerRequestMsgSubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS); - graphqlServerRequestMsgSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new SingletonDataBundle<>(KnownAddresses.GRAPHQL_SERVER_ALL_RESOLVERS, data); - try { - GatewayContext gwCtx = new GatewayContext(true); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - graphqlServerRequestMsgSubInfo = null; - } - } - }); + // If detected any events - mark span at appsec.event + if (!collectedEvents.isEmpty()) { + // Set asm keep in case that root span was not available when events are detected + traceSeg.setTagTop(Tags.ASM_KEEP, true); + traceSeg.setTagTop(Tags.PROPAGATED_APPSEC, true); + traceSeg.setTagTop("appsec.event", true); + traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress()); + + // Reflect client_ip as actor.ip for backward compatibility + Object clientIp = spanInfo.getTags().get(Tags.HTTP_CLIENT_IP); + if (clientIp != null) { + traceSeg.setTagTop("actor.ip", clientIp); + } - subscriptionService.registerCallback( - EVENTS.databaseConnection(), - (ctx_, dbType) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return; - } - ctx.setDbType(dbType); - }); + // Report AppSec events via "_dd.appsec.json" tag + AppSecEventWrapper wrapper = new AppSecEventWrapper(collectedEvents); + traceSeg.setDataTop("appsec", wrapper); - subscriptionService.registerCallback( - EVENTS.databaseSqlQuery(), - (ctx_, sql) -> { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - while (true) { - DataSubscriberInfo subInfo = dbSqlQuerySubInfo; - if (subInfo == null) { - subInfo = - producerService.getDataSubscribers( - KnownAddresses.DB_TYPE, KnownAddresses.DB_SQL_QUERY); - dbSqlQuerySubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return NoopFlow.INSTANCE; - } - DataBundle bundle = - new MapDataBundle.Builder(CAPACITY_0_2) - .add(KnownAddresses.DB_TYPE, ctx.getDbType()) - .add(KnownAddresses.DB_SQL_QUERY, sql) - .build(); - try { - GatewayContext gwCtx = new GatewayContext(false, RuleType.SQL_INJECTION); - return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - } catch (ExpiredSubscriberInfoException e) { - dbSqlQuerySubInfo = null; - } + // Report collected request and response headers based on allow list + writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + writeResponseHeaders(traceSeg, RESPONSE_HEADERS_ALLOW_LIST, ctx.getResponseHeaders()); + + // Report collected stack traces + StackTraceCollection stackTraceCollection = ctx.transferStackTracesCollection(); + if (stackTraceCollection != null) { + Object flatStruct = ObjectFlattener.flatten(stackTraceCollection); + if (flatStruct != null) { + traceSeg.setMetaStructTop("_dd.stack", flatStruct); } - }); + } + } else if (hasUserTrackingEvent(traceSeg)) { + // Report all collected request headers on user tracking event + writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + } else { + // Report minimum set of collected request headers + writeRequestHeaders(traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders()); + } + // If extracted any Api Schemas - commit them + if (!ctx.commitApiSchemas(traceSeg)) { + log.debug("Unable to commit, api security schemas and will be skipped"); + } + + if (ctx.isBlocked()) { + WafMetricCollector.get().wafRequestBlocked(); + } else if (!collectedEvents.isEmpty()) { + WafMetricCollector.get().wafRequestTriggered(); + } else { + WafMetricCollector.get().wafRequest(); + } + } + + ctx.close(); + return NoopFlow.INSTANCE; + } + + private Flow onRequestHeadersDone(RequestContext ctx_) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null || ctx.isReqDataPublished()) { + return NoopFlow.INSTANCE; + } + ctx.finishRequestHeaders(); + return maybePublishRequestData(ctx); + } + + private Flow onRequestMethodUriRaw(RequestContext ctx_, String method, URIDataAdapter uri) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + + if (ctx.isReqDataPublished()) { + log.debug( + "Request method and URI already published; will ignore new values {}, {}", method, uri); + return NoopFlow.INSTANCE; + } + ctx.setMethod(method); + ctx.setScheme(uri.scheme()); + if (uri.supportsRaw()) { + ctx.setRawURI(uri.raw()); + } else { + try { + URI encodedUri = new URI(null, null, uri.path(), uri.query(), null); + String q = encodedUri.getRawQuery(); + StringBuilder encoded = new StringBuilder(); + encoded.append(encodedUri.getRawPath()); + if (null != q && !q.isEmpty()) { + encoded.append('?').append(q); + } + ctx.setRawURI(encoded.toString()); + } catch (URISyntaxException e) { + log.debug("Failed to encode URI '{}{}'", uri.path(), uri.query()); + } + } + return maybePublishRequestData(ctx); + } + + private void onRequestHeader(RequestContext ctx_, String name, String value) { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return; + } + + if (name.equalsIgnoreCase("cookie")) { + Map> cookies = CookieCutter.parseCookieHeader(value); + ctx.addCookies(cookies); + } else { + ctx.addRequestHeader(name, value); + } } public void stop() { @@ -567,71 +597,6 @@ public AppSecRequestContext getResult() { } } - private static class NewRequestHeaderCallback - implements TriConsumer { - @Override - public void accept(RequestContext ctx_, String name, String value) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return; - } - - if (name.equalsIgnoreCase("cookie")) { - Map> cookies = CookieCutter.parseCookieHeader(value); - ctx.addCookies(cookies); - } else { - ctx.addRequestHeader(name, value); - } - } - } - - private class RequestHeadersDoneCallback implements Function> { - public Flow apply(RequestContext ctx_) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null || ctx.isReqDataPublished()) { - return NoopFlow.INSTANCE; - } - ctx.finishRequestHeaders(); - return maybePublishRequestData(ctx); - } - } - - private class MethodAndRawURICallback - implements TriFunction> { - @Override - public Flow apply(RequestContext ctx_, String method, URIDataAdapter uri) { - AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); - if (ctx == null) { - return NoopFlow.INSTANCE; - } - - if (ctx.isReqDataPublished()) { - log.debug( - "Request method and URI already published; will ignore new values {}, {}", method, uri); - return NoopFlow.INSTANCE; - } - ctx.setMethod(method); - ctx.setScheme(uri.scheme()); - if (uri.supportsRaw()) { - ctx.setRawURI(uri.raw()); - } else { - try { - URI encodedUri = new URI(null, null, uri.path(), uri.query(), null); - String q = encodedUri.getRawQuery(); - StringBuilder encoded = new StringBuilder(); - encoded.append(encodedUri.getRawPath()); - if (null != q && !q.isEmpty()) { - encoded.append('?').append(q); - } - ctx.setRawURI(encoded.toString()); - } catch (URISyntaxException e) { - log.debug("Failed to encode URI '{}{}'", uri.path(), uri.query()); - } - } - return maybePublishRequestData(ctx); - } - } - private Flow maybePublishRequestData(AppSecRequestContext ctx) { String savedRawURI = ctx.getSavedRawURI();