Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription server use ConfigMap as persistent storage as well as IPC #96

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/cmd/server/start_alarm_subscription_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/openshift-kni/oran-o2ims/internal/authentication"
"github.com/openshift-kni/oran-o2ims/internal/authorization"
"github.com/openshift-kni/oran-o2ims/internal/exit"
"github.com/openshift-kni/oran-o2ims/internal/k8s"
"github.com/openshift-kni/oran-o2ims/internal/logging"
"github.com/openshift-kni/oran-o2ims/internal/metrics"
"github.com/openshift-kni/oran-o2ims/internal/network"
Expand Down Expand Up @@ -205,13 +206,28 @@ func (c *AlarmSubscriptionServerCommand) run(cmd *cobra.Command, argv []string)
})
router.Use(metricsWrapper, authenticationWrapper, authorizationWrapper)

// create k8s client with kube(from env first)
//var config *rest.Config
kubeClient, err := k8s.NewClient().SetLogger(logger).SetLoggingWrapper(loggingWrapper).Build()

if err != nil {
logger.ErrorContext(
ctx,
"Failed to create kubeClient",
"error", err,
)
return exit.Error(1)
}

// Create the handler:
handler, err := service.NewAlarmSubscriptionHandler().
SetLogger(logger).
SetLoggingWrapper(loggingWrapper).
SetCloudID(cloudID).
SetExtensions(extensions...).
Build()
SetKubeClient(kubeClient).
Build(ctx)

if err != nil {
logger.ErrorContext(
ctx,
Expand Down
14 changes: 14 additions & 0 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clnt "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// Client builder contains the data and logic needed to create a Kubernetes API client that
Expand All @@ -53,6 +55,13 @@ type Client struct {
delegate clnt.WithWatch
}

// set fake client to be used for test.go
func NewFakeClient() *Client {
client := Client{}
client.delegate = fake.NewFakeClient()
return &client
}

// NewClient creates a builder that can then be used to configure and create a Kubernetes API client
// that implements the controller-runtime WithWatch interface.
func NewClient() *ClientBuilder {
Expand Down Expand Up @@ -285,6 +294,11 @@ func (c *Client) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return c.delegate.IsObjectNamespaced(obj)
}

// watch
func (c *Client) Watch(ctx context.Context, obj clnt.ObjectList, opts ...clnt.ListOption) (watch.Interface, error) {
return c.delegate.Watch(ctx, obj, opts...)
Jennifer-chen-rh marked this conversation as resolved.
Show resolved Hide resolved
}

// Close closes the client and releases all the resources it is using. It is specially important to
// call this method when the client is using as SSH tunnel, as otherwise the tunnel will remain
// open.
Expand Down
38 changes: 38 additions & 0 deletions internal/persiststorage/persiststorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package persiststorage

import (
"context"
"errors"
"sync"

"github.com/openshift-kni/oran-o2ims/internal/data"
)

var ErrNotFound = errors.New("not found")

// interface for persistent storage
type StorageOperations interface {
//notification from db to application about db entry changes
//currently assume the notification is granular to indivial entry
ReadEntry(ctx context.Context, key string) (value string, err error)
AddEntry(ctx context.Context, key string, value string) (err error)
DeleteEntry(ctx context.Context, key string) (err error)
ReadAllEntries(ctx context.Context) (result map[string]data.Object, err error)
ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error)
}

func Add(so StorageOperations, ctx context.Context, key string, value string) (err error) {
return so.AddEntry(ctx, key, value)
}
func Get(so StorageOperations, ctx context.Context, key string) (value string, err error) {
return so.ReadEntry(ctx, key)
}
func GetAll(so StorageOperations, ctx context.Context) (result map[string]data.Object, err error) {
return so.ReadAllEntries(ctx)
}
func Delete(so StorageOperations, ctx context.Context, key string) (err error) {
return so.DeleteEntry(ctx, key)
}
func ProcessChanges(so StorageOperations, ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) {
return so.ProcessChanges(ctx, dataMap, lock)
}
251 changes: 251 additions & 0 deletions internal/persiststorage/persiststorage_configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package persiststorage

import (
"context"
"sync"

jsoniter "github.com/json-iterator/go"
"github.com/openshift-kni/oran-o2ims/internal/data"
"github.com/openshift-kni/oran-o2ims/internal/k8s"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
clnt "sigs.k8s.io/controller-runtime/pkg/client"
)

type KubeConfigMapStore struct {
nameSpace string
name string
fieldOwner string
jsonAPI *jsoniter.API
client *k8s.Client
}

func NewKubeConfigMapStore() *KubeConfigMapStore {
return &KubeConfigMapStore{}
}

func (b *KubeConfigMapStore) SetNameSpace(
ns string) *KubeConfigMapStore {
b.nameSpace = ns
return b
}
func (b *KubeConfigMapStore) SetName(
name string) *KubeConfigMapStore {
b.name = name
return b
}

func (b *KubeConfigMapStore) SetFieldOwnder(
owner string) *KubeConfigMapStore {
b.fieldOwner = owner
return b
}

func (b *KubeConfigMapStore) SetJsonAPI(
jsonAPI *jsoniter.API) *KubeConfigMapStore {
b.jsonAPI = jsonAPI
return b
}

func (b *KubeConfigMapStore) SetClient(
client *k8s.Client) *KubeConfigMapStore {
b.client = client
return b
}

// k8s configmap methods
func (s *KubeConfigMapStore) AddEntry(ctx context.Context, entryKey string, value string) (err error) {
//test to read the configmap
configmap := &corev1.ConfigMap{}

key := clnt.ObjectKey{
Namespace: s.nameSpace,
Name: s.name,
}
err = (*s.client).Get(ctx, key, configmap)

if err != nil && !apierrors.IsNotFound(err) {
return
}

savedData := configmap.Data

//configmap does not exist
if savedData == nil {
savedData = map[string]string{}
}
savedData[entryKey] = value

configmap = &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: s.nameSpace,
Name: s.name,
},
Data: savedData,
}

err = (*s.client).Patch(ctx, configmap, clnt.Apply, clnt.ForceOwnership, clnt.FieldOwner(s.fieldOwner))

return
}

func (s *KubeConfigMapStore) DeleteEntry(ctx context.Context, entryKey string) (err error) {
//test to read the configmap
configmap := &corev1.ConfigMap{}

key := clnt.ObjectKey{
Namespace: s.nameSpace,
Name: s.name,
}
err = (*s.client).Get(ctx, key, configmap)

if err != nil && !apierrors.IsNotFound(err) {
//there is error and err is not notfound error
//panic("unexpected error")
return
}

if configmap.Data == nil {
return
}

_, ok := configmap.Data[entryKey]

//entry not found
if !ok {
return
}
delete(configmap.Data, entryKey)

configmap = &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: s.nameSpace,
Name: s.name,
},
Data: configmap.Data,
}
// Note: there is a case configmap is empty shall we remove the configmap
err = (*s.client).Patch(ctx, configmap, clnt.Apply, clnt.ForceOwnership, clnt.FieldOwner(s.fieldOwner))
return
}

func (s *KubeConfigMapStore) ReadEntry(ctx context.Context, entryKey string) (value string, err error) {

//test to read the configmap
configmap := &corev1.ConfigMap{}

key := clnt.ObjectKey{
Namespace: s.nameSpace,
Name: s.name,
}
err = (*s.client).Get(ctx, key, configmap)

if err != nil && !apierrors.IsNotFound(err) {
//there is error and err is not notfound error
//panic("unexpected error")
return
}

err = nil
if configmap.Data == nil {
return
}

value, ok := configmap.Data[entryKey]
if !ok {
err = ErrNotFound
}
return
}

// The handler read to DB and build/recovery datastructures in memory
func (s *KubeConfigMapStore) ReadAllEntries(ctx context.Context) (result map[string]data.Object, err error) {
result = map[string]data.Object{}

//test to read the configmap
configmap := &corev1.ConfigMap{}

key := clnt.ObjectKey{
Namespace: s.nameSpace,
Name: s.name,
}
err = (*s.client).Get(ctx, key, configmap)

if err != nil && !apierrors.IsNotFound(err) {
//there is error and err is not notfound error
//panic("unexpected error")
return
}

err = nil

if configmap.Data == nil {
return
}

for mapKey, value := range configmap.Data {
var object data.Object
err = (*s.jsonAPI).Unmarshal([]byte(value), &object)
if err != nil {
continue
}
result[mapKey] = object
}
return
}

func (s *KubeConfigMapStore) ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) {
raw_opt := metav1.SingleObject(metav1.ObjectMeta{
Namespace: s.nameSpace,
Name: s.name,
})
opt := clnt.ListOptions{}
opt.Raw = &raw_opt

watcher, err := s.client.Watch(ctx, &corev1.ConfigMapList{}, &opt)

go func() {
for {
event, open := <-watcher.ResultChan()
if open {
switch event.Type {
case watch.Added, watch.Modified:
configmap, _ := event.Object.(*corev1.ConfigMap)
newMap := map[string]data.Object{}
for k, value := range configmap.Data {
var object data.Object
err = (*s.jsonAPI).Unmarshal([]byte(value), &object)
if err != nil {
continue
}
newMap[k] = object
}

lock.Lock()
*dataMap = &newMap
lock.Unlock()
case watch.Deleted:
lock.Lock()
*dataMap = &map[string]data.Object{}
lock.Unlock()

default:

}

}
}
}()

return
}
Loading
Loading