diff --git a/control_plane.go b/control_plane.go index 8e0686e..3a5ff9c 100644 --- a/control_plane.go +++ b/control_plane.go @@ -17,7 +17,9 @@ package opensergo import ( "log" "os" + "reflect" "sync" + "time" "github.com/opensergo/opensergo-control-plane/pkg/controller" "github.com/opensergo/opensergo-control-plane/pkg/model" @@ -43,7 +45,11 @@ func NewControlPlane() (*ControlPlane, error) { return nil, err } - cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) + handlers := []model.SubscribeRequestHandler{ + cp.handleSubscribeRequest, + cp.handleUnSubscribeRequest, + } + cp.server = transport.NewServer(uint32(10246), handlers) cp.operator = operator hostname, herr := os.Hostname() @@ -62,6 +68,10 @@ func (c *ControlPlane) Start() error { if err != nil { return err } + + // start the delete-connection goroutine + c.delConn() + // Run the transport server err = c.server.Run() if err != nil { @@ -106,7 +116,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream }) } +func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error { + if stream == nil { + return nil + } + return stream.SendMsg(&trpb.SubscribeResponse{ + Status: status, + Ack: ack, + ControlPlane: c.protoDesc, + ResponseId: respId, + }) +} + func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_SUBSCRIBE != request.OpType { + return nil + } + //var labels []model.LabelKV //if request.Target.Labels != nil { // for _, label := range request.Target.Labels { @@ -160,3 +187,140 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent } return nil } + +// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK. +// +// 1.remove cache of SubscribeTarget in Connection +// 2.remove watcher if there is no SubscribeTarget for the same kind in Connection cache. +func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType { + return nil + } + + for _, kind := range request.Target.Kinds { + namespacedApp := model.NamespacedApp{ + Namespace: request.Target.Namespace, + App: request.Target.App, + } + // remove the relation of Connection and SubscribeTarget from local cache + err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier) + + if err != nil { + status := &trpb.Status{ + // TODO: defined a new errorCode + Code: transport.RegisterWatcherError, + Message: "Remove from watcher error", + Details: nil, + } + err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId) + if err != nil { + // TODO: log here + } + continue + } + + // handle the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. only push into a chan named delSubscribeConnChan waiting for delete. + // 1. if there is no kind cached in current Connection, push subscribeConnInfo with an empty NamespaceApp into delSubscribeConnChan, + // then, will delete the watcher for CRD. + // 2. if the number of relation between Connection and SubscribeTarget < 1, then push subscribeConnInfo with current NamespaceApp into delSubscribeConnChan, + // then 1st, will delete the SubscribeTarget which is cached in current Connection + // 2nd, will delte the watcher for CRD if there's no kind cached in current Connection + existConnection := c.server.ConnectionManager().ExistConnection(namespacedApp.Namespace, namespacedApp.App, kind) + if !existConnection { + delSubscribeConnChan <- delSubscribeConnInfo{ + stream: stream, + request: request, + namespaceApp: model.NamespacedApp{}, + kind: kind, + } + } else { + targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind) + if len(targetConnections) < 1 { + delSubscribeConnChan <- delSubscribeConnInfo{ + stream: stream, + request: request, + namespaceApp: namespacedApp, + kind: kind, + } + } + } + } + + return nil +} + +// delSubscribeConnChan a chan for delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher to stop watching. +var delSubscribeConnChan chan delSubscribeConnInfo + +type delSubscribeConnInfo struct { + stream model.OpenSergoTransportStream + request *trpb.SubscribeRequest + namespaceApp model.NamespacedApp + kind string +} + +// delConn a goroutine contains the logic of delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. +// +// 1. at the beginning of current goroutine, should wait for the status of server is started. +// +// 2. when receive a delSubscribeConnInfo from delSubscribeConnChan, waiting a silence time to prevent inaccurate data statistics caused by network jitter. +// +// after the silence time, is the actually logic of deleting local cache and removing watcher. +func (c *ControlPlane) delConn() { + go func() { + // waiting for server is started. + for !c.server.IsStarted() { + time.Sleep(time.Duration(1) * time.Second) + } + + // receive from delSubscribeConnChan + currDelConnInfo := <-delSubscribeConnChan + namespaceApp := currDelConnInfo.namespaceApp + kind := currDelConnInfo.kind + request := currDelConnInfo.request + stream := currDelConnInfo.stream + + // wait a silence for network jitter + // TODO make time of sleep is configurable + time.Sleep(time.Duration(5) * time.Second) + var err error + + // RemoveSubscribeTarget from CRDWatcher + // if namespaceApp is not an empty struct, means that need to delete SubscribeTarget cache in CRDWatcher + if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) { + // re-count the number of SubscribeTarget + targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind) + if len(targetConnections) < 1 { + if crdWatcher, existed := c.operator.GetWatcher(kind); existed { + err = crdWatcher.RemoveSubscribeTarget(model.SubscribeTarget{ + Namespace: namespaceApp.Namespace, + AppName: namespaceApp.App, + Kind: kind, + }) + } + } + } + + // remove the CRDWatch from KubernetesOperator, to stop watching the kind. + existConnection := c.server.ConnectionManager().ExistConnection(namespaceApp.Namespace, namespaceApp.App, kind) + if !existConnection { + c.operator.RemoveWatcher(model.SubscribeTarget{ + Namespace: namespaceApp.Namespace, + AppName: namespaceApp.App, + Kind: kind, + }) + } + + // send ackMessage for UnSubscribeConfig request. + status := &trpb.Status{ + Code: transport.Success, + Message: "unSubscribe success", + Details: nil, + } + err = c.sendAckToStream(stream, transport.ACKFlag, status, request.RequestId) + if err != nil { + // TODO: log here + } + }() +} diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index bcc42ad..243e193 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,6 +16,7 @@ package controller import ( "context" + "go.uber.org/atomic" "log" "net/http" "strconv" @@ -56,6 +57,7 @@ type CRDWatcher struct { crdGenerator func() client.Object sendDataHandler model.DataEntirePushHandler + deleted *atomic.Bool updateMux sync.RWMutex } @@ -97,7 +99,24 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error { } func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { - // TODO: implement me + // TODO: validate the target + if target.Kind != r.kind { + return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind) + } + r.updateMux.Lock() + defer r.updateMux.Unlock() + + // remove the subscribe-cache in this CRDWatcher + delete(r.subscribedList, target) + delete(r.subscribedNamespaces, target.Namespace) + delete(r.subscribedApps, target.NamespacedApp()) + + // delete the matched crdCache which comes from k8s + // TODO the 2nd param need fix to correct. + r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{ + Namespace: target.Namespace, + App: target.AppName, + }, "") return nil } @@ -119,6 +138,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app model.NamespacedApp) bool { } func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // TODO optimize the logic of destroy the current controller + // r.deleted.Load() is a flag marked the deleted status of controller + // can not destroy the current controller + // + if r.deleted.Load() { + return ctrl.Result{}, nil + } + if !r.HasAnySubscribedOfNamespace(req.Namespace) { // Ignore unmatched namespace return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil @@ -216,9 +243,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) { } func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(false) return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r) } +func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(true) + return nil +} + func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) { var packRule *anypb.Any var err error @@ -338,6 +373,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat subscribedNamespaces: make(map[string]bool), subscribedApps: make(map[model.NamespacedApp]bool), crdGenerator: crdGenerator, + deleted: atomic.NewBool(false), crdCache: NewCRDCache(kind), sendDataHandler: sendDataHandler, } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 8ae15ad..4e62def 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -129,6 +129,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD existingWatcher, exists := k.controllers[target.Kind] if exists { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) if existingWatcher.HasSubscribed(target) { // Target has been subscribed return existingWatcher, nil @@ -168,6 +170,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { existingWatcher, exists := k.controllers[target.Kind] if exists && !existingWatcher.HasSubscribed(target) { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) // TODO: think more about here err = existingWatcher.AddSubscribeTarget(target) if err != nil { @@ -200,6 +204,22 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { return nil } +func (k *KubernetesOperator) RemoveWatcher(target model.SubscribeTarget) error { + k.controllerMux.Lock() + defer k.controllerMux.Unlock() + + crdWatch, exists := k.controllers[target.Kind] + if exists { + err := crdWatch.ShutdownWithManager(k.crdManager) + if err != nil { + // TODO add log + } + delete(k.controllers, target.Kind) + } + + return nil +} + // Close exit the K8S KubernetesOperator func (k *KubernetesOperator) Close() error { k.ctxCancel() diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index f8f9d56..c71d541 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -154,6 +154,26 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier return nil } +func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { + c.updateMux.Lock() + defer c.updateMux.Unlock() + + err := c.removeInternal(namespacedApp, kind, identifier) + if err != nil { + return err + } + + return nil +} + +func (c *ConnectionManager) ExistConnection(namespace, app, kind string) bool { + connections, success := c.Get(namespace, app, kind) + if success && len(connections) > 0 { + return true + } + return false +} + func NewConnectionManager() *ConnectionManager { return &ConnectionManager{ connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap), diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index fc43adc..b01b836 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -61,6 +61,10 @@ func (s *Server) ComponentName() string { return "OpenSergoUniversalTransportServer" } +func (s *Server) IsStarted() bool { + return s.started.Load() +} + func (s *Server) Run() error { if s.started.CAS(false, true) { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))