Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
PriorityTagName = "priority"
PersistenceDBKindTagName = "db_kind"
WorkerPluginNameTagName = "worker_plugin_name"
headerCallsiteTagName = "header_callsite"
)

// This package should hold all the metrics and tags for temporal
Expand Down Expand Up @@ -636,6 +637,7 @@ var (
TlsCertsExpiring = NewGaugeDef("certificates_expiring")
ServiceAuthorizationLatency = NewTimerDef("service_authorization_latency")
EventBlobSize = NewBytesHistogramDef("event_blob_size")
HeaderSize = NewBytesHistogramDef("header_size", WithDescription("The size of the header in bytes passed to the server by the client. This metric is experimental and can be removed in the future."))
LockRequests = NewCounterDef("lock_requests")
LockLatency = NewTimerDef("lock_latency")
SemaphoreRequests = NewCounterDef("semaphore_requests")
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,7 @@ var TaskInvalidTag = Tag{Key: taskExpireStage, Value: "invalid"}
func PersistenceDBKindTag(kind string) Tag {
return Tag{Key: PersistenceDBKindTagName, Value: kind}
}

func HeaderCallsiteTag(kind string) Tag {
return Tag{Key: headerCallsiteTagName, Value: kind}
}
16 changes: 16 additions & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,9 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows
return nil, err
}

metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("QueryWorkflow"))
metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetQuery().GetHeader().Size()))

sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace())
sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace())

Expand Down Expand Up @@ -3119,6 +3122,11 @@ func (wh *WorkflowHandler) createScheduleWorkflow(
return nil, err
}

if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("CreateSchedule"))
metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size()))
}

// size limits will be validated on history. note that the start workflow request is
// embedded in the schedule, which is in the scheduler input. so if the scheduler itself
// doesn't exceed the limit, the started workflows should be safe as well.
Expand Down Expand Up @@ -3974,6 +3982,11 @@ func (wh *WorkflowHandler) updateScheduleWorkflow(
return nil, err
}

if startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow(); startWorkflow != nil {
metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateSchedule"))
metrics.HeaderSize.With(metricsHandler).Record(int64(startWorkflow.GetHeader().Size()))
}

input := &schedulespb.FullUpdateRequest{
Schedule: request.Schedule,
SearchAttributes: request.SearchAttributes,
Expand Down Expand Up @@ -4374,6 +4387,9 @@ func (wh *WorkflowHandler) UpdateWorkflowExecution(
return nil, err
}

metricsHandler := wh.metricsScope(ctx).WithTags(metrics.HeaderCallsiteTag("UpdateWorkflowExecution"))
metrics.HeaderSize.With(metricsHandler).Record(int64(request.GetRequest().GetInput().GetHeader().Size()))

switch request.WaitPolicy.LifecycleStage { // nolint:exhaustive
case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED:
metrics.WorkflowExecutionUpdateWaitStageAccepted.With(wh.metricsScope(ctx)).Record(1)
Expand Down
12 changes: 8 additions & 4 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,37 +217,40 @@ func ValidateStart(
workflowID string,
workflowInputSize int,
workflowMemoSize int,
workflowHeaderSize int,
operation string,
) error {
config := shard.GetConfig()
logger := shard.GetLogger()
throttledLogger := shard.GetThrottledLogger()
namespaceName := namespaceEntry.Name().String()

metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger)
metrics.HeaderSize.With(metricsHandler.WithTags(metrics.HeaderCallsiteTag(operation))).Record(int64(workflowHeaderSize))
handlerWithCommandTag := metricsHandler.WithTags(metrics.CommandTypeTag(operation))
if err := common.CheckEventBlobSizeLimit(
workflowInputSize,
config.BlobSizeLimitWarn(namespaceName),
config.BlobSizeLimitError(namespaceName),
namespaceName,
workflowID,
"",
interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation)),
handlerWithCommandTag,
throttledLogger,
tag.BlobSizeViolationOperation(operation),
); err != nil {
return err
}

