Skip to content

Commit

Permalink
No blocking in database onConnection flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinZakharov committed Jun 27, 2024
1 parent 7df3aa8 commit 005fb47
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

public abstract class DatabaseClientDecorator<CONNECTION> extends ClientDecorator {
Expand Down Expand Up @@ -151,27 +152,14 @@ protected void processDatabaseType(AgentSpan span, String dbType) {
postProcessServiceAndOperationName(span, namingEntry);

if (Config.get().isAppSecRaspEnabled() && dbType != null) {
BiFunction<RequestContext, String, Flow<Void>> connectDbCallback =
BiConsumer<RequestContext, String> connectDbCallback =
AgentTracer.get()
.getCallbackProvider(RequestContextSlot.APPSEC)
.getCallback(EVENTS.databaseConnection());
if (connectDbCallback != null) {
RequestContext ctx = span.getRequestContext();
if (ctx != null) {
Flow<Void> flow = connectDbCallback.apply(ctx, dbType);
Flow.Action action = flow.getAction();
if (action instanceof Flow.Action.RequestBlockingAction) {
BlockResponseFunction brf = ctx.getBlockResponseFunction();
if (brf != null) {
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
brf.tryCommitBlockingResponse(
ctx.getTraceSegment(),
rba.getStatusCode(),
rba.getBlockingContentType(),
rba.getExtraHeaders());
}
throw new BlockingException("Blocked request (for DB connection)");
}
connectDbCallback.accept(ctx, dbType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void init() {
(ctx_, dbType) -> {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return NoopFlow.INSTANCE;
return;
}
while (true) {
DataSubscriberInfo subInfo = dbConnectionSubInfo;
Expand All @@ -460,11 +460,11 @@ public void init() {
dbConnectionSubInfo = subInfo;
}
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
return;
}
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.DB_TYPE, dbType);
try {
return producerService.publishDataEvent(subInfo, ctx, bundle, false);
producerService.publishDataEvent(subInfo, ctx, bundle, false);
} catch (ExpiredSubscriberInfoException e) {
dbConnectionSubInfo = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapterBase
import datadog.trace.test.util.DDSpecification

import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Function
import java.util.function.Supplier
Expand Down Expand Up @@ -77,7 +78,7 @@ class GatewayBridgeSpecification extends DDSpecification {
BiFunction<RequestContext, String, Flow<Void>> grpcServerMethodCB
BiFunction<RequestContext, Object, Flow<Void>> grpcServerRequestMessageCB
BiFunction<RequestContext, Map<String, Object>, Flow<Void>> graphqlServerRequestMessageCB
BiFunction<RequestContext, String, Flow<Void>> databaseConnectionCB
BiConsumer<RequestContext, String> databaseConnectionCB
BiFunction<RequestContext, String, Flow<Void>> databaseSqlQueryCB

void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -209,8 +210,8 @@ public EventType<BiFunction<RequestContext, Object, Flow<Void>>> grpcServerReque
new ET<>("database.connection", DATABASE_CONNECTION_ID);
/** A database connection */
@SuppressWarnings("unchecked")
public EventType<BiFunction<RequestContext, String, Flow<Void>>> databaseConnection() {
return (EventType<BiFunction<RequestContext, String, Flow<Void>>>) DATABASE_CONNECTION;
public EventType<BiConsumer<RequestContext, String>> databaseConnection() {
return (EventType<BiConsumer<RequestContext, String>>) DATABASE_CONNECTION;
}

static final int DATABASE_SQL_QUERY_ID = 17;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -365,6 +366,17 @@ public Flow<Void> apply(RequestContext ctx, Integer status) {
}
};
case DATABASE_CONNECTION_ID:
return (C)
new BiConsumer<RequestContext, String>() {
@Override
public void accept(RequestContext ctx, String arg) {
try {
((BiConsumer<RequestContext, String>) callback).accept(ctx, arg);
} catch (Throwable t) {
log.warn("Callback for {} threw.", eventType, t);
}
}
};
case DATABASE_SQL_QUERY_ID:
return (C)
new BiFunction<RequestContext, String, Flow<Void>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void testNormalCalls() {
assertThat(cbp.getCallback(events.graphqlServerRequestMessage()).apply(null, null).getAction())
.isEqualTo(Flow.Action.Noop.INSTANCE);
ss.registerCallback(events.databaseConnection(), callback);
cbp.getCallback(events.databaseConnection()).apply(null, null);
cbp.getCallback(events.databaseConnection()).accept(null, null);
ss.registerCallback(events.databaseSqlQuery(), callback);
cbp.getCallback(events.databaseSqlQuery()).apply(null, null);
assertThat(callback.count).isEqualTo(Events.MAX_EVENTS);
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testThrowableBlocking() {
assertThat(cbp.getCallback(events.graphqlServerRequestMessage()).apply(null, null).getAction())
.isEqualTo(Flow.Action.Noop.INSTANCE);
ss.registerCallback(events.databaseConnection(), throwback);
cbp.getCallback(events.databaseConnection()).apply(null, null);
cbp.getCallback(events.databaseConnection()).accept(null, null);
ss.registerCallback(events.databaseSqlQuery(), throwback);
cbp.getCallback(events.databaseSqlQuery()).apply(null, null);
assertThat(throwback.count).isEqualTo(Events.MAX_EVENTS);
Expand Down

0 comments on commit 005fb47

Please sign in to comment.