Skip to content

Commit c830f14

Browse files
committed
HYPERFLEET-774 - feat: update aggregation logic
1 parent 77fe53d commit c830f14

12 files changed

Lines changed: 1932 additions & 515 deletions

docs/api-resources.md

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/statuses
5555
"status": "False",
5656
"reason": "AwaitingAdapters",
5757
"message": "Waiting for adapters to report status",
58-
"observed_generation": 0,
58+
"observed_generation": 1,
5959
"created_time": "2025-01-01T00:00:00Z",
6060
"last_updated_time": "2025-01-01T00:00:00Z",
6161
"last_transition_time": "2025-01-01T00:00:00Z"
@@ -65,7 +65,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/statuses
6565
"status": "False",
6666
"reason": "AwaitingAdapters",
6767
"message": "Waiting for adapters to report status",
68-
"observed_generation": 0,
68+
"observed_generation": 1,
6969
"created_time": "2025-01-01T00:00:00Z",
7070
"last_updated_time": "2025-01-01T00:00:00Z",
7171
"last_transition_time": "2025-01-01T00:00:00Z"
@@ -249,6 +249,101 @@ The status uses Kubernetes-style conditions instead of a single phase field:
249249
- One adapter reports `Available=False` for `observed_generation=1` `Available` transitions to `False`
250250
- One adapter reports `Available=False` for `observed_generation=2` `Available` keeps its `True` status
251251

252+
### Aggregation logic
253+
254+
Description of the aggregation logic for the resource status conditions
255+
256+
- An API that stores resources entities (clusters, nodepools)
257+
- A sentinel that polls the API for changes and triggers messages
258+
- Instances of "adapters":
259+
- Read the messages
260+
- Reconcile the state with the world
261+
- Report back to the API, using statuses "conditions"
262+
263+
Resources keep track of its status, which is affected by the reports from adapters
264+
265+
- Each resource keeps a `generation` property that gets increased on every change
266+
- Adapters associated with a resource, report their state as an array of adapter conditions
267+
- Three of these conditions are always mandatory : `Available`, `Applied`, `Health`
268+
- If one of the mandatory conditions is missing, the report is discarded
269+
- A `observed_generation` field indicating the generation associated with the report
270+
- `observed_time` for when the adapter work was done
271+
- If the reported `observed_generation` is lower than the already stored `observed_generation` for that adapter, the report is discarded
272+
- Each resource has a list of associated "adapters" used to compute the aggregated status.conditions
273+
- Each resource "status.conditions" is array property composed of:
274+
- The `Available` condition of each adapter, named as `<adapter-name>Successful`
275+
- 2 aggregated conditions: `Ready` and `Available` computed from the array of `Available` resource statuses conditions
276+
- Only `Available` condition from adapters is used to compute aggregated conditions
277+
278+
The whole API spec is at: <https://raw.githubusercontent.com/openshift-hyperfleet/hyperfleet-api/refs/heads/main/openapi/openapi.yaml>
279+
280+
The aggregation logic for a resource (cluster/nodepool) works as follows.
281+
282+
**Notation:**
283+
284+
- `X` = report's `observed_generation`
285+
- `G` = resource's current `generation`
286+
- `statuses[]` = all stored adapter condition reports
287+
- `lut` = adapter's `last_report_time`
288+
- `ltt` = `last_transition_time`
289+
- `obs_gen` = `observed_generation`
290+
- `obs_time` = report's `observed_time`
291+
- `` = no change
292+
293+
---
294+
295+
#### Discard / Reject Rules
296+
297+
Checked before any aggregation. A discarded or rejected report causes no state change.
298+
299+
| Rule | Condition | Outcome |
300+
|---|---|---|
301+
| `obs_gen` too high | report `observed_generation` > resource `generation` | Discarded |
302+
| Stale adapter report | report `observed_generation` < adapter's stored `observed_generation` | Discarded |
303+
| Missing mandatory conditions | Missing any of `Available`, `Applied`, `Health`, or value not in `{True, False, Unknown}` | Discarded |
304+
| Available=Unknown | Report is valid but `Available=Unknown` | Discarded |
305+
306+
---
307+
308+
#### Lifecycle Events
309+
310+
| Event | Condition | Target | → status | → obs_gen | → lut | → ltt |
311+
|---|---|---|---|---|---|---|
312+
| Creation || `Ready` | `False` | `1` | `now` | `now` |
313+
| Creation || `Available` | `False` | `1` | `now` | `now` |
314+
| Change (→G) | Was `Ready=True` | `Ready` | `False` | `G` | `now` | `now` |
315+
| Change (→G) | Was `Ready=False` | `Ready` | `False` | `G` | `now` | `` |
316+
| Change (→G) || `Available` | unchanged | unchanged | `` | `` |
317+
318+
---
319+
320+
#### Adapter Report Aggregation Matrix
321+
322+
The **Ready** check and **Available** check are independent — both can apply to the same incoming report.
323+
324+
##### Report `Available=True` (obs_gen = X)
325+
326+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
327+
|---|---|---|---|---|---|---|
328+
| `Ready` | `Ready=True` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | unchanged | `min(adapter last_report_time)` | `` | `` |
329+
| `Ready` | `Ready=False` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | **`True`** | `min(adapter last_report_time)` | `obs_time` | `` |
330+
| `Ready` | `Ready=False` | Any required adapter has no stored status | `` | `now` | `` | `` |
331+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
332+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | **`True`** | `min(adapter last_report_time)` | `obs_time` | `X` |
333+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | unchanged | `min(adapter last_report_time)` | `` | `X` |
334+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
335+
336+
##### Report `Available=False` (obs_gen = X)
337+
338+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
339+
|---|---|---|---|---|---|---|
340+
| `Ready` | `Ready=False` | `X==G` | unchanged | `min(adapter last_report_time)` | `` | `` |
341+
| `Ready` | `Ready=True` | `X==G` | **`False`** | `obs_time` | `obs_time` | `` |
342+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
343+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | unchanged | `min(adapter last_report_time)` | `` | `X` |
344+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | **`False`** | `obs_time` | `obs_time` | `X` |
345+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
346+
252347
## NodePool Management
253348

