Skip to content

Commit f825425

Browse files
committed
HYPERFLEET-774 - feat: update aggregation logic
1 parent 77fe53d commit f825425

38 files changed

Lines changed: 3282 additions & 1201 deletions

File tree

cmd/hyperfleet-api/migrate/cmd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ func runMigrateWithError(ctx context.Context, dbConfig *config.DatabaseConfig) e
5858
}
5959
}()
6060

61-
// Use MigrateWithLock to prevent concurrent migrations from multiple pods
62-
if err := db.MigrateWithLock(ctx, connection); err != nil {
61+
if err := db.Migrate(connection.New(ctx)); err != nil {
6362
logger.WithError(ctx, err).Error("Migration failed")
6463
return err
6564
}
6665

66+
logger.Info(ctx, "Migration completed successfully")
6767
return nil
6868
}

docs/api-resources.md

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,100 @@ The status uses Kubernetes-style conditions instead of a single phase field:
249249
- One adapter reports `Available=False` for `observed_generation=1` `Available` transitions to `False`
250250
- One adapter reports `Available=False` for `observed_generation=2` `Available` keeps its `True` status
251251

252+
### Aggregation logic
253+
254+
Description of the aggregation logic for the resource status conditions
255+
256+
- An API that stores resources entities (clusters, nodepools)
257+
- A sentinel that polls the API for changes and triggers messages
258+
- Instances of "adapters":
259+
- Read the messages
260+
- Reconcile the state with the world
261+
- Report back to the API, using statuses "conditions"
262+
263+
Resources keep track of its status, which is affected by the reports from adapters
264+
265+
- Each resource keeps a `generation` property that gets increased on every change
266+
- Adapters associated with a resource, report their state as an array of adapter conditions
267+
- Three of these conditions are always mandatory : `Available`, `Applied`, `Health`
268+
- If one of the mandatory conditions is missing, the report is discarded
269+
- A `observed_generation` field indicating the generation associated with the report
270+
- `observed_time` for when the adapter work was done
271+
- If the reported `observed_generation` is lower than the already stored `observed_generation` for that adapter, the report is discarded
272+
- Each resource has a list of associated "adapters" used to compute the aggregated status.conditions
273+
- Each resource "status.conditions" is array property composed of:
274+
- The `Available` condition of each adapter, named as `<adapter-name>Successful`
275+
- 2 aggregated conditions: `Ready` and `Available` computed from the array of `Available` resource statuses conditions
276+
- Only `Available` condition from adapters is used to compute aggregated conditions
277+
278+
The whole API spec is at: <https://raw.githubusercontent.com/openshift-hyperfleet/hyperfleet-api/refs/heads/main/openapi/openapi.yaml>
279+
280+
The aggregation logic for a resource (cluster/nodepool) works as follows.
281+
282+
**Notation:**
283+
284+
- `X` = report's `observed_generation`
285+
- `G` = resource's current `generation`
286+
- `statuses[]` = all stored adapter condition reports
287+
- `lut` = `last_update_time`
288+
- `ltt` = `last_transition_time`
289+
- `obs_gen` = `observed_generation`
290+
- `obs_time` = report's `observed_time`
291+
- `` = no change
292+
293+
---
294+
295+
#### Discard / Reject Rules
296+
297+
Checked before any aggregation. A discarded or rejected report causes no state change.
298+
299+
| Rule | Condition | Outcome |
300+
|---|---|---|
301+
| `obs_gen` too high | report `observed_generation` > resource `generation` | Discarded |
302+
| Stale adapter report | report `observed_generation` < adapter's stored `observed_generation` | Discarded |
303+
| Missing mandatory conditions | Missing any of `Available`, `Applied`, `Health`, or value not in `{True, False, Unknown}` | Discarded |
304+
| Available=Unknown | Report is valid but `Available=Unknown` | Discarded |
305+
306+
---
307+
308+
#### Lifecycle Events
309+
310+
| Event | Condition | Target | → status | → obs_gen | → lut | → ltt |
311+
|---|---|---|---|---|---|---|
312+
| Creation || `Ready` | `False` | `1` | `now` | `now` |
313+
| Creation || `Available` | `False` | `1` | `now` | `now` |
314+
| Change (→G) | Was `Ready=True` | `Ready` | `False` | `G` | `now` | `now` |
315+
| Change (→G) | Was `Ready=False` | `Ready` | `False` | `G` | `now` | `` |
316+
| Change (→G) || `Available` | unchanged | unchanged | `` | `` |
317+
318+
---
319+
320+
#### Adapter Report Aggregation Matrix
321+
322+
The **Ready** check and **Available** check are independent — both can apply to the same incoming report.
323+
324+
##### Report `Available=True` (obs_gen = X)
325+
326+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
327+
|---|---|---|---|---|---|---|
328+
| `Ready` | `Ready=True` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | unchanged | `min(statuses[].lut)` | `` | `` |
329+
| `Ready` | `Ready=False` | `X==G` AND all `statuses[].obs_gen==G` AND all `statuses[].status==True` | **`True`** | `min(statuses[].lut)` | `obs_time` | `` |
330+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
331+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | **`True`** | `min(statuses[].lut)` | `obs_time` | `X` |
332+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `` | `X` |
333+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
334+
335+
##### Report `Available=False` (obs_gen = X)
336+
337+
| Target | Current State | Required Condition | → status | → lut | → ltt | → obs_gen |
338+
|---|---|---|---|---|---|---|
339+
| `Ready` | `Ready=False` | `X==G` | unchanged | `min(statuses[].lut)` | `` | `` |
340+
| `Ready` | `Ready=True` | `X==G` | **`False`** | `obs_time` | `obs_time` | `` |
341+
| `Ready` | any | Conditions above not met | `` | `` | `` | `` |
342+
| `Available` | `Available=False` | all `statuses[].obs_gen==X` | unchanged | `min(statuses[].lut)` | `` | `X` |
343+
| `Available` | `Available=True` | all `statuses[].obs_gen==X` | **`False`** | `obs_time` | `obs_time` | `X` |
344+
| `Available` | any | Conditions above not met | `` | `` | `` | `` |
345+
252346
## NodePool Management
253347

