From cd6f66c376672fc5d07b245b8f880e3d4ebe6e98 Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Wed, 26 Nov 2025 15:23:06 -0800 Subject: [PATCH 1/3] Track the size of the header --- common/metrics/metric_defs.go | 1 + service/frontend/workflow_handler.go | 10 ++++-- service/history/api/create_workflow_util.go | 11 ++++--- .../workflow_task_completed_handler.go | 33 ++++++++++++++----- service/history/api/signal_workflow_util.go | 9 +++-- .../signal_with_start_workflow.go | 2 ++ service/history/api/signalworkflow/api.go | 1 + 7 files changed, 49 insertions(+), 18 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 3a1169dfd33..a0d1d75c996 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -636,6 +636,7 @@ var ( TlsCertsExpiring = NewGaugeDef("certificates_expiring") ServiceAuthorizationLatency = NewTimerDef("service_authorization_latency") EventBlobSize = NewBytesHistogramDef("event_blob_size") + HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client.")) LockRequests = NewCounterDef("lock_requests") LockLatency = NewTimerDef("lock_latency") SemaphoreRequests = NewCounterDef("semaphore_requests") diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index a902093334c..ac8ca518bc8 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -1959,6 +1959,9 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request return nil, err } + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())) + metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetHeader().Size())) + sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace()) if err := common.CheckEventBlobSizeLimit( @@ -1968,7 +1971,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request namespaceID.String(), request.GetWorkflowExecution().GetWorkflowId(), request.GetWorkflowExecution().GetRunId(), - wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), + metricsHandler, wh.throttledLogger, tag.BlobSizeViolationOperation("SignalWorkflowExecution"), ); err != nil { @@ -2777,6 +2780,9 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows return nil, err } + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())) + metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetQuery().GetHeader().Size())) + sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace()) @@ -2787,7 +2793,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows namespaceID.String(), request.GetExecution().GetWorkflowId(), request.GetExecution().GetRunId(), - wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), + metricsHandler, wh.throttledLogger, tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil { return nil, err diff --git a/service/history/api/create_workflow_util.go b/service/history/api/create_workflow_util.go index 800b96b16cc..e78f5c13b4b 100644 --- a/service/history/api/create_workflow_util.go +++ b/service/history/api/create_workflow_util.go @@ -217,6 +217,7 @@ func ValidateStart( workflowID string, workflowInputSize int, workflowMemoSize int, + workflowHeaderSize int, operation string, ) error { config := shard.GetConfig() @@ -224,6 +225,8 @@ func ValidateStart( throttledLogger := shard.GetThrottledLogger() namespaceName := namespaceEntry.Name().String() + metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation)) + metrics.HeaderSize.With(metricsHandler).Record(int64(workflowHeaderSize)) if err := common.CheckEventBlobSizeLimit( workflowInputSize, config.BlobSizeLimitWarn(namespaceName), @@ -231,15 +234,14 @@ func ValidateStart( namespaceName, workflowID, "", - interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation)), + metricsHandler, throttledLogger, tag.BlobSizeViolationOperation(operation), ); err != nil { return err } - handler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation)) - metrics.MemoSize.With(handler).Record(int64(workflowMemoSize)) + metrics.MemoSize.With(metricsHandler).Record(int64(workflowMemoSize)) if err := common.CheckEventBlobSizeLimit( workflowMemoSize, config.MemoSizeLimitWarn(namespaceName), @@ -247,7 +249,7 @@ func ValidateStart( namespaceName, workflowID, "", - handler, + metricsHandler, throttledLogger, tag.BlobSizeViolationOperation(operation), ); err != nil { @@ -311,6 +313,7 @@ func ValidateStartWorkflowExecutionRequest( workflowID, request.GetInput().Size(), request.GetMemo().Size(), + request.GetHeader().Size(), operation, ) } diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index 80e007c7de1..b727af08829 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -463,8 +463,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity( } } + tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK.String()) + metricsHandler := handler.metricsHandler.WithTags(tag) + metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK.String()), + tag, attr.GetInput().Size(), "ScheduleActivityTaskCommandAttributes.Input exceeds size limit.", ); err != nil { @@ -903,8 +906,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandRecordMarker( return nil, err } + tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_RECORD_MARKER.String()) + metricsHandler := handler.metricsHandler.WithTags(tag) + metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_RECORD_MARKER.String()), + tag, common.GetPayloadsMapSize(attr.GetDetails()), "RecordMarkerCommandAttributes.Details exceeds size limit.", ); err != nil { @@ -960,8 +966,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( } } + commandTypeTag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()) + metricsHandler := handler.metricsHandler.WithTags(commandTypeTag) + metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), + commandTypeTag, attr.GetInput().Size(), "ContinueAsNewWorkflowExecutionCommandAttributes. Input exceeds size limit.", ); err != nil { @@ -970,7 +979,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( if err := handler.sizeLimitChecker.checkIfMemoSizeExceedsLimit( attr.GetMemo(), - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), + commandTypeTag, "ContinueAsNewWorkflowExecutionCommandAttributes. Memo exceeds size limit.", ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES, err) @@ -980,7 +989,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( if err := handler.sizeLimitChecker.checkIfSearchAttributesSizeExceedsLimit( attr.GetSearchAttributes(), namespaceName, - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), + commandTypeTag, ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES, err) } @@ -1081,8 +1090,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( } } + tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()) + metricsHandler := handler.metricsHandler.WithTags(tag) + metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), + tag, attr.GetInput().Size(), "StartChildWorkflowExecutionCommandAttributes. Input exceeds size limit.", ); err != nil { @@ -1091,7 +1103,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( if err := handler.sizeLimitChecker.checkIfMemoSizeExceedsLimit( attr.GetMemo(), - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), + tag, "StartChildWorkflowExecutionCommandAttributes.Memo exceeds size limit.", ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err) @@ -1101,7 +1113,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( if err := handler.sizeLimitChecker.checkIfSearchAttributesSizeExceedsLimit( attr.GetSearchAttributes(), targetNamespace, - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), + tag, ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err) } @@ -1160,8 +1172,11 @@ func (handler *workflowTaskCompletedHandler) handleCommandSignalExternalWorkflow return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err) } + tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()) + metricsHandler := handler.metricsHandler.WithTags(tag) + metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()), + tag, attr.GetInput().Size(), "SignalExternalWorkflowExecutionCommandAttributes.Input exceeds size limit.", ); err != nil { diff --git a/service/history/api/signal_workflow_util.go b/service/history/api/signal_workflow_util.go index 09f2f9d0444..a820ddd10ee 100644 --- a/service/history/api/signal_workflow_util.go +++ b/service/history/api/signal_workflow_util.go @@ -17,6 +17,7 @@ func ValidateSignal( shard historyi.ShardContext, mutableState historyi.MutableState, signalPayloadSize int, + signalHeaderSize int, operation string, ) error { config := shard.GetConfig() @@ -31,6 +32,10 @@ func ValidateSignal( blobSizeLimitWarn := config.BlobSizeLimitWarn(namespaceName) blobSizeLimitError := config.BlobSizeLimitError(namespaceName) + metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger()).WithTags( + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()), + ) + metrics.HeaderSize.With(metricsHandler).Record(int64(signalHeaderSize)) if err := common.CheckEventBlobSizeLimit( signalPayloadSize, blobSizeLimitWarn, @@ -38,9 +43,7 @@ func ValidateSignal( namespaceName, workflowID, runID, - interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger()).WithTags( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()), - ), + metricsHandler, shard.GetThrottledLogger(), tag.BlobSizeViolationOperation(operation), ); err != nil { diff --git a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go index 376915bd699..05cc8981e65 100644 --- a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go +++ b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go @@ -87,6 +87,7 @@ func startAndSignalWorkflow( shard, newMutableState, signalWithStartRequest.GetSignalInput().Size(), + signalWithStartRequest.GetHeader().Size(), "SignalWithStartWorkflowExecution", ); err != nil { return "", false, err @@ -280,6 +281,7 @@ func signalWorkflow( shardContext, workflowLease.GetMutableState(), request.GetSignalInput().Size(), + request.GetHeader().Size(), "SignalWithStartWorkflowExecution", ); err != nil { // in-memory mutable state is still clean, release the lock with nil error to prevent diff --git a/service/history/api/signalworkflow/api.go b/service/history/api/signalworkflow/api.go index 72ddec22fe8..8c0a2c61c80 100644 --- a/service/history/api/signalworkflow/api.go +++ b/service/history/api/signalworkflow/api.go @@ -57,6 +57,7 @@ func Invoke( shard, mutableState, request.GetInput().Size(), + request.GetHeader().Size(), "SignalWorkflowExecution", ); err != nil { releaseFn(nil) From c0083c860cd04f3fc50faf937dbc827248f89659 Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Mon, 1 Dec 2025 12:51:16 -0800 Subject: [PATCH 2/3] Comments --- common/metrics/metric_defs.go | 3 +- common/metrics/tags.go | 4 +++ service/frontend/workflow_handler.go | 12 +++---- service/history/api/create_workflow_util.go | 11 ++++--- .../workflow_task_completed_handler.go | 33 ++++++++----------- service/history/api/signal_workflow_util.go | 10 +++--- 6 files changed, 37 insertions(+), 36 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index a0d1d75c996..b29798b0274 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -29,6 +29,7 @@ const ( PriorityTagName = "priority" PersistenceDBKindTagName = "db_kind" WorkerPluginNameTagName = "worker_plugin_name" + headerCallsiteTagName = "header_callsite" ) // This package should hold all the metrics and tags for temporal @@ -636,7 +637,7 @@ var ( TlsCertsExpiring = NewGaugeDef("certificates_expiring") ServiceAuthorizationLatency = NewTimerDef("service_authorization_latency") EventBlobSize = NewBytesHistogramDef("event_blob_size") - HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client.")) + HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client. This metric is experimental and can be removed in the future.")) LockRequests = NewCounterDef("lock_requests") LockLatency = NewTimerDef("lock_latency") SemaphoreRequests = NewCounterDef("semaphore_requests") diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 56398cf6575..06499041738 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -471,3 +471,7 @@ var TaskInvalidTag = Tag{Key: taskExpireStage, Value: "invalid"} func PersistenceDBKindTag(kind string) Tag { return Tag{Key: PersistenceDBKindTagName, Value: kind} } + +func HeaderCallsiteTag(kind string) Tag { + return Tag{Key: headerCallsiteTagName, Value: kind} +} diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index ac8ca518bc8..2e7b45915a4 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -1959,9 +1959,6 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request return nil, err } - metricsHandler := wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())) - metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetHeader().Size())) - sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace()) if err := common.CheckEventBlobSizeLimit( @@ -1971,7 +1968,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request namespaceID.String(), request.GetWorkflowExecution().GetWorkflowId(), request.GetWorkflowExecution().GetRunId(), - metricsHandler, + wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, tag.BlobSizeViolationOperation("SignalWorkflowExecution"), ); err != nil { @@ -2780,7 +2777,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows return nil, err } - metricsHandler := wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())) + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("QueryWorkflow")) metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetQuery().GetHeader().Size())) sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace()) @@ -2793,7 +2790,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows namespaceID.String(), request.GetExecution().GetWorkflowId(), request.GetExecution().GetRunId(), - metricsHandler, + wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil { return nil, err @@ -4380,6 +4377,9 @@ func (wh *WorkflowHandler) UpdateWorkflowExecution( return nil, err } + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateWorkflowExecution")) + metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetRequest().GetInput().GetHeader().Size())) + switch request.WaitPolicy.LifecycleStage { // nolint:exhaustive case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED: metrics.WorkflowExecutionUpdateWaitStageAccepted.With(wh.metricsScope(ctx)).Record(1) diff --git a/service/history/api/create_workflow_util.go b/service/history/api/create_workflow_util.go index e78f5c13b4b..0f1983033cb 100644 --- a/service/history/api/create_workflow_util.go +++ b/service/history/api/create_workflow_util.go @@ -225,8 +225,9 @@ func ValidateStart( throttledLogger := shard.GetThrottledLogger() namespaceName := namespaceEntry.Name().String() - metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation)) - metrics.HeaderSize.With(metricsHandler).Record(int64(workflowHeaderSize)) + metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger) + metrics.HeaderSize.With(metricsHandler.WithTags(metrics.HeaderCallsiteTag(operation))).Record(int64(workflowHeaderSize)) + handlerWithCommandTag := metricsHandler.WithTags(metrics.CommandTypeTag(operation)) if err := common.CheckEventBlobSizeLimit( workflowInputSize, config.BlobSizeLimitWarn(namespaceName), @@ -234,14 +235,14 @@ func ValidateStart( namespaceName, workflowID, "", - metricsHandler, + handlerWithCommandTag, throttledLogger, tag.BlobSizeViolationOperation(operation), ); err != nil { return err } - metrics.MemoSize.With(metricsHandler).Record(int64(workflowMemoSize)) + metrics.MemoSize.With(handlerWithCommandTag).Record(int64(workflowMemoSize)) if err := common.CheckEventBlobSizeLimit( workflowMemoSize, config.MemoSizeLimitWarn(namespaceName), @@ -249,7 +250,7 @@ func ValidateStart( namespaceName, workflowID, "", - metricsHandler, + handlerWithCommandTag, throttledLogger, tag.BlobSizeViolationOperation(operation), ); err != nil { diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index b727af08829..6eff0e0d016 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -463,11 +463,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity( } } - tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK.String()) - metricsHandler := handler.metricsHandler.WithTags(tag) + metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("ScheduleActivityTaskCommand")) metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK.String()), attr.GetInput().Size(), "ScheduleActivityTaskCommandAttributes.Input exceeds size limit.", ); err != nil { @@ -906,11 +905,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandRecordMarker( return nil, err } - tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_RECORD_MARKER.String()) - metricsHandler := handler.metricsHandler.WithTags(tag) + metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("RecordMarkerCommand")) metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_RECORD_MARKER.String()), common.GetPayloadsMapSize(attr.GetDetails()), "RecordMarkerCommandAttributes.Details exceeds size limit.", ); err != nil { @@ -966,11 +964,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( } } - commandTypeTag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()) - metricsHandler := handler.metricsHandler.WithTags(commandTypeTag) + metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("ContinueAsNewWorkflowExecutionCommand")) metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - commandTypeTag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), attr.GetInput().Size(), "ContinueAsNewWorkflowExecutionCommandAttributes. Input exceeds size limit.", ); err != nil { @@ -979,7 +976,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( if err := handler.sizeLimitChecker.checkIfMemoSizeExceedsLimit( attr.GetMemo(), - commandTypeTag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), "ContinueAsNewWorkflowExecutionCommandAttributes. Memo exceeds size limit.", ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES, err) @@ -989,7 +986,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow( if err := handler.sizeLimitChecker.checkIfSearchAttributesSizeExceedsLimit( attr.GetSearchAttributes(), namespaceName, - commandTypeTag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()), ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES, err) } @@ -1090,11 +1087,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( } } - tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()) - metricsHandler := handler.metricsHandler.WithTags(tag) + metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("StartChildWorkflowExecutionCommand")) metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), attr.GetInput().Size(), "StartChildWorkflowExecutionCommandAttributes. Input exceeds size limit.", ); err != nil { @@ -1103,7 +1099,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( if err := handler.sizeLimitChecker.checkIfMemoSizeExceedsLimit( attr.GetMemo(), - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), "StartChildWorkflowExecutionCommandAttributes.Memo exceeds size limit.", ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err) @@ -1113,7 +1109,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow( if err := handler.sizeLimitChecker.checkIfSearchAttributesSizeExceedsLimit( attr.GetSearchAttributes(), targetNamespace, - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()), ); err != nil { return nil, handler.terminateWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES, err) } @@ -1172,11 +1168,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandSignalExternalWorkflow return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err) } - tag := metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()) - metricsHandler := handler.metricsHandler.WithTags(tag) + metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("SignalExternalWorkflowExecutionCommand")) metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size())) if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( - tag, + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()), attr.GetInput().Size(), "SignalExternalWorkflowExecutionCommandAttributes.Input exceeds size limit.", ); err != nil { diff --git a/service/history/api/signal_workflow_util.go b/service/history/api/signal_workflow_util.go index a820ddd10ee..c95a6e2a243 100644 --- a/service/history/api/signal_workflow_util.go +++ b/service/history/api/signal_workflow_util.go @@ -32,10 +32,8 @@ func ValidateSignal( blobSizeLimitWarn := config.BlobSizeLimitWarn(namespaceName) blobSizeLimitError := config.BlobSizeLimitError(namespaceName) - metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger()).WithTags( - metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()), - ) - metrics.HeaderSize.With(metricsHandler).Record(int64(signalHeaderSize)) + metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger()) + metrics.HeaderSize.With(metricsHandler.WithTags(metrics.HeaderCallsiteTag(operation))).Record(int64(signalHeaderSize)) if err := common.CheckEventBlobSizeLimit( signalPayloadSize, blobSizeLimitWarn, @@ -43,7 +41,9 @@ func ValidateSignal( namespaceName, workflowID, runID, - metricsHandler, + metricsHandler.WithTags( + metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()), + ), shard.GetThrottledLogger(), tag.BlobSizeViolationOperation(operation), ); err != nil { From 86d7d309e1dece2e3c92e546e31f9fe87748ec26 Mon Sep 17 00:00:00 2001 From: Vladyslav Simonenko Date: Mon, 1 Dec 2025 13:11:53 -0800 Subject: [PATCH 3/3] Create/Update Schedule logging --- service/frontend/workflow_handler.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 2e7b45915a4..d9d2c251fc2 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -3122,6 +3122,11 @@ func (wh *WorkflowHandler) createScheduleWorkflow( return nil, err } + if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil { + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("CreateSchedule")) + metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size())) + } + // size limits will be validated on history. note that the start workflow request is // embedded in the schedule, which is in the scheduler input. so if the scheduler itself // doesn't exceed the limit, the started workflows should be safe as well. @@ -3977,6 +3982,11 @@ func (wh *WorkflowHandler) updateScheduleWorkflow( return nil, err } + if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil { + metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateSchedule")) + metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size())) + } + input := &schedulespb.FullUpdateRequest{ Schedule: request.Schedule, SearchAttributes: request.SearchAttributes,