254349
### Endpoints
@@ -307,7 +402,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/nodepools/{nodepool_id}/statuses
307402
"status": "False",
308403
"reason": "AwaitingAdapters",
309404
"message": "Waiting for adapters to report status",
310-
"observed_generation": 0,
405+
"observed_generation": 1,
311406
"created_time": "2025-01-01T00:00:00Z",
312407
"last_updated_time": "2025-01-01T00:00:00Z",
313408
"last_transition_time": "2025-01-01T00:00:00Z"
@@ -317,7 +412,7 @@ POST /api/hyperfleet/v1/clusters/{cluster_id}/nodepools/{nodepool_id}/statuses
317412
"status": "False",
318413
"reason": "AwaitingAdapters",
319414
"message": "Waiting for adapters to report status",
320-
"observed_generation": 0,
415+
"observed_generation": 1,
321416
"created_time": "2025-01-01T00:00:00Z",
322417
"last_updated_time": "2025-01-01T00:00:00Z",
323418
"last_transition_time": "2025-01-01T00:00:00Z"
@@ -456,7 +551,7 @@ The status object contains synthesized conditions computed from adapter reports:
456551
- All above fields plus:
457552
- `observed_generation` - Generation this condition reflects
458553
- `created_time` - When condition was first created (API-managed)
459-
- `last_updated_time` - When adapter last reported (API-managed, from AdapterStatus.last_report_time)
554+
- `last_updated_time` - When this condition was last refreshed (API-managed). For **Available**, always the evaluation time. For **Ready**: when Ready=True, the minimum of `last_report_time` across all required adapters that report Available=True at the current generation; when Ready=False, the evaluation time (so consumers can detect staleness).
460555
- `last_transition_time` - When status last changed (API-managed)
461556

462557
## Parameter Restrictions

