Skip to content

Commit

Permalink
Remove global variables from job (kube-burner#740)
Browse files Browse the repository at this point in the history
## Type of change

- [x] Refactor
- [x] Optimization

## Description

Global variables are in general a cause for issues; they can be set
unexpectedly (or not set at all), they can be misused, etc. Instead,
local variables, method parameters and instance fields should be used.

---------

Signed-off-by: Ygal Blum <[email protected]>
  • Loading branch information
ygalblum authored Dec 2, 2024
1 parent c272de5 commit d3364f0
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 98 deletions.
5 changes: 2 additions & 3 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ func destroyCmd() *cobra.Command {
util.SetupFileLogging(uuid)
kubeClientProvider := config.NewKubeClientProvider(kubeConfig, kubeContext)
clientSet, restConfig := kubeClientProvider.ClientSet(0, 0)
burner.ClientSet = clientSet
burner.DynamicClient = dynamic.NewForConfigOrDie(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
labelSelector := fmt.Sprintf("kube-burner-uuid=%s", uuid)
util.CleanupNamespaces(ctx, clientSet, labelSelector)
util.CleanupNonNamespacedResources(ctx, clientSet, burner.DynamicClient, labelSelector)
util.CleanupNonNamespacedResources(ctx, clientSet, dynamicClient, labelSelector)
},
}
cmd.Flags().StringVar(&uuid, "uuid", "", "UUID")
Expand Down
27 changes: 13 additions & 14 deletions pkg/burner/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

func (ex *Executor) setupCreateJob() {
func (ex *Executor) setupCreateJob(configSpec config.Spec, mapper meta.RESTMapper) {
var err error
var f io.Reader
mapper := newRESTMapper()
log.Debugf("Preparing create job: %s", ex.Name)
for _, o := range ex.Objects {
if o.Replicas < 1 {
Expand All @@ -50,11 +49,11 @@ func (ex *Executor) setupCreateJob() {
}
log.Debugf("Rendering template: %s", o.ObjectTemplate)
e := embed.FS{}
if embedFS == e {
if configSpec.EmbedFS == e {
f, err = util.ReadConfig(o.ObjectTemplate)
} else {
objectTemplate := path.Join(embedFSDir, o.ObjectTemplate)
f, err = util.ReadEmbedConfig(embedFS, objectTemplate)
objectTemplate := path.Join(configSpec.EmbedFSDir, o.ObjectTemplate)
f, err = util.ReadEmbedConfig(configSpec.EmbedFS, objectTemplate)
}
if err != nil {
log.Fatalf("Error reading template %s: %s", o.ObjectTemplate, err)
Expand Down Expand Up @@ -110,7 +109,7 @@ func (ex *Executor) RunCreateJob(ctx context.Context, iterationStart, iterationE
}
if ex.nsRequired && !ex.NamespacedIterations {
ns = ex.Namespace
if err = util.CreateNamespace(ClientSet, ns, nsLabels, nsAnnotations); err != nil {
if err = util.CreateNamespace(ex.clientSet, ns, nsLabels, nsAnnotations); err != nil {
log.Fatal(err.Error())
}
*waitListNamespaces = append(*waitListNamespaces, ns)
Expand All @@ -132,7 +131,7 @@ func (ex *Executor) RunCreateJob(ctx context.Context, iterationStart, iterationE
if ex.nsRequired && ex.NamespacedIterations {
ns = ex.generateNamespace(i)
if !namespacesCreated[ns] {
if err = util.CreateNamespace(ClientSet, ns, nsLabels, nsAnnotations); err != nil {
if err = util.CreateNamespace(ex.clientSet, ns, nsLabels, nsAnnotations); err != nil {
log.Error(err.Error())
continue
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func (ex *Executor) RunCreateJob(ctx context.Context, iterationStart, iterationE
if ex.WaitWhenFinished {
log.Infof("Waiting up to %s for actions to be completed", ex.MaxWaitTimeout)
// This semaphore is used to limit the maximum number of concurrent goroutines
sem := make(chan int, int(restConfig.QPS))
sem := make(chan int, int(ex.restConfig.QPS))
for i := iterationStart; i < iterationEnd; i++ {
if ex.nsRequired && ex.NamespacedIterations {
ns = ex.generateNamespace(i)
Expand Down Expand Up @@ -266,15 +265,15 @@ func (ex *Executor) replicaHandler(ctx context.Context, labels map[string]string
if !obj.Namespaced {
n = ""
}
createRequest(ctx, obj.gvr, n, newObject, ex.MaxWaitTimeout)
ex.createRequest(ctx, obj.gvr, n, newObject, ex.MaxWaitTimeout)
replicaWg.Done()
}(ns)
}(r)
}
wg.Wait()
}

func createRequest(ctx context.Context, gvr schema.GroupVersionResource, ns string, obj *unstructured.Unstructured, timeout time.Duration) {
func (ex *Executor) createRequest(ctx context.Context, gvr schema.GroupVersionResource, ns string, obj *unstructured.Unstructured, timeout time.Duration) {
var uns *unstructured.Unstructured
var err error
util.RetryWithExponentialBackOff(func() (bool, error) {
Expand All @@ -286,9 +285,9 @@ func createRequest(ctx context.Context, gvr schema.GroupVersionResource, ns stri
ns = objNs
}
if ns != "" {
uns, err = DynamicClient.Resource(gvr).Namespace(ns).Create(context.TODO(), obj, metav1.CreateOptions{})
uns, err = ex.dynamicClient.Resource(gvr).Namespace(ns).Create(context.TODO(), obj, metav1.CreateOptions{})
} else {
uns, err = DynamicClient.Resource(gvr).Create(context.TODO(), obj, metav1.CreateOptions{})
uns, err = ex.dynamicClient.Resource(gvr).Create(context.TODO(), obj, metav1.CreateOptions{})
}
if err != nil {
if kerrors.IsUnauthorized(err) {
Expand Down Expand Up @@ -370,7 +369,7 @@ func (ex *Executor) RunCreateJobWithChurn(ctx context.Context) {
continue
}
// Label namespaces to be deleted
_, err = ClientSet.CoreV1().Namespaces().Patch(context.TODO(), ns, types.JSONPatchType, delPatch, metav1.PatchOptions{})
_, err = ex.clientSet.CoreV1().Namespaces().Patch(context.TODO(), ns, types.JSONPatchType, delPatch, metav1.PatchOptions{})
if err != nil {
log.Errorf("Error patching namespace %s. Error: %v", ns, err)
}
Expand All @@ -384,7 +383,7 @@ func (ex *Executor) RunCreateJobWithChurn(ctx context.Context) {
if ex.ChurnDeletionStrategy == "gvr" {
CleanupNamespacesUsingGVR(ctx, *ex, namespacesToDelete)
}
util.CleanupNamespaces(ctx, ClientSet, "churndelete=delete")
util.CleanupNamespaces(ctx, ex.clientSet, "churndelete=delete")
log.Info("Re-creating deleted objects")
// Re-create objects that were deleted
ex.RunCreateJob(ctx, randStart, numToChurn+randStart, &[]string{})
Expand Down
12 changes: 6 additions & 6 deletions pkg/burner/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

"github.com/kube-burner/kube-burner/pkg/config"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
)

func (ex *Executor) setupDeleteJob() {
func (ex *Executor) setupDeleteJob(mapper meta.RESTMapper) {
log.Debugf("Preparing delete job: %s", ex.Name)
ex.itemHandler = deleteHandler
if ex.WaitForDeletion {
Expand All @@ -37,7 +38,6 @@ func (ex *Executor) setupDeleteJob() {
ex.JobIterations = 1
// Use the sequential mode
ex.ExecutionMode = config.ExecutionModeSequential
mapper := newRESTMapper()
for _, o := range ex.Objects {
log.Debugf("Job %s: %s %s with selector %s", ex.Name, ex.JobType, o.Kind, labels.Set(o.LabelSelector))
ex.objects = append(ex.objects, newObject(o, mapper))
Expand All @@ -50,23 +50,23 @@ func deleteHandler(ex *Executor, obj object, item unstructured.Unstructured, ite
var err error
if obj.Namespaced {
log.Debugf("Removing %s/%s from namespace %s", item.GetKind(), item.GetName(), item.GetNamespace())
err = DynamicClient.Resource(obj.gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
err = ex.dynamicClient.Resource(obj.gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
} else {
log.Debugf("Removing %s/%s", item.GetKind(), item.GetName())
err = DynamicClient.Resource(obj.gvr).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
err = ex.dynamicClient.Resource(obj.gvr).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
}
if err != nil {
log.Errorf("Error found removing %s/%s: %s", item.GetKind(), item.GetName(), err)
}
}

func verifyDelete(obj object) {
func verifyDelete(ex *Executor, obj object) {
labelSelector := labels.Set(obj.LabelSelector).String()
listOptions := metav1.ListOptions{
LabelSelector: labelSelector,
}
wait.PollUntilContextCancel(context.TODO(), 2*time.Second, true, func(ctx context.Context) (done bool, err error) {
itemList, err := DynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions)
itemList, err := ex.dynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions)
if err != nil {
log.Error(err.Error())
return false, nil
Expand Down
32 changes: 24 additions & 8 deletions pkg/burner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// Executor contains the information required to execute a job
type ItemHandler func(ex *Executor, obj object, originalItem unstructured.Unstructured, iteration int, wg *sync.WaitGroup)
type ObjectFinalizer func(obj object)
type ObjectFinalizer func(ex *Executor, obj object)

type Executor struct {
config.Job
Expand All @@ -37,25 +41,37 @@ type Executor struct {
nsRequired bool
itemHandler ItemHandler
objectFinalizer ObjectFinalizer
clientSet kubernetes.Interface
restConfig *rest.Config
dynamicClient *dynamic.DynamicClient
}

func newExecutor(job config.Job, uuid, runid string) Executor {
func newExecutor(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider, job config.Job) Executor {
ex := Executor{
Job: job,
limiter: rate.NewLimiter(rate.Limit(job.QPS), job.Burst),
uuid: uuid,
runid: runid,
uuid: configSpec.GlobalConfig.UUID,
runid: configSpec.GlobalConfig.RUNID,
waitLimiter: rate.NewLimiter(rate.Limit(job.QPS), job.Burst),
}

clientSet, runtimeRestConfig := kubeClientProvider.ClientSet(job.QPS, job.Burst)
ex.clientSet = clientSet
ex.restConfig = runtimeRestConfig
ex.dynamicClient = dynamic.NewForConfigOrDie(ex.restConfig)

_, setupRestConfig := kubeClientProvider.ClientSet(100, 100) // Hardcoded QPS/Burst
mapper := newRESTMapper(discovery.NewDiscoveryClientForConfigOrDie(setupRestConfig))

switch job.JobType {
case config.CreationJob:
ex.setupCreateJob()
ex.setupCreateJob(configSpec, mapper)
case config.DeletionJob:
ex.setupDeleteJob()
ex.setupDeleteJob(mapper)
case config.PatchJob:
ex.setupPatchJob()
ex.setupPatchJob(mapper)
case config.ReadJob:
ex.setupReadJob()
ex.setupReadJob(mapper)
default:
log.Fatalf("Unknown jobType: %s", job.JobType)
}
Expand Down
49 changes: 18 additions & 31 deletions pkg/burner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package burner
import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"os/exec"
Expand All @@ -34,9 +33,6 @@ import (
"github.com/kube-burner/kube-burner/pkg/util/metrics"
log "github.com/sirupsen/logrus"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

Expand All @@ -58,13 +54,6 @@ const (
)

var (
ClientSet kubernetes.Interface
DynamicClient dynamic.Interface
discoveryClient *discovery.DiscoveryClient
restConfig *rest.Config
embedFS embed.FS
embedFSDir string

supportedExecutionMode = map[config.ExecutionMode]struct{}{
config.ExecutionModeParallel: {},
config.ExecutionModeSequential: {},
Expand All @@ -83,8 +72,6 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
var executedJobs []prometheus.Job
var jobList []Executor
var msWg, gcWg sync.WaitGroup
embedFS = configSpec.EmbedFS
embedFSDir = configSpec.EmbedFSDir
errs := []error{}
res := make(chan int, 1)
uuid := configSpec.GlobalConfig.UUID
Expand All @@ -99,24 +86,14 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
var innerRC int
measurements.NewMeasurementFactory(configSpec, metricsScraper.MetricsMetadata)
jobList = newExecutorList(configSpec, kubeClientProvider)
ClientSet, restConfig = kubeClientProvider.DefaultClientSet()
for _, job := range jobList {
if job.PreLoadImages && job.JobType == config.CreationJob {
if err = preLoadImages(job); err != nil {
log.Fatal(err.Error())
}
}
}
handlePreloadImages(jobList, kubeClientProvider)
// Iterate job list
for jobPosition, job := range jobList {
executedJobs = append(executedJobs, prometheus.Job{
Start: time.Now().UTC(),
JobConfig: job.Job,
})
var waitListNamespaces []string
ClientSet, restConfig = kubeClientProvider.ClientSet(job.QPS, job.Burst)
discoveryClient = discovery.NewDiscoveryClientForConfigOrDie(restConfig)
DynamicClient = dynamic.NewForConfigOrDie(restConfig)
measurements.SetJobConfig(&job.Job, kubeClientProvider)
log.Infof("Triggering job: %s", job.Name)
measurements.Start()
Expand Down Expand Up @@ -271,6 +248,18 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
return rc, utilerrors.NewAggregate(errs)
}

// If requests, preload the images used in the test into the node
func handlePreloadImages(executorList []Executor, kubeClientProvider *config.KubeClientProvider) {
clientSet, _ := kubeClientProvider.DefaultClientSet()
for _, executor := range executorList {
if executor.PreLoadImages && executor.JobType == config.CreationJob {
if err := preLoadImages(executor, clientSet); err != nil {
log.Fatal(err.Error())
}
}
}
}

// indexMetrics indexes metrics for the executed jobs
func indexMetrics(uuid string, executedJobs []prometheus.Job, returnMap map[string]returnPair, metricsScraper metrics.Scraper, configSpec config.Spec, innerRC bool, executionErrors string, isTimeout bool) {
var jobSummaries []JobSummary
Expand Down Expand Up @@ -335,11 +324,9 @@ func verifyJobDefaults(job *config.Job, defaultTimeout time.Duration) {
// newExecutorList Returns a list of executors
func newExecutorList(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider) []Executor {
var executorList []Executor
_, restConfig = kubeClientProvider.ClientSet(100, 100) // Hardcoded QPS/Burst
discoveryClient = discovery.NewDiscoveryClientForConfigOrDie(restConfig)
for _, job := range configSpec.Jobs {
verifyJobDefaults(&job, configSpec.GlobalConfig.Timeout)
executorList = append(executorList, newExecutor(job, configSpec.GlobalConfig.UUID, configSpec.GlobalConfig.RUNID))
executorList = append(executorList, newExecutor(configSpec, kubeClientProvider, job))
}
return executorList
}
Expand All @@ -351,7 +338,7 @@ func runWaitList(globalWaitMap map[string][]string, executorMap map[string]Execu
executor := executorMap[executorUUID]
log.Infof("Waiting up to %s for actions to be completed", executor.MaxWaitTimeout)
// This semaphore is used to limit the maximum number of concurrent goroutines
sem := make(chan int, int(restConfig.QPS))
sem := make(chan int, int(executor.restConfig.QPS))
for _, ns := range namespaces {
sem <- 1
wg.Add(1)
Expand All @@ -369,13 +356,13 @@ func garbageCollectJob(ctx context.Context, jobExecutor Executor, labelSelector
if wg != nil {
defer wg.Done()
}
util.CleanupNamespaces(ctx, ClientSet, labelSelector)
util.CleanupNamespaces(ctx, jobExecutor.clientSet, labelSelector)
for _, obj := range jobExecutor.objects {
jobExecutor.limiter.Wait(ctx)
if !obj.Namespaced {
CleanupNonNamespacedResourcesUsingGVR(ctx, obj, labelSelector)
CleanupNonNamespacedResourcesUsingGVR(ctx, jobExecutor, obj, labelSelector)
} else if obj.namespace != "" { // When the object has a fixed namespace not generated by kube-burner
CleanupNamespaceResourcesUsingGVR(ctx, obj, obj.namespace, labelSelector)
CleanupNamespaceResourcesUsingGVR(ctx, jobExecutor, obj, obj.namespace, labelSelector)
}
}
}
Loading

0 comments on commit d3364f0

Please sign in to comment.