From 263a15da5a72f84bcb60844380c4183e64989012 Mon Sep 17 00:00:00 2001 From: Jiangnan Jia Date: Fri, 4 Nov 2022 00:50:20 +0800 Subject: [PATCH] support unsubscribe. (#7) Signed-off-by: Jiangnan Jia --- control_plane.go | 139 ++++++++++++++++++++++++++++++- pkg/controller/crd_watcher.go | 36 +++++++- pkg/controller/k8s_operator.go | 20 +++++ pkg/transport/grpc/connection.go | 21 +++++ pkg/transport/grpc/server.go | 4 + 5 files changed, 218 insertions(+), 2 deletions(-) diff --git a/control_plane.go b/control_plane.go index 1cedfec..890a7aa 100644 --- a/control_plane.go +++ b/control_plane.go @@ -16,7 +16,9 @@ package opensergo import ( "os" + "reflect" "sync" + "time" "github.com/opensergo/opensergo-control-plane/pkg/controller" "github.com/opensergo/opensergo-control-plane/pkg/model" @@ -42,7 +44,8 @@ func NewControlPlane() (*ControlPlane, error) { return nil, err } - cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) + handlers := []model.SubscribeRequestHandler{cp.handleSubscribeRequest, cp.handleUnSubscribeRequest} + cp.server = transport.NewServer(uint32(10246), handlers) cp.operator = operator hostname, herr := os.Hostname() @@ -61,6 +64,10 @@ func (c *ControlPlane) Start() error { if err != nil { return err } + + // start the delete-connection goroutine + c.delConn() + // Run the transport server err = c.server.Run() if err != nil { @@ -105,7 +112,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream }) } +func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error { + if stream == nil { + return nil + } + return stream.SendMsg(&trpb.SubscribeResponse{ + Status: status, + Ack: ack, + ControlPlane: c.protoDesc, + ResponseId: respId, + }) +} + func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_SUBSCRIBE != request.OpType { + return nil + } + //var labels []model.LabelKV //if request.Target.Labels != nil { // for _, label := range request.Target.Labels { @@ -157,3 +181,116 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent } return nil } + +func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType { + return nil + } + + for _, kind := range request.Target.Kinds { + namespacedApp := model.NamespacedApp{ + Namespace: request.Target.Namespace, + App: request.Target.App, + } + err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier) + + if err != nil { + status := &trpb.Status{ + // TODO: defined a new errorCode + Code: transport.RegisterWatcherError, + Message: "Remove from watcher error", + Details: nil, + } + err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId) + if err != nil { + // TODO: log here + } + continue + } + + targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind) + if len(targetConnections) < 1 { + delSubscribeConnChan <- delSubscribeConnInfo{ + stream: stream, + request: request, + namespaceApp: namespacedApp, + kind: kind, + } + } else { + existConnection := c.server.ConnectionManager().ExistConnection(kind) + if !existConnection { + delSubscribeConnChan <- delSubscribeConnInfo{ + stream: stream, + request: request, + namespaceApp: model.NamespacedApp{}, + kind: kind, + } + } + } + } + + return nil +} + +var delSubscribeConnChan chan delSubscribeConnInfo + +type delSubscribeConnInfo struct { + stream model.OpenSergoTransportStream + request *trpb.SubscribeRequest + namespaceApp model.NamespacedApp + kind string +} + +func (c *ControlPlane) delConn() { + go func() { + for !c.server.IsStarted() { + time.Sleep(time.Duration(1) * time.Second) + } + + currDelConnInfo := <-delSubscribeConnChan + namespaceApp := currDelConnInfo.namespaceApp + kind := currDelConnInfo.kind + request := currDelConnInfo.request + stream := currDelConnInfo.stream + + // TODO make time of sleep is configurable + time.Sleep(time.Duration(5) * time.Second) + var err error + + // RemoveSubscribeTarget from CRDWatcher + if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) { + targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind) + if len(targetConnections) < 1 { + if crdWatcher, existed := c.operator.GetWatcher(kind); existed { + err = crdWatcher.RemoveSubscribeTarget(model.SubscribeTarget{ + Namespace: namespaceApp.Namespace, + AppName: namespaceApp.App, + Kind: kind, + }) + } + } + } + + // delete Connection and CRDWatch + existConnection := c.server.ConnectionManager().ExistConnection(kind) + if !existConnection { + c.operator.RemoveWatcher(model.SubscribeTarget{ + Namespace: namespaceApp.Namespace, + AppName: namespaceApp.App, + Kind: kind, + }) + } + + // send ackMessage + status := &trpb.Status{ + Code: transport.Success, + Message: "unSubscribe success", + Details: nil, + } + err = c.sendAckToStream(stream, transport.ACKFlag, status, request.RequestId) + if err != nil { + // TODO: log here + } + }() +} diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index 7017420..73a5e05 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,6 +16,7 @@ package controller import ( "context" + "go.uber.org/atomic" "log" "net/http" "strconv" @@ -55,6 +56,7 @@ type CRDWatcher struct { crdGenerator func() client.Object sendDataHandler model.DataEntirePushHandler + deleted *atomic.Bool updateMux sync.RWMutex } @@ -96,7 +98,22 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error { } func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { - // TODO: implement me + // TODO: validate the target + if target.Kind != r.kind { + return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind) + } + r.updateMux.Lock() + defer r.updateMux.Unlock() + + delete(r.subscribedList, target) + delete(r.subscribedNamespaces, target.Namespace) + delete(r.subscribedApps, target.AppName) + + // TODO the 2nd param need fix to correct. + r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{ + Namespace: target.Namespace, + App: target.AppName, + }, "") return nil } @@ -118,6 +135,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app string) bool { } func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // TODO optimize the logic of destroy the current controller + // r.deleted.Load() is a flag marked the deleted status of controller + // can not destroy the current controller + // + if r.deleted.Load() { + return ctrl.Result{}, nil + } + if r.HasAnySubscribedOfApp(req.Namespace) { // Ignore unmatched namespace return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil @@ -217,9 +242,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) { } func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(false) return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r) } +func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(true) + return nil +} + func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) { var packRule *anypb.Any var err error @@ -336,6 +369,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat subscribedNamespaces: make(map[string]bool), subscribedApps: make(map[string]bool), crdGenerator: crdGenerator, + deleted: atomic.NewBool(false), crdCache: NewCRDCache(kind), sendDataHandler: sendDataHandler, } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 96f628e..95cf330 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -125,6 +125,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD existingWatcher, exists := k.controllers[target.Kind] if exists { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) if existingWatcher.HasSubscribed(target) { // Target has been subscribed return existingWatcher, nil @@ -164,6 +166,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { existingWatcher, exists := k.controllers[target.Kind] if exists && !existingWatcher.HasSubscribed(target) { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) // TODO: think more about here err = existingWatcher.AddSubscribeTarget(target) if err != nil { @@ -196,6 +200,22 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { return nil } +func (k *KubernetesOperator) RemoveWatcher(target model.SubscribeTarget) error { + k.controllerMux.Lock() + defer k.controllerMux.Unlock() + + crdWatch, exists := k.controllers[target.Kind] + if exists { + err := crdWatch.ShutdownWithManager(k.crdManager) + if err != nil { + // TODO add log + } + delete(k.controllers, target.Kind) + } + + return nil +} + // Close exit the K8S KubernetesOperator func (k *KubernetesOperator) Close() error { k.ctxCancel() diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index f8f9d56..c0377d8 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -154,6 +154,27 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier return nil } +func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { + c.updateMux.Lock() + defer c.updateMux.Unlock() + + err := c.removeInternal(namespacedApp, kind, identifier) + if err != nil { + return err + } + + return nil +} + +func (c *ConnectionManager) ExistConnection(kind string) bool { + for _, kindConnections := range c.connectionMap { + connectionMap := kindConnections[kind] + return len(connectionMap) > 0 + } + + return false +} + func NewConnectionManager() *ConnectionManager { return &ConnectionManager{ connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap), diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index 8e1f76c..6cbc279 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -60,6 +60,10 @@ func (s *Server) ComponentName() string { return "OpenSergoUniversalTransportServer" } +func (s *Server) IsStarted() bool { + return s.started.Load() +} + func (s *Server) Run() error { if s.started.CAS(false, true) { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))