diff --git a/internal/cmd/server/start_alarm_subscription_server.go b/internal/cmd/server/start_alarm_subscription_server.go index f667f6a8..efa4d527 100644 --- a/internal/cmd/server/start_alarm_subscription_server.go +++ b/internal/cmd/server/start_alarm_subscription_server.go @@ -27,6 +27,7 @@ import ( "github.com/openshift-kni/oran-o2ims/internal/authentication" "github.com/openshift-kni/oran-o2ims/internal/authorization" "github.com/openshift-kni/oran-o2ims/internal/exit" + "github.com/openshift-kni/oran-o2ims/internal/k8s" "github.com/openshift-kni/oran-o2ims/internal/logging" "github.com/openshift-kni/oran-o2ims/internal/metrics" "github.com/openshift-kni/oran-o2ims/internal/network" @@ -205,13 +206,28 @@ func (c *AlarmSubscriptionServerCommand) run(cmd *cobra.Command, argv []string) }) router.Use(metricsWrapper, authenticationWrapper, authorizationWrapper) + // create k8s client with kube(from env first) + //var config *rest.Config + kubeClient, err := k8s.NewClient().SetLogger(logger).SetLoggingWrapper(loggingWrapper).Build() + + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create kubeClient", + "error", err, + ) + return exit.Error(1) + } + // Create the handler: handler, err := service.NewAlarmSubscriptionHandler(). SetLogger(logger). SetLoggingWrapper(loggingWrapper). SetCloudID(cloudID). SetExtensions(extensions...). - Build() + SetKubeClient(kubeClient). + Build(ctx) + if err != nil { logger.ErrorContext( ctx, diff --git a/internal/k8s/client.go b/internal/k8s/client.go index 631acdff..40b1b35e 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -29,9 +29,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clnt "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) // Client builder contains the data and logic needed to create a Kubernetes API client that @@ -53,6 +55,13 @@ type Client struct { delegate clnt.WithWatch } +// set fake client to be used for test.go +func NewFakeClient() *Client { + client := Client{} + client.delegate = fake.NewFakeClient() + return &client +} + // NewClient creates a builder that can then be used to configure and create a Kubernetes API client // that implements the controller-runtime WithWatch interface. func NewClient() *ClientBuilder { @@ -285,6 +294,11 @@ func (c *Client) IsObjectNamespaced(obj runtime.Object) (bool, error) { return c.delegate.IsObjectNamespaced(obj) } +// watch +func (c *Client) Watch(ctx context.Context, obj clnt.ObjectList, opts ...clnt.ListOption) (watch.Interface, error) { + return c.delegate.Watch(ctx, obj, opts...) +} + // Close closes the client and releases all the resources it is using. It is specially important to // call this method when the client is using as SSH tunnel, as otherwise the tunnel will remain // open. diff --git a/internal/persiststorage/persiststorage.go b/internal/persiststorage/persiststorage.go new file mode 100644 index 00000000..b29504e7 --- /dev/null +++ b/internal/persiststorage/persiststorage.go @@ -0,0 +1,38 @@ +package persiststorage + +import ( + "context" + "errors" + "sync" + + "github.com/openshift-kni/oran-o2ims/internal/data" +) + +var ErrNotFound = errors.New("not found") + +// interface for persistent storage +type StorageOperations interface { + //notification from db to application about db entry changes + //currently assume the notification is granular to indivial entry + ReadEntry(ctx context.Context, key string) (value string, err error) + AddEntry(ctx context.Context, key string, value string) (err error) + DeleteEntry(ctx context.Context, key string) (err error) + ReadAllEntries(ctx context.Context) (result map[string]data.Object, err error) + ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) +} + +func Add(so StorageOperations, ctx context.Context, key string, value string) (err error) { + return so.AddEntry(ctx, key, value) +} +func Get(so StorageOperations, ctx context.Context, key string) (value string, err error) { + return so.ReadEntry(ctx, key) +} +func GetAll(so StorageOperations, ctx context.Context) (result map[string]data.Object, err error) { + return so.ReadAllEntries(ctx) +} +func Delete(so StorageOperations, ctx context.Context, key string) (err error) { + return so.DeleteEntry(ctx, key) +} +func ProcessChanges(so StorageOperations, ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) { + return so.ProcessChanges(ctx, dataMap, lock) +} diff --git a/internal/persiststorage/persiststorage_configmap.go b/internal/persiststorage/persiststorage_configmap.go new file mode 100644 index 00000000..cd4b395b --- /dev/null +++ b/internal/persiststorage/persiststorage_configmap.go @@ -0,0 +1,251 @@ +package persiststorage + +import ( + "context" + "sync" + + jsoniter "github.com/json-iterator/go" + "github.com/openshift-kni/oran-o2ims/internal/data" + "github.com/openshift-kni/oran-o2ims/internal/k8s" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + clnt "sigs.k8s.io/controller-runtime/pkg/client" +) + +type KubeConfigMapStore struct { + nameSpace string + name string + fieldOwner string + jsonAPI *jsoniter.API + client *k8s.Client +} + +func NewKubeConfigMapStore() *KubeConfigMapStore { + return &KubeConfigMapStore{} +} + +func (b *KubeConfigMapStore) SetNameSpace( + ns string) *KubeConfigMapStore { + b.nameSpace = ns + return b +} +func (b *KubeConfigMapStore) SetName( + name string) *KubeConfigMapStore { + b.name = name + return b +} + +func (b *KubeConfigMapStore) SetFieldOwnder( + owner string) *KubeConfigMapStore { + b.fieldOwner = owner + return b +} + +func (b *KubeConfigMapStore) SetJsonAPI( + jsonAPI *jsoniter.API) *KubeConfigMapStore { + b.jsonAPI = jsonAPI + return b +} + +func (b *KubeConfigMapStore) SetClient( + client *k8s.Client) *KubeConfigMapStore { + b.client = client + return b +} + +// k8s configmap methods +func (s *KubeConfigMapStore) AddEntry(ctx context.Context, entryKey string, value string) (err error) { + //test to read the configmap + configmap := &corev1.ConfigMap{} + + key := clnt.ObjectKey{ + Namespace: s.nameSpace, + Name: s.name, + } + err = (*s.client).Get(ctx, key, configmap) + + if err != nil && !apierrors.IsNotFound(err) { + return + } + + savedData := configmap.Data + + //configmap does not exist + if savedData == nil { + savedData = map[string]string{} + } + savedData[entryKey] = value + + configmap = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: s.nameSpace, + Name: s.name, + }, + Data: savedData, + } + + err = (*s.client).Patch(ctx, configmap, clnt.Apply, clnt.ForceOwnership, clnt.FieldOwner(s.fieldOwner)) + + return +} + +func (s *KubeConfigMapStore) DeleteEntry(ctx context.Context, entryKey string) (err error) { + //test to read the configmap + configmap := &corev1.ConfigMap{} + + key := clnt.ObjectKey{ + Namespace: s.nameSpace, + Name: s.name, + } + err = (*s.client).Get(ctx, key, configmap) + + if err != nil && !apierrors.IsNotFound(err) { + //there is error and err is not notfound error + //panic("unexpected error") + return + } + + if configmap.Data == nil { + return + } + + _, ok := configmap.Data[entryKey] + + //entry not found + if !ok { + return + } + delete(configmap.Data, entryKey) + + configmap = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: s.nameSpace, + Name: s.name, + }, + Data: configmap.Data, + } + // Note: there is a case configmap is empty shall we remove the configmap + err = (*s.client).Patch(ctx, configmap, clnt.Apply, clnt.ForceOwnership, clnt.FieldOwner(s.fieldOwner)) + return +} + +func (s *KubeConfigMapStore) ReadEntry(ctx context.Context, entryKey string) (value string, err error) { + + //test to read the configmap + configmap := &corev1.ConfigMap{} + + key := clnt.ObjectKey{ + Namespace: s.nameSpace, + Name: s.name, + } + err = (*s.client).Get(ctx, key, configmap) + + if err != nil && !apierrors.IsNotFound(err) { + //there is error and err is not notfound error + //panic("unexpected error") + return + } + + err = nil + if configmap.Data == nil { + return + } + + value, ok := configmap.Data[entryKey] + if !ok { + err = ErrNotFound + } + return +} + +// The handler read to DB and build/recovery datastructures in memory +func (s *KubeConfigMapStore) ReadAllEntries(ctx context.Context) (result map[string]data.Object, err error) { + result = map[string]data.Object{} + + //test to read the configmap + configmap := &corev1.ConfigMap{} + + key := clnt.ObjectKey{ + Namespace: s.nameSpace, + Name: s.name, + } + err = (*s.client).Get(ctx, key, configmap) + + if err != nil && !apierrors.IsNotFound(err) { + //there is error and err is not notfound error + //panic("unexpected error") + return + } + + err = nil + + if configmap.Data == nil { + return + } + + for mapKey, value := range configmap.Data { + var object data.Object + err = (*s.jsonAPI).Unmarshal([]byte(value), &object) + if err != nil { + continue + } + result[mapKey] = object + } + return +} + +func (s *KubeConfigMapStore) ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) { + raw_opt := metav1.SingleObject(metav1.ObjectMeta{ + Namespace: s.nameSpace, + Name: s.name, + }) + opt := clnt.ListOptions{} + opt.Raw = &raw_opt + + watcher, err := s.client.Watch(ctx, &corev1.ConfigMapList{}, &opt) + + go func() { + for { + event, open := <-watcher.ResultChan() + if open { + switch event.Type { + case watch.Added, watch.Modified: + configmap, _ := event.Object.(*corev1.ConfigMap) + newMap := map[string]data.Object{} + for k, value := range configmap.Data { + var object data.Object + err = (*s.jsonAPI).Unmarshal([]byte(value), &object) + if err != nil { + continue + } + newMap[k] = object + } + + lock.Lock() + *dataMap = &newMap + lock.Unlock() + case watch.Deleted: + lock.Lock() + *dataMap = &map[string]data.Object{} + lock.Unlock() + + default: + + } + + } + } + }() + + return +} diff --git a/internal/service/alarm_subscription_handler.go b/internal/service/alarm_subscription_handler.go index 39f727ee..2cf6ead2 100644 --- a/internal/service/alarm_subscription_handler.go +++ b/internal/service/alarm_subscription_handler.go @@ -24,20 +24,30 @@ import ( "github.com/google/uuid" jsoniter "github.com/json-iterator/go" + "github.com/openshift-kni/oran-o2ims/internal/data" "github.com/openshift-kni/oran-o2ims/internal/jq" + "github.com/openshift-kni/oran-o2ims/internal/k8s" "github.com/openshift-kni/oran-o2ims/internal/search" - "golang.org/x/exp/maps" + + "github.com/openshift-kni/oran-o2ims/internal/persiststorage" +) + +const ( + TestNamespace = "orantest" + TestConfigmapName = "orantestconfigmapalarmsub" + FieldOwner = "oran-o2ims" ) -// DeploymentManagerHandlerBuilder contains the data and logic needed to create a new deployment +// alarmSubscriptionHandlerBuilder contains the data and logic needed to create a new deployment // manager collection handler. Don't create instances of this type directly, use the -// NewDeploymentManagerHandler function instead. +// NewAlarmSubscriptionHandler function instead. type alarmSubscriptionHandlerBuilder struct { logger *slog.Logger loggingWrapper func(http.RoundTripper) http.RoundTripper cloudID string extensions []string + kubeClient *k8s.Client } // alarmSubscriptionHander knows how to respond to requests to list deployment managers. @@ -48,11 +58,13 @@ type alarmSubscriptionHandler struct { loggingWrapper func(http.RoundTripper) http.RoundTripper cloudID string extensions []string + kubeClient *k8s.Client jsonAPI jsoniter.API selectorEvaluator *search.SelectorEvaluator jqTool *jq.Tool subscritionMapMemoryLock *sync.Mutex - subscriptionMap map[string]data.Object + subscriptionMap *map[string]data.Object + persistStore *persiststorage.KubeConfigMapStore } // NewAlarmSubscriptionHandler creates a builder that can then be used to configure and create a @@ -90,8 +102,15 @@ func (b *alarmSubscriptionHandlerBuilder) SetExtensions( return b } +// SetExtensions sets the fields that will be added to the extensions. +func (b *alarmSubscriptionHandlerBuilder) SetKubeClient( + kubeClient *k8s.Client) *alarmSubscriptionHandlerBuilder { + b.kubeClient = kubeClient + return b +} + // Build uses the data stored in the builder to create anad configure a new handler. -func (b *alarmSubscriptionHandlerBuilder) Build() ( +func (b *alarmSubscriptionHandlerBuilder) Build(ctx context.Context) ( result *alarmSubscriptionHandler, err error) { // Check parameters: if b.logger == nil { @@ -103,6 +122,11 @@ func (b *alarmSubscriptionHandlerBuilder) Build() ( return } + if b.kubeClient == nil { + err = errors.New("kubeClient is mandatory") + return + } + // Prepare the JSON iterator API: jsonConfig := jsoniter.Config{ IndentionStep: 2, @@ -140,17 +164,27 @@ func (b *alarmSubscriptionHandlerBuilder) Build() ( } } + // create persist storeage option + persistStore := persiststorage.NewKubeConfigMapStore(). + SetNameSpace(TestNamespace). + SetName(TestConfigmapName). + SetFieldOwnder(FieldOwner). + SetJsonAPI(&jsonAPI). + SetClient(b.kubeClient) + // Create and populate the object: result = &alarmSubscriptionHandler{ logger: b.logger, loggingWrapper: b.loggingWrapper, cloudID: b.cloudID, + kubeClient: b.kubeClient, extensions: slices.Clone(b.extensions), selectorEvaluator: selectorEvaluator, jsonAPI: jsonAPI, jqTool: jqTool, subscritionMapMemoryLock: &sync.Mutex{}, - subscriptionMap: map[string]data.Object{}, + subscriptionMap: &map[string]data.Object{}, + persistStore: persistStore, } b.logger.Debug( @@ -158,6 +192,20 @@ func (b *alarmSubscriptionHandlerBuilder) Build() ( "CloudID", b.cloudID, ) + err = result.recoveryFromPersistStore(ctx) + if err != nil { + b.logger.Error( + "alarmSubscriptionHandler failed to recovery from persistStore ", err, + ) + } + + err = result.watchPersistStore(ctx) + if err != nil { + b.logger.Error( + "alarmSubscriptionHandler failed to watch persist store changes ", err, + ) + } + return } @@ -222,6 +270,10 @@ func (h *alarmSubscriptionHandler) Add(ctx context.Context, id, err := h.addItem(ctx, *request) if err != nil { + h.logger.Debug( + "alarmSubscriptionHandler Add:", + "err", err.Error(), + ) return } @@ -249,6 +301,7 @@ func (h *alarmSubscriptionHandler) Delete(ctx context.Context, ctx, "alarmSubscriptionHandler delete:", ) + err = h.deleteItem(ctx, *request) // Return the result: @@ -260,18 +313,26 @@ func (h *alarmSubscriptionHandler) fetchItem(ctx context.Context, id string) (result data.Object, err error) { h.subscritionMapMemoryLock.Lock() defer h.subscritionMapMemoryLock.Unlock() - result, ok := h.subscriptionMap[id] + obj, ok := (*h.subscriptionMap)[id] if !ok { err = ErrNotFound return } + + result, _ = h.encodeSubId(ctx, id, obj) return } func (h *alarmSubscriptionHandler) fetchItems(ctx context.Context) (result data.Stream, err error) { h.subscritionMapMemoryLock.Lock() defer h.subscritionMapMemoryLock.Unlock() - ar := maps.Values(h.subscriptionMap) + + ar := make([]data.Object, 0, len(*h.subscriptionMap)) + + for key, value := range *h.subscriptionMap { + obj, _ := h.encodeSubId(ctx, key, value) + ar = append(ar, obj) + } h.logger.DebugContext( ctx, "alarmSubscriptionHandler fetchItems:", @@ -280,13 +341,79 @@ func (h *alarmSubscriptionHandler) fetchItems(ctx context.Context) (result data. return } +func (h *alarmSubscriptionHandler) addItem( + ctx context.Context, input_data AddRequest) (subId string, err error) { + + subId = h.getSubcriptionId() + + //save the subscription in configuration map + //value, err := jsoniter.MarshalIndent(&input_data.Object, "", " ") + value, err := h.jsonAPI.MarshalIndent(&input_data.Object, "", " ") + if err != nil { + return + } + err = h.persistStoreAddEntry(ctx, subId, string(value)) + if err != nil { + h.logger.Debug( + "alarmSubscriptionHandler addItem:", + "err", err.Error(), + ) + return + } + + h.addToSubscriptionMap(subId, input_data.Object) + + return +} + +func (h *alarmSubscriptionHandler) deleteItem( + ctx context.Context, delete_req DeleteRequest) (err error) { + + err = h.persistStoreDeleteEntry(ctx, delete_req.Variables[0]) + if err != nil { + return + } + + h.deleteToSubscriptionMap(delete_req.Variables[0]) + + return +} + +func (h *alarmSubscriptionHandler) mapItem(ctx context.Context, + input data.Object) (output data.Object, err error) { + + //TBD only save related attributes in the future + return input, nil +} +func (h *alarmSubscriptionHandler) addToSubscriptionMap(key string, value data.Object) { + h.subscritionMapMemoryLock.Lock() + defer h.subscritionMapMemoryLock.Unlock() + (*h.subscriptionMap)[key] = value +} +func (h *alarmSubscriptionHandler) deleteToSubscriptionMap(key string) { + h.subscritionMapMemoryLock.Lock() + defer h.subscritionMapMemoryLock.Unlock() + //test if the key in the map + _, ok := (*h.subscriptionMap)[key] + + if !ok { + return + } + + delete(*h.subscriptionMap, key) +} + +func (h *alarmSubscriptionHandler) assignSubscriptionMap(newMap map[string]data.Object) { + h.subscritionMapMemoryLock.Lock() + defer h.subscritionMapMemoryLock.Unlock() + h.subscriptionMap = &newMap +} + func (h *alarmSubscriptionHandler) getSubcriptionId() (subId string) { subId = uuid.New().String() return } -// Not sure if we need this in the future -// add it for now for the test purpose func (h *alarmSubscriptionHandler) encodeSubId(ctx context.Context, subId string, input data.Object) (output data.Object, err error) { //get consumer name, subscriptions @@ -316,45 +443,32 @@ func (h *alarmSubscriptionHandler) decodeSubId(ctx context.Context, } return } -func (h *alarmSubscriptionHandler) addItem( - ctx context.Context, input_data AddRequest) (subId string, err error) { - subId = h.getSubcriptionId() - object, err := h.encodeSubId(ctx, subId, input_data.Object) - if err != nil { - return - } - - object, err = h.mapItem(ctx, object) - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() - h.subscriptionMap[subId] = object +func (h *alarmSubscriptionHandler) persistStoreAddEntry( + ctx context.Context, entryKey string, value string) (err error) { + return persiststorage.Add(h.persistStore, ctx, entryKey, value) +} +func (h *alarmSubscriptionHandler) persistStoreDeleteEntry( + ctx context.Context, entryKey string) (err error) { + err = persiststorage.Delete(h.persistStore, ctx, entryKey) return } -func (h *alarmSubscriptionHandler) deleteItem( - ctx context.Context, delete_req DeleteRequest) (err error) { - - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() - - //test if the key in the map - _, ok := h.subscriptionMap[delete_req.Variables[0]] - - if !ok { - err = ErrNotFound +func (h *alarmSubscriptionHandler) recoveryFromPersistStore(ctx context.Context) (err error) { + newMap, err := persiststorage.GetAll(h.persistStore, ctx) + if err != nil { return } - - delete(h.subscriptionMap, delete_req.Variables[0]) - + h.assignSubscriptionMap(newMap) return } -func (h *alarmSubscriptionHandler) mapItem(ctx context.Context, - input data.Object) (output data.Object, err error) { +func (h *alarmSubscriptionHandler) watchPersistStore(ctx context.Context) (err error) { + err = persiststorage.ProcessChanges(h.persistStore, ctx, &h.subscriptionMap, h.subscritionMapMemoryLock) - //TBD only save related attributes in the future - return input, nil + if err != nil { + panic("failed to launch watcher") + } + return } diff --git a/internal/service/alarm_subscription_handler_test.go b/internal/service/alarm_subscription_handler_test.go index 1f7078d8..1c017893 100644 --- a/internal/service/alarm_subscription_handler_test.go +++ b/internal/service/alarm_subscription_handler_test.go @@ -19,15 +19,32 @@ import ( . "github.com/onsi/ginkgo/v2/dsl/core" . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/openshift-kni/oran-o2ims/internal/data" + "github.com/openshift-kni/oran-o2ims/internal/k8s" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ = Describe("alarm Subscription handler", func() { Describe("Creation", func() { + var ( + ctx context.Context + fakeClient *k8s.Client + ) + + BeforeEach(func() { + // Create a context: + ctx = context.TODO() + fakeClient = k8s.NewFakeClient() + }) + It("Can't be created without a logger", func() { handler, err := NewAlarmSubscriptionHandler(). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).To(HaveOccurred()) Expect(handler).To(BeNil()) msg := err.Error() @@ -38,7 +55,8 @@ var _ = Describe("alarm Subscription handler", func() { It("Can't be created without a cloud identifier", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).To(HaveOccurred()) Expect(handler).To(BeNil()) msg := err.Error() @@ -49,13 +67,35 @@ var _ = Describe("alarm Subscription handler", func() { Describe("Behaviour", func() { var ( - ctx context.Context + ctx context.Context + fakeClient *k8s.Client ) BeforeEach(func() { // Create a context: - ctx = context.Background() - + ctx = context.TODO() + fakeClient = k8s.NewFakeClient() + //create fake namespace + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: TestNamespace, + }, + } + err := fakeClient.Create(ctx, namespace, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) + Expect(err).ToNot(HaveOccurred()) + configmap := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: TestNamespace, + Name: TestConfigmapName, + }, + Data: nil, + } + err = fakeClient.Create(ctx, configmap, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) + Expect(err).ToNot(HaveOccurred()) }) Describe("List", func() { @@ -66,7 +106,8 @@ var _ = Describe("alarm Subscription handler", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).ToNot(HaveOccurred()) Expect(handler).ToNot(BeNil()) @@ -84,7 +125,8 @@ var _ = Describe("alarm Subscription handler", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).ToNot(HaveOccurred()) Expect(handler).ToNot(BeNil()) @@ -146,7 +188,8 @@ var _ = Describe("alarm Subscription handler", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).ToNot(HaveOccurred()) // Send the request. Note that we ignore the error here because @@ -167,7 +210,8 @@ var _ = Describe("alarm Subscription handler", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).ToNot(HaveOccurred()) obj_1 := data.Object{ "customerId": "test_custer_id", @@ -202,7 +246,8 @@ var _ = Describe("alarm Subscription handler", func() { handler, err := NewAlarmSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). - Build() + SetKubeClient(fakeClient). + Build(ctx) Expect(err).ToNot(HaveOccurred()) obj := data.Object{ "customerId": "test_custer_id", diff --git a/proxy.sh b/proxy.sh index 2c8f2c34..9fd5d4d2 100755 --- a/proxy.sh +++ b/proxy.sh @@ -66,7 +66,7 @@ pids="${pids} $!" --log-level="debug" \ --log-field="server=alarm-subscription" \ --log-field="pid=%p" \ ---api-listener-address="127.0.0.1:8000" \ +--api-listener-address="127.0.0.1:8006" \ --cloud-id="123" \ & pids="${pids} $!"