Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor:move kubeclient to Driver struct #2302

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions pkg/azurefile/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -65,37 +66,15 @@ func getRuntimeClassForPod(ctx context.Context, kubeClient clientset.Interface,
}

// getCloudProvider get Azure Cloud Provider
func getCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig, enableWindowsHostProcess bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func getCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
var (
config *azureconfig.Config
kubeClient *clientset.Clientset
fromSecret bool
)

az := &azure.Cloud{}
var err error

// for sanity test: if kubeconfig is set as "no-need-kubeconfig", kubeClient will be nil
if kubeconfig == "no-need-kubeconfig" {
klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
} else {
kubeCfg, err := getKubeConfig(kubeconfig, enableWindowsHostProcess)
if err == nil && kubeCfg != nil {
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
kubeClient, err = clientset.NewForConfig(kubeCfg)
if err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
return az, fmt.Errorf("failed to get KubeClient: %v", err)
}
}
}

if kubeClient != nil {
klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
config, err = configloader.Load[azureconfig.Config](ctx, &configloader.K8sSecretLoaderConfig{
Expand Down
160 changes: 53 additions & 107 deletions pkg/azurefile/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
fake "k8s.io/client-go/kubernetes/fake"
azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider"
azureconfig "sigs.k8s.io/cloud-provider-azure/pkg/provider/config"
Expand Down Expand Up @@ -116,169 +117,114 @@ func TestGetRuntimeClassForPod(t *testing.T) {
// To run this unit test successfully, need to ensure /etc/kubernetes/azure.json nonexistent.
func TestGetCloudProvider(t *testing.T) {
var (
fakeCredFile = testutil.GetWorkDirPath("fake-cred-file.json", t)
fakeKubeConfig = testutil.GetWorkDirPath("fake-kube-config", t)
emptyKubeConfig = testutil.GetWorkDirPath("empty-kube-config", t)
notExistKubeConfig = testutil.GetWorkDirPath("non-exist.json", t)
fakeCredFile = testutil.GetWorkDirPath("fake-cred-file.json", t)
)

fakeContent := `apiVersion: v1
clusters:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this test case?

- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`

if err := createTestFile(emptyKubeConfig); err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(emptyKubeConfig); err != nil {
t.Error(err)
}
}()

tests := []struct {
desc string
createFakeCredFile bool
createFakeKubeConfig bool
setFederatedWorkloadIdentityEnv bool
kubeconfig string
kubeclient kubernetes.Interface
userAgent string
allowEmptyCloudConfig bool
aadFederatedTokenFile string
useFederatedWorkloadIdentityExtension bool
aadClientID string
tenantID string
expectedErr testutil.TestError
expectedErr *testutil.TestError
}{
{
desc: "out of cluster, no kubeconfig, no credential file",
kubeconfig: "",
kubeclient: nil,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[failure][disallowEmptyCloudConfig] out of cluster, no kubeconfig, no credential file",
kubeconfig: "",
kubeclient: nil,
allowEmptyCloudConfig: false,
expectedErr: testutil.TestError{
expectedErr: &testutil.TestError{
DefaultError: fmt.Errorf("no cloud config provided, error"),
},
},
{
desc: "[failure] out of cluster & in cluster, specify a non-exist kubeconfig, no credential file",
kubeconfig: notExistKubeConfig,
kubeclient: nil,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
},
{
desc: "[failure] out of cluster & in cluster, specify a empty kubeconfig, no credential file",
kubeconfig: emptyKubeConfig,
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{
DefaultError: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
},
expectedErr: nil,
},
{
desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file",
createFakeKubeConfig: true,
kubeconfig: fakeKubeConfig,
kubeclient: fake.NewSimpleClientset(),
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[success] out of cluster & in cluster, no kubeconfig, a fake credential file",
createFakeCredFile: true,
kubeconfig: "",
kubeclient: nil,
userAgent: "useragent",
allowEmptyCloudConfig: true,
expectedErr: testutil.TestError{},
expectedErr: nil,
},
{
desc: "[success] get azure client with workload identity",
createFakeKubeConfig: true,
createFakeCredFile: true,
setFederatedWorkloadIdentityEnv: true,
kubeconfig: fakeKubeConfig,
kubeclient: fake.NewSimpleClientset(),
userAgent: "useragent",
useFederatedWorkloadIdentityExtension: true,
aadFederatedTokenFile: "fake-token-file",
aadClientID: "fake-client-id",
tenantID: "fake-tenant-id",
expectedErr: testutil.TestError{},
expectedErr: nil,
},
}

for _, test := range tests {
if test.createFakeKubeConfig {
if err := createTestFile(fakeKubeConfig); err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(fakeKubeConfig); err != nil && !os.IsNotExist(err) {
t.Run(test.desc, func(t *testing.T) {
if test.createFakeCredFile {
if err := createTestFile(fakeCredFile); err != nil {
t.Error(err)
}
}()

if err := os.WriteFile(fakeKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
defer func() {
if err := os.Remove(fakeCredFile); err != nil && !os.IsNotExist(err) {
t.Error(err)
}
}()

originalCredFile, ok := os.LookupEnv(DefaultAzureCredentialFileEnv)
if ok {
defer os.Setenv(DefaultAzureCredentialFileEnv, originalCredFile)
} else {
defer os.Unsetenv(DefaultAzureCredentialFileEnv)
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
}
if test.createFakeCredFile {
if err := createTestFile(fakeCredFile); err != nil {
t.Error(err)
if test.setFederatedWorkloadIdentityEnv {
t.Setenv("AZURE_TENANT_ID", test.tenantID)
t.Setenv("AZURE_CLIENT_ID", test.aadClientID)
t.Setenv("AZURE_FEDERATED_TOKEN_FILE", test.aadFederatedTokenFile)
}
defer func() {
if err := os.Remove(fakeCredFile); err != nil && !os.IsNotExist(err) {
t.Error(err)
}
}()

originalCredFile, ok := os.LookupEnv(DefaultAzureCredentialFileEnv)
if ok {
defer os.Setenv(DefaultAzureCredentialFileEnv, originalCredFile)
cloud, err := getCloudProvider(context.Background(), test.kubeclient, "", "", "", test.userAgent, test.allowEmptyCloudConfig)
if test.expectedErr != nil {
if err == nil {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeclient, err, test.expectedErr)
}
if !testutil.AssertError(err, test.expectedErr) && !strings.Contains(err.Error(), test.expectedErr.DefaultError.Error()) {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeclient, err, test.expectedErr)
}
}
if cloud == nil {
t.Errorf("return value of getCloudProvider should not be nil even there is error")
} else {
defer os.Unsetenv(DefaultAzureCredentialFileEnv)
assert.Equal(t, test.userAgent, cloud.UserAgent)
assert.Equal(t, cloud.AADFederatedTokenFile, test.aadFederatedTokenFile)
assert.Equal(t, cloud.UseFederatedWorkloadIdentityExtension, test.useFederatedWorkloadIdentityExtension)
assert.Equal(t, cloud.AADClientID, test.aadClientID)
assert.Equal(t, cloud.TenantID, test.tenantID)
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
if test.setFederatedWorkloadIdentityEnv {
t.Setenv("AZURE_TENANT_ID", test.tenantID)
t.Setenv("AZURE_CLIENT_ID", test.aadClientID)
t.Setenv("AZURE_FEDERATED_TOKEN_FILE", test.aadFederatedTokenFile)
}

cloud, err := getCloudProvider(context.Background(), test.kubeconfig, "", "", "", test.userAgent, test.allowEmptyCloudConfig, false, 5, 10)
if !testutil.AssertError(err, &test.expectedErr) && !strings.Contains(err.Error(), test.expectedErr.DefaultError.Error()) {
t.Errorf("desc: %s,\n input: %q, getCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
if cloud == nil {
t.Errorf("return value of getCloudProvider should not be nil even there is error")
} else {
assert.Equal(t, test.userAgent, cloud.UserAgent)
assert.Equal(t, cloud.AADFederatedTokenFile, test.aadFederatedTokenFile)
assert.Equal(t, cloud.UseFederatedWorkloadIdentityExtension, test.useFederatedWorkloadIdentityExtension)
assert.Equal(t, cloud.AADClientID, test.aadClientID)
assert.Equal(t, cloud.TenantID, test.tenantID)
}
})
}
}

Expand Down
41 changes: 30 additions & 11 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
Expand All @@ -45,6 +46,9 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util"
mount "k8s.io/mount-utils"
Expand Down Expand Up @@ -231,8 +235,6 @@ type Driver struct {
enableVolumeMountGroup bool
appendMountErrorHelpLink bool
mountPermissions uint64
kubeAPIQPS float64
kubeAPIBurst int
enableWindowsHostProcess bool
removeSMBMountOnWindows bool
appendClosetimeoOption bool
Expand Down Expand Up @@ -281,7 +283,7 @@ type Driver struct {
// azcopy for provide exec mock for ut
azcopy *fileutil.Azcopy

kubeconfig string
kubeClient kubernetes.Interface
endpoint string
resolver Resolver
directVolume DirectVolume
Expand All @@ -307,8 +309,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.appendMountErrorHelpLink = options.AppendMountErrorHelpLink
driver.mountPermissions = options.MountPermissions
driver.fsGroupChangePolicy = options.FSGroupChangePolicy
driver.kubeAPIQPS = options.KubeAPIQPS
driver.kubeAPIBurst = options.KubeAPIBurst
driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
driver.removeSMBMountOnWindows = options.RemoveSMBMountOnWindows
driver.appendClosetimeoOption = options.AppendClosetimeoOption
Expand All @@ -322,7 +322,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.subnetLockMap = newLockMap()
driver.volumeLocks = newVolumeLocks()
driver.azcopy = &fileutil.Azcopy{}
driver.kubeconfig = options.KubeConfig
driver.endpoint = options.Endpoint
driver.resolver = new(NetResolver)
driver.directVolume = new(directVolume)
Expand Down Expand Up @@ -376,6 +375,26 @@ func NewDriver(options *DriverOptions) *Driver {
klog.Fatalf("%v", err)
}

// for sanity test: if kubeconfig is set as "no-need-kubeconfig", kubeClient will be nil
if options.KubeConfig == "no-need-kubeconfig" {
klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
} else {
kubeCfg, err := getKubeConfig(options.KubeConfig, options.EnableWindowsHostProcess)
if err == nil && kubeCfg != nil {
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(options.KubeAPIQPS), options.KubeAPIBurst)
kubeCfg.QPS = float32(options.KubeAPIQPS)
kubeCfg.Burst = options.KubeAPIBurst
driver.kubeClient, err = clientset.NewForConfig(kubeCfg)
if err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", options.KubeConfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
klog.Fatalf("failed to get KubeClient: %v", err)
}
}
}
return &driver
}

Expand All @@ -394,7 +413,7 @@ func (d *Driver) Run(ctx context.Context) error {

userAgent := GetUserAgent(d.Name, d.customUserAgent, d.userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)
d.cloud, err = getCloudProvider(context.Background(), d.kubeconfig, d.NodeID, d.cloudConfigSecretName, d.cloudConfigSecretNamespace, userAgent, d.allowEmptyCloudConfig, d.enableWindowsHostProcess, d.kubeAPIQPS, d.kubeAPIBurst)
d.cloud, err = getCloudProvider(context.Background(), d.kubeClient, d.NodeID, d.cloudConfigSecretName, d.cloudConfigSecretNamespace, userAgent, d.allowEmptyCloudConfig)
if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
Expand Down Expand Up @@ -1150,11 +1169,11 @@ func (d *Driver) GetStorageAccesskey(ctx context.Context, accountOptions *azure.
// GetStorageAccountFromSecret get storage account key from k8s secret
// return <accountName, accountKey, error>
func (d *Driver) GetStorageAccountFromSecret(ctx context.Context, secretName, secretNamespace string) (string, string, error) {
if d.cloud.KubeClient == nil {
if d.kubeClient == nil {
return "", "", fmt.Errorf("could not get account key from secret(%s): KubeClient is nil", secretName)
}

secret, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Get(ctx, secretName, metav1.GetOptions{})
secret, err := d.kubeClient.CoreV1().Secrets(secretNamespace).Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
return "", "", fmt.Errorf("could not get secret(%v): %v", secretName, err)
}
Expand Down Expand Up @@ -1205,7 +1224,7 @@ func (d *Driver) useDataPlaneAPI(ctx context.Context, volumeID, accountName stri
}

func (d *Driver) SetAzureCredentials(ctx context.Context, accountName, accountKey, secretName, secretNamespace string) (string, error) {
if d.cloud.KubeClient == nil {
if d.kubeClient == nil {
klog.Warningf("could not create secret: kubeClient is nil")
return "", nil
}
Expand All @@ -1226,7 +1245,7 @@ func (d *Driver) SetAzureCredentials(ctx context.Context, accountName, accountKe
},
Type: "Opaque",
}
_, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
_, err := d.kubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
err = nil
}
Expand Down
Loading
Loading