254348
### Endpoints
@@ -456,7 +550,7 @@ The status object contains synthesized conditions computed from adapter reports:
456550
- All above fields plus:
457551
- `observed_generation` - Generation this condition reflects
458552
- `created_time` - When condition was first created (API-managed)
459-
- `last_updated_time` - When adapter last reported (API-managed, from AdapterStatus.last_report_time)
553+
- `last_updated_time` - When this condition was last refreshed (API-managed). For **Available**, always the evaluation time. For **Ready**: when Ready=True, the minimum of `last_report_time` across all required adapters that report Available=True at the current generation; when Ready=False, the evaluation time (so consumers can detect staleness).
460554
- `last_transition_time` - When status last changed (API-managed)
461555

462556
## Parameter Restrictions

pkg/config/db.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,12 @@ type SSLConfig struct {
4444
// Includes fields from HYPERFLEET-694 for connection lifecycle management
4545
type PoolConfig struct {
4646
MaxConnections int `mapstructure:"max_connections" json:"max_connections" validate:"required,min=1,max=200"`
47-
MaxIdleConnections int `mapstructure:"max_idle_connections" json:"max_idle_connections" validate:"min=0"`
48-
ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime" json:"conn_max_lifetime"`
49-
ConnMaxIdleTime time.Duration `mapstructure:"conn_max_idle_time" json:"conn_max_idle_time"`
50-
RequestTimeout time.Duration `mapstructure:"request_timeout" json:"request_timeout"`
51-
ConnRetryAttempts int `mapstructure:"conn_retry_attempts" json:"conn_retry_attempts" validate:"min=1"`
52-
ConnRetryInterval time.Duration `mapstructure:"conn_retry_interval" json:"conn_retry_interval"`
53-
// HYPERFLEET-618: prevents indefinite blocking during migrations
54-
AdvisoryLockTimeout time.Duration `mapstructure:"advisory_lock_timeout" json:"advisory_lock_timeout"`
47+
MaxIdleConnections int `mapstructure:"max_idle_connections" json:"max_idle_connections" validate:"min=0"`
48+
ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime" json:"conn_max_lifetime"`
49+
ConnMaxIdleTime time.Duration `mapstructure:"conn_max_idle_time" json:"conn_max_idle_time"`
50+
RequestTimeout time.Duration `mapstructure:"request_timeout" json:"request_timeout"`
51+
ConnRetryAttempts int `mapstructure:"conn_retry_attempts" json:"conn_retry_attempts" validate:"min=1"`
52+
ConnRetryInterval time.Duration `mapstructure:"conn_retry_interval" json:"conn_retry_interval"`
5553
}
5654

5755
// MarshalJSON implements custom JSON marshaling to redact sensitive fields
@@ -93,14 +91,13 @@ func NewDatabaseConfig() *DatabaseConfig {
9391
RootCertFile: "",
9492
},
9593
Pool: PoolConfig{
96-
MaxConnections: 50,
97-
MaxIdleConnections: 10,
98-
ConnMaxLifetime: 5 * time.Minute,
99-
ConnMaxIdleTime: 1 * time.Minute,
100-
RequestTimeout: 30 * time.Second,
101-
ConnRetryAttempts: 10,
102-
ConnRetryInterval: 3 * time.Second,
103-
AdvisoryLockTimeout: 5 * time.Minute, // HYPERFLEET-618: prevents indefinite blocking during migrations
94+
MaxConnections: 50,
95+
MaxIdleConnections: 10,
96+
ConnMaxLifetime: 5 * time.Minute,
97+
ConnMaxIdleTime: 1 * time.Minute,
98+
RequestTimeout: 30 * time.Second,
99+
ConnRetryAttempts: 10,
100+
ConnRetryInterval: 3 * time.Second,
104101
},
105102
}
106103
}

pkg/config/loader.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -227,41 +227,43 @@ func (l *ConfigLoader) validateConfig(config *ApplicationConfig) error {
227227
}
228228

229229
// handleJSONArrayEnvVars processes environment variables containing JSON arrays
230-
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly
231-
// Used for: HYPERFLEET_ADAPTERS_CLUSTER_ADAPTERS and HYPERFLEET_ADAPTERS_NODEPOOL_ADAPTERS
230+
// Viper doesn't automatically parse JSON from env vars, so we handle this explicitly.
231+
// Each viper key is filled from the first non-empty env var in the list (canonical name first, then aliases).
232232
func (l *ConfigLoader) handleJSONArrayEnvVars(ctx context.Context) error {
233-
// Map of env var name -> viper key
234-
jsonArrayMappings := map[string]string{
235-
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER": "adapters.required.cluster",
236-
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL": "adapters.required.nodepool",
233+
// viper key -> ordered list of env var names (first one set wins)
234+
clusterEnvVars := []string{
235+
EnvPrefix + "_ADAPTERS_REQUIRED_CLUSTER",
236+
EnvPrefix + "_CLUSTER_ADAPTERS", // alias for user convenience
237+
}
238+
nodepoolEnvVars := []string{
239+
EnvPrefix + "_ADAPTERS_REQUIRED_NODEPOOL",
240+
EnvPrefix + "_NODEPOOL_ADAPTERS", // alias for user convenience
237241
}
238242

239-
for envVar, viperKey := range jsonArrayMappings {
240-
jsonValue := os.Getenv(envVar)
241-
if jsonValue == "" {
242-
continue
243-
}
244-
245-
// Parse JSON array
246-
var arrayValue []string
247-
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
248-
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
243+
setFromEnvVars := func(viperKey string, envVars []string) error {
244+
for _, envVar := range envVars {
245+
jsonValue := os.Getenv(envVar)
246+
if jsonValue == "" {
247+
continue
248+
}
249+
var arrayValue []string
250+
if err := json.Unmarshal([]byte(jsonValue), &arrayValue); err != nil {
251+
return fmt.Errorf("failed to parse %s as JSON array: %w (value: %s)", envVar, err, jsonValue)
252+
}
253+
// Set() overrides Viper's auto-env CSV parsing so JSON arrays are correct.
254+
l.viper.Set(viperKey, arrayValue)
255+
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
256+
return nil
249257
}
250-
251-
// Always set the parsed JSON array value to override Viper's auto-env CSV parsing.
252-
// Viper's AutomaticEnv treats comma-separated strings as arrays, incorrectly parsing
253-
// JSON arrays like '["a","b"]' as ["[\"a\"", "\"b\"]"] instead of ["a", "b"].
254-
//
255-
// We use Set() to ensure proper JSON parsing overrides Viper's CSV parsing.
256-
// This maintains ENV > Config > Default precedence for adapters.
257-
//
258-
// NOTE: Adapters currently have no CLI flags (see bindFlags line 494).
259-
// If CLI flags are added in the future, this code needs updating to check
260-
// if the value came from a flag before calling Set().
261-
l.viper.Set(viperKey, arrayValue)
262-
logger.With(ctx, "env_var", envVar, "count", len(arrayValue)).Debug("Parsed JSON array from environment")
258+
return nil
263259
}
264260

261+
if err := setFromEnvVars("adapters.required.cluster", clusterEnvVars); err != nil {
262+
return err
263+
}
264+
if err := setFromEnvVars("adapters.required.nodepool", nodepoolEnvVars); err != nil {
265+
return err
266+
}
265267
return nil
266268
}
267269

pkg/db/advisory_locks.go

Lines changed: 0 additions & 124 deletions
This file was deleted.

0 commit comments

Comments
 (0)