From fa01bc051dfeb9a829af905fa56f398c5638de6b Mon Sep 17 00:00:00 2001 From: Julio Guerra Date: Mon, 3 Jun 2024 09:45:05 +0200 Subject: [PATCH 1/5] grpc/appsec: fix rpc message blocking --- contrib/google.golang.org/grpc/appsec.go | 66 +++-- contrib/google.golang.org/grpc/appsec_test.go | 267 ++++++------------ .../appsec/emitter/grpcsec/types/types.go | 9 +- internal/appsec/emitter/sharedsec/actions.go | 6 +- internal/appsec/listener/grpcsec/grpc.go | 56 ++-- 5 files changed, 165 insertions(+), 239 deletions(-) diff --git a/contrib/google.golang.org/grpc/appsec.go b/contrib/google.golang.org/grpc/appsec.go index 227beb136d..ffea569bad 100644 --- a/contrib/google.golang.org/grpc/appsec.go +++ b/contrib/google.golang.org/grpc/appsec.go @@ -41,7 +41,7 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc } ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) { dyngo.OnData(op, func(a *sharedsec.GRPCAction) { - code, e := a.GRPCWrapper(md) + code, e := a.GRPCWrapper() blocked = a.Blocking() err = status.Error(codes.Code(code), e.Error()) }) @@ -61,7 +61,12 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc if err != nil { return nil, err } - defer grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{}, op).Finish(types.ReceiveOperationRes{Message: req}) + + defer grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{Message: req}, op).Finish(types.ReceiveOperationRes{}) + if err != nil { + return nil, err + } + rv, err := handler(ctx, req) if e, ok := err.(*types.MonitoringError); ok { err = status.Error(codes.Code(e.GRPCStatus()), e.Error()) @@ -74,13 +79,16 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grpc.StreamHandler) grpc.StreamHandler { trace.SetAppSecEnabledTags(span) return func(srv interface{}, stream grpc.ServerStream) error { - var err error - var blocked bool + appsecStream := &appsecServerStream{ + ServerStream: stream, + } + ctx := stream.Context() md, _ := metadata.FromIncomingContext(ctx) clientIP := setClientIP(ctx, span, md) grpctrace.SetRequestMetadataTags(span, md) + // Create the handler operation and listen to blocking gRPC actions to detect a blocking condition args := types.HandlerOperationArgs{ Method: method, Metadata: md, @@ -88,19 +96,21 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp } ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) { dyngo.OnData(op, func(a *sharedsec.GRPCAction) { - code, e := a.GRPCWrapper(md) - blocked = a.Blocking() - err = status.Error(codes.Code(code), e.Error()) + if a.Blocking() { + code, e := a.GRPCWrapper() + appsecStream.blockedErr = status.Error(codes.Code(code), e.Error()) + + } }) }) - stream = appsecServerStream{ - ServerStream: stream, - handlerOperation: op, - ctx: ctx, - } + + appsecStream.handlerOperation = op + appsecStream.ctx = ctx + stream = appsecStream + defer func() { events := op.Finish(types.HandlerOperationRes{}) - if blocked { + if appsecStream.blockedErr != nil { op.SetTag(trace.BlockedRequestTag, true) } trace.SetTags(span, op.Tags()) @@ -109,14 +119,16 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp } }() - if err != nil { - return err + // Check if a blocking condition was detected so far with the start operation event (ip blocking, metadata blocking, etc.) + if appsecStream.blockedErr != nil { + return appsecStream.blockedErr } - err = handler(srv, stream) - if e, ok := err.(*types.MonitoringError); ok { - err = status.Error(codes.Code(e.GRPCStatus()), e.Error()) - } + // Call the original handler + err := handler(srv, stream) + //if e, ok := err.(*types.MonitoringError); ok { + // err = status.Error(codes.Code(e.GRPCStatus()), e.Error()) + //} return err } } @@ -125,15 +137,21 @@ type appsecServerStream struct { grpc.ServerStream handlerOperation *types.HandlerOperation ctx context.Context + + // blockedErr is used to store the error to return when a blocking sec event is detected. + blockedErr error } // RecvMsg implements grpc.ServerStream interface method to monitor its // execution with AppSec. -func (ss appsecServerStream) RecvMsg(m interface{}) error { - op := grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{}, ss.handlerOperation) - defer func() { - op.Finish(types.ReceiveOperationRes{Message: m}) - }() +func (ss appsecServerStream) RecvMsg(m interface{}) (err error) { + op := grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{Message: m}, ss.handlerOperation) + defer op.Finish(types.ReceiveOperationRes{}) + + if ss.blockedErr != nil { + return ss.blockedErr + } + return ss.ServerStream.RecvMsg(m) } diff --git a/contrib/google.golang.org/grpc/appsec_test.go b/contrib/google.golang.org/grpc/appsec_test.go index a0de9b998e..d6b8280719 100644 --- a/contrib/google.golang.org/grpc/appsec_test.go +++ b/contrib/google.golang.org/grpc/appsec_test.go @@ -32,7 +32,7 @@ func TestAppSec(t *testing.T) { } setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newRig(false) + rig, err := newAppsecRig(false) require.NoError(t, err) mt := mocktracer.Start() @@ -140,7 +140,7 @@ func TestBlocking(t *testing.T) { } setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newRig(false) + rig, err := newAppsecRig(false) require.NoError(t, err) mt := mocktracer.Start() @@ -152,34 +152,89 @@ func TestBlocking(t *testing.T) { } t.Run("unary-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - // Send a XSS attack in the payload along with the canary value in the RPC metadata - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "dd-test-scanner-log", "x-client-ip", "1.2.3.4")) - reply, err := client.Ping(ctx, &FixtureRequest{Name: ""}) - - require.Nil(t, reply) - require.Equal(t, codes.Aborted, status.Code(err)) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - // The request should have the attack attempts - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-001")) - }) - - t.Run("unary-no-block", func(t *testing.T) { - client, _, cleanup := setup() - defer cleanup() - - // Send a XSS attack in the payload along with the canary value in the RPC metadata - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "dd-test-scanner-log", "x-client-ip", "1.2.3.5")) - reply, err := client.Ping(ctx, &FixtureRequest{Name: ""}) - - require.Equal(t, "passed", reply.Message) - require.Equal(t, codes.OK, status.Code(err)) + for _, tc := range []struct { + name string + md metadata.MD + message string + expectedBlocked bool + expectedMatchedRules []string + expectedNotMatchedRules []string + }{ + { + name: "ip blocking", + md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.4", "user-id", "blocked-user-1"), + message: "$globals", + expectedMatchedRules: []string{"blk-001-001"}, // ip blocking alone as it comes first + expectedNotMatchedRules: []string{"crs-933-130-block", "blk-001-002"}, // no user blocking or message blocking + }, + { + name: "message blocking", + md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "blocked-user-1"), + message: "$globals", + expectedMatchedRules: []string{"crs-933-130-block"}, // message blocking alone as it comes before user blocking + expectedNotMatchedRules: []string{"blk-001-002"}, // no user blocking + }, + { + name: "user blocking", + md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "blocked-user-1"), + message: "", + expectedMatchedRules: []string{"crs-941-110", "blk-001-002"}, // monitoring event + user blocking + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Helper assertion function to run for the unary and stream tests + assert := func(t *testing.T, do func(client FixtureClient)) { + client, mt, cleanup := setup() + defer cleanup() + + do(client) + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // The request should have the security events + events, _ := finished[0].Tag("_dd.appsec.json").(string) + require.NotEmpty(t, events) + for _, rule := range tc.expectedMatchedRules { + require.Contains(t, events, rule) + } + for _, rule := range tc.expectedNotMatchedRules { + require.NotContains(t, events, rule) + } + } + + t.Run("unary", func(t *testing.T) { + assert(t, func(client FixtureClient) { + ctx := metadata.NewOutgoingContext(context.Background(), tc.md) + reply, err := client.Ping(ctx, &FixtureRequest{Name: tc.message}) + require.Nil(t, reply) + require.Equal(t, codes.Aborted, status.Code(err)) + }) + }) + + t.Run("stream", func(t *testing.T) { + assert(t, func(client FixtureClient) { + ctx := metadata.NewOutgoingContext(context.Background(), tc.md) + + // Open the stream + stream, err := client.StreamPing(ctx) + require.NoError(t, err) + defer func() { + require.NoError(t, stream.CloseSend()) + }() + + // Send a message + err = stream.Send(&FixtureRequest{Name: tc.message}) + require.NoError(t, err) + + // Receive a message + reply, err := stream.Recv() + require.Equal(t, codes.Aborted, status.Code(err)) + require.Nil(t, reply) + }) + }) + }) + } }) t.Run("stream-block", func(t *testing.T) { @@ -190,118 +245,8 @@ func TestBlocking(t *testing.T) { stream, err := client.StreamPing(ctx) require.NoError(t, err) reply, err := stream.Recv() - - require.Equal(t, codes.Aborted, status.Code(err)) - require.Nil(t, reply) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - // The request should have the attack attempts - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-001")) - }) - - t.Run("stream-no-block", func(t *testing.T) { - client, _, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "dd-test-scanner-log", "x-client-ip", "1.2.3.5")) - stream, err := client.StreamPing(ctx) - require.NoError(t, err) - - // Send a XSS attack - err = stream.Send(&FixtureRequest{Name: ""}) - require.NoError(t, err) - reply, err := stream.Recv() - require.Equal(t, codes.OK, status.Code(err)) - require.Equal(t, "passed", reply.Message) - err = stream.CloseSend() require.NoError(t, err) - }) - -} - -// Test that user blocking works by using custom rules/rules data -func TestUserBlocking(t *testing.T) { - t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/blocking.json") - appsec.Start() - defer appsec.Stop() - if !appsec.Enabled() { - t.Skip("appsec disabled") - } - - setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newAppsecRig(false) - require.NoError(t, err) - - mt := mocktracer.Start() - - return rig.client, mt, func() { - rig.Close() - mt.Stop() - } - } - - t.Run("unary-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - // Send a XSS attack in the payload along with the canary value in the RPC metadata - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "blocked-user-1")) - reply, err := client.Ping(ctx, &FixtureRequest{Name: ""}) - - require.Nil(t, reply) - require.Equal(t, codes.Aborted, status.Code(err)) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - // The request should have the XSS and user ID attack attempts - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-002")) - require.True(t, strings.Contains(event, "crs-941-110")) - }) - - t.Run("unary-no-block", func(t *testing.T) { - client, _, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "legit user")) - reply, err := client.Ping(ctx, &FixtureRequest{Name: ""}) - - require.Equal(t, "passed", reply.Message) - require.Equal(t, codes.OK, status.Code(err)) - }) - - // This test checks that IP blocking happens BEFORE user blocking, since user blocking needs the request handler - // to be invoked while IP blocking doesn't - t.Run("unary-mixed-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "blocked-user-1", "x-forwarded-for", "1.2.3.4")) - reply, err := client.Ping(ctx, &FixtureRequest{}) - - require.Nil(t, reply) - require.Equal(t, codes.Aborted, status.Code(err)) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-001")) - }) - - t.Run("stream-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "blocked-user-1")) - stream, err := client.StreamPing(ctx) - require.NoError(t, err) - reply, err := stream.Recv() require.Equal(t, codes.Aborted, status.Code(err)) require.Nil(t, reply) @@ -311,46 +256,6 @@ func TestUserBlocking(t *testing.T) { // The request should have the attack attempts event, _ := finished[0].Tag("_dd.appsec.json").(string) require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-002")) - }) - - t.Run("stream-no-block", func(t *testing.T) { - client, _, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "legit user")) - stream, err := client.StreamPing(ctx) - require.NoError(t, err) - - // Send a XSS attack - err = stream.Send(&FixtureRequest{Name: ""}) - require.NoError(t, err) - reply, err := stream.Recv() - require.Equal(t, codes.OK, status.Code(err)) - require.Equal(t, "passed", reply.Message) - - err = stream.CloseSend() - require.NoError(t, err) - }) - // This test checks that IP blocking happens BEFORE user blocking, since user blocking needs the request handler - // to be invoked while IP blocking doesn't - t.Run("stream-mixed-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("user-id", "blocked-user-1", "x-forwarded-for", "1.2.3.4")) - stream, err := client.StreamPing(ctx) - require.NoError(t, err) - reply, err := stream.Recv() - - require.Equal(t, codes.Aborted, status.Code(err)) - require.Nil(t, reply) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - // The request should have IP related the attack attempts - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) require.True(t, strings.Contains(event, "blk-001-001")) }) } @@ -367,7 +272,7 @@ func TestPasslist(t *testing.T) { } setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newRig(false) + rig, err := newAppsecRig(false) require.NoError(t, err) mt := mocktracer.Start() diff --git a/internal/appsec/emitter/grpcsec/types/types.go b/internal/appsec/emitter/grpcsec/types/types.go index 6e94ef9495..f41a832b3a 100644 --- a/internal/appsec/emitter/grpcsec/types/types.go +++ b/internal/appsec/emitter/grpcsec/types/types.go @@ -63,14 +63,15 @@ type ( // ReceiveOperationArgs is the gRPC handler receive operation arguments // Empty as of today. - ReceiveOperationArgs struct{} + ReceiveOperationArgs struct{ + // Message received by the gRPC handler. + // Corresponds to the address `grpc.server.request.message`. + Message interface{} + } // ReceiveOperationRes is the gRPC handler receive operation results which // contains the message the gRPC handler received. ReceiveOperationRes struct { - // Message received by the gRPC handler. - // Corresponds to the address `grpc.server.request.message`. - Message interface{} } // MonitoringError is used to vehicle a gRPC error that also embeds a request status code diff --git a/internal/appsec/emitter/sharedsec/actions.go b/internal/appsec/emitter/sharedsec/actions.go index abbab10a0f..002a3a4049 100644 --- a/internal/appsec/emitter/sharedsec/actions.go +++ b/internal/appsec/emitter/sharedsec/actions.go @@ -68,12 +68,12 @@ type ( } // GRPCWrapper is an opaque prototype abstraction for a gRPC handler (to avoid importing grpc) - // that takes metadata as input and returns a status code and an error + // that returns a status code and an error // TODO: rely on strongly typed actions (with the actual grpc types) by introducing WAF constructors // living in the contrib packages, along with their dependencies - something like `appsec.RegisterWAFConstructor(newGRPCWAF)` // Such constructors would receive the full appsec config and rules, so that they would be able to build // specific blocking actions. - GRPCWrapper func(map[string][]string) (uint32, error) + GRPCWrapper func() (uint32, error) // blockActionParams are the dynamic parameters to be provided to a "block_request" // action type upon invocation @@ -196,7 +196,7 @@ func newBlockRequestHandler(status int, ct string, payload []byte) http.Handler } func newGRPCBlockHandler(status int) GRPCWrapper { - return func(_ map[string][]string) (uint32, error) { + return func() (uint32, error) { return uint32(status), errors.New("Request blocked") } } diff --git a/internal/appsec/listener/grpcsec/grpc.go b/internal/appsec/listener/grpcsec/grpc.go index a258410766..0429b9adb4 100644 --- a/internal/appsec/listener/grpcsec/grpc.go +++ b/internal/appsec/listener/grpcsec/grpc.go @@ -86,14 +86,21 @@ func newWafEventListener(wafHandle *waf.Handle, cfg *config.Config, limiter limi func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types.HandlerOperationArgs) { // Limit the maximum number of security events, as a streaming RPC could // receive unlimited number of messages where we could find security events - const maxWAFEventsPerRequest = 10 var ( nbEvents atomic.Uint32 logOnce sync.Once // per request - - events []any - mu sync.Mutex // events mutex ) + addEvents := func(events []any) { + const maxWAFEventsPerRequest = 10 + if nbEvents.Load() >= maxWAFEventsPerRequest { + logOnce.Do(func() { + log.Debug("appsec: ignoring the rpc message due to the maximum number of security events per grpc call reached") + }) + return + } + nbEvents.Add(uint32(len(events))) + shared.AddSecurityEvents(op, l.limiter, events) + } wafCtx, err := l.wafHandle.NewContextWithBudget(l.config.WAFTimeout) if err != nil { @@ -111,21 +118,23 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types // UserIDOperation happens when appsec.SetUser() is called. We run the WAF and apply actions to // see if the associated user should be blocked. Since we don't control the execution flow in this case // (SetUser is SDK), we delegate the responsibility of interrupting the handler to the user. - dyngo.On(op, func(userIDOp *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) { + dyngo.On(op, func(op *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) { values := map[string]any{ UserIDAddr: args.UserID, } wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}) - if wafResult.HasActions() || wafResult.HasEvents() { + if wafResult.HasEvents() { + addEvents(wafResult.Events) + } + if wafResult.HasActions() { for aType, params := range wafResult.Actions { for _, action := range shared.ActionsFromEntry(aType, params) { if grpcAction, ok := action.(*sharedsec.GRPCAction); ok { - code, err := grpcAction.GRPCWrapper(map[string][]string{}) - dyngo.EmitData(userIDOp, types.NewMonitoringError(err.Error(), code)) + code, err := grpcAction.GRPCWrapper() + dyngo.EmitData(op, types.NewMonitoringError(err.Error(), code)) } } } - shared.AddSecurityEvents(op, l.limiter, wafResult.Events) log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID) } }) @@ -141,9 +150,11 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types } wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}) - if wafResult.HasActions() || wafResult.HasEvents() { + if wafResult.HasEvents() { + addEvents(wafResult.Events) + } + if wafResult.HasActions() { interrupt := shared.ProcessActions(op, wafResult.Actions, nil) - shared.AddSecurityEvents(op, l.limiter, wafResult.Events) log.Debug("appsec: WAF detected an attack before executing the request") if interrupt { wafCtx.Close() @@ -152,14 +163,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types } // When the gRPC handler receives a message - dyngo.OnFinish(op, func(_ types.ReceiveOperation, res types.ReceiveOperationRes) { - if nbEvents.Load() == maxWAFEventsPerRequest { - logOnce.Do(func() { - log.Debug("appsec: ignoring the rpc message due to the maximum number of security events per grpc call reached") - }) - return - } - + dyngo.On(op, func(_ types.ReceiveOperation, res types.ReceiveOperationArgs) { // Run the WAF on the rule addresses available and listened to by the sec rules var values waf.RunAddressData // Add the gRPC message to the values if the WAF rules are using it. @@ -178,28 +182,26 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types // Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's // response is not supported at the moment. wafResult := shared.RunWAF(wafCtx, values) - if wafResult.HasEvents() { log.Debug("appsec: attack detected by the grpc waf") - nbEvents.Inc() - mu.Lock() - defer mu.Unlock() - events = append(events, wafResult.Events...) + + addEvents(wafResult.Events) + } + if wafResult.HasActions() { + shared.ProcessActions(op, wafResult.Actions, nil) } }) // When the gRPC handler finishes dyngo.OnFinish(op, func(op *types.HandlerOperation, _ types.HandlerOperationRes) { defer wafCtx.Close() - shared.AddWAFMonitoringTags(op, l.wafDiags.Version, wafCtx.Stats().Metrics()) + shared.AddWAFMonitoringTags(op, l.wafDiags.Version, wafCtx.Stats().Metrics()) // Log the following metrics once per instantiation of a WAF handle l.once.Do(func() { shared.AddRulesMonitoringTags(op, &l.wafDiags) op.SetTag(ext.ManualKeep, samplernames.AppSec) }) - - shared.AddSecurityEvents(op, l.limiter, events) }) } From 89ecea147fb917f5c80f805cf3bffc2a5581c6e6 Mon Sep 17 00:00:00 2001 From: Julio Guerra Date: Tue, 4 Jun 2024 00:05:51 +0200 Subject: [PATCH 2/5] grpc/appsec: fix rpc message blocking --- contrib/google.golang.org/grpc/appsec.go | 90 ++++++++++--------- contrib/google.golang.org/grpc/appsec_test.go | 51 ++++------- .../appsec/emitter/grpcsec/types/types.go | 9 +- internal/appsec/listener/grpcsec/grpc.go | 12 +-- 4 files changed, 69 insertions(+), 93 deletions(-) diff --git a/contrib/google.golang.org/grpc/appsec.go b/contrib/google.golang.org/grpc/appsec.go index ffea569bad..0b03cdc6fe 100644 --- a/contrib/google.golang.org/grpc/appsec.go +++ b/contrib/google.golang.org/grpc/appsec.go @@ -29,9 +29,8 @@ import ( // UnaryHandler wrapper to use when AppSec is enabled to monitor its execution. func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc.UnaryHandler) grpc.UnaryHandler { trace.SetAppSecEnabledTags(span) - return func(ctx context.Context, req interface{}) (interface{}, error) { - var err error - var blocked bool + return func(ctx context.Context, req interface{}) (res interface{}, rpcErr error) { + var blockedErr error md, _ := metadata.FromIncomingContext(ctx) clientIP := setClientIP(ctx, span, md) args := types.HandlerOperationArgs{ @@ -41,46 +40,50 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc } ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) { dyngo.OnData(op, func(a *sharedsec.GRPCAction) { - code, e := a.GRPCWrapper() - blocked = a.Blocking() - err = status.Error(codes.Code(code), e.Error()) + if a.Blocking() { + code, err := a.GRPCWrapper() + blockedErr = status.Error(codes.Code(code), err.Error()) + } }) }) defer func() { events := op.Finish(types.HandlerOperationRes{}) - if blocked { + if len(events) > 0 { + grpctrace.SetSecurityEventsTags(span, events) + } + if blockedErr != nil { op.SetTag(trace.BlockedRequestTag, true) + rpcErr = blockedErr } grpctrace.SetRequestMetadataTags(span, md) trace.SetTags(span, op.Tags()) - if len(events) > 0 { - grpctrace.SetSecurityEventsTags(span, events) - } }() - if err != nil { - return nil, err + // Check if a blocking condition was detected so far with the start operation event (ip blocking, metadata blocking, etc.) + if blockedErr != nil { + return nil, blockedErr } - defer grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{Message: req}, op).Finish(types.ReceiveOperationRes{}) - if err != nil { - return nil, err + // As of our gRPC abstract operation definition, we must fake a receive operation for unary RPCs (the same model fits both unary and streaming RPCs) + grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{}, op).Finish(types.ReceiveOperationRes{Message: req}) + // Check if a blocking condition was detected so far with the receive operation events + if blockedErr != nil { + return nil, blockedErr } - rv, err := handler(ctx, req) - if e, ok := err.(*types.MonitoringError); ok { - err = status.Error(codes.Code(e.GRPCStatus()), e.Error()) - } - return rv, err + // Call the original handler - let the deferred function above handle the blocking condition and return error + return handler(ctx, req) } } // StreamHandler wrapper to use when AppSec is enabled to monitor its execution. func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grpc.StreamHandler) grpc.StreamHandler { trace.SetAppSecEnabledTags(span) - return func(srv interface{}, stream grpc.ServerStream) error { - appsecStream := &appsecServerStream{ - ServerStream: stream, + return func(srv interface{}, stream grpc.ServerStream) (rpcErr error) { + // Create a ServerStream wrapper with appsec RPC handler operation and the Go context (to implement the ServerStream interface) + appsecStream := &appsecServerStream{ + ServerStream: stream, + // note: the blockedErr field is captured by the RPC handler's OnData closure below } ctx := stream.Context() @@ -99,24 +102,28 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp if a.Blocking() { code, e := a.GRPCWrapper() appsecStream.blockedErr = status.Error(codes.Code(code), e.Error()) - } }) }) + // Finish constructing the appsec stream wrapper and replace the original one appsecStream.handlerOperation = op appsecStream.ctx = ctx - stream = appsecStream defer func() { events := op.Finish(types.HandlerOperationRes{}) + + if len(events) > 0 { + grpctrace.SetSecurityEventsTags(span, events) + } + if appsecStream.blockedErr != nil { op.SetTag(trace.BlockedRequestTag, true) + // Change the RPC return error with appsec's + rpcErr = appsecStream.blockedErr } + trace.SetTags(span, op.Tags()) - if len(events) > 0 { - grpctrace.SetSecurityEventsTags(span, events) - } }() // Check if a blocking condition was detected so far with the start operation event (ip blocking, metadata blocking, etc.) @@ -124,12 +131,8 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp return appsecStream.blockedErr } - // Call the original handler - err := handler(srv, stream) - //if e, ok := err.(*types.MonitoringError); ok { - // err = status.Error(codes.Code(e.GRPCStatus()), e.Error()) - //} - return err + // Call the original handler - let the deferred function above handle the blocking condition and return error + return handler(srv, appsecStream) } } @@ -144,18 +147,19 @@ type appsecServerStream struct { // RecvMsg implements grpc.ServerStream interface method to monitor its // execution with AppSec. -func (ss appsecServerStream) RecvMsg(m interface{}) (err error) { - op := grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{Message: m}, ss.handlerOperation) - defer op.Finish(types.ReceiveOperationRes{}) - - if ss.blockedErr != nil { - return ss.blockedErr - } - +func (ss *appsecServerStream) RecvMsg(m interface{}) (err error) { + op := grpcsec.StartReceiveOperation(types.ReceiveOperationArgs{}, ss.handlerOperation) + defer func() { + op.Finish(types.ReceiveOperationRes{Message: m}) + if ss.blockedErr != nil { + // Change the function call return error with appsec's + err = ss.blockedErr + } + }() return ss.ServerStream.RecvMsg(m) } -func (ss appsecServerStream) Context() context.Context { +func (ss *appsecServerStream) Context() context.Context { return ss.ctx } diff --git a/contrib/google.golang.org/grpc/appsec_test.go b/contrib/google.golang.org/grpc/appsec_test.go index d6b8280719..911efa7c5d 100644 --- a/contrib/google.golang.org/grpc/appsec_test.go +++ b/contrib/google.golang.org/grpc/appsec_test.go @@ -10,7 +10,6 @@ import ( "encoding/json" "fmt" "net" - "strings" "testing" pappsec "gopkg.in/DataDog/dd-trace-go.v1/appsec" @@ -169,16 +168,17 @@ func TestBlocking(t *testing.T) { }, { name: "message blocking", - md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "blocked-user-1"), + md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "legit-user-1"), message: "$globals", expectedMatchedRules: []string{"crs-933-130-block"}, // message blocking alone as it comes before user blocking expectedNotMatchedRules: []string{"blk-001-002"}, // no user blocking }, { - name: "user blocking", - md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "blocked-user-1"), - message: "", - expectedMatchedRules: []string{"crs-941-110", "blk-001-002"}, // monitoring event + user blocking + name: "user blocking", + md: metadata.Pairs("m1", "v1", "x-client-ip", "1.2.3.5", "user-id", "blocked-user-1"), + message: "", + expectedMatchedRules: []string{"blk-001-002"}, // user blocking alone as it comes first in our test handler + expectedNotMatchedRules: []string{"crs-933-130-block"}, // message blocking alone as it comes before user blocking }, } { t.Run(tc.name, func(t *testing.T) { @@ -190,10 +190,10 @@ func TestBlocking(t *testing.T) { do(client) finished := mt.FinishedSpans() - require.Len(t, finished, 1) + require.True(t, len(finished) >= 1) // streaming RPCs will have two spans, unary RPCs will have one // The request should have the security events - events, _ := finished[0].Tag("_dd.appsec.json").(string) + events, _ := finished[len(finished)-1 /* root span */].Tag("_dd.appsec.json").(string) require.NotEmpty(t, events) for _, rule := range tc.expectedMatchedRules { require.Contains(t, events, rule) @@ -236,28 +236,6 @@ func TestBlocking(t *testing.T) { }) } }) - - t.Run("stream-block", func(t *testing.T) { - client, mt, cleanup := setup() - defer cleanup() - - ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "dd-test-scanner-log", "x-client-ip", "1.2.3.4")) - stream, err := client.StreamPing(ctx) - require.NoError(t, err) - reply, err := stream.Recv() - err = stream.CloseSend() - require.NoError(t, err) - - require.Equal(t, codes.Aborted, status.Code(err)) - require.Nil(t, reply) - - finished := mt.FinishedSpans() - require.Len(t, finished, 1) - // The request should have the attack attempts - event, _ := finished[0].Tag("_dd.appsec.json").(string) - require.NotNil(t, event) - require.True(t, strings.Contains(event, "blk-001-001")) - }) } func TestPasslist(t *testing.T) { @@ -406,17 +384,20 @@ func (s *appsecFixtureServer) StreamPing(stream Fixture_StreamPingServer) (err e ctx := stream.Context() md, _ := metadata.FromIncomingContext(ctx) ids := md.Get("user-id") - if err := pappsec.SetUser(ctx, ids[0]); err != nil { - return err + if len(ids) > 0 { + if err := pappsec.SetUser(ctx, ids[0]); err != nil { + return err + } } return s.s.StreamPing(stream) } func (s *appsecFixtureServer) Ping(ctx context.Context, in *FixtureRequest) (*FixtureReply, error) { md, _ := metadata.FromIncomingContext(ctx) ids := md.Get("user-id") - if err := pappsec.SetUser(ctx, ids[0]); err != nil { - return nil, err + if len(ids) > 0 { + if err := pappsec.SetUser(ctx, ids[0]); err != nil { + return nil, err + } } - return s.s.Ping(ctx, in) } diff --git a/internal/appsec/emitter/grpcsec/types/types.go b/internal/appsec/emitter/grpcsec/types/types.go index f41a832b3a..6e94ef9495 100644 --- a/internal/appsec/emitter/grpcsec/types/types.go +++ b/internal/appsec/emitter/grpcsec/types/types.go @@ -63,15 +63,14 @@ type ( // ReceiveOperationArgs is the gRPC handler receive operation arguments // Empty as of today. - ReceiveOperationArgs struct{ - // Message received by the gRPC handler. - // Corresponds to the address `grpc.server.request.message`. - Message interface{} - } + ReceiveOperationArgs struct{} // ReceiveOperationRes is the gRPC handler receive operation results which // contains the message the gRPC handler received. ReceiveOperationRes struct { + // Message received by the gRPC handler. + // Corresponds to the address `grpc.server.request.message`. + Message interface{} } // MonitoringError is used to vehicle a gRPC error that also embeds a request status code diff --git a/internal/appsec/listener/grpcsec/grpc.go b/internal/appsec/listener/grpcsec/grpc.go index 0429b9adb4..da8d14c0e9 100644 --- a/internal/appsec/listener/grpcsec/grpc.go +++ b/internal/appsec/listener/grpcsec/grpc.go @@ -127,14 +127,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types addEvents(wafResult.Events) } if wafResult.HasActions() { - for aType, params := range wafResult.Actions { - for _, action := range shared.ActionsFromEntry(aType, params) { - if grpcAction, ok := action.(*sharedsec.GRPCAction); ok { - code, err := grpcAction.GRPCWrapper() - dyngo.EmitData(op, types.NewMonitoringError(err.Error(), code)) - } - } - } + shared.ProcessActions(op, wafResult.Actions, &types.MonitoringError{}) log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID) } }) @@ -163,7 +156,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types } // When the gRPC handler receives a message - dyngo.On(op, func(_ types.ReceiveOperation, res types.ReceiveOperationArgs) { + dyngo.OnFinish(op, func(_ types.ReceiveOperation, res types.ReceiveOperationRes) { // Run the WAF on the rule addresses available and listened to by the sec rules var values waf.RunAddressData // Add the gRPC message to the values if the WAF rules are using it. @@ -184,7 +177,6 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types wafResult := shared.RunWAF(wafCtx, values) if wafResult.HasEvents() { log.Debug("appsec: attack detected by the grpc waf") - addEvents(wafResult.Events) } if wafResult.HasActions() { From cb551e0a7616e73abbec04113460a140a507e821 Mon Sep 17 00:00:00 2001 From: Julio Guerra Date: Tue, 4 Jun 2024 13:15:46 +0200 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Eliott Bouhana <47679741+eliottness@users.noreply.github.com> --- contrib/google.golang.org/grpc/appsec.go | 4 ++-- internal/appsec/listener/grpcsec/grpc.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/contrib/google.golang.org/grpc/appsec.go b/contrib/google.golang.org/grpc/appsec.go index 0b03cdc6fe..24f9a1dac7 100644 --- a/contrib/google.golang.org/grpc/appsec.go +++ b/contrib/google.golang.org/grpc/appsec.go @@ -29,7 +29,7 @@ import ( // UnaryHandler wrapper to use when AppSec is enabled to monitor its execution. func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc.UnaryHandler) grpc.UnaryHandler { trace.SetAppSecEnabledTags(span) - return func(ctx context.Context, req interface{}) (res interface{}, rpcErr error) { + return func(ctx context.Context, req any) (res any, rpcErr error) { var blockedErr error md, _ := metadata.FromIncomingContext(ctx) clientIP := setClientIP(ctx, span, md) @@ -79,7 +79,7 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc // StreamHandler wrapper to use when AppSec is enabled to monitor its execution. func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grpc.StreamHandler) grpc.StreamHandler { trace.SetAppSecEnabledTags(span) - return func(srv interface{}, stream grpc.ServerStream) (rpcErr error) { + return func(srv any, stream grpc.ServerStream) (rpcErr error) { // Create a ServerStream wrapper with appsec RPC handler operation and the Go context (to implement the ServerStream interface) appsecStream := &appsecServerStream{ ServerStream: stream, diff --git a/internal/appsec/listener/grpcsec/grpc.go b/internal/appsec/listener/grpcsec/grpc.go index d6a655c7a4..e0195a70f4 100644 --- a/internal/appsec/listener/grpcsec/grpc.go +++ b/internal/appsec/listener/grpcsec/grpc.go @@ -93,7 +93,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types const maxWAFEventsPerRequest = 10 if nbEvents.Load() >= maxWAFEventsPerRequest { logOnce.Do(func() { - log.Debug("appsec: ignoring the rpc message due to the maximum number of security events per grpc call reached") + log.Debug("appsec: ignoring new WAF event due to the maximum number of security events per grpc call reached") }) return } @@ -128,10 +128,10 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}) if wafResult.HasEvents() { addEvents(wafResult.Events) + log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID) } if wafResult.HasActions() { shared.ProcessActions(op, wafResult.Actions) - log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID) } }) } From 8fbec0bd36754b76ec33109189c2c1a2f993babd Mon Sep 17 00:00:00 2001 From: Julio Guerra Date: Tue, 4 Jun 2024 14:01:16 +0200 Subject: [PATCH 4/5] grpc/appsec: fix rpc message blocking --- contrib/google.golang.org/grpc/appsec.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/contrib/google.golang.org/grpc/appsec.go b/contrib/google.golang.org/grpc/appsec.go index 24f9a1dac7..6330cd8f4f 100644 --- a/contrib/google.golang.org/grpc/appsec.go +++ b/contrib/google.golang.org/grpc/appsec.go @@ -40,10 +40,8 @@ func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc } ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) { dyngo.OnData(op, func(a *sharedsec.GRPCAction) { - if a.Blocking() { - code, err := a.GRPCWrapper() - blockedErr = status.Error(codes.Code(code), err.Error()) - } + code, err := a.GRPCWrapper() + blockedErr = status.Error(codes.Code(code), err.Error()) }) }) defer func() { @@ -99,10 +97,8 @@ func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grp } ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) { dyngo.OnData(op, func(a *sharedsec.GRPCAction) { - if a.Blocking() { - code, e := a.GRPCWrapper() - appsecStream.blockedErr = status.Error(codes.Code(code), e.Error()) - } + code, e := a.GRPCWrapper() + appsecStream.blockedErr = status.Error(codes.Code(code), e.Error()) }) }) From fc5c6eb1a4448fdceac7204bfe540f86e4274966 Mon Sep 17 00:00:00 2001 From: Julio Guerra Date: Tue, 4 Jun 2024 14:06:36 +0200 Subject: [PATCH 5/5] grpc/appsec: fix rpc message blocking --- internal/appsec/listener/grpcsec/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/appsec/listener/grpcsec/grpc.go b/internal/appsec/listener/grpcsec/grpc.go index e0195a70f4..5b3326bddd 100644 --- a/internal/appsec/listener/grpcsec/grpc.go +++ b/internal/appsec/listener/grpcsec/grpc.go @@ -148,10 +148,10 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}) if wafResult.HasEvents() { addEvents(wafResult.Events) + log.Debug("appsec: WAF detected an attack before executing the request") } if wafResult.HasActions() { interrupt := shared.ProcessActions(op, wafResult.Actions) - log.Debug("appsec: WAF detected an attack before executing the request") if interrupt { wafCtx.Close() return