Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
MetricsReplication
MetricsProcess
MetricsHealing
MetricsBuckets

// MetricsAll must be last.
// Enables all metrics.
Expand Down Expand Up @@ -101,6 +102,7 @@ func (m MetricType) String() string {
addIf(m.Contains(MetricsReplication), "Replication")
addIf(m.Contains(MetricsProcess), "Process")
addIf(m.Contains(MetricsHealing), "Healing")
addIf(m.Contains(MetricsBuckets), "Buckets")
return b.String()
}

Expand Down Expand Up @@ -159,6 +161,7 @@ type MetricsOptions struct {
DrivePoolIdx []int // Only include metrics for these drive pools. Leave empty for all.
DriveSetIdx []int // Only include metrics for these drive sets (combine with PoolIdx if needed).
Disks []string // Include only specific disks. Leave empty for all.
Buckets []string // Include only specific buckets in bucket metrics. Leave empty for all.
ByJobID string
ByDepID string

Expand Down Expand Up @@ -195,6 +198,9 @@ func (adm *AdminClient) Metrics(ctx context.Context, o MetricsOptions, out func(
}

q.Set("disks", strings.Join(o.Disks, ","))
if len(o.Buckets) > 0 {
q.Set("buckets", strings.Join(o.Buckets, ","))
}
if o.ByDisk {
q.Set("by-disk", "true") // Legacy flag
o.Flags.Add(MetricsByDisk)
Expand Down Expand Up @@ -368,6 +374,7 @@ type Metrics struct {
Replication *ReplicationMetrics `json:"replication,omitempty"`
Process *ProcessMetrics `json:"process,omitempty"`
Healing *HealingMetrics `json:"healing,omitempty"`
Buckets *BucketAPIMetrics `json:"buckets,omitempty"`
}

// Merge other into r.
Expand Down Expand Up @@ -435,6 +442,10 @@ func (r *Metrics) Merge(other *Metrics) {
r.Healing = &HealingMetrics{}
}
r.Healing.Merge(other.Healing)
if r.Buckets == nil && other.Buckets != nil {
r.Buckets = &BucketAPIMetrics{}
}
r.Buckets.Merge(other.Buckets)
}

// ScannerMetrics contains scanner information.
Expand Down Expand Up @@ -2960,3 +2971,157 @@ func (m *HealingMetrics) Merge(other *HealingMetrics) {
maps.Copy(m.ActiveSessions, other.ActiveSessions)
}
}

// BucketOpStat holds per-operation request counters and byte I/O for one
// bucket. Bytes are tracked per-operation so byte traffic can be attributed
// to the specific S3 calls (e.g., GET vs PUT).
type BucketOpStat struct {
Requests int64 `json:"requests"`
Errors4xx int64 `json:"errors4xx,omitempty"`
Errors5xx int64 `json:"errors5xx,omitempty"`
BytesIn uint64 `json:"bytesIn,omitempty"`
BytesOut uint64 `json:"bytesOut,omitempty"`
}

// SegmentedBucketStats holds a time-segmented series for one bucket within a
// single window (LastHour or LastDay). Slots are ordered oldest-first; index
// len-1 is the most recent.
type SegmentedBucketStats struct {
// IntervalSecs is the duration of each slot in seconds.
IntervalSecs int `json:"intervalSecs"`

// FirstTime is the timestamp of the oldest slot.
FirstTime time.Time `json:"firstTime"`

// Per-category counts; one slot per IntervalSecs.
Requests []int64 `json:"requests,omitempty"`
Gets []int64 `json:"gets,omitempty"`
Puts []int64 `json:"puts,omitempty"`
Lists []int64 `json:"lists,omitempty"`
Errors []int64 `json:"errors,omitempty"`
Errors4xx []int64 `json:"errors4xx,omitempty"`
Errors5xx []int64 `json:"errors5xx,omitempty"`

// BytesIn / BytesOut are per-slot byte counters so callers can both
// chart byte throughput and sum across slots for a window total.
BytesIn []int64 `json:"bytesIn,omitempty"`
BytesOut []int64 `json:"bytesOut,omitempty"`
}

// Merge folds other into s. Slots are right-aligned and summed so the most
// recent slot always aligns; FirstTime extends to the earliest reported.
func (s *SegmentedBucketStats) Merge(other *SegmentedBucketStats) {
if other == nil {
return
}
if s.IntervalSecs == 0 {
s.IntervalSecs = other.IntervalSecs
}
if s.FirstTime.IsZero() || (!other.FirstTime.IsZero() && other.FirstTime.Before(s.FirstTime)) {
s.FirstTime = other.FirstTime
}
s.Requests = addInt64Slices(s.Requests, other.Requests)
s.Gets = addInt64Slices(s.Gets, other.Gets)
s.Puts = addInt64Slices(s.Puts, other.Puts)
s.Lists = addInt64Slices(s.Lists, other.Lists)
s.Errors = addInt64Slices(s.Errors, other.Errors)
s.Errors4xx = addInt64Slices(s.Errors4xx, other.Errors4xx)
s.Errors5xx = addInt64Slices(s.Errors5xx, other.Errors5xx)
s.BytesIn = addInt64Slices(s.BytesIn, other.BytesIn)
s.BytesOut = addInt64Slices(s.BytesOut, other.BytesOut)
}

// BucketMetrics holds all data for one bucket across the available time
// windows. LastMinute is always populated and aggregated per-op (no
// segments). LastHour and LastDay are populated only when
// MetricsHourStats / MetricsDayStats are requested, and carry segmented
// time-series.
type BucketMetrics struct {
// LastMinute holds per-S3-operation aggregated stats over the last
// minute. Always present. Map key is the operation name.
LastMinute map[string]BucketOpStat `json:"lastMinute,omitempty"`

// LastHour holds 1-minute segmented stats over the last hour.
// Populated only when MetricsHourStats is requested.
LastHour *SegmentedBucketStats `json:"lastHour,omitempty"`

// LastDay holds 15-minute segmented stats over the last day.
// Populated only when MetricsDayStats is requested.
LastDay *SegmentedBucketStats `json:"lastDay,omitempty"`
}

// Merge folds other into m. Per-op LastMinute entries are summed; segmented
// windows are right-aligned and summed.
func (m *BucketMetrics) Merge(other *BucketMetrics) {
if other == nil {
return
}
for op, oStat := range other.LastMinute {
if m.LastMinute == nil {
m.LastMinute = make(map[string]BucketOpStat, len(other.LastMinute))
}
aStat := m.LastMinute[op]
aStat.Requests += oStat.Requests
aStat.Errors4xx += oStat.Errors4xx
aStat.Errors5xx += oStat.Errors5xx
aStat.BytesIn += oStat.BytesIn
aStat.BytesOut += oStat.BytesOut
m.LastMinute[op] = aStat
}
if other.LastHour != nil {
if m.LastHour == nil {
m.LastHour = &SegmentedBucketStats{}
}
m.LastHour.Merge(other.LastHour)
}
if other.LastDay != nil {
if m.LastDay == nil {
m.LastDay = &SegmentedBucketStats{}
}
m.LastDay.Merge(other.LastDay)
}
}

// BucketAPIMetrics holds per-bucket API statistics. Each bucket's entry
// carries the populated windows; the windows themselves live on
// BucketMetrics so a bucket can carry both LastHour and LastDay at once.
type BucketAPIMetrics struct {
// N is the number of nodes that reported data.
N int `json:"n"`

// Buckets maps bucket name to its consolidated metrics.
Buckets map[string]BucketMetrics `json:"buckets,omitempty"`
}

// Merge folds other into b by merging each per-bucket entry.
func (b *BucketAPIMetrics) Merge(other *BucketAPIMetrics) {
if other == nil {
return
}
b.N += other.N
for bucket, ob := range other.Buckets {
if b.Buckets == nil {
b.Buckets = make(map[string]BucketMetrics, len(other.Buckets))
}
ab := b.Buckets[bucket]
ab.Merge(&ob)
b.Buckets[bucket] = ab
}
}

// addInt64Slices returns the element-wise sum of a and b, right-aligned so
// that slot index len-1 (most recent) always corresponds between both.
// The longer slice is used as the base; if b is longer, a copy is made so
// the caller's backing array is not mutated.
func addInt64Slices(a, b []int64) []int64 {
if len(b) > len(a) {
tmp := make([]int64, len(b))
copy(tmp, b)
a, b = tmp, a
}
offset := len(a) - len(b)
for i, v := range b {
a[offset+i] += v
}
return a
}
Loading
Loading