handler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.CommandTypeTag(operation))
metrics.MemoSize.With(handler).Record(int64(workflowMemoSize))
metrics.MemoSize.With(handlerWithCommandTag).Record(int64(workflowMemoSize))
if err := common.CheckEventBlobSizeLimit(
workflowMemoSize,
config.MemoSizeLimitWarn(namespaceName),
config.MemoSizeLimitError(namespaceName),
namespaceName,
workflowID,
"",
handler,
handlerWithCommandTag,
throttledLogger,
tag.BlobSizeViolationOperation(operation),
); err != nil {
Expand Down Expand Up @@ -311,6 +314,7 @@ func ValidateStartWorkflowExecutionRequest(
workflowID,
request.GetInput().Size(),
request.GetMemo().Size(),
request.GetHeader().Size(),
operation,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity(
}
}

metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("ScheduleActivityTaskCommand"))
metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size()))
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK.String()),
attr.GetInput().Size(),
Expand Down Expand Up @@ -903,6 +905,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandRecordMarker(
return nil, err
}

metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("RecordMarkerCommand"))
metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size()))
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_RECORD_MARKER.String()),
common.GetPayloadsMapSize(attr.GetDetails()),
Expand Down Expand Up @@ -960,6 +964,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow(
}
}

metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("ContinueAsNewWorkflowExecutionCommand"))
metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size()))
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION.String()),
attr.GetInput().Size(),
Expand Down Expand Up @@ -1081,6 +1087,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow(
}
}

metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("StartChildWorkflowExecutionCommand"))
metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size()))
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION.String()),
attr.GetInput().Size(),
Expand Down Expand Up @@ -1160,6 +1168,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandSignalExternalWorkflow
return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
}

metricsHandler := handler.metricsHandler.WithTags(metrics.HeaderCallsiteTag("SignalExternalWorkflowExecutionCommand"))
metrics.HeaderSize.With(metricsHandler).Record(int64(attr.GetHeader().Size()))
if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION.String()),
attr.GetInput().Size(),
Expand Down
5 changes: 4 additions & 1 deletion service/history/api/signal_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func ValidateSignal(
shard historyi.ShardContext,
mutableState historyi.MutableState,
signalPayloadSize int,
signalHeaderSize int,
operation string,
) error {
config := shard.GetConfig()
Expand All @@ -31,14 +32,16 @@ func ValidateSignal(
blobSizeLimitWarn := config.BlobSizeLimitWarn(namespaceName)
blobSizeLimitError := config.BlobSizeLimitError(namespaceName)

metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger())
metrics.HeaderSize.With(metricsHandler.WithTags(metrics.HeaderCallsiteTag(operation))).Record(int64(signalHeaderSize))
if err := common.CheckEventBlobSizeLimit(
signalPayloadSize,
blobSizeLimitWarn,
blobSizeLimitError,
namespaceName,
workflowID,
runID,
interceptor.GetMetricsHandlerFromContext(ctx, shard.GetLogger()).WithTags(
metricsHandler.WithTags(
metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String()),
),
shard.GetThrottledLogger(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func startAndSignalWorkflow(
shard,
newMutableState,
signalWithStartRequest.GetSignalInput().Size(),
signalWithStartRequest.GetHeader().Size(),
"SignalWithStartWorkflowExecution",
); err != nil {
return "", false, err
Expand Down Expand Up @@ -280,6 +281,7 @@ func signalWorkflow(
shardContext,
workflowLease.GetMutableState(),
request.GetSignalInput().Size(),
request.GetHeader().Size(),
"SignalWithStartWorkflowExecution",
); err != nil {
// in-memory mutable state is still clean, release the lock with nil error to prevent
Expand Down
1 change: 1 addition & 0 deletions service/history/api/signalworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func Invoke(
shard,
mutableState,
request.GetInput().Size(),
request.GetHeader().Size(),
"SignalWorkflowExecution",
); err != nil {
releaseFn(nil)
Expand Down
Loading