pkg/dao/adapter_status.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ type AdapterStatusDao interface {
1919
Get(ctx context.Context, id string) (*api.AdapterStatus, error)
2020
Create(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
2121
Replace(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
22-
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
22+
// Upsert creates or replaces an adapter status. The second return value reports whether the
23+
// write was actually applied: false means the incoming observed_generation was stale (either
24+
// detected immediately or lost a race to a concurrent write with a higher generation).
25+
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, bool, error)
2326
Delete(ctx context.Context, id string) error
2427
FindByResource(ctx context.Context, resourceType, resourceID string) (api.AdapterStatusList, error)
2528
FindByResourcePaginated(
@@ -72,49 +75,66 @@ func (d *sqlAdapterStatusDao) Replace(
7275
return adapterStatus, nil
7376
}
7477

75-
// Upsert creates or updates an adapter status based on resource_type, resource_id, and adapter
76-
// This implements the upsert semantic required by the new API spec
78+
// Upsert creates or updates an adapter status based on resource_type, resource_id, and adapter.
79+
// The UPDATE path includes a WHERE predicate on observed_generation so that a stale write can
80+
// never overwrite a newer generation, even under concurrent requests.
81+
// Returns (status, true, nil) when the write was applied, or (existing, false, nil) when the
82+
// incoming observed_generation was stale (fast-path check) or lost a race to a concurrent write.
7783
func (d *sqlAdapterStatusDao) Upsert(
7884
ctx context.Context, adapterStatus *api.AdapterStatus,
79-
) (*api.AdapterStatus, error) {
85+
) (*api.AdapterStatus, bool, error) {
8086
g2 := (*d.sessionFactory).New(ctx)
8187

82-
// Try to find existing adapter status
88+
// Try to find existing adapter status.
8389
existing, err := d.FindByResourceAndAdapter(
8490
ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter,
8591
)
86-
8792
if err != nil {
88-
// If not found, create new
8993
if errors.Is(err, gorm.ErrRecordNotFound) {
90-
return d.Create(ctx, adapterStatus)
94+
created, createErr := d.Create(ctx, adapterStatus)
95+
if createErr != nil {
96+
return nil, false, createErr
97+
}
98+
return created, true, nil
9199
}
92-
// Other errors
93100
db.MarkForRollback(ctx, err)
94-
return nil, err
101+
return nil, false, err
102+
}
103+
104+
// Fast-path stale check: if the stored generation is already strictly newer, skip the write.
105+
if existing.ObservedGeneration > adapterStatus.ObservedGeneration {
106+
return existing, false, nil
95107
}
96108

97-
// Update existing record
98-
// Keep the original ID and CreatedTime
109+
// Prepare the update: keep original ID and CreatedTime.
99110
adapterStatus.ID = existing.ID
100111
if existing.CreatedTime != nil {
101112
adapterStatus.CreatedTime = existing.CreatedTime
102113
}
103114

104-
// Update LastReportTime to now
115+
// Update LastReportTime to now.
105116
now := time.Now()
106117
adapterStatus.LastReportTime = &now
107118

108-
// Preserve LastTransitionTime for conditions whose status hasn't changed
119+
// Preserve LastTransitionTime for conditions whose status hasn't changed.
109120
adapterStatus.Conditions = preserveLastTransitionTime(existing.Conditions, adapterStatus.Conditions)
110121

111-
// Save (update) the record
112-
if err := g2.Omit(clause.Associations).Save(adapterStatus).Error; err != nil {
113-
db.MarkForRollback(ctx, err)
114-
return nil, err
122+
// Atomic conditional UPDATE: only applies when the stored observed_generation is still <=
123+
// the incoming one. A concurrent request that wrote a higher generation will cause
124+
// RowsAffected to be 0, signalling a no-op.
125+
result := g2.Omit(clause.Associations).
126+
Where("observed_generation <= ?", adapterStatus.ObservedGeneration).
127+
Save(adapterStatus)
128+
if result.Error != nil {
129+
db.MarkForRollback(ctx, result.Error)
130+
return nil, false, result.Error
131+
}
132+
if result.RowsAffected == 0 {
133+
// A concurrent write with a higher generation won the race.
134+
return existing, false, nil
115135
}
116136

117-
return adapterStatus, nil
137+
return adapterStatus, true, nil
118138
}
119139

120140
func (d *sqlAdapterStatusDao) Delete(ctx context.Context, id string) error {

pkg/services/CLAUDE.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@ func NewClusterService(dao, adapterStatusDao, config) ClusterService
2222
## Status Aggregation
2323

2424
`UpdateClusterStatusFromAdapters()` in `cluster.go` synthesizes two top-level conditions:
25-
- **Available**: True if all required adapters report `Available=True` (any generation)
25+
26+
- **Available**: True when all required adapters report `Available=True` at the same `observed_generation` (not necessarily the current resource generation). When adapters are at different generations, Available preserves its previous value (last-known-good semantics). `ObservedGeneration` = the common adapter generation when consistent; preserved from existing state otherwise.
2627
- **Ready**: True if all adapters report `Available=True` AND `observed_generation` matches current generation
2728

29+
Ready's `LastUpdatedTime` is computed in `status_aggregation.computeReadyLastUpdated`: when Ready=False it is the minimum of `LastReportTime` across all required adapters (falls back to `now` if any required adapter has no stored status yet); when Ready=True it is the minimum of `LastReportTime` across required adapters that have Available=True at the current generation. True→False transitions override this with the triggering adapter's `observedTime`.
30+
2831
`ProcessAdapterStatus()` validates mandatory conditions (`Available`, `Applied`, `Health`) before persisting. Rejects `Available=Unknown` on subsequent reports (only allowed on first report).
2932

3033
## GenericService
3134

3235
`generic.go` provides `List()` with pagination, search, and ordering.
36+
3337
- `ListArguments` has Page, Size, Search, OrderBy, Fields, Preloads
3438
- Search validation: `SearchDisallowedFields` map blocks searching certain fields per resource type
3539
- Default ordering: `created_time desc`

pkg/services/adapter_status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ func (s *sqlAdapterStatusService) Replace(
7171
func (s *sqlAdapterStatusService) Upsert(
7272
ctx context.Context, adapterStatus *api.AdapterStatus,
7373
) (*api.AdapterStatus, *errors.ServiceError) {
74-
adapterStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
74+
result, _, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
7575
if err != nil {
7676
return nil, handleCreateError("AdapterStatus", err)
7777
}
78-
return adapterStatus, nil
78+
return result, nil
7979
}
8080

8181
func (s *sqlAdapterStatusService) Delete(ctx context.Context, id string) *errors.ServiceError {

0 commit comments

Comments
 (0)