Skip to content

Commit

Permalink
support unsubscribe. (opensergo#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiangnan Jia <[email protected]>
  • Loading branch information
jnan806 committed Nov 7, 2022
1 parent 91ef7e9 commit 07d9a80
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 2 deletions.
139 changes: 138 additions & 1 deletion control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package opensergo

import (
"os"
"reflect"
"sync"
"time"

"github.com/opensergo/opensergo-control-plane/pkg/controller"
"github.com/opensergo/opensergo-control-plane/pkg/model"
Expand All @@ -42,7 +44,8 @@ func NewControlPlane() (*ControlPlane, error) {
return nil, err
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
handlers := []model.SubscribeRequestHandler{cp.handleSubscribeRequest, cp.handleUnSubscribeRequest}
cp.server = transport.NewServer(uint32(10246), handlers)
cp.operator = operator

hostname, herr := os.Hostname()
Expand All @@ -61,6 +64,10 @@ func (c *ControlPlane) Start() error {
if err != nil {
return err
}

// start the delete-connection goroutine
c.delConn()

// Run the transport server
err = c.server.Run()
if err != nil {
Expand Down Expand Up @@ -105,7 +112,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error {
if stream == nil {
return nil
}
return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: ack,
ControlPlane: c.protoDesc,
ResponseId: respId,
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_SUBSCRIBE != request.OpType {
return nil
}

//var labels []model.LabelKV
//if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
Expand Down Expand Up @@ -157,3 +181,116 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
return nil
}

func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
return nil
}

for _, kind := range request.Target.Kinds {
namespacedApp := model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
}
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)

if err != nil {
status := &trpb.Status{
// TODO: defined a new errorCode
Code: transport.RegisterWatcherError,
Message: "Remove from watcher error",
Details: nil,
}
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
if err != nil {
// TODO: log here
}
continue
}

targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind)
if len(targetConnections) < 1 {
delSubscribeConnChan <- delSubscribeConnInfo{
stream: stream,
request: request,
namespaceApp: namespacedApp,
kind: kind,
}
} else {
existConnection := c.server.ConnectionManager().ExistConnection(kind)
if !existConnection {
delSubscribeConnChan <- delSubscribeConnInfo{
stream: stream,
request: request,
namespaceApp: model.NamespacedApp{},
kind: kind,
}
}
}
}

return nil
}

var delSubscribeConnChan chan delSubscribeConnInfo

type delSubscribeConnInfo struct {
stream model.OpenSergoTransportStream
request *trpb.SubscribeRequest
namespaceApp model.NamespacedApp
kind string
}

func (c *ControlPlane) delConn() {
go func() {
for !c.server.IsStarted() {
time.Sleep(time.Duration(1) * time.Second)
}

currDelConnInfo := <-delSubscribeConnChan
namespaceApp := currDelConnInfo.namespaceApp
kind := currDelConnInfo.kind
request := currDelConnInfo.request
stream := currDelConnInfo.stream

// TODO make time of sleep is configurable
time.Sleep(time.Duration(5) * time.Second)
var err error

// RemoveSubscribeTarget from CRDWatcher
if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) {
targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind)
if len(targetConnections) < 1 {
if crdWatcher, existed := c.operator.GetWatcher(kind); existed {
err = crdWatcher.RemoveSubscribeTarget(model.SubscribeTarget{
Namespace: namespaceApp.Namespace,
AppName: namespaceApp.App,
Kind: kind,
})
}
}
}

// delete Connection and CRDWatch
existConnection := c.server.ConnectionManager().ExistConnection(kind)
if !existConnection {
c.operator.RemoveWatcher(model.SubscribeTarget{
Namespace: namespaceApp.Namespace,
AppName: namespaceApp.App,
Kind: kind,
})
}

// send ackMessage
status := &trpb.Status{
Code: transport.Success,
Message: "unSubscribe success",
Details: nil,
}
err = c.sendAckToStream(stream, transport.ACKFlag, status, request.RequestId)
if err != nil {
// TODO: log here
}
}()
}
36 changes: 35 additions & 1 deletion pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"context"
"go.uber.org/atomic"
"log"
"net/http"
"strconv"
Expand Down Expand Up @@ -51,6 +52,7 @@ type CRDWatcher struct {
crdGenerator func() client.Object
sendDataHandler model.DataEntirePushHandler

deleted *atomic.Bool
crdCache *CRDCache
updateMux sync.RWMutex
}
Expand Down Expand Up @@ -93,7 +95,22 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error {
}

func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
// TODO: implement me
// TODO: validate the target
if target.Kind != r.kind {
return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind)
}
r.updateMux.Lock()
defer r.updateMux.Unlock()

delete(r.subscribedList, target)
delete(r.subscribedNamespaces, target.Namespace)
delete(r.subscribedApps, target.AppName)

// TODO the 2nd param need fix to correct.
r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
Namespace: target.Namespace,
App: target.AppName,
}, "")

return nil
}
Expand All @@ -115,6 +132,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app string) bool {
}

func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// TODO optimize the logic of destroy the current controller
// r.deleted.Load() is a flag marked the deleted status of controller
// can not destroy the current controller
//
if r.deleted.Load() {
return ctrl.Result{}, nil
}

if r.HasAnySubscribedOfApp(req.Namespace) {
// Ignore unmatched namespace
return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil
Expand Down Expand Up @@ -214,9 +239,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) {
}

func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(false)
return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r)
}

func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(true)
return nil
}

func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) {
var packRule *anypb.Any
var err error
Expand Down Expand Up @@ -333,6 +366,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
subscribedNamespaces: make(map[string]bool),
subscribedApps: make(map[string]bool),
crdGenerator: crdGenerator,
deleted: atomic.NewBool(false),
crdCache: NewCRDCache(kind),
sendDataHandler: sendDataHandler,
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD

existingWatcher, exists := k.controllers[target.Kind]
if exists {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
if existingWatcher.HasSubscribed(target) {
// Target has been subscribed
return existingWatcher, nil
Expand Down Expand Up @@ -164,6 +166,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {

existingWatcher, exists := k.controllers[target.Kind]
if exists && !existingWatcher.HasSubscribed(target) {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
// TODO: think more about here
err = existingWatcher.AddSubscribeTarget(target)
if err != nil {
Expand Down Expand Up @@ -196,6 +200,22 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
return nil
}

func (k *KubernetesOperator) RemoveWatcher(target model.SubscribeTarget) error {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

crdWatch, exists := k.controllers[target.Kind]
if exists {
err := crdWatch.ShutdownWithManager(k.crdManager)
if err != nil {
// TODO add log
}
delete(k.controllers, target.Kind)
}

return nil
}

// Close exit the K8S KubernetesOperator
func (k *KubernetesOperator) Close() error {
k.ctxCancel()
Expand Down
21 changes: 21 additions & 0 deletions pkg/transport/grpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
return nil
}

func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()

err := c.removeInternal(namespacedApp, kind, identifier)
if err != nil {
return err
}

return nil
}

func (c *ConnectionManager) ExistConnection(kind string) bool {
for _, kindConnections := range c.connectionMap {
connectionMap := kindConnections[kind]
return len(connectionMap) > 0
}

return false
}

func NewConnectionManager() *ConnectionManager {
return &ConnectionManager{
connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap),
Expand Down
4 changes: 4 additions & 0 deletions pkg/transport/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *Server) ComponentName() string {
return "OpenSergoUniversalTransportServer"
}

func (s *Server) IsStarted() bool {
return s.started.Load()
}

func (s *Server) Run() error {
if s.started.CAS(false, true) {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
Expand Down

0 comments on commit 07d9a80

Please sign in to comment.