Skip to content

Commit

Permalink
No blocking in database onConnection flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinZakharov committed Jun 27, 2024
1 parent 7df3aa8 commit 65247ca
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

public abstract class DatabaseClientDecorator<CONNECTION> extends ClientDecorator {
Expand Down Expand Up @@ -151,27 +152,14 @@ protected void processDatabaseType(AgentSpan span, String dbType) {
postProcessServiceAndOperationName(span, namingEntry);

if (Config.get().isAppSecRaspEnabled() && dbType != null) {
BiFunction<RequestContext, String, Flow<Void>> connectDbCallback =
BiConsumer<RequestContext, String> connectDbCallback =
AgentTracer.get()
.getCallbackProvider(RequestContextSlot.APPSEC)
.getCallback(EVENTS.databaseConnection());
if (connectDbCallback != null) {
RequestContext ctx = span.getRequestContext();
if (ctx != null) {
Flow<Void> flow = connectDbCallback.apply(ctx, dbType);
Flow.Action action = flow.getAction();
if (action instanceof Flow.Action.RequestBlockingAction) {
BlockResponseFunction brf = ctx.getBlockResponseFunction();
if (brf != null) {
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
brf.tryCommitBlockingResponse(
ctx.getTraceSegment(),
rba.getStatusCode(),
rba.getBlockingContentType(),
rba.getExtraHeaders());
}
throw new BlockingException("Blocked request (for DB connection)");
}
connectDbCallback.accept(ctx, dbType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void init() {
(ctx_, dbType) -> {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return NoopFlow.INSTANCE;
return;
}
while (true) {
DataSubscriberInfo subInfo = dbConnectionSubInfo;
Expand All @@ -460,11 +460,11 @@ public void init() {
dbConnectionSubInfo = subInfo;
}
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
return;
}
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.DB_TYPE, dbType);
try {
return producerService.publishDataEvent(subInfo, ctx, bundle, false);
producerService.publishDataEvent(subInfo, ctx, bundle, false);
} catch (ExpiredSubscriberInfoException e) {
dbConnectionSubInfo = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapterBase
import datadog.trace.test.util.DDSpecification

import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Function
import java.util.function.Supplier
Expand Down Expand Up @@ -77,7 +78,7 @@ class GatewayBridgeSpecification extends DDSpecification {
BiFunction<RequestContext, String, Flow<Void>> grpcServerMethodCB
BiFunction<RequestContext, Object, Flow<Void>> grpcServerRequestMessageCB
BiFunction<RequestContext, Map<String, Object>, Flow<Void>> graphqlServerRequestMessageCB
BiFunction<RequestContext, String, Flow<Void>> databaseConnectionCB
BiConsumer<RequestContext, String> databaseConnectionCB
BiFunction<RequestContext, String, Flow<Void>> databaseSqlQueryCB

void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -209,8 +210,8 @@ public EventType<BiFunction<RequestContext, Object, Flow<Void>>> grpcServerReque
new ET<>("database.connection", DATABASE_CONNECTION_ID);
/** A database connection */
@SuppressWarnings("unchecked")
public EventType<BiFunction<RequestContext, String, Flow<Void>>> databaseConnection() {
return (EventType<BiFunction<RequestContext, String, Flow<Void>>>) DATABASE_CONNECTION;
public EventType<BiConsumer<RequestContext, String>> databaseConnection() {
return (EventType<BiConsumer<RequestContext, String>>) DATABASE_CONNECTION;
}

static final int DATABASE_SQL_QUERY_ID = 17;
Expand Down

0 comments on commit 65247ca

Please sign in to comment.