Skip to content

Commit

Permalink
feat: add ratelimit plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
yikuaibro committed Sep 29, 2023
1 parent 76d805e commit 7f7c43c
Show file tree
Hide file tree
Showing 22 changed files with 626 additions and 67 deletions.
60 changes: 44 additions & 16 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package opensergo

import (
stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream"
"github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1"
"github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin"
ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit"
"log"
"os"
"sync"
Expand All @@ -39,7 +41,7 @@ type ControlPlane struct {
func NewControlPlane() (*ControlPlane, error) {
cp := &ControlPlane{}

operator, err := controller.NewKubernetesOperator(cp.sendMessage)
operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.NotifyPluginHandler)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -72,6 +74,32 @@ func (c *ControlPlane) Start() error {
return nil
}

func (c *ControlPlane) NotifyPluginHandler(pluginName string, e any) error {
client, err := c.server.PluginServer.GetPluginClient(pluginName)
if err != nil {
log.Printf("Error:%s\n", err.Error())
}
switch pluginName {
case builtin.RateLimitServicePluginName:
raw, ok := client.(ratelimit_plugin.RateLimit)
if !ok {
return errors.New("can't convert ratelimit plugin to normal wrapper")
}
l, ok := e.(*v1alpha1.RateLimitStrategy)
if !ok {
log.Printf("Error: %s\n", "can't convert event to ratelimit strategy")
}
err = builtin.NotifyPluginRateLimit(raw, l)
if err != nil {
return err
}
default:
log.Printf("unknown plugin name: %s\n", pluginName)
}
return nil

}

func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
connections, exists := c.server.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
Expand Down Expand Up @@ -101,20 +129,20 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
if stream == nil {
return nil
}
client, err := c.server.PluginServer.GetPluginClient("stream")
if err != nil {
log.Printf("Error:%s\n", err.Error())
}
raw, ok := client.(stream_plugin.Stream)
if !ok {
log.Printf("Error: %s\n", "can't convert rpc plugin to normal wrapper")
}
sa := &say{}
greet, err := raw.Greeter("这是一个前缀", sa)
if err != nil {
log.Printf("Error: %s\n", err.Error())
}
log.Printf("Greeting: %s\n", greet)
//client, err := c.server.PluginServer.GetPluginClient("stream")
//if err != nil {
// log.Printf("Error:%s\n", err.Error())
//}
//raw, ok := client.(stream_plugin.Stream)
//if !ok {
// log.Printf("Error: %s\n", "can't convert rpc plugin to normal wrapper")
//}
//sa := &say{}
//greet, err := raw.Greeter("这是一个前缀", sa)
//if err != nil {
// log.Printf("Error: %s\n", err.Error())
//}
//log.Printf("Greeting: %s\n", greet)

return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"context"
"github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin"
"log"
"net/http"
"strconv"
Expand Down Expand Up @@ -53,8 +54,9 @@ type CRDWatcher struct {
subscribedNamespaces map[string]bool
subscribedApps map[model.NamespacedApp]bool

crdGenerator func() client.Object
sendDataHandler model.DataEntirePushHandler
crdGenerator func() client.Object
sendDataHandler model.DataEntirePushHandler
notifyPluginHandler model.NotifyPluginHandler

updateMux sync.RWMutex
}
Expand Down Expand Up @@ -247,6 +249,10 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro

case RateLimitStrategyKind:
rls := object.(*crdv1alpha1.RateLimitStrategy)
err = r.notifyPluginHandler(builtin.RateLimitServicePluginName, rls)
if err != nil {
log.Println("notify plugin error", err)
}
mType, _ := strconv.ParseInt(rls.Spec.MetricType, 10, 32)
limitMode, _ := strconv.ParseInt(rls.Spec.LimitMode, 10, 32)
rule = &pb.RateLimitStrategy{
Expand Down Expand Up @@ -328,7 +334,7 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro

}

func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler) *CRDWatcher {
func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) *CRDWatcher {
return &CRDWatcher{
kind: kind,
Client: crdManager.GetClient(),
Expand All @@ -340,5 +346,6 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
crdGenerator: crdGenerator,
crdCache: NewCRDCache(kind),
sendDataHandler: sendDataHandler,
notifyPluginHandler: notifyPluginHandler,
}
}
20 changes: 11 additions & 9 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ type KubernetesOperator struct {
ctxCancel context.CancelFunc
started atomic.Value

sendDataHandler model.DataEntirePushHandler
sendDataHandler model.DataEntirePushHandler
notifyPluginHandler model.NotifyPluginHandler

controllerMux sync.RWMutex
}

// NewKubernetesOperator creates a OpenSergo Kubernetes operator.
func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*KubernetesOperator, error) {
func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) (*KubernetesOperator, error) {
ctrl.SetLogger(&k8SLogger{
l: logging.GetGlobalLogger(),
level: logging.GetGlobalLoggerLevel(),
Expand All @@ -102,11 +103,12 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern
}
ctx, cancel := context.WithCancel(context.Background())
k := &KubernetesOperator{
crdManager: mgr,
controllers: make(map[string]*CRDWatcher),
ctx: ctx,
ctxCancel: cancel,
sendDataHandler: sendDataHandler,
crdManager: mgr,
controllers: make(map[string]*CRDWatcher),
ctx: ctx,
ctxCancel: cancel,
sendDataHandler: sendDataHandler,
notifyPluginHandler: notifyPluginHandler,
}
return k, nil
}
Expand Down Expand Up @@ -145,7 +147,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
return nil, errors.New("CRD not supported: " + target.Kind)
}
// This kind of CRD has never been watched.
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler)
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler)
err = crdWatcher.AddSubscribeTarget(target)
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,7 +180,7 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
if !crdSupports {
return errors.New("CRD not supported: " + target.Kind)
}
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler)
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler)
err = crdWatcher.AddSubscribeTarget(target)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib
type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error

type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error

type NotifyPluginHandler func(pluginName string, e any) error
14 changes: 13 additions & 1 deletion pkg/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *PluginClientRegistry) DeletePluginClient(id string) {
c.client.Delete(id)
}

func (c *PluginClientRegistry) RangePluginClient(name string) interface{} {
func (c *PluginClientRegistry) RangePluginClientByName(name string) interface{} {
var client interface{}
c.client.Range(func(key, value interface{}) bool {
parts := strings.SplitN(key.(string), "-", 2)
Expand All @@ -46,3 +46,15 @@ func (c *PluginClientRegistry) RangePluginClient(name string) interface{} {
})
return client
}

func (c *PluginClientRegistry) RangePluginClientByPublicID(publicID string) interface{} {
var client interface{}
c.client.Range(func(key, value interface{}) bool {
if key.(string) == publicID {
client = value
return false
}
return true
})
return client
}
6 changes: 4 additions & 2 deletions pkg/plugin/pl/builtin/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package builtin

const (
StreamServicePluginSetName = "stream-plugin"
StreamServicePluginName = "stream"
StreamServicePluginSetName = "stream-plugin"
StreamServicePluginName = "stream"
RateLimitServicePluginSetName = "ratelimit-plugin"
RateLimitServicePluginName = "ratelimit"
)
15 changes: 15 additions & 0 deletions pkg/plugin/pl/builtin/notifyplugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package builtin

import (
"github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1"
ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit"
)

func NotifyPluginRateLimit(r ratelimit_plugin.RateLimit, l *v1alpha1.RateLimitStrategy) error {
limit, err := r.RateLimit(l.Spec.Threshold)
if err != nil {
return err
}
l.Spec.Threshold = limit
return nil
}
36 changes: 36 additions & 0 deletions pkg/plugin/pl/builtin/ratelimit/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ratelimit_plugin

import (
"context"

v1 "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1"
)

type GRPCClient struct {
client v1.RateLimitServiceClient
}

func (g *GRPCClient) RateLimit(t int64) (int64, error) {
resp, err := g.client.RateLimit(context.Background(), &v1.RateLimitRequest{
Threshold: t,
})
if err != nil {
return 0, err
}
return resp.Threshold, nil
}

type GRPCServer struct {
v1.UnimplementedRateLimitServiceServer
Impl RateLimit
}

func (g *GRPCServer) RateLimit(ctx context.Context, req *v1.RateLimitRequest) (*v1.RateLimitResponse, error) {
resp, err := g.Impl.RateLimit(req.Threshold)
if err != nil {
return nil, err
}
return &v1.RateLimitResponse{
Threshold: resp,
}, nil
}
51 changes: 51 additions & 0 deletions pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ratelimit_plugin

import (
"context"
"fmt"

"github.com/hashicorp/go-plugin"
pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1"
"google.golang.org/grpc"
)

type RateLimitPluginServer struct {
}

var _ RateLimit = (*RateLimitPluginServer)(nil)

func (s RateLimitPluginServer) RateLimit(t int64) (int64, error) {
return t + 1, nil
}

type RateLimit interface {
RateLimit(t int64) (int64, error)
}

type RateLimitPlugin struct {
plugin.Plugin

impl RateLimit
}

func NewRateLimitPluginServiceServer(impl RateLimit) (*RateLimitPlugin, error) {
if impl == nil {
return nil, fmt.Errorf("empty underlying stream plugin passed in")
}
return &RateLimitPlugin{
impl: impl,
}, nil
}

func (h *RateLimitPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
pb.RegisterRateLimitServiceServer(s, &GRPCServer{
Impl: h.impl,
})
return nil
}

func (h *RateLimitPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (any, error) {
return &GRPCClient{
client: pb.NewRateLimitServiceClient(c),
}, nil
}
Loading

0 comments on commit 7f7c43c

Please sign in to comment.