Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support unsubscribe #20

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func NewControlPlane() (*ControlPlane, error) {
return nil, err
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
handlers := []model.SubscribeRequestHandler{
cp.handleSubscribeRequest,
cp.handleUnSubscribeRequest,
}
cp.server = transport.NewServer(uint32(10246), handlers)
cp.operator = operator

hostname, herr := os.Hostname()
Expand Down Expand Up @@ -106,7 +110,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error {
if stream == nil {
return nil
}
return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: ack,
ControlPlane: c.protoDesc,
ResponseId: respId,
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_SUBSCRIBE != request.OpType {
return nil
}

//var labels []model.LabelKV
//if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
Expand Down Expand Up @@ -160,3 +181,50 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
return nil
}

// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK.
//
// 1.use ConnectionManager to remove from connectionMap for SubscribeTarget
// 2.use operator to UnRegisterWatcher for SubscribeTarget which will remove SubscribeTarget, delete crdCache, and remove CrdWatcher
func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
return nil
}

for _, kind := range request.Target.Kinds {
namespacedApp := model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
}
// remove the relation of Connection and SubscribeTarget from local cache
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)
if err != nil {
log.Printf("Remove map of Connection-SubscribeTarget failed, err=%s\n", err.Error())
status := &trpb.Status{
// TODO: defined a new errorCode
Code: transport.RegisterWatcherError,
Message: "Remove from watcher error",
Details: nil,
}
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
if err != nil {
// TODO: log here
log.Printf("sendMessageToStream failed, err=%s\n", err.Error())
}
continue
}

// UnRegisterWatcher for SubscribeTarget
err = c.operator.UnRegisterWatcher(model.SubscribeTarget{
Namespace: request.Target.Namespace,
AppName: request.Target.App,
Kind: kind,
})
if err != nil {
log.Printf("UnRegisterWatcher failed, err=%s\n", err.Error())
}
}

return nil
}
51 changes: 50 additions & 1 deletion pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strconv"
"sync"

"go.uber.org/atomic"

"github.com/go-logr/logr"
crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1"
crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic"
Expand Down Expand Up @@ -56,6 +58,7 @@ type CRDWatcher struct {
crdGenerator func() client.Object
sendDataHandler model.DataEntirePushHandler

deleted *atomic.Bool
updateMux sync.RWMutex
}

Expand Down Expand Up @@ -97,7 +100,36 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error {
}

func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
// TODO: implement me
// TODO: validate the target
if target.Kind != r.kind {
return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind)
}
r.updateMux.Lock()
defer r.updateMux.Unlock()

// remove from subscribedList
delete(r.subscribedList, target)

// if len(r.subscribedList) < 1 means there's no matched subscribeTarget in NamespacedApp,
// then delete subscribeTarget and crdCache which matched NamespacedApp
if len(r.subscribedList) < 1 {
delete(r.subscribedApps, target.NamespacedApp())
// TODO delete crdCache
//r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
// Namespace: target.Namespace,
// App: target.AppName,
//}, "")

// if len(r.subscribedApps) < 1 means there's no matched subscribeTarget in Namespace,
// then delete subscribeTarget and crdCache which matched Namespace
if len(r.subscribedApps) < 1 {
delete(r.subscribedNamespaces, target.Namespace)
// r.crdCache.DeleteByNamespacedName(model.NamespacedApp{
// Namespace: target.Namespace,
// App: target.AppName,
// }, "")
}
}

return nil
}
Expand All @@ -119,6 +151,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app model.NamespacedApp) bool {
}

func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// TODO optimize the logic of destroy the current controller
// r.deleted.Load() is a flag marked the deleted status of controller
// can not destroy the current controller
//
if r.deleted.Load() {
return ctrl.Result{}, nil
}

if !r.HasAnySubscribedOfNamespace(req.Namespace) {
// Ignore unmatched namespace
return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil
Expand Down Expand Up @@ -216,9 +256,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) {
}

func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(false)
return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r)
}

func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(true)
return nil
}

func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) {
var packRule *anypb.Any
var err error
Expand Down Expand Up @@ -338,6 +386,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
subscribedNamespaces: make(map[string]bool),
subscribedApps: make(map[model.NamespacedApp]bool),
crdGenerator: crdGenerator,
deleted: atomic.NewBool(false),
crdCache: NewCRDCache(kind),
sendDataHandler: sendDataHandler,
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD

existingWatcher, exists := k.controllers[target.Kind]
if exists {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
if existingWatcher.HasSubscribed(target) {
// Target has been subscribed
return existingWatcher, nil
Expand Down Expand Up @@ -168,6 +170,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {

existingWatcher, exists := k.controllers[target.Kind]
if exists && !existingWatcher.HasSubscribed(target) {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
// TODO: think more about here
err = existingWatcher.AddSubscribeTarget(target)
if err != nil {
Expand Down Expand Up @@ -200,6 +204,44 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
return nil
}

// UnRegisterWatcher unRegisters given SubscribeTarget.
func (k *KubernetesOperator) UnRegisterWatcher(target model.SubscribeTarget) error {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

existingWatcher, exists := k.controllers[target.Kind]
if !exists {
return nil
}

if existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) {
err := existingWatcher.RemoveSubscribeTarget(target)
if err != nil {
return err
}
if !existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) {
if err = k.removeWatcher(existingWatcher, target); err != nil {
return err
}
}
} else {
if err := k.removeWatcher(existingWatcher, target); err != nil {
return err
}
}

return nil
}

func (k *KubernetesOperator) removeWatcher(crdWatcher *CRDWatcher, target model.SubscribeTarget) error {
delete(k.controllers, target.Kind)
// TODO add Shutdown logic
if err := crdWatcher.ShutdownWithManager(k.crdManager); err != nil {
return err
}
return nil
}

// Close exit the K8S KubernetesOperator
func (k *KubernetesOperator) Close() error {
k.ctxCancel()
Expand Down
30 changes: 30 additions & 0 deletions pkg/transport/grpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,26 @@ func (c *ConnectionManager) removeInternal(n model.NamespacedApp, kind string, i
return nil
}
delete(streams, identifier)

streams, exists = kindMap[kind]
// !exists || streams == nil || len(streams) < 1
// means after delete, there is no elements in kindMap[kind]
// then delete kind from kindMap
if !exists || streams == nil || len(streams) < 1 {
delete(kindMap, kind)
}
kindMap, exists = c.connectionMap[n]
// !exists || kindMap == nil || len(kindMap) < 1
// means after delete, there is no elements in c.connectionMap[n]
// then delete n from c.connectionMap[n]
if !exists || kindMap == nil || len(kindMap) < 1 {
delete(c.connectionMap, n)
}

return nil
}

// RemoveByIdentifier with a sync.RWMutex
func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()
Expand All @@ -143,6 +160,7 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
if !exists {
return nil
}
// remove from connectionMap
for n, kinds := range NamespaceAppKinds {
for _, kind := range kinds {
err := c.removeInternal(n, kind, identifier)
Expand All @@ -154,6 +172,18 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
return nil
}

// RemoveWithIdentifier with a sync.RWMutex
func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()
// remove from connectionMap
err := c.removeInternal(namespacedApp, kind, identifier)
if err != nil {
return err
}
return nil
}

func NewConnectionManager() *ConnectionManager {
return &ConnectionManager{
connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap),
Expand Down
4 changes: 4 additions & 0 deletions pkg/transport/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (s *Server) ComponentName() string {
return "OpenSergoUniversalTransportServer"
}

func (s *Server) IsStarted() bool {
return s.started.Load()
}

func (s *Server) Run() error {
if s.started.CAS(false, true) {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
Expand Down