Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Choreo] Changes to consider organization with subscription policies #3504

Merged
merged 18 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions adapter/internal/discovery/xds/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,16 +501,15 @@ func MarshalMultipleSubscriptionPolicies(policies *types.SubscriptionPolicyList)
// from message broker. And then it returns the subscriptionPolicyList.
func MarshalSubscriptionPolicyEventAndReturnList(policy *types.SubscriptionPolicy, eventType EventType) *subscription.SubscriptionPolicyList {
if eventType == DeleteEvent {
delete(ApplicationPolicyMap, policy.ID)
logger.LoggerXds.Infof("Application Policy: %s is deleted.", policy.Name)
delete(SubscriptionPolicyMap, policy.ID)
logger.LoggerXds.Infof("Subscription policy: %s is deleted for organization: %s", policy.Name, policy.Organization)
} else {
subPolicy := marshalSubscriptionPolicy(policy)
SubscriptionPolicyMap[policy.ID] = subPolicy
if eventType == UpdateEvent {
logger.LoggerInternalMsg.Infof("Subscription Policy: %s is updated.", subPolicy.Name)
} else {
logger.LoggerInternalMsg.Infof("Subscription Policy: %s is added.", subPolicy.Name)
logger.LoggerInternalMsg.Infof("Subscription policy: %s is updated for organization: %s", subPolicy.Name, subPolicy.Organization)
}
logger.LoggerInternalMsg.Infof("Subscription policy: %s is added for organization: %s", subPolicy.Name, subPolicy.Organization)
}
return marshalSubscriptionPolicyMapToList(SubscriptionPolicyMap)
}
Expand Down Expand Up @@ -671,6 +670,7 @@ func marshalSubscriptionPolicy(policy *types.SubscriptionPolicy) *subscription.S
TenantId: policy.TenantID,
TenantDomain: policy.TenantDomain,
Timestamp: policy.TimeStamp,
Organization: policy.Organization,
}
}

Expand Down
78 changes: 56 additions & 22 deletions adapter/internal/discovery/xds/rate_limiter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import (

var rlsPolicyCache *rateLimitPolicyCache

const subscriptionPolicyType = "subscription"
const (
subscriptionPolicyType = "subscription"
organization = "organization"
)

func getRateLimitUnit(name string) (rls_config.RateLimitUnit, error) {
switch strings.ToUpper(name) {
Expand Down Expand Up @@ -76,8 +79,8 @@ type rateLimitPolicyCache struct {
// org -> vhost -> API-Identifier (i.e. Vhost:API-UUID) -> Rate Limit Configs
apiLevelRateLimitPolicies map[string]map[string]map[string][]*rls_config.RateLimitDescriptor
// metadataBasedPolicies is used to store the rate limit policies which are based on dynamic metadata.
// rate limit type (eg: subscription) -> policy name (eg: Gold, Silver) -> Rate Limit Config
metadataBasedPolicies map[string]map[string]*rls_config.RateLimitDescriptor
// metadata related to the subscription rate-limiting: organization -> subscriptionID -> rate-limit-policy
metadataBasedPolicies map[string]map[string]map[string]*rls_config.RateLimitDescriptor
// mutex for API level
apiLevelMu sync.RWMutex
}
Expand Down Expand Up @@ -247,16 +250,41 @@ func (r *rateLimitPolicyCache) generateRateLimitConfig(label string) *rls_config
}

//Iterate through the subscription policies and append it to the orgDescriptors
for metadataType, metadataPolicyMap := range r.metadataBasedPolicies {
var subscriptionDescriptors []*rls_config.RateLimitDescriptor
for _, subscriptionPolicyDescriptor := range metadataPolicyMap {
subscriptionDescriptors = append(subscriptionDescriptors, subscriptionPolicyDescriptor)
}
metadataDescriptor := &rls_config.RateLimitDescriptor{
Key: metadataType,
Descriptors: subscriptionDescriptors,
// domain: Default
// descriptors:
// - key: organisation
// value: org001
// - key: subscription
// descriptors:
// - key: policy
// value: gold
// rate_limit:
// requests_per_unit: 1000
// unit: minute
// - key: policy
// value: silver
// rate_limit:
// requests_per_unit: 200
// unit: minute
if subscriptionPoliciesList, ok := r.metadataBasedPolicies[subscriptionPolicyType]; ok {
for orgUUID := range subscriptionPoliciesList {
var metadataDescriptor *rls_config.RateLimitDescriptor
var policyDescriptors []*rls_config.RateLimitDescriptor
metadataDescriptor = &rls_config.RateLimitDescriptor{
Key: organization,
Value: orgUUID,
}
subscriptionIDDescriptor := &rls_config.RateLimitDescriptor{
Key: subscriptionPolicyType,
}
for policyName := range subscriptionPoliciesList[orgUUID] {
policyDescriptors = append(policyDescriptors, subscriptionPoliciesList[orgUUID][policyName])
}
subscriptionIDDescriptor.Descriptors = policyDescriptors
metadataDescriptor.Descriptors = append(metadataDescriptor.Descriptors, subscriptionIDDescriptor)

metadataDescriptors = append(metadataDescriptors, metadataDescriptor)
}
metadataDescriptors = append(metadataDescriptors, metadataDescriptor)
}

allDescriptors := append(orgDescriptors, metadataDescriptors...)
Expand Down Expand Up @@ -294,21 +322,19 @@ func (r *rateLimitPolicyCache) updateXdsCache(label string) bool {
return true
}

// AddSubscriptionLevelRateLimitPolicy adds a subscription level rate limit policy to the cache. This method is called
// only during the startup as there is no option available to add subscription level rate limit policies via the choreo console
// AddSubscriptionLevelRateLimitPolicy adds a subscription level rate limit policy to the cache.
func AddSubscriptionLevelRateLimitPolicy(policyList *types.SubscriptionPolicyList) error {
// Check if rlsPolicyCache.metadataBasedPolicies[Subscription] exists and create a new map if not
if _, ok := rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType]; !ok {
rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType] = make(map[string]*rls_config.RateLimitDescriptor)
}
for _, policy := range policyList.List {
// Needs to skip on async policies.
if policy.DefaultLimit == nil || policy.DefaultLimit.QuotaType != "requestCount" || policy.DefaultLimit.RequestCount == nil {
continue
}

// Need not to add the Unauthenticated and Unlimited policies to the rate limiter service
if policy.Name == "Unauthenticated" || policy.Name == "Unlimited" {
if policy.Organization == "carbon.super" && policy.Name == "Unauthenticated" {
continue
}
if policy.Name == "Unlimited" {
continue
}
rateLimitUnit, err := parseRateLimitUnitFromSubscriptionPolicy(policy.DefaultLimit.RequestCount.TimeUnit)
Expand All @@ -320,12 +346,20 @@ func AddSubscriptionLevelRateLimitPolicy(policyList *types.SubscriptionPolicyLis
Unit: rateLimitUnit,
RequestsPerUnit: uint32(policy.DefaultLimit.RequestCount.RequestCount),
}
rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType][policy.Name] = &rls_config.RateLimitDescriptor{
descriptor := &rls_config.RateLimitDescriptor{
Key: "policy",
Value: policy.Name,
RateLimit: &rlPolicyConfig,
}
loggers.LoggerXds.Infof("Subscription level rate limit policy is added to the cache map: %v", policy)
// Check if rlsPolicyCache.metadataBasedPolicies[Subscription] exists and create a new map if not
if _, ok := rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType]; !ok {
rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType] = make(map[string]map[string]*rls_config.RateLimitDescriptor)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do this inside this for loop. We can keep it in the original location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let move this to original place

if _, ok := rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType][policy.Organization]; !ok {
rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType][policy.Organization] = make(map[string]*rls_config.RateLimitDescriptor)
}
rlsPolicyCache.metadataBasedPolicies[subscriptionPolicyType][policy.Organization][policy.Name] = descriptor
loggers.LoggerXds.Infof("Custom subscription policy: %s is added to the cache map for organization: %s", policy.Name, policy.Organization)
}
return nil
}
Expand All @@ -334,6 +368,6 @@ func init() {
rlsPolicyCache = &rateLimitPolicyCache{
xdsCache: gcp_cache.NewSnapshotCache(false, IDHash{}, nil),
apiLevelRateLimitPolicies: make(map[string]map[string]map[string][]*rls_config.RateLimitDescriptor),
metadataBasedPolicies: make(map[string]map[string]*rls_config.RateLimitDescriptor),
metadataBasedPolicies: make(map[string]map[string]map[string]*rls_config.RateLimitDescriptor),
}
}
133 changes: 133 additions & 0 deletions adapter/internal/discovery/xds/rate_limiter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/wso2/product-microgateway/adapter/internal/oasparser/envoyconf"
mgw "github.com/wso2/product-microgateway/adapter/internal/oasparser/model"
"github.com/wso2/product-microgateway/adapter/pkg/eventhub/types"
)

func TestGetRateLimitUnit(t *testing.T) {
Expand Down Expand Up @@ -577,3 +578,135 @@ func getDummyAPISwagger(apiID, level, apiPolicy, res1GetPolicy, res1PostPolicy,
mgwSwagger.VHost = "vhost1"
return mgwSwagger
}
func TestAddSubscriptionLevelRateLimitPolicy(t *testing.T) {
policyList := &types.SubscriptionPolicyList{
List: []types.SubscriptionPolicy{
{
Name: "Policy1",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 100,
TimeUnit: "sec",
},
},
Organization: "org1",
},
{
Name: "Policy2",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 200,
TimeUnit: "min",
},
},
Organization: "org1",
},
{
Name: "Unauthenticated",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 300,
TimeUnit: "hours",
},
},
Organization: "org1",
},
{
Name: "AsyncPolicy1",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "eventCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 300,
TimeUnit: "hours",
},
},
Organization: "org1",
},
{
Name: "Org2Policy1",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 124,
TimeUnit: "sec",
},
},
Organization: "org2",
},
{
Name: "Unlimited",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 2147483647,
TimeUnit: "min",
},
},
Organization: "carbon.super",
},
{
Name: "Unlimited",
DefaultLimit: &types.SubscriptionDefaultLimit{
QuotaType: "requestCount",
RequestCount: &types.SubscriptionRequestCount{
RequestCount: 2147483647,
TimeUnit: "min",
},
},
Organization: "org2",
},
},
}

// Initialize rlsPolicyCache.metadataBasedPolicies
rlsPolicyCache.metadataBasedPolicies = make(map[string]map[string]map[string]*rls_config.RateLimitDescriptor)

err := AddSubscriptionLevelRateLimitPolicy(policyList)
assert.NoError(t, err)

expectedPolicies := map[string]map[string]map[string]*rls_config.RateLimitDescriptor{
subscriptionPolicyType: {
"org1": {
"Policy1": {
Key: "policy",
Value: "Policy1",
RateLimit: &rls_config.RateLimitPolicy{
Unit: rls_config.RateLimitUnit_SECOND,
RequestsPerUnit: 100,
},
},
"Policy2": {
Key: "policy",
Value: "Policy2",
RateLimit: &rls_config.RateLimitPolicy{
Unit: rls_config.RateLimitUnit_MINUTE,
RequestsPerUnit: 200,
},
},
"Unauthenticated": {
Key: "policy",
Value: "Unauthenticated",
RateLimit: &rls_config.RateLimitPolicy{
Unit: rls_config.RateLimitUnit_HOUR,
RequestsPerUnit: 300,
},
},
},
"org2": {
"Org2Policy1": {
Key: "policy",
Value: "Org2Policy1",
RateLimit: &rls_config.RateLimitPolicy{
Unit: rls_config.RateLimitUnit_SECOND,
RequestsPerUnit: 124,
},
},
},
},
}

assert.Equal(t, expectedPolicies, rlsPolicyCache.metadataBasedPolicies)
}
2 changes: 1 addition & 1 deletion adapter/internal/messaging/notification_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func handlePolicyEvents(data []byte, eventType string) {
GraphQLMaxComplexity: subscriptionPolicyEvent.GraphQLMaxComplexity,
GraphQLMaxDepth: subscriptionPolicyEvent.GraphQLMaxDepth, RateLimitCount: subscriptionPolicyEvent.RateLimitCount,
RateLimitTimeUnit: subscriptionPolicyEvent.RateLimitTimeUnit, StopOnQuotaReach: subscriptionPolicyEvent.StopOnQuotaReach,
TenantDomain: subscriptionPolicyEvent.TenantDomain, TimeStamp: subscriptionPolicyEvent.TimeStamp}
TenantDomain: subscriptionPolicyEvent.TenantDomain, TimeStamp: subscriptionPolicyEvent.TimeStamp, Organization: subscriptionPolicyEvent.Organization}

var subscriptionPolicyList *subscription.SubscriptionPolicyList
if subscriptionPolicyEvent.Event.Type == policyCreate {
Expand Down
19 changes: 19 additions & 0 deletions adapter/internal/oasparser/envoyconf/envoyconf_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,25 @@ func TestCreateRoute(t *testing.T) {
RateLimits: []*routev3.RateLimit{
{
Actions: []*routev3.RateLimit_Action{
{
ActionSpecifier: &routev3.RateLimit_Action_Metadata{
Metadata: &routev3.RateLimit_Action_MetaData{
DescriptorKey: "organization",
MetadataKey: &metadatav3.MetadataKey{
Key: extAuthzFilterName,
Path: []*metadatav3.MetadataKey_PathSegment{
{
Segment: &metadatav3.MetadataKey_PathSegment_Key{
Key: "ratelimit:organization",
},
},
},
},
Source: routev3.RateLimit_Action_MetaData_DYNAMIC,
SkipIfAbsent: true,
},
},
},
{
ActionSpecifier: &routev3.RateLimit_Action_Metadata{
Metadata: &routev3.RateLimit_Action_MetaData{
Expand Down
21 changes: 21 additions & 0 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ const (
DescriptorValueForOperationMethod = ":method"
DescriptorKeyForSubscription = "subscription"
DescriptorKeyForPolicy = "policy"
DescriptorKeyForOrganization = "organization"

descriptorMetadataKeyForSubscription = "ratelimit:subscription"
descriptorMetadataKeyForUsagePolicy = "ratelimit:usage-policy"
descriptorMetadataKeyForOrganization = "ratelimit:organization"
)

// CreateRoutesWithClusters creates envoy routes along with clusters and endpoint instances.
Expand Down Expand Up @@ -971,6 +973,25 @@ func createRoute(params *routeCreateParams) *routev3.Route {
if config.Envoy.RateLimit.Enabled {
action.Route.RateLimits = append(action.Route.RateLimits, &routev3.RateLimit{
Actions: []*routev3.RateLimit_Action{
{
ActionSpecifier: &routev3.RateLimit_Action_Metadata{
Metadata: &routev3.RateLimit_Action_MetaData{
DescriptorKey: DescriptorKeyForOrganization,
MetadataKey: &metadatav3.MetadataKey{
Key: extAuthzFilterName,
Path: []*metadatav3.MetadataKey_PathSegment{
{
Segment: &metadatav3.MetadataKey_PathSegment_Key{
Key: descriptorMetadataKeyForOrganization,
},
},
},
},
Source: routev3.RateLimit_Action_MetaData_DYNAMIC,
SkipIfAbsent: true,
},
},
},
{
ActionSpecifier: &routev3.RateLimit_Action_Metadata{
Metadata: &routev3.RateLimit_Action_MetaData{
Expand Down
Loading
Loading