diff --git a/control_plane.go b/control_plane.go index fb9ce86..8cab1a0 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,7 +15,9 @@ package opensergo import ( - stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream" + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" "log" "os" "sync" @@ -39,7 +41,7 @@ type ControlPlane struct { func NewControlPlane() (*ControlPlane, error) { cp := &ControlPlane{} - operator, err := controller.NewKubernetesOperator(cp.sendMessage) + operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.NotifyPluginHandler) if err != nil { return nil, err } @@ -72,6 +74,32 @@ func (c *ControlPlane) Start() error { return nil } +func (c *ControlPlane) NotifyPluginHandler(pluginName string, e any) error { + client, err := c.server.PluginServer.GetPluginClient(pluginName) + if err != nil { + log.Printf("Error:%s\n", err.Error()) + } + switch pluginName { + case builtin.RateLimitServicePluginName: + raw, ok := client.(ratelimit_plugin.RateLimit) + if !ok { + return errors.New("can't convert ratelimit plugin to normal wrapper") + } + l, ok := e.(*v1alpha1.RateLimitStrategy) + if !ok { + log.Printf("Error: %s\n", "can't convert event to ratelimit strategy") + } + err = builtin.NotifyPluginRateLimit(raw, l) + if err != nil { + return err + } + default: + log.Printf("unknown plugin name: %s\n", pluginName) + } + return nil + +} + func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error { connections, exists := c.server.ConnectionManager().Get(namespace, app, kind) if !exists || connections == nil { @@ -101,20 +129,20 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream if stream == nil { return nil } - client, err := c.server.PluginServer.GetPluginClient("stream") - if err != nil { - log.Printf("Error:%s\n", err.Error()) - } - raw, ok := client.(stream_plugin.Stream) - if !ok { - log.Printf("Error: %s\n", "can't convert rpc plugin to normal wrapper") - } - sa := &say{} - greet, err := raw.Greeter("这是一个前缀", sa) - if err != nil { - log.Printf("Error: %s\n", err.Error()) - } - log.Printf("Greeting: %s\n", greet) + //client, err := c.server.PluginServer.GetPluginClient("stream") + //if err != nil { + // log.Printf("Error:%s\n", err.Error()) + //} + //raw, ok := client.(stream_plugin.Stream) + //if !ok { + // log.Printf("Error: %s\n", "can't convert rpc plugin to normal wrapper") + //} + //sa := &say{} + //greet, err := raw.Greeter("这是一个前缀", sa) + //if err != nil { + // log.Printf("Error: %s\n", err.Error()) + //} + //log.Printf("Greeting: %s\n", greet) return stream.SendMsg(&trpb.SubscribeResponse{ Status: status, diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index bcc42ad..ae3a330 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,6 +16,7 @@ package controller import ( "context" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" "log" "net/http" "strconv" @@ -53,8 +54,9 @@ type CRDWatcher struct { subscribedNamespaces map[string]bool subscribedApps map[model.NamespacedApp]bool - crdGenerator func() client.Object - sendDataHandler model.DataEntirePushHandler + crdGenerator func() client.Object + sendDataHandler model.DataEntirePushHandler + notifyPluginHandler model.NotifyPluginHandler updateMux sync.RWMutex } @@ -247,6 +249,10 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro case RateLimitStrategyKind: rls := object.(*crdv1alpha1.RateLimitStrategy) + err = r.notifyPluginHandler(builtin.RateLimitServicePluginName, rls) + if err != nil { + log.Println("notify plugin error", err) + } mType, _ := strconv.ParseInt(rls.Spec.MetricType, 10, 32) limitMode, _ := strconv.ParseInt(rls.Spec.LimitMode, 10, 32) rule = &pb.RateLimitStrategy{ @@ -328,7 +334,7 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro } -func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler) *CRDWatcher { +func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) *CRDWatcher { return &CRDWatcher{ kind: kind, Client: crdManager.GetClient(), @@ -340,5 +346,6 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat crdGenerator: crdGenerator, crdCache: NewCRDCache(kind), sendDataHandler: sendDataHandler, + notifyPluginHandler: notifyPluginHandler, } } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 6c72356..538d77c 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -72,13 +72,14 @@ type KubernetesOperator struct { ctxCancel context.CancelFunc started atomic.Value - sendDataHandler model.DataEntirePushHandler + sendDataHandler model.DataEntirePushHandler + notifyPluginHandler model.NotifyPluginHandler controllerMux sync.RWMutex } // NewKubernetesOperator creates a OpenSergo Kubernetes operator. -func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*KubernetesOperator, error) { +func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) (*KubernetesOperator, error) { ctrl.SetLogger(&k8SLogger{ l: logging.GetGlobalLogger(), level: logging.GetGlobalLoggerLevel(), @@ -102,11 +103,12 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern } ctx, cancel := context.WithCancel(context.Background()) k := &KubernetesOperator{ - crdManager: mgr, - controllers: make(map[string]*CRDWatcher), - ctx: ctx, - ctxCancel: cancel, - sendDataHandler: sendDataHandler, + crdManager: mgr, + controllers: make(map[string]*CRDWatcher), + ctx: ctx, + ctxCancel: cancel, + sendDataHandler: sendDataHandler, + notifyPluginHandler: notifyPluginHandler, } return k, nil } @@ -145,7 +147,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD return nil, errors.New("CRD not supported: " + target.Kind) } // This kind of CRD has never been watched. - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return nil, err @@ -178,7 +180,7 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { if !crdSupports { return errors.New("CRD not supported: " + target.Kind) } - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return err diff --git a/pkg/model/model.go b/pkg/model/model.go index 29140e4..19a3332 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -31,3 +31,5 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error + +type NotifyPluginHandler func(pluginName string, e any) error diff --git a/pkg/plugin/client/client.go b/pkg/plugin/client/client.go index 057c748..cc2817f 100644 --- a/pkg/plugin/client/client.go +++ b/pkg/plugin/client/client.go @@ -33,7 +33,7 @@ func (c *PluginClientRegistry) DeletePluginClient(id string) { c.client.Delete(id) } -func (c *PluginClientRegistry) RangePluginClient(name string) interface{} { +func (c *PluginClientRegistry) RangePluginClientByName(name string) interface{} { var client interface{} c.client.Range(func(key, value interface{}) bool { parts := strings.SplitN(key.(string), "-", 2) @@ -46,3 +46,15 @@ func (c *PluginClientRegistry) RangePluginClient(name string) interface{} { }) return client } + +func (c *PluginClientRegistry) RangePluginClientByPublicID(publicID string) interface{} { + var client interface{} + c.client.Range(func(key, value interface{}) bool { + if key.(string) == publicID { + client = value + return false + } + return true + }) + return client +} diff --git a/pkg/plugin/pl/builtin/const.go b/pkg/plugin/pl/builtin/const.go index dbccd80..f4399f5 100644 --- a/pkg/plugin/pl/builtin/const.go +++ b/pkg/plugin/pl/builtin/const.go @@ -1,6 +1,8 @@ package builtin const ( - StreamServicePluginSetName = "stream-plugin" - StreamServicePluginName = "stream" + StreamServicePluginSetName = "stream-plugin" + StreamServicePluginName = "stream" + RateLimitServicePluginSetName = "ratelimit-plugin" + RateLimitServicePluginName = "ratelimit" ) diff --git a/pkg/plugin/pl/builtin/notifyplugin.go b/pkg/plugin/pl/builtin/notifyplugin.go new file mode 100644 index 0000000..a5b7e36 --- /dev/null +++ b/pkg/plugin/pl/builtin/notifyplugin.go @@ -0,0 +1,15 @@ +package builtin + +import ( + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" +) + +func NotifyPluginRateLimit(r ratelimit_plugin.RateLimit, l *v1alpha1.RateLimitStrategy) error { + limit, err := r.RateLimit(l.Spec.Threshold) + if err != nil { + return err + } + l.Spec.Threshold = limit + return nil +} diff --git a/pkg/plugin/pl/builtin/ratelimit/grpc.go b/pkg/plugin/pl/builtin/ratelimit/grpc.go new file mode 100644 index 0000000..3b40ae2 --- /dev/null +++ b/pkg/plugin/pl/builtin/ratelimit/grpc.go @@ -0,0 +1,36 @@ +package ratelimit_plugin + +import ( + "context" + + v1 "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1" +) + +type GRPCClient struct { + client v1.RateLimitServiceClient +} + +func (g *GRPCClient) RateLimit(t int64) (int64, error) { + resp, err := g.client.RateLimit(context.Background(), &v1.RateLimitRequest{ + Threshold: t, + }) + if err != nil { + return 0, err + } + return resp.Threshold, nil +} + +type GRPCServer struct { + v1.UnimplementedRateLimitServiceServer + Impl RateLimit +} + +func (g *GRPCServer) RateLimit(ctx context.Context, req *v1.RateLimitRequest) (*v1.RateLimitResponse, error) { + resp, err := g.Impl.RateLimit(req.Threshold) + if err != nil { + return nil, err + } + return &v1.RateLimitResponse{ + Threshold: resp, + }, nil +} diff --git a/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go b/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go new file mode 100644 index 0000000..5b0ec76 --- /dev/null +++ b/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go @@ -0,0 +1,51 @@ +package ratelimit_plugin + +import ( + "context" + "fmt" + + "github.com/hashicorp/go-plugin" + pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1" + "google.golang.org/grpc" +) + +type RateLimitPluginServer struct { +} + +var _ RateLimit = (*RateLimitPluginServer)(nil) + +func (s RateLimitPluginServer) RateLimit(t int64) (int64, error) { + return t + 1, nil +} + +type RateLimit interface { + RateLimit(t int64) (int64, error) +} + +type RateLimitPlugin struct { + plugin.Plugin + + impl RateLimit +} + +func NewRateLimitPluginServiceServer(impl RateLimit) (*RateLimitPlugin, error) { + if impl == nil { + return nil, fmt.Errorf("empty underlying stream plugin passed in") + } + return &RateLimitPlugin{ + impl: impl, + }, nil +} + +func (h *RateLimitPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + pb.RegisterRateLimitServiceServer(s, &GRPCServer{ + Impl: h.impl, + }) + return nil +} + +func (h *RateLimitPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (any, error) { + return &GRPCClient{ + client: pb.NewRateLimitServiceClient(c), + }, nil +} diff --git a/pkg/plugin/pl/builtin/stream/grpc.go b/pkg/plugin/pl/builtin/stream/grpc.go index 4bfdcae..018b7cd 100644 --- a/pkg/plugin/pl/builtin/stream/grpc.go +++ b/pkg/plugin/pl/builtin/stream/grpc.go @@ -4,13 +4,14 @@ import ( "context" "fmt" + v1 "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream/v1" + "github.com/hashicorp/go-plugin" - pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream" "google.golang.org/grpc" ) type GRPCClient struct { - client pb.StreamGreeterClient + client v1.RateLimitServiceClient broker *plugin.GRPCBroker } @@ -20,7 +21,7 @@ func (g *GRPCClient) Greeter(name string, h Hello) (string, error) { var s *grpc.Server serverFunc := func(opts []grpc.ServerOption) *grpc.Server { s = grpc.NewServer(opts...) - pb.RegisterHelloServer(s, addHelperServer) + v1.RegisterHelloServer(s, addHelperServer) return s } @@ -28,7 +29,7 @@ func (g *GRPCClient) Greeter(name string, h Hello) (string, error) { brokerID := g.broker.NextId() go g.broker.AcceptAndServe(brokerID, serverFunc) - resp, err := g.client.Greet(context.Background(), &pb.StreamReq{ + resp, err := g.client.Greet(context.Background(), &v1.StreamReq{ Id: brokerID, Name: name, }) @@ -41,34 +42,34 @@ func (g *GRPCClient) Greeter(name string, h Hello) (string, error) { } type GRPCServer struct { - pb.UnimplementedStreamGreeterServer + v1.UnimplementedStreamGreeterServer Impl Stream broker *plugin.GRPCBroker } -func (g *GRPCServer) Greet(ctx context.Context, req *pb.StreamReq) (*pb.StreamResp, error) { +func (g *GRPCServer) Greet(ctx context.Context, req *v1.StreamReq) (*v1.StreamResp, error) { conn, err := g.broker.Dial(req.Id) if err != nil { return nil, err } defer conn.Close() - a := &GRPCHelloClient{pb.NewHelloClient(conn)} + a := &GRPCHelloClient{v1.NewHelloClient(conn)} resp, err := g.Impl.Greeter(req.Name, a) if err != nil { return nil, err } - return &pb.StreamResp{ + return &v1.StreamResp{ Greet: resp, }, nil } type GRPCHelloClient struct { - client pb.HelloClient + client v1.HelloClient } func (g *GRPCHelloClient) Say(s string) string { - resp, err := g.client.Say(context.Background(), &pb.HelloReq{ + resp, err := g.client.Say(context.Background(), &v1.HelloReq{ Pre: s, }) if err != nil { @@ -78,13 +79,13 @@ func (g *GRPCHelloClient) Say(s string) string { } type GRPCHelloServer struct { - pb.UnimplementedHelloServer + v1.UnimplementedHelloServer Impl Hello } -func (g *GRPCHelloServer) Say(ctx context.Context, req *pb.HelloReq) (*pb.HelloResp, error) { +func (g *GRPCHelloServer) Say(ctx context.Context, req *v1.HelloReq) (*v1.HelloResp, error) { resp := g.Impl.Say(fmt.Sprint(req.Pre, " GRPCHelloServer")) - return &pb.HelloResp{ + return &v1.HelloResp{ Resp: resp, }, nil } diff --git a/pkg/plugin/pl/builtin/stream/stream_plugin.go b/pkg/plugin/pl/builtin/stream/stream_plugin.go index 0289526..ab00f0a 100644 --- a/pkg/plugin/pl/builtin/stream/stream_plugin.go +++ b/pkg/plugin/pl/builtin/stream/stream_plugin.go @@ -4,8 +4,9 @@ import ( "context" "fmt" + pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream/v1" + "github.com/hashicorp/go-plugin" - pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream" "google.golang.org/grpc" ) diff --git a/pkg/plugin/pl/plugin.go b/pkg/plugin/pl/plugin.go index 7f04567..e24a37f 100644 --- a/pkg/plugin/pl/plugin.go +++ b/pkg/plugin/pl/plugin.go @@ -98,6 +98,8 @@ func (p *PluginServer) InitPlugin() error { // enadle builtin plugin p.EnabledPlugin = append(p.EnabledPlugin, &EnabledPlugin{ PluginName: builtin.StreamServicePluginName, + }, &EnabledPlugin{ + PluginName: builtin.RateLimitServicePluginName, }) err = p.CreatePlugin() if err != nil { @@ -126,6 +128,16 @@ func (p *PluginServer) CreatePlugin() error { if err != nil { return fmt.Errorf("error CreatePlugin: %w", err) } + case builtin.RateLimitServicePluginName: + co := &creatOption{ + pluginSetName: builtin.RateLimitServicePluginSetName, + pluginType: store.PluginTypeCompute, + executionDir: "", + } + err := p.createplugin(enabledPlugin, co) + if err != nil { + return fmt.Errorf("error CreatePlugin: %w", err) + } default: fmt.Printf("unknow plugin: %s\n", enabledPlugin.PluginName) } @@ -175,7 +187,7 @@ func (p *PluginServer) registerPlugin(ctx context.Context, name string, client i func (p *PluginServer) GetPluginClient(name string) (interface{}, error) { //keys := make([]string, 0, len(p.Client)) - client := p.Client.RangePluginClient(name) + client := p.Client.RangePluginClientByName(name) if client == nil { return nil, fmt.Errorf("plugin %s not found", name) } diff --git a/pkg/plugin/pl/plugin/plugin.go b/pkg/plugin/pl/plugin/plugin.go index c273612..01b84bc 100644 --- a/pkg/plugin/pl/plugin/plugin.go +++ b/pkg/plugin/pl/plugin/plugin.go @@ -5,6 +5,7 @@ import ( "os/exec" "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream" "github.com/hashicorp/go-plugin" @@ -32,12 +33,18 @@ func ServePlugin(svc any, opt ...Option) error { } plugins[builtin.StreamServicePluginSetName] = streamServiceServer } + if ratelimitSvc, ok := svc.(ratelimit_plugin.RateLimit); ok { + ratelimitServiceServer, err := ratelimit_plugin.NewRateLimitPluginServiceServer(ratelimitSvc) + if err != nil { + return err + } + plugins[builtin.RateLimitServicePluginSetName] = ratelimitServiceServer + } if len(plugins) == 0 { return errors.New("no valid plugin server provided") } - //opts.withLogger.Info("Info NewPluginServer") plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: HandshakeConfig, VersionedPlugins: map[int]plugin.PluginSet{ @@ -60,12 +67,10 @@ func NewPluginClient(pluginPath string, setName string, opt ...Option) (*plugin. switch setName { case builtin.StreamServicePluginSetName: set = plugin.PluginSet{builtin.StreamServicePluginSetName: &stream_plugin.StreamPlugin{}} + case builtin.RateLimitServicePluginSetName: + set = plugin.PluginSet{builtin.RateLimitServicePluginSetName: &ratelimit_plugin.RateLimitPlugin{}} } - //fmt.Println("NewPluginClient") - //opts.withLogger.Info("Info NewPluginClient") - //opts.withLogger.Debug("Debug NewPluginClient") - return plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: HandshakeConfig, VersionedPlugins: map[int]plugin.PluginSet{ diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go b/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go new file mode 100644 index 0000000..bd14489 --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go @@ -0,0 +1,222 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: rate_limit.proto + +package __ + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + _ "google.golang.org/protobuf/types/known/anypb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RateLimitResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Threshold int64 `protobuf:"varint,1,opt,name=threshold,proto3" json:"threshold,omitempty"` +} + +func (x *RateLimitResponse) Reset() { + *x = RateLimitResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_rate_limit_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RateLimitResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RateLimitResponse) ProtoMessage() {} + +func (x *RateLimitResponse) ProtoReflect() protoreflect.Message { + mi := &file_rate_limit_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RateLimitResponse.ProtoReflect.Descriptor instead. +func (*RateLimitResponse) Descriptor() ([]byte, []int) { + return file_rate_limit_proto_rawDescGZIP(), []int{0} +} + +func (x *RateLimitResponse) GetThreshold() int64 { + if x != nil { + return x.Threshold + } + return 0 +} + +type RateLimitRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Threshold int64 `protobuf:"varint,1,opt,name=threshold,proto3" json:"threshold,omitempty"` +} + +func (x *RateLimitRequest) Reset() { + *x = RateLimitRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_rate_limit_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RateLimitRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RateLimitRequest) ProtoMessage() {} + +func (x *RateLimitRequest) ProtoReflect() protoreflect.Message { + mi := &file_rate_limit_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RateLimitRequest.ProtoReflect.Descriptor instead. +func (*RateLimitRequest) Descriptor() ([]byte, []int) { + return file_rate_limit_proto_rawDescGZIP(), []int{1} +} + +func (x *RateLimitRequest) GetThreshold() int64 { + if x != nil { + return x.Threshold + } + return 0 +} + +var File_rate_limit_proto protoreflect.FileDescriptor + +var file_rate_limit_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x24, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, 0x6f, + 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, 0x61, + 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x31, 0x0a, 0x11, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, 0x72, 0x65, + 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x68, 0x72, + 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x22, 0x30, 0x0a, 0x10, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, + 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, + 0x68, 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x32, 0x90, 0x01, 0x0a, 0x10, 0x52, 0x61, 0x74, + 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7c, 0x0a, + 0x09, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x36, 0x2e, 0x69, 0x6f, 0x2e, + 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, 0x6f, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, + 0x74, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, + 0x6f, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, + 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x04, 0x5a, 0x02, 0x2e, + 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_rate_limit_proto_rawDescOnce sync.Once + file_rate_limit_proto_rawDescData = file_rate_limit_proto_rawDesc +) + +func file_rate_limit_proto_rawDescGZIP() []byte { + file_rate_limit_proto_rawDescOnce.Do(func() { + file_rate_limit_proto_rawDescData = protoimpl.X.CompressGZIP(file_rate_limit_proto_rawDescData) + }) + return file_rate_limit_proto_rawDescData +} + +var file_rate_limit_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_rate_limit_proto_goTypes = []interface{}{ + (*RateLimitResponse)(nil), // 0: io.opensergo.plugin.proto.rate_limit.RateLimitResponse + (*RateLimitRequest)(nil), // 1: io.opensergo.plugin.proto.rate_limit.RateLimitRequest +} +var file_rate_limit_proto_depIdxs = []int32{ + 1, // 0: io.opensergo.plugin.proto.rate_limit.RateLimitService.RateLimit:input_type -> io.opensergo.plugin.proto.rate_limit.RateLimitRequest + 0, // 1: io.opensergo.plugin.proto.rate_limit.RateLimitService.RateLimit:output_type -> io.opensergo.plugin.proto.rate_limit.RateLimitResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_rate_limit_proto_init() } +func file_rate_limit_proto_init() { + if File_rate_limit_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_rate_limit_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RateLimitResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rate_limit_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RateLimitRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_rate_limit_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_rate_limit_proto_goTypes, + DependencyIndexes: file_rate_limit_proto_depIdxs, + MessageInfos: file_rate_limit_proto_msgTypes, + }.Build() + File_rate_limit_proto = out.File + file_rate_limit_proto_rawDesc = nil + file_rate_limit_proto_goTypes = nil + file_rate_limit_proto_depIdxs = nil +} diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit.proto b/pkg/plugin/proto/rate_limit/v1/rate_limit.proto new file mode 100644 index 0000000..ae3f908 --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package io.opensergo.plugin.proto.rate_limit; + +option go_package = "./"; +import "google/protobuf/any.proto"; + +message RateLimitResponse{ + int64 threshold = 1; +} + +message RateLimitRequest { + int64 threshold = 1; +} +service RateLimitService { + rpc RateLimit(RateLimitRequest) returns (RateLimitResponse); +} diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go b/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go new file mode 100644 index 0000000..94cc33e --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go @@ -0,0 +1,106 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: rate_limit.proto + +package __ + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// RateLimitServiceClient is the client API for RateLimitService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RateLimitServiceClient interface { + RateLimit(ctx context.Context, in *RateLimitRequest, opts ...grpc.CallOption) (*RateLimitResponse, error) +} + +type rateLimitServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewRateLimitServiceClient(cc grpc.ClientConnInterface) RateLimitServiceClient { + return &rateLimitServiceClient{cc} +} + +func (c *rateLimitServiceClient) RateLimit(ctx context.Context, in *RateLimitRequest, opts ...grpc.CallOption) (*RateLimitResponse, error) { + out := new(RateLimitResponse) + err := c.cc.Invoke(ctx, "/io.opensergo.plugin.proto.rate_limit.RateLimitService/RateLimit", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RateLimitServiceServer is the server API for RateLimitService service. +// All implementations must embed UnimplementedRateLimitServiceServer +// for forward compatibility +type RateLimitServiceServer interface { + RateLimit(context.Context, *RateLimitRequest) (*RateLimitResponse, error) + mustEmbedUnimplementedRateLimitServiceServer() +} + +// UnimplementedRateLimitServiceServer must be embedded to have forward compatible implementations. +type UnimplementedRateLimitServiceServer struct { +} + +func (UnimplementedRateLimitServiceServer) RateLimit(context.Context, *RateLimitRequest) (*RateLimitResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RateLimit not implemented") +} +func (UnimplementedRateLimitServiceServer) mustEmbedUnimplementedRateLimitServiceServer() {} + +// UnsafeRateLimitServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RateLimitServiceServer will +// result in compilation errors. +type UnsafeRateLimitServiceServer interface { + mustEmbedUnimplementedRateLimitServiceServer() +} + +func RegisterRateLimitServiceServer(s grpc.ServiceRegistrar, srv RateLimitServiceServer) { + s.RegisterService(&RateLimitService_ServiceDesc, srv) +} + +func _RateLimitService_RateLimit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RateLimitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RateLimitServiceServer).RateLimit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/io.opensergo.plugin.proto.rate_limit.RateLimitService/RateLimit", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RateLimitServiceServer).RateLimit(ctx, req.(*RateLimitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// RateLimitService_ServiceDesc is the grpc.ServiceDesc for RateLimitService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var RateLimitService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "io.opensergo.plugin.proto.rate_limit.RateLimitService", + HandlerType: (*RateLimitServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RateLimit", + Handler: _RateLimitService_RateLimit_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "rate_limit.proto", +} diff --git a/pkg/plugin/proto/stream/stream.pb.go b/pkg/plugin/proto/stream/v1/stream.pb.go similarity index 99% rename from pkg/plugin/proto/stream/stream.pb.go rename to pkg/plugin/proto/stream/v1/stream.pb.go index 58a57cb..d5db1af 100644 --- a/pkg/plugin/proto/stream/stream.pb.go +++ b/pkg/plugin/proto/stream/v1/stream.pb.go @@ -4,7 +4,7 @@ // protoc v3.19.4 // source: stream.proto -package stream +package v1 import ( reflect "reflect" diff --git a/pkg/plugin/proto/stream.proto b/pkg/plugin/proto/stream/v1/stream.proto similarity index 92% rename from pkg/plugin/proto/stream.proto rename to pkg/plugin/proto/stream/v1/stream.proto index 40a58f0..ad9045c 100644 --- a/pkg/plugin/proto/stream.proto +++ b/pkg/plugin/proto/stream/v1/stream.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package stream; -option go_package = "./stream"; +option go_package = "./"; //protoc --go_out=. --go-grpc_out=. stream.proto diff --git a/pkg/plugin/proto/stream/stream_grpc.pb.go b/pkg/plugin/proto/stream/v1/stream_grpc.pb.go similarity index 95% rename from pkg/plugin/proto/stream/stream_grpc.pb.go rename to pkg/plugin/proto/stream/v1/stream_grpc.pb.go index 8ead58b..71e435f 100644 --- a/pkg/plugin/proto/stream/stream_grpc.pb.go +++ b/pkg/plugin/proto/stream/v1/stream_grpc.pb.go @@ -4,7 +4,7 @@ // - protoc v3.19.4 // source: stream.proto -package stream +package v1 import ( context "context" @@ -19,10 +19,10 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -// StreamGreeterClient is the client API for StreamGreeter service. +// RateLimitServiceClient is the client API for StreamGreeter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type StreamGreeterClient interface { +type RateLimitServiceClient interface { Greet(ctx context.Context, in *StreamReq, opts ...grpc.CallOption) (*StreamResp, error) } @@ -30,7 +30,7 @@ type streamGreeterClient struct { cc grpc.ClientConnInterface } -func NewStreamGreeterClient(cc grpc.ClientConnInterface) StreamGreeterClient { +func NewStreamGreeterClient(cc grpc.ClientConnInterface) RateLimitServiceClient { return &streamGreeterClient{cc} } @@ -56,7 +56,7 @@ type UnimplementedStreamGreeterServer struct { } func (UnimplementedStreamGreeterServer) Greet(context.Context, *StreamReq) (*StreamResp, error) { - return nil, status.Errorf(codes.Unimplemented, "method Greet not implemented") + return nil, status.Errorf(codes.Unimplemented, "method RateLimit not implemented") } func (UnimplementedStreamGreeterServer) mustEmbedUnimplementedStreamGreeterServer() {} diff --git a/pkg/plugin/scripts/plugin.sh b/pkg/plugin/scripts/plugin.sh index 7c5babe..0adfee0 100644 --- a/pkg/plugin/scripts/plugin.sh +++ b/pkg/plugin/scripts/plugin.sh @@ -18,7 +18,7 @@ SOURCE="${BASH_SOURCE[0]}" while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done export DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" echo "DIR: $DIR" -# Create boundary plugins +# Create plugins echo "==> Building opensergo plugins..." rm -f $DIR/assets/opensergo-plugin-* for CURR_PLUGIN in $(ls $DIR/server); do diff --git a/pkg/plugin/server/ratelimit/main.go b/pkg/plugin/server/ratelimit/main.go new file mode 100644 index 0000000..9d66e99 --- /dev/null +++ b/pkg/plugin/server/ratelimit/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "os" + + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" + + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/plugin" +) + +func main() { + //log := hclog.New(&hclog.LoggerOptions{ + // Output: os.Stderr, + // Level: hclog.Trace, + // JSONFormat: true, + //}), plugin.WithLogger(log) + b := NewBuiltinPlugin() + if err := plugin.ServePlugin(b); err != nil { + fmt.Println("Error serving plugin", err) + os.Exit(1) + } + os.Exit(0) +} + +var ( + _ ratelimit_plugin.RateLimit = (*ratelimit_plugin.RateLimitPluginServer)(nil) +) + +type BuiltinPlugin struct { + *ratelimit_plugin.RateLimitPluginServer +} + +func NewBuiltinPlugin() *BuiltinPlugin { + return &BuiltinPlugin{ + RateLimitPluginServer: &ratelimit_plugin.RateLimitPluginServer{}, + } +} diff --git a/pkg/plugin/server/stream/main.go b/pkg/plugin/server/stream/main.go index fad44ca..be33d31 100644 --- a/pkg/plugin/server/stream/main.go +++ b/pkg/plugin/server/stream/main.go @@ -14,22 +14,24 @@ func main() { // Level: hclog.Trace, // JSONFormat: true, //}), plugin.WithLogger(log) - streamplugin := &stream_plugin.StreamPluginServer{} - if err := plugin.ServePlugin(streamplugin); err != nil { + b := NewStreamPlugin() + if err := plugin.ServePlugin(b); err != nil { fmt.Println("Error serving plugin", err) os.Exit(1) } os.Exit(0) } -// 为了加载多个plugin,现在不用这个,后面plugin多了 -// 就加在StreamPlugin中,然后使用StreamPlugin传入ServePlugin里 -type StreamPlugin struct { +var ( + _ stream_plugin.Stream = (*stream_plugin.StreamPluginServer)(nil) +) + +type BuiltinPlugin struct { *stream_plugin.StreamPluginServer } -func NewStreamPlugin() *StreamPlugin { - return &StreamPlugin{ - &stream_plugin.StreamPluginServer{}, +func NewStreamPlugin() *BuiltinPlugin { + return &BuiltinPlugin{ + StreamPluginServer: &stream_plugin.StreamPluginServer{}, } }