Skip to content

Commit

Permalink
Merge pull request lavanet#1055 from lavanet/CNS-add-adjustment-to-pr…
Browse files Browse the repository at this point in the history
…oviders

CNS-add-adjustment-to-providers
  • Loading branch information
Yaroms authored Dec 24, 2023
2 parents d11373d + 324aa10 commit 41bb701
Show file tree
Hide file tree
Showing 26 changed files with 1,209 additions and 70 deletions.
11 changes: 11 additions & 0 deletions proto/lavanet/lava/subscription/adjustment.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";
package lavanet.lava.subscription;

option go_package = "github.com/lavanet/lava/x/subscription/types";

message Adjustment {
string index = 1;
uint64 adjustedUsage = 2;
uint64 totalUsage = 3;
}

2 changes: 2 additions & 0 deletions proto/lavanet/lava/subscription/genesis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lavanet.lava.subscription;

import "gogoproto/gogo.proto";
import "lavanet/lava/subscription/params.proto";
import "lavanet/lava/subscription/adjustment.proto";
import "lavanet/lava/fixationstore/fixation.proto";
import "lavanet/lava/timerstore/timer.proto";
// this line is used by starport scaffolding # genesis/proto/import
Expand All @@ -16,5 +17,6 @@ message GenesisState {
lavanet.lava.timerstore.GenesisState subsTS = 3 [(gogoproto.nullable) = false];
lavanet.lava.fixationstore.GenesisState cuTrackerFS = 4 [(gogoproto.nullable) = false];
lavanet.lava.timerstore.GenesisState cuTrackerTS = 5 [(gogoproto.nullable) = false];
repeated Adjustment adjustments = 6 [(gogoproto.nullable) = false];
// this line is used by starport scaffolding # genesis/proto/state
}
2 changes: 1 addition & 1 deletion protocol/chainlib/common_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func SetupForTests(t *testing.T, numOfProviders int, specID string, getToTopMost
var stake int64 = 50000000000

// subscribe consumer
testcommon.BuySubscription(t, ts.Ctx, *ts.Keepers, *ts.Servers, ts.Consumer, ts.Plan.Index)
testcommon.BuySubscription(ts.Ctx, *ts.Keepers, *ts.Servers, ts.Consumer, ts.Plan.Index)

// stake providers
for _, provider := range ts.Providers {
Expand Down
2 changes: 1 addition & 1 deletion testutil/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func StakeAccount(t *testing.T, ctx context.Context, keepers testkeeper.Keepers,
require.Nil(t, err)
}

func BuySubscription(t *testing.T, ctx context.Context, keepers testkeeper.Keepers, servers testkeeper.Servers, acc sigs.Account, plan string) {
func BuySubscription(ctx context.Context, keepers testkeeper.Keepers, servers testkeeper.Servers, acc sigs.Account, plan string) {
servers.SubscriptionServer.Buy(ctx, &subscriptiontypes.MsgBuy{Creator: acc.Addr.String(), Consumer: acc.Addr.String(), Index: plan, Duration: 1})
}

Expand Down
80 changes: 74 additions & 6 deletions testutil/common/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ type Tester struct {
}

const (
PROVIDER string = "provider"
CONSUMER string = "consumer"
VALIDATOR string = "validator"
PROVIDER string = "provider_"
CONSUMER string = "consumer_"
VALIDATOR string = "validator_"
DEVELOPER string = "developer_"
)

func NewTester(t *testing.T) *Tester {
Expand Down Expand Up @@ -160,21 +161,24 @@ func (ts *Tester) StakeProviderExtra(

// if necessary, generate mock endpoints
if endpoints == nil {
apiInterface := spec.ApiCollections[0].CollectionData.ApiInterface
apiInterfaces := []string{}
for _, apiCollection := range spec.ApiCollections {
apiInterfaces = append(apiInterfaces, apiCollection.CollectionData.ApiInterface)
}
geolocations := planstypes.GetGeolocationsFromUint(geoloc)

for _, geo := range geolocations {
endpoint := epochstoragetypes.Endpoint{
IPPORT: "123",
ApiInterfaces: []string{apiInterface},
ApiInterfaces: apiInterfaces,
Geolocation: int32(geo),
}
endpoints = append(endpoints, endpoint)
}
}

stake := sdk.NewCoin(ts.TokenDenom(), sdk.NewInt(amount))
_, err := ts.TxPairingStakeProvider(addr, spec.Name, stake, endpoints, geoloc, moniker)
_, err := ts.TxPairingStakeProvider(addr, spec.Index, stake, endpoints, geoloc, moniker)

return err
}
Expand Down Expand Up @@ -960,12 +964,76 @@ func (ts *Tester) AdvanceMonthsFrom(from time.Time, months int) *Tester {
return ts
}

func (ts *Tester) BondDenom() string {
return ts.Keepers.StakingKeeper.BondDenom(sdk.UnwrapSDKContext(ts.Ctx))
}

// AdvanceMonth advanced blocks by given months, like AdvanceMonthsFrom,
// starting from the current block's timestamp
func (ts *Tester) AdvanceMonths(months int) *Tester {
return ts.AdvanceMonthsFrom(ts.BlockTime(), months)
}

func (ts *Tester) SetupForTests(getToTopMostPath string, specId string, validators int, subscriptions int, projectsInSubscription int, providers int) error {
var balance int64 = 100000000000

start := len(ts.Accounts(VALIDATOR))
for i := 0; i < validators; i++ {
acc, _ := ts.AddAccount(VALIDATOR, start+i, balance)
ts.TxCreateValidator(acc, math.NewInt(balance))
}

sdkContext := sdk.UnwrapSDKContext(ts.Ctx)
spec, err := testkeeper.GetASpec(specId, getToTopMostPath, &sdkContext, &ts.Keepers.Spec)
if err != nil {
return err
}
ts.AddSpec(spec.Index, spec)
ts.Keepers.Spec.SetSpec(sdk.UnwrapSDKContext(ts.Ctx), spec)
start = len(ts.Accounts(CONSUMER))
for i := 0; i < subscriptions; i++ {
// setup consumer
consumerAcc, consumerAddress := ts.AddAccount(CONSUMER, start+i, balance)
ts.AddPlan("free", CreateMockPlan())
ts.AddPolicy("mock", CreateMockPolicy())
plan := ts.plans["free"]
// subscribe consumer
BuySubscription(ts.Ctx, *ts.Keepers, *ts.Servers, consumerAcc, plan.Index)
// create projects:
_, pd2both := ts.AddAccount(DEVELOPER, start+i, 10000)
keys_1_admin_dev := []projectstypes.ProjectKey{
projectstypes.NewProjectKey(pd2both).
AddType(projectstypes.ProjectKey_ADMIN).
AddType(projectstypes.ProjectKey_DEVELOPER),
}
policy := ts.Policy("mock")
pd := projectstypes.ProjectData{
Name: "proj",
Enabled: true,
ProjectKeys: keys_1_admin_dev,
Policy: &policy,
}
ts.AddProjectData("projdata", pd)
err = ts.Keepers.Projects.CreateProject(ts.Ctx, consumerAddress, pd, plan)
if err != nil {
return err
}
}
// setup providers
start = len(ts.Accounts(PROVIDER))
for i := 0; i < providers; i++ {
_, addr := ts.AddAccount(PROVIDER, start+i, balance)
err := ts.StakeProviderExtra(addr, spec, spec.MinStakeProvider.Amount.Int64(), nil, 1, "prov"+strconv.Itoa(start+i))
if err != nil {
return err
}
}

// advance for the staking to be valid
ts.AdvanceEpoch()
return nil
}

var sessionID uint64

func (ts *Tester) SendRelay(provider string, clientAcc sigs.Account, chainIDs []string, cuSum uint64) pairingtypes.MsgRelayPayment {
Expand Down
2 changes: 1 addition & 1 deletion x/dualstaking/keeper/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (k Keeper) VerifyDelegatorBalance(ctx sdk.Context, delAddr sdk.AccAddress)
for _, d := range delegations {
v, found := k.stakingKeeper.GetValidator(ctx, d.GetValidatorAddr())
if found {
sumValidatorDelegations = sumValidatorDelegations.Add(v.TokensFromSharesRoundUp(d.Shares).TruncateInt())
sumValidatorDelegations = sumValidatorDelegations.Add(v.TokensFromSharesRoundUp(d.Shares).Ceil().TruncateInt())
}
}

Expand Down
2 changes: 1 addition & 1 deletion x/dualstaking/keeper/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h Hooks) BeforeDelegationRemoved(ctx sdk.Context, delAddr sdk.AccAddress,
if err != nil {
return nil
}
amount := validator.TokensFromSharesRoundUp(delegation.Shares).TruncateInt()
amount := validator.TokensFromSharesRoundUp(delegation.Shares).Ceil().TruncateInt()
err = h.k.UnbondUniformProviders(ctx, delAddr.String(), sdk.NewCoin(h.k.stakingKeeper.BondDenom(ctx), amount))
if err != nil {
return utils.LavaFormatError("delegation removed hook failed", err,
Expand Down
9 changes: 1 addition & 8 deletions x/dualstaking/keeper/hooks_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package keeper_test

import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -415,7 +413,7 @@ func TestValidatorAndProvidersSlash(t *testing.T) {
// verify once again that the delegator's delegations balance is preserved
diff, err = ts.Keepers.Dualstaking.VerifyDelegatorBalance(ts.Ctx, delegatorAcc.Addr)
require.Nil(t, err)
require.True(t, diff.IsZero())
require.Equal(t, sdk.OneInt(), diff)
}

// TestCancelUnbond checks that the providers-validators delegations balance is preserved when
Expand Down Expand Up @@ -489,11 +487,6 @@ func TestHooksRandomDelegations(t *testing.T) {
delegator = prevDelegator
}
_, err := ts.TxDualstakingDelegate(delegator, provider, ts.spec.Index, sdk.NewCoin(ts.TokenDenom(), sdk.NewInt(int64(d))))
if err == nil {
fmt.Printf("%v: delegated %v\n", strconv.Itoa(i), strconv.Itoa(d))
} else {
fmt.Printf("%v: failed delegating %v. err: %s\n", strconv.Itoa(i), strconv.Itoa(d), err.Error())
}
require.Nil(t, err)

_, found := ts.Keepers.StakingKeeper.GetDelegation(ts.Ctx, delegatorAcc.Addr, sdk.ValAddress(validatorAcc.Addr))
Expand Down
37 changes: 30 additions & 7 deletions x/pairing/keeper/epoch_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (k Keeper) GetAllEpochPayments(ctx sdk.Context) (list []types.EpochPayments
// Function to remove epochPayments objects from deleted epochs (older than the chain's memory)
func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) {
for _, epoch := range k.epochStorageKeeper.GetDeletedEpochs(ctx) {
k.RemoveAllEpochPaymentsForBlock(ctx, epoch)
k.RemoveAllEpochPaymentsForBlockAppendAdjustments(ctx, epoch)
}
}

Expand Down Expand Up @@ -116,26 +116,33 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p
}

// Function to remove all epochPayments objects from a specific epoch
func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete uint64) {
func (k Keeper) RemoveAllEpochPaymentsForBlockAppendAdjustments(ctx sdk.Context, blockForDelete uint64) {
// get the epochPayments object of blockForDelete
epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, blockForDelete)
if !found {
return
}

// TODO: update Qos in providerQosFS. new consumers (cluster.subUsage = 0) get default QoS (what is default?)

consumerUsage := map[string]uint64{}
type couplingConsumerProvider struct {
consumer string
provider string
}
// we are keeping the iteration keys to keep determinism when going over the map
iterationOrder := []couplingConsumerProvider{}
couplingUsage := map[couplingConsumerProvider]uint64{}
// go over the epochPayments object's providerPaymentStorageKeys
userPaymentsStorageKeys := epochPayments.GetProviderPaymentStorageKeys()
for _, userPaymentStorageKey := range userPaymentsStorageKeys {
// get the providerPaymentStorage object
userPaymentStorage, found := k.GetProviderPaymentStorage(ctx, userPaymentStorageKey)
providerPaymentStorage, found := k.GetProviderPaymentStorage(ctx, userPaymentStorageKey)
if !found {
continue
}

// go over the providerPaymentStorage object's uniquePaymentStorageClientProviderKeys
uniquePaymentStoragesCliProKeys := userPaymentStorage.GetUniquePaymentStorageClientProviderKeys()
uniquePaymentStoragesCliProKeys := providerPaymentStorage.GetUniquePaymentStorageClientProviderKeys()
for _, uniquePaymentStorageKey := range uniquePaymentStoragesCliProKeys {
// get the uniquePaymentStorageClientProvider object
uniquePaymentStorage, found := k.GetUniquePaymentStorageClientProvider(ctx, uniquePaymentStorageKey)
Expand All @@ -157,12 +164,28 @@ func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete u

// delete the uniquePaymentStorageClientProvider object
k.RemoveUniquePaymentStorageClientProvider(ctx, uniquePaymentStorage.Index)
consumer := k.GetConsumerFromUniquePayment(&uniquePaymentStorage)

provider, err := k.GetProviderFromProviderPaymentStorage(&providerPaymentStorage)
if err != nil {
utils.LavaFormatError("failed getting provider from payment storage", err)
continue
}
coupling := couplingConsumerProvider{consumer: consumer, provider: provider}
if _, ok := couplingUsage[coupling]; !ok {
// only add it if it doesn't exist
iterationOrder = append(iterationOrder, coupling)
}
consumerUsage[consumer] += uniquePaymentStorage.UsedCU
couplingUsage[coupling] += uniquePaymentStorage.UsedCU
}

// after we're done deleting the uniquePaymentStorageClientProvider objects, delete the providerPaymentStorage object
k.RemoveProviderPaymentStorage(ctx, userPaymentStorage.Index)
k.RemoveProviderPaymentStorage(ctx, providerPaymentStorage.Index)
}
for _, coupling := range iterationOrder {
k.subscriptionKeeper.AppendAdjustment(ctx, coupling.consumer, coupling.provider, consumerUsage[coupling.consumer], couplingUsage[coupling])
}

// after we're done deleting the providerPaymentStorage objects, delete the epochPayments object
k.RemoveEpochPayments(ctx, key)
}
2 changes: 1 addition & 1 deletion x/pairing/keeper/msg_server_relay_payment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func TestBadgeValidation(t *testing.T) {
ts.AdvanceBlock()

// remove past payments to avoid double spending (first error payment succeeded)
ts.Keepers.Pairing.RemoveAllEpochPaymentsForBlock(ts.Ctx, tt.epoch)
ts.Keepers.Pairing.RemoveAllEpochPaymentsForBlockAppendAdjustments(ts.Ctx, tt.epoch)
}

badge := types.CreateBadge(badgeCuAllocation, tt.epoch, tt.badgeAddress, tt.lavaChainID, []byte{})
Expand Down
11 changes: 11 additions & 0 deletions x/pairing/keeper/provider_payment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keeper
import (
"fmt"
"strconv"
"strings"

"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -64,6 +65,16 @@ func (k Keeper) GetAllProviderPaymentStorage(ctx sdk.Context) (list []types.Prov
return
}

func (k Keeper) GetProviderFromProviderPaymentStorage(providerPaymentStorage *types.ProviderPaymentStorage) (string, error) {
index := providerPaymentStorage.Index
// index consists of chain_epoch_providerAddress
lastIndex := strings.LastIndex(index, "_")
if lastIndex != -1 {
return index[lastIndex+1:], nil
}
return "", fmt.Errorf("invalid provider payment storage key %s", index)
}

// Function to get a providerPaymentStorage object's key (key is chainID_epoch_providerAddress, epoch in hex representation)
func (k Keeper) GetProviderPaymentStorageKey(ctx sdk.Context, chainID string, epoch uint64, providerAddress sdk.AccAddress) string {
return chainID + "_" + strconv.FormatUint(epoch, 16) + "_" + providerAddress.String()
Expand Down
1 change: 1 addition & 0 deletions x/pairing/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type SubscriptionKeeper interface {
CalcTotalMonthlyReward(ctx sdk.Context, totalAmount math.Int, trackedCu uint64, totalCuUsedBySub uint64) math.Int
AddTrackedCu(ctx sdk.Context, sub string, provider string, chainID string, cu uint64, block uint64) error
GetAllSubscriptionsIndices(ctx sdk.Context) []string
AppendAdjustment(ctx sdk.Context, consumer string, provider string, totalConsumerUsage uint64, usageWithThisProvider uint64)
}

type PlanKeeper interface {
Expand Down
6 changes: 3 additions & 3 deletions x/projects/keeper/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,9 @@ func TestSetPolicyByGeolocation(t *testing.T) {
basicUser := common.CreateNewAccount(_ctx, *keepers, 10000)
premiumUser := common.CreateNewAccount(_ctx, *keepers, 10000)

common.BuySubscription(t, _ctx, *keepers, *servers, freeUser, freePlan.Index)
common.BuySubscription(t, _ctx, *keepers, *servers, basicUser, basicPlan.Index)
common.BuySubscription(t, _ctx, *keepers, *servers, premiumUser, premiumPlan.Index)
common.BuySubscription(_ctx, *keepers, *servers, freeUser, freePlan.Index)
common.BuySubscription(_ctx, *keepers, *servers, basicUser, basicPlan.Index)
common.BuySubscription(_ctx, *keepers, *servers, premiumUser, premiumPlan.Index)

templates := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions x/rewards/keeper/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func newTester(t *testing.T, addValidator bool) *tester {
ts.plan.Price.Amount = monthlyProvidersPool.QuoRaw(5).AddRaw(5)
ts.plan.PlanPolicy.EpochCuLimit = monthlyProvidersPool.Uint64() * 5
ts.plan.PlanPolicy.TotalCuLimit = monthlyProvidersPool.Uint64() * 5
ts.plan.PlanPolicy.MaxProvidersToPair = 5
ts.AddPlan(ts.plan.Index, ts.plan)
ts.spec = ts.AddSpec("mock", common.CreateMockSpec()).Spec("mock")

Expand Down
4 changes: 2 additions & 2 deletions x/rewards/keeper/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/lavanet/lava/x/rewards/types"
)

func (k Keeper) AggregateRewards(ctx sdk.Context, provider, chainid string, adjustmentDenom uint64, rewards math.Int) {
func (k Keeper) AggregateRewards(ctx sdk.Context, provider, chainid string, adjustment sdk.Dec, rewards math.Int) {
index := types.BasePayIndex{Provider: provider, ChainID: chainid}
basepay, found := k.getBasePay(ctx, index)
adjustedPay := sdk.NewDecFromInt(rewards).QuoInt64(int64(adjustmentDenom))
adjustedPay := adjustment.MulInt(rewards)
adjustedPay = sdk.MinDec(adjustedPay, sdk.NewDecFromInt(rewards))
if !found {
basepay = types.BasePay{Total: rewards, TotalAdjusted: adjustedPay}
Expand Down
Loading

0 comments on commit 41bb701

Please sign in to comment.