From adddf4455eda045f7a1f965dbb5fc541fdfe617e Mon Sep 17 00:00:00 2001 From: Muhamad Azamy Date: Tue, 2 May 2023 17:53:28 +0200 Subject: [PATCH] Make sure on boot we exclude the current workload from cap Fixes #1960 On boot we have to reprocess all active workloads, but during this time we also need to exclude the "used capacity" of that workload from the theoritical total used capacity on the node otherwise the node might think that there are no enough capcity available --- pkg/primitives/statistics.go | 33 +++++++++++++++++++------------- pkg/provision/interface.go | 6 +++++- pkg/provision/storage/storage.go | 9 +++++++-- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/pkg/primitives/statistics.go b/pkg/primitives/statistics.go index c082fdf82..3d4b5e353 100644 --- a/pkg/primitives/statistics.go +++ b/pkg/primitives/statistics.go @@ -70,8 +70,8 @@ type activeCounters struct { } // Get all used capacity from storage + reserved / deployments count and workloads count -func (s *Statistics) active() (activeCounters, error) { - storageCap, err := s.storage.Capacity() +func (s *Statistics) active(exclude ...provision.Exclude) (activeCounters, error) { + storageCap, err := s.storage.Capacity(exclude...) storageCap.Cap.Add(&s.reserved) return activeCounters{ storageCap.Cap, @@ -87,12 +87,13 @@ func (s *Statistics) Total() gridtypes.Capacity { // getUsableMemoryBytes returns the used capacity by *reservations* and usable free memory. for the memory // it takes into account reserved memory for the system -func (s *Statistics) getUsableMemoryBytes() (gridtypes.Capacity, gridtypes.Unit, error) { +// excluding (not including it as 'used' any workload or deployment that matches the exclusion list) +func (s *Statistics) getUsableMemoryBytes(exclude ...provision.Exclude) (gridtypes.Capacity, gridtypes.Unit, error) { // [ ] // [[R][ WL ] ] // [[ actual ] ] - activeCounters, err := s.active() + activeCounters, err := s.active(exclude...) cap := activeCounters.cap if err != nil { return cap, 0, err @@ -111,12 +112,23 @@ func (s *Statistics) getUsableMemoryBytes() (gridtypes.Capacity, gridtypes.Unit, return cap, usable, nil } -func (s *Statistics) hasEnoughCapacity(required *gridtypes.Capacity) (gridtypes.Capacity, error) { - // checks memory - used, usable, err := s.getUsableMemoryBytes() +func (s *Statistics) hasEnoughCapacity(wl *gridtypes.WorkloadWithID) (gridtypes.Capacity, error) { + required, err := wl.Capacity() + if err != nil { + return gridtypes.Capacity{}, errors.Wrap(err, "failed to calculate workload needed capacity") + } + + // get used capacity by ALL workloads excluding this workload + // we do that by providing an exclusion list + used, usable, err := s.getUsableMemoryBytes(func(dl_ *gridtypes.Deployment, wl_ *gridtypes.Workload) bool { + id, _ := gridtypes.NewWorkloadID(dl_.TwinID, dl_.ContractID, wl_.Name) + return id == wl.ID + }) + if err != nil { return used, errors.Wrap(err, "failed to get available memory") } + if required.MRU > usable { return used, fmt.Errorf("cannot fulfil required memory size %d bytes out of usable %d bytes", required.MRU, usable) } @@ -132,12 +144,7 @@ func (s *Statistics) Initialize(ctx context.Context) error { // Provision implements the provisioner interface func (s *Statistics) Provision(ctx context.Context, wl *gridtypes.WorkloadWithID) (result gridtypes.Result, err error) { - needed, err := wl.Capacity() - if err != nil { - return result, errors.Wrap(err, "failed to calculate workload needed capacity") - } - - current, err := s.hasEnoughCapacity(&needed) + current, err := s.hasEnoughCapacity(wl) if err != nil { return result, errors.Wrap(err, "failed to satisfy required capacity") } diff --git a/pkg/provision/interface.go b/pkg/provision/interface.go index 85a273fca..eb6dbcfa8 100644 --- a/pkg/provision/interface.go +++ b/pkg/provision/interface.go @@ -104,6 +104,10 @@ type StorageCapacity struct { Workloads int } +// Used with Storage interface to compute capacity, exclude any deployment +// and or workload that returns true from the capacity calculation. +type Exclude = func(dl *gridtypes.Deployment, wl *gridtypes.Workload) bool + // Storage interface type Storage interface { // Create a new deployment in storage, it sets the initial transactions @@ -132,7 +136,7 @@ type Storage interface { // ByTwin return list of deployments for a twin ByTwin(twin uint32) ([]uint64, error) // return total capacity and active deployments - Capacity() (StorageCapacity, error) + Capacity(exclude ...Exclude) (StorageCapacity, error) } // Janitor interface diff --git a/pkg/provision/storage/storage.go b/pkg/provision/storage/storage.go index a230933e2..5949c7a49 100644 --- a/pkg/provision/storage/storage.go +++ b/pkg/provision/storage/storage.go @@ -680,7 +680,7 @@ func (b *BoltStorage) ByTwin(twin uint32) ([]uint64, error) { return deployments, err } -func (b *BoltStorage) Capacity() (storageCap provision.StorageCapacity, err error) { +func (b *BoltStorage) Capacity(exclude ...provision.Exclude) (storageCap provision.StorageCapacity, err error) { twins, err := b.Twins() if err != nil { return provision.StorageCapacity{}, err @@ -700,11 +700,16 @@ func (b *BoltStorage) Capacity() (storageCap provision.StorageCapacity, err erro } isActive := false + next: for _, wl := range deployment.Workloads { if !wl.Result.State.IsOkay() { continue } - + for _, exc := range exclude { + if exc(&deployment, &wl) { + continue next + } + } c, err := wl.Capacity() if err != nil { return provision.StorageCapacity{}, err