Skip to content

Commit

Permalink
Make sure on boot we exclude the current workload from cap
Browse files Browse the repository at this point in the history
Fixes #1960

On boot we have to reprocess all active workloads, but during this time
we also need to exclude the "used capacity" of that workload from the
theoritical total used capacity on the node otherwise the node might
think that there are no enough capcity available
  • Loading branch information
muhamadazmy committed May 2, 2023
1 parent fc6b894 commit adddf44
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
33 changes: 20 additions & 13 deletions pkg/primitives/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type activeCounters struct {
}

// Get all used capacity from storage + reserved / deployments count and workloads count
func (s *Statistics) active() (activeCounters, error) {
storageCap, err := s.storage.Capacity()
func (s *Statistics) active(exclude ...provision.Exclude) (activeCounters, error) {
storageCap, err := s.storage.Capacity(exclude...)
storageCap.Cap.Add(&s.reserved)
return activeCounters{
storageCap.Cap,
Expand All @@ -87,12 +87,13 @@ func (s *Statistics) Total() gridtypes.Capacity {

// getUsableMemoryBytes returns the used capacity by *reservations* and usable free memory. for the memory
// it takes into account reserved memory for the system
func (s *Statistics) getUsableMemoryBytes() (gridtypes.Capacity, gridtypes.Unit, error) {
// excluding (not including it as 'used' any workload or deployment that matches the exclusion list)
func (s *Statistics) getUsableMemoryBytes(exclude ...provision.Exclude) (gridtypes.Capacity, gridtypes.Unit, error) {
// [ ]
// [[R][ WL ] ]
// [[ actual ] ]

activeCounters, err := s.active()
activeCounters, err := s.active(exclude...)
cap := activeCounters.cap
if err != nil {
return cap, 0, err
Expand All @@ -111,12 +112,23 @@ func (s *Statistics) getUsableMemoryBytes() (gridtypes.Capacity, gridtypes.Unit,
return cap, usable, nil
}

func (s *Statistics) hasEnoughCapacity(required *gridtypes.Capacity) (gridtypes.Capacity, error) {
// checks memory
used, usable, err := s.getUsableMemoryBytes()
func (s *Statistics) hasEnoughCapacity(wl *gridtypes.WorkloadWithID) (gridtypes.Capacity, error) {
required, err := wl.Capacity()
if err != nil {
return gridtypes.Capacity{}, errors.Wrap(err, "failed to calculate workload needed capacity")
}

// get used capacity by ALL workloads excluding this workload
// we do that by providing an exclusion list
used, usable, err := s.getUsableMemoryBytes(func(dl_ *gridtypes.Deployment, wl_ *gridtypes.Workload) bool {
id, _ := gridtypes.NewWorkloadID(dl_.TwinID, dl_.ContractID, wl_.Name)
return id == wl.ID
})

if err != nil {
return used, errors.Wrap(err, "failed to get available memory")
}

if required.MRU > usable {
return used, fmt.Errorf("cannot fulfil required memory size %d bytes out of usable %d bytes", required.MRU, usable)
}
Expand All @@ -132,12 +144,7 @@ func (s *Statistics) Initialize(ctx context.Context) error {

// Provision implements the provisioner interface
func (s *Statistics) Provision(ctx context.Context, wl *gridtypes.WorkloadWithID) (result gridtypes.Result, err error) {
needed, err := wl.Capacity()
if err != nil {
return result, errors.Wrap(err, "failed to calculate workload needed capacity")
}

current, err := s.hasEnoughCapacity(&needed)
current, err := s.hasEnoughCapacity(wl)
if err != nil {
return result, errors.Wrap(err, "failed to satisfy required capacity")
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/provision/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type StorageCapacity struct {
Workloads int
}

// Used with Storage interface to compute capacity, exclude any deployment
// and or workload that returns true from the capacity calculation.
type Exclude = func(dl *gridtypes.Deployment, wl *gridtypes.Workload) bool

// Storage interface
type Storage interface {
// Create a new deployment in storage, it sets the initial transactions
Expand Down Expand Up @@ -132,7 +136,7 @@ type Storage interface {
// ByTwin return list of deployments for a twin
ByTwin(twin uint32) ([]uint64, error)
// return total capacity and active deployments
Capacity() (StorageCapacity, error)
Capacity(exclude ...Exclude) (StorageCapacity, error)
}

// Janitor interface
Expand Down
9 changes: 7 additions & 2 deletions pkg/provision/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (b *BoltStorage) ByTwin(twin uint32) ([]uint64, error) {
return deployments, err
}

func (b *BoltStorage) Capacity() (storageCap provision.StorageCapacity, err error) {
func (b *BoltStorage) Capacity(exclude ...provision.Exclude) (storageCap provision.StorageCapacity, err error) {
twins, err := b.Twins()
if err != nil {
return provision.StorageCapacity{}, err
Expand All @@ -700,11 +700,16 @@ func (b *BoltStorage) Capacity() (storageCap provision.StorageCapacity, err erro
}

isActive := false
next:
for _, wl := range deployment.Workloads {
if !wl.Result.State.IsOkay() {
continue
}

for _, exc := range exclude {
if exc(&deployment, &wl) {
continue next
}
}
c, err := wl.Capacity()
if err != nil {
return provision.StorageCapacity{}, err
Expand Down

0 comments on commit adddf44

Please sign in to comment.