Skip to content

Commit

Permalink
Make NSG Monitor run every 10 minutes (#3343)
Browse files Browse the repository at this point in the history
* Make NSG Monitor run every 10 minutes

* Fix rebase unit tests

* Address code review comments

* Add defer the NSG ticker's stop
  • Loading branch information
nwnt authored Feb 20, 2024
1 parent a861f41 commit 6f2b31b
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 40 deletions.
45 changes: 38 additions & 7 deletions pkg/monitor/azure/nsg/nsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"net/netip"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2"
"github.com/sirupsen/logrus"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
"github.com/Azure/ARO-RP/pkg/monitor/emitter"
Expand Down Expand Up @@ -49,20 +51,49 @@ type NSGMonitor struct {
dims map[string]string
}

func NewNSGMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, subscriptionID string, subnetClient sdknetwork.SubnetsClient, emitter metrics.Emitter, wg *sync.WaitGroup) *NSGMonitor {
func NewMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, e env.Interface, subscriptionID string, tenantID string, emitter metrics.Emitter, dims map[string]string, wg *sync.WaitGroup, trigger <-chan time.Time) monitoring.Monitor {
if oc == nil {
return &monitoring.NoOpMonitor{Wg: wg}
}

if oc.Properties.NetworkProfile.PreconfiguredNSG != api.PreconfiguredNSGEnabled {
return &monitoring.NoOpMonitor{Wg: wg}
}

emitter.EmitGauge(MetricPreconfiguredNSGEnabled, int64(1), dims)

select {
case <-trigger:
default:
return &monitoring.NoOpMonitor{Wg: wg}
}

token, err := e.FPNewClientCertificateCredential(tenantID)
if err != nil {
log.Error("Unable to create FP Authorizer for NSG monitoring.", err)
emitter.EmitGauge(MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}

options := arm.ClientOptions{
ClientOptions: e.Environment().ClientCertificateCredentialOptions().ClientOptions,
}
client, err := armnetwork.NewSubnetsClient(subscriptionID, token, &options)
if err != nil {
log.Error("Unable to create the subnet client for NSG monitoring", err)
emitter.EmitGauge(MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}

return &NSGMonitor{
log: log,
emitter: emitter,
oc: oc,

subnetClient: subnetClient,
subnetClient: client,
wg: wg,

dims: map[string]string{
dimension.ResourceID: oc.ID,
dimension.SubscriptionID: subscriptionID,
dimension.Location: oc.Location,
},
dims: dims,
}
}

Expand Down
117 changes: 116 additions & 1 deletion pkg/monitor/azure/nsg/nsg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nsg

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -15,13 +16,17 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2"
"github.com/golang/mock/gomock"
"github.com/sirupsen/logrus"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
"github.com/Azure/ARO-RP/pkg/monitor/monitoring"
"github.com/Azure/ARO-RP/pkg/util/azureclient"
mock_armnetwork "github.com/Azure/ARO-RP/pkg/util/mocks/azureclient/azuresdk/armnetwork"
mock_env "github.com/Azure/ARO-RP/pkg/util/mocks/env"
mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics"
utilerror "github.com/Azure/ARO-RP/test/util/error"
)
Expand Down Expand Up @@ -52,6 +57,7 @@ var (
overlappingWorkerPrefix2 = "10.0.1.2"
overlappingWorkerPrefixes = []*string{&overlappingWorkerPrefix1, &overlappingWorkerPrefix2}

tenantID = "1111-1111-1111-1111"
subscriptionID = "0000-0000-0000-0000"
resourcegroupName = "myRG"
vNetName = "myVnet"
Expand Down Expand Up @@ -523,7 +529,20 @@ func TestMonitor(t *testing.T) {
}

var wg sync.WaitGroup
n := NewNSGMonitor(logrus.NewEntry(logrus.New()), &oc, subscriptionID, subnetClient, emitter, &wg)
n := &NSGMonitor{
log: logrus.NewEntry(logrus.New()),
emitter: emitter,
oc: &oc,

subnetClient: subnetClient,
wg: &wg,

dims: map[string]string{
dimension.ResourceID: oc.ID,
dimension.SubscriptionID: subscriptionID,
dimension.Location: oc.Location,
},
}

wg.Add(1)
err := n.Monitor(ctx)
Expand All @@ -547,3 +566,99 @@ func wait(wg *sync.WaitGroup, done chan<- any) {
wg.Wait()
done <- nil
}

func isOfType[T any](mon monitoring.Monitor) bool {
_, ok := mon.(T)
return ok
}

func TestNewMonitor(t *testing.T) {
dims := map[string]string{
dimension.ResourceID: ocID,
dimension.SubscriptionID: subscriptionID,
dimension.Location: ocLocation,
}
var wg sync.WaitGroup
log := logrus.NewEntry(logrus.New())

for _, tt := range []struct {
name string
modOC func(*api.OpenShiftCluster)
mockInterface func(*mock_env.MockInterface)
mockEmitter func(*mock_metrics.MockEmitter)
tick bool
valid func(monitoring.Monitor) bool
}{
{
name: "Not a preconfiguredNSG cluster: returning NoOpMonitor",
valid: isOfType[*monitoring.NoOpMonitor],
},
{
name: "A preconfiguredNSG cluster but not ticked: returning NoOpMonitor",
modOC: func(oc *api.OpenShiftCluster) {
oc.Properties.NetworkProfile.PreconfiguredNSG = api.PreconfiguredNSGEnabled
},
mockEmitter: func(emitter *mock_metrics.MockEmitter) {
emitter.EXPECT().EmitGauge(MetricPreconfiguredNSGEnabled, int64(1), dims)
},
valid: isOfType[*monitoring.NoOpMonitor],
},
{
name: "A preconfiguredNSG cluster, ticked with an error while creating FP client: returning NoOpMonitor",
modOC: func(oc *api.OpenShiftCluster) {
oc.Properties.NetworkProfile.PreconfiguredNSG = api.PreconfiguredNSGEnabled
},
mockEmitter: func(emitter *mock_metrics.MockEmitter) {
emitter.EXPECT().EmitGauge(MetricPreconfiguredNSGEnabled, int64(1), dims)
emitter.EXPECT().EmitGauge(MetricFailedNSGMonitorCreation, int64(1), dims)
},
mockInterface: func(mi *mock_env.MockInterface) {
mi.EXPECT().FPNewClientCertificateCredential(gomock.Any()).Return(nil, errors.New("Unknown Error"))
},
tick: true,
valid: isOfType[*monitoring.NoOpMonitor],
},
{
name: "A preconfiguredNSG cluster, ticked with an error while creating a subnet client: returning NoOpMonitor",
modOC: func(oc *api.OpenShiftCluster) {
oc.Properties.NetworkProfile.PreconfiguredNSG = api.PreconfiguredNSGEnabled
},
mockEmitter: func(emitter *mock_metrics.MockEmitter) {
emitter.EXPECT().EmitGauge(MetricPreconfiguredNSGEnabled, int64(1), dims)
},
mockInterface: func(mi *mock_env.MockInterface) {
mi.EXPECT().FPNewClientCertificateCredential(gomock.Any()).Return(&azidentity.ClientCertificateCredential{}, nil)
mi.EXPECT().Environment().Return(&azureclient.AROEnvironment{})
},
tick: true,
valid: isOfType[*NSGMonitor],
},
} {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
e := mock_env.NewMockInterface(ctrl)
emitter := mock_metrics.NewMockEmitter(ctrl)

oc := ocFactory()
if tt.modOC != nil {
tt.modOC(&oc)
}
if tt.mockInterface != nil {
tt.mockInterface(e)
}
if tt.mockEmitter != nil {
tt.mockEmitter(emitter)
}
ticking := make(chan time.Time, 1) // buffered
if tt.tick {
ticking <- time.Now()
}

mon := NewMonitor(log, &oc, e, subscriptionID, tenantID, emitter, dims, &wg, ticking)
if !tt.valid(mon) {
t.Error("Invalid monitoring object returned")
}
})
}
}
41 changes: 9 additions & 32 deletions pkg/monitor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@ import (
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/monitor/azure/nsg"
"github.com/Azure/ARO-RP/pkg/monitor/cluster"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
Expand All @@ -28,6 +24,9 @@ import (
"github.com/Azure/ARO-RP/pkg/util/restconfig"
)

// nsgMonitoringFrequency is used for initializing NSG monitoring ticker
var nsgMonitoringFrequency = 10 * time.Minute

// This function will continue to run until such time as it has a config to add to the global Hive shard map
// Note that because the mon.hiveShardConfigs[shard] is set to `nil` when its created, the cluster
// monitors will simply ignore Hive stats until this function populates the config
Expand Down Expand Up @@ -198,6 +197,8 @@ func (mon *monitor) worker(stop <-chan struct{}, delay time.Duration, id string)

log.Debug("starting monitoring")

nsgMonitoringTicker := time.NewTicker(nsgMonitoringFrequency)
defer nsgMonitoringTicker.Stop()
t := time.NewTicker(time.Minute)
defer t.Stop()

Expand All @@ -220,7 +221,7 @@ out:
// cached metrics in the remaining minutes

if sub != nil && sub.Subscription != nil && sub.Subscription.State != api.SubscriptionStateSuspended && sub.Subscription.State != api.SubscriptionStateWarned {
mon.workOne(context.Background(), log, v.doc, sub, newh != h)
mon.workOne(context.Background(), log, v.doc, sub, newh != h, nsgMonitoringTicker)
}

select {
Expand All @@ -235,28 +236,8 @@ out:
log.Debug("stopping monitoring")
}

func (mon *monitor) newNSGMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, subscriptionID, tenantID string, e metrics.Emitter, dims map[string]string, wg *sync.WaitGroup) monitoring.Monitor {
token, err := mon.env.FPNewClientCertificateCredential(tenantID)
if err != nil {
log.Error("Unable to create FP Authorizer for NSG monitoring.", err)
mon.clusterm.EmitGauge(nsg.MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}
options := arm.ClientOptions{
ClientOptions: azcore.ClientOptions{Cloud: mon.env.Environment().Cloud},
}
client, err := armnetwork.NewSubnetsClient(subscriptionID, token, &options)
if err != nil {
log.Error("Unable to create the subnet client for NSG monitoring", err)
mon.clusterm.EmitGauge(nsg.MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}

return nsg.NewNSGMonitor(log, oc, subscriptionID, client, e, wg)
}

// workOne checks the API server health of a cluster
func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument, sub *api.SubscriptionDocument, hourlyRun bool) {
func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument, sub *api.SubscriptionDocument, hourlyRun bool, nsgMonTicker *time.Ticker) {
ctx, cancel := context.WithTimeout(ctx, 50*time.Second)
defer cancel()

Expand All @@ -283,11 +264,7 @@ func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.Ope
var monitors []monitoring.Monitor
var wg sync.WaitGroup

if doc.OpenShiftCluster.Properties.NetworkProfile.PreconfiguredNSG == api.PreconfiguredNSGEnabled && hourlyRun {
mon.clusterm.EmitGauge(nsg.MetricPreconfiguredNSGEnabled, int64(1), dims)
nsgMon := mon.newNSGMonitor(log, doc.OpenShiftCluster, sub.ID, sub.Subscription.Properties.TenantID, mon.clusterm, dims, &wg)
monitors = append(monitors, nsgMon)
}
nsgMon := nsg.NewMonitor(log, doc.OpenShiftCluster, mon.env, sub.ID, sub.Subscription.Properties.TenantID, mon.clusterm, dims, &wg, nsgMonTicker.C)

c, err := cluster.NewMonitor(log, restConfig, doc.OpenShiftCluster, mon.clusterm, hiveRestConfig, hourlyRun, &wg)
if err != nil {
Expand All @@ -298,7 +275,7 @@ func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.Ope
return
}

monitors = append(monitors, c)
monitors = append(monitors, c, nsgMon)
allJobsDone := make(chan bool)
go execute(ctx, allJobsDone, &wg, monitors)

Expand Down

0 comments on commit 6f2b31b

Please sign in to comment.