diff --git a/.env.example b/.env.example index ef42856a4c..3825ec0a37 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,5 @@ export K8S_NAMESPACE=smc export AZURE_SUBSCRIPTION=abc -export AZURE_RESOURCE_GROUP=rg export CTR_REGISTRY=your.azurecr.io/smc export CTR_REGISTRY_CREDS_NAME=acr-creds diff --git a/DEMO.md b/DEMO.md index ac79bbee58..2b2e4a0b25 100644 --- a/DEMO.md +++ b/DEMO.md @@ -30,7 +30,6 @@ In the root directory of the repo create a `.env` file. It is already listed in `.gitignore` so that anything you put in it would not accidentally leak into a public git repo. The `.env` file should contain the following Bash variables: - `K8S_NAMESPACE` - Namespace within your Kubernetes cluster, where SMC will be installed. This cannot be the `default` namespace because it has to be a namespace that can be deleted. - `AZURE_SUBSCRIPTION` - the Azure subscription where your Kubernete cluster resides. The demo will use this to configure the Endpoint Discovery Service's cloud observer. - - `AZURE_RESOURCE_GROUP` - Resource group where your Azure Kubernetes cluster resides. The demo uses this to configure the Endpoint Discovery Service so that it watches a particular subset of Azure resources. - `CTR_REGISTRY` - URL of the container registry. For example: `draychev.azurecr.io/smc` - `CTR_REGISTRY_CREDS_NAME` - name to be used for the Kubernetes secrets resource to be created from the Docker container registry. diff --git a/cmd/eds/clients.go b/cmd/eds/clients.go deleted file mode 100755 index 33cbdde397..0000000000 --- a/cmd/eds/clients.go +++ /dev/null @@ -1,58 +0,0 @@ -package main - -import ( - "time" - - smiClient "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/clientset/versioned" - "github.com/eapache/channels" - "github.com/golang/glog" - "k8s.io/client-go/tools/clientcmd" - - "github.com/deislabs/smc/pkg/catalog" - "github.com/deislabs/smc/pkg/mesh" - "github.com/deislabs/smc/pkg/providers/azure" - "github.com/deislabs/smc/pkg/providers/kube" -) - -// Categories of meshed service/compute providers -// TODO(draychev): further break down by k8s cluster, cloud subscription etc. -var ( - providerAzure = "providerAzure" - providerKubernetes = "providerKubernetes" -) - -func setupClients(announceChan *channels.RingChannel) (map[string]mesh.ComputeProviderI, mesh.SpecI, mesh.ServiceCatalogI) { - kubeConfig, err := clientcmd.BuildConfigFromFlags("", *kubeConfigFile) - if err != nil { - glog.Fatalf("Error gathering Kubernetes config. Ensure correctness of CLI argument 'kubeconfig=%s': %s", *kubeConfigFile, err) - } - - smiResources := smiClient.NewForConfigOrDie(kubeConfig) - kubernetesProvider := kube.NewProvider(kubeConfig, smiResources, getNamespaces(), 1*time.Second, announceChan) - azureProvider := azure.NewProvider(*subscriptionID, *resourceGroup, *namespace, *azureAuthFile, maxAuthRetryCount, retryPause, announceChan) - stopChan := make(chan struct{}) - - // Setup all Compute Providers -- these are Kubernetes, cloud provider virtual machines, etc. - // TODO(draychev): How do we add multiple Kubernetes clusters? Multiple Azure subscriptions? - computeProviders := map[string]mesh.ComputeProviderI{ - providerKubernetes: kubernetesProvider, - providerAzure: azureProvider, - } - - // Run each provider -- starting the pub/sub system, which leverages the announceChan channel - for providerType, provider := range computeProviders { - if err := provider.Run(stopChan); err != nil { - glog.Errorf("Could not start %s provider: %s", providerType, err) - continue - } - glog.Infof("Started provider %s", providerType) - } - - // Mesh Spec Provider is something, which we query for SMI spec. Gives us the declaration of the service mesh. - meshSpecProvider := kubernetesProvider - - // ServiceName Catalog is the facility, which we query to get the list of services, weights for traffic split etc. - serviceCatalog := catalog.NewServiceCatalog(computeProviders, meshSpecProvider) - - return computeProviders, meshSpecProvider, serviceCatalog -} diff --git a/cmd/eds/eds.go b/cmd/eds/eds.go index 2f8e907de6..5b681dad1a 100755 --- a/cmd/eds/eds.go +++ b/cmd/eds/eds.go @@ -5,37 +5,45 @@ import ( "flag" "fmt" "os" + "os/signal" "strings" + "syscall" "time" + "github.com/deislabs/smc/pkg/utils" + "github.com/eapache/channels" envoyControlPlane "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/golang/glog" "github.com/spf13/pflag" + "k8s.io/client-go/tools/clientcmd" - "github.com/deislabs/smc/cmd" + "github.com/deislabs/smc/pkg/catalog" edsServer "github.com/deislabs/smc/pkg/envoy/eds" + "github.com/deislabs/smc/pkg/providers/azure" + "github.com/deislabs/smc/pkg/providers/kube" ) const ( serverType = "EDS" - port = 15124 - verbosityFlag = "verbosity" defaultNamespace = "default" maxAuthRetryCount = 10 retryPause = 10 * time.Second + + azureProvidername = "Azure" + kubernetesProviderName = "Kubernetes" ) var ( flags = pflag.NewFlagSet(`diplomat-edsServer`, pflag.ExitOnError) kubeConfigFile = flags.String("kubeconfig", "", "Path to Kubernetes config file.") azureAuthFile = flags.String("azureAuthFile", "", "Path to Azure Auth File") - resourceGroup = flags.String("resource-group", "", "Azure Resource Group") subscriptionID = flags.String("subscriptionID", "", "Azure Subscription") - verbosity = flags.Int(verbosityFlag, 1, "Set logging verbosity level") + verbosity = flags.Int("verbosity", 1, "Set logging verbosity level") namespace = flags.String("namespace", "default", "Kubernetes namespace to watch.") + port = flags.Int("port", 15124, "Endpoint Discovery Service port number. (Default: 15124)") ) func main() { @@ -50,14 +58,34 @@ func main() { // which would trigger Envoy updates. announceChan := channels.NewRingChannel(1024) - computeProviders, meshSpec, serviceCatalog := setupClients(announceChan) - grpcServer, lis := cmd.NewGrpc(serverType, port) - eds := edsServer.NewEDSServer(ctx, computeProviders, serviceCatalog, meshSpec, announceChan) + kubeConfig, err := clientcmd.BuildConfigFromFlags("", *kubeConfigFile) + if err != nil { + glog.Fatalf("Error gathering Kubernetes config. Ensure correctness of CLI argument 'kubeconfig=%s': %s", *kubeConfigFile, err) + } + + stopChan := make(chan struct{}) + meshSpec := kube.NewMeshSpecClient(kubeConfig, getNamespaces(), 1*time.Second, announceChan, stopChan) + kubernetesProvider := kube.NewProvider(kubeConfig, getNamespaces(), 1*time.Second, announceChan, kubernetesProviderName) + azureProvider := azure.NewProvider(*subscriptionID, *namespace, *azureAuthFile, maxAuthRetryCount, retryPause, announceChan, meshSpec, azureProvidername) + + // ServiceName Catalog is the facility, which we query to get the list of services, weights for traffic split etc. + serviceCatalog := catalog.NewServiceCatalog(meshSpec, stopChan, kubernetesProvider, azureProvider) + + grpcServer, lis := utils.NewGrpc(serverType, *port) + eds := edsServer.NewEDSServer(ctx, serviceCatalog, meshSpec, announceChan) envoyControlPlane.RegisterEndpointDiscoveryServiceServer(grpcServer, eds) - cmd.GrpcServe(ctx, grpcServer, lis, cancel, serverType) + go utils.GrpcServe(ctx, grpcServer, lis, cancel, serverType) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + close(stopChan) + glog.Info("Goodbye!") } func parseFlags() { + // TODO(draychev): consolidate parseFlags - shared between sds.go and eds.go if err := flags.Parse(os.Args); err != nil { glog.Error(err) } @@ -71,8 +99,9 @@ func getNamespaces() []string { if namespace == nil { defaultNS := defaultNamespace namespaces = []string{defaultNS} + } else { + namespaces = []string{*namespace} } - namespaces = []string{*namespace} glog.Infof("Observing namespaces: %s", strings.Join(namespaces, ",")) return namespaces } diff --git a/cmd/sds/sds.go b/cmd/sds/sds.go index 51a21e6589..ba0a385e5d 100755 --- a/cmd/sds/sds.go +++ b/cmd/sds/sds.go @@ -5,26 +5,27 @@ import ( "flag" "fmt" "os" + "os/signal" + "syscall" + + "github.com/deislabs/smc/pkg/utils" envoyControlPlane "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/golang/glog" "github.com/spf13/pflag" - "github.com/deislabs/smc/cmd" sdsServer "github.com/deislabs/smc/pkg/envoy/sds" ) const ( serverType = "SDS" - port = 15123 - - verbosityFlag = "verbosity" ) var ( flags = pflag.NewFlagSet(`diplomat-sds`, pflag.ExitOnError) keysDirectory = flags.String("keys-directory", "", "Directory where the keys are stored") - verbosity = flags.Int(verbosityFlag, 1, "Set logging verbosity level") + verbosity = flags.Int("verbosity", 1, "Set logging verbosity level") + port = flags.Int("port", 15123, "Service Discovery Service port number. (Default: 15123)") ) func main() { @@ -34,13 +35,20 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcServer, lis := cmd.NewGrpc(serverType, port) + grpcServer, lis := utils.NewGrpc(serverType, *port) sds := sdsServer.NewSDSServer(keysDirectory) envoyControlPlane.RegisterSecretDiscoveryServiceServer(grpcServer, sds) - cmd.GrpcServe(ctx, grpcServer, lis, cancel, serverType) + go utils.GrpcServe(ctx, grpcServer, lis, cancel, serverType) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + glog.Info("Goodbye!") } func parseFlags() { + // TODO(draychev): consolidate parseFlags - shared between sds.go and eds.go if err := flags.Parse(os.Args); err != nil { glog.Error(err) } diff --git a/demo/AzureResource.yaml b/demo/AzureResource.yaml new file mode 100644 index 0000000000..dec6c115d3 --- /dev/null +++ b/demo/AzureResource.yaml @@ -0,0 +1,9 @@ +apiVersion: smc.osm.k8s.io/v1 +kind: AzureResource +metadata: + name: bookstore + namespace: smc + labels: + app: bookstore-1 +spec: + resourceid: /subscriptions/abc/resourcegroups/xyz/providers/Microsoft.Compute/virtualMachines/myVM diff --git a/demo/deploy-eds.sh b/demo/deploy-eds.sh index ddcd4e7313..953a7b9a13 100755 --- a/demo/deploy-eds.sh +++ b/demo/deploy-eds.sh @@ -54,8 +54,6 @@ spec: args: - "--kubeconfig" - "/kube/config" - - "--resource-group" - - "$AZURE_RESOURCE_GROUP" - "--azureAuthFile" - "/azure/azureAuth.json" - "--subscriptionID" @@ -82,4 +80,3 @@ spec: imagePullSecrets: - name: "$CTR_REGISTRY_CREDS_NAME" EOF - diff --git a/dockerfiles/Dockerfile.eds b/dockerfiles/Dockerfile.eds index d5fe1c0570..ef9a920570 100644 --- a/dockerfiles/Dockerfile.eds +++ b/dockerfiles/Dockerfile.eds @@ -1,5 +1,5 @@ FROM alpine:3.10.1 ADD bin/eds /eds -ADD ./run-eds.sh /run-eds.sh +ADD ./scripts/run-eds.sh /run-eds.sh RUN chmod +x /eds RUN chmod +x /run-eds.sh diff --git a/go.mod b/go.mod index b0d1c0c233..982028c4c7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.3.2 github.com/json-iterator/go v1.1.8 // indirect + github.com/onsi/ginkgo v1.8.0 + github.com/onsi/gomega v1.5.0 github.com/pkg/errors v0.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.4.0 // indirect diff --git a/go.sum b/go.sum index f32c191bab..4564b5d9b6 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,7 @@ github.com/envoyproxy/protoc-gen-validate v0.0.0-20190405222122-d6164de49109 h1: github.com/envoyproxy/protoc-gen-validate v0.0.0-20190405222122-d6164de49109/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -146,6 +147,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= @@ -187,10 +189,12 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= @@ -360,11 +364,13 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -400,6 +406,7 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/kube-openapi v0.0.0-20190502190224-411b2483e503/go.mod h1:iU+ZGYsNlvU9XKUSso6SQfKTCCw7lFduMZy26Mgr2Fw= +k8s.io/kube-openapi v0.0.0-20190603182131-db7b694dc208 h1:5sW+fEHvlJI3Ngolx30CmubFulwH28DhKjGf70Xmtco= k8s.io/kube-openapi v0.0.0-20190603182131-db7b694dc208/go.mod h1:nfDlWeOsu3pUf4yWGL+ERqohP4YsZcBJXWMK+gkzOA4= k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a h1:2jUDc9gJja832Ftp+QbDV0tVhQHMISFn01els+2ZAcw= diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 0ba50327b3..c9043b0fba 100755 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -3,6 +3,7 @@ package catalog import ( "fmt" "strings" + "time" "github.com/golang/glog" @@ -10,19 +11,39 @@ import ( ) // NewServiceCatalog creates a new service catalog -func NewServiceCatalog(computeProviders map[string]mesh.ComputeProviderI, meshSpecProvider mesh.SpecI) mesh.ServiceCatalogI { - glog.Info("[catalog] Crete new Service Catalog...") - return ServiceCatalog{ +func NewServiceCatalog(meshSpecProvider mesh.SpecI, stopChan chan struct{}, computeProviders ...mesh.ComputeProviderI) mesh.ServiceCatalogI { + // Run each provider -- starting the pub/sub system, which leverages the announceChan channel + for _, provider := range computeProviders { + if err := provider.Run(stopChan); err != nil { + glog.Errorf("Could not start %s provider: %s", provider.GetID(), err) + continue + } + glog.Infof("Started provider %s", provider.GetID()) + } + glog.Info("[catalog] Create a new Service Catalog.") + serviceCatalog := ServiceCatalog{ servicesCache: make(map[mesh.ServiceName][]mesh.IP), computeProviders: computeProviders, - meshSpecProvider: meshSpecProvider, + meshSpec: meshSpecProvider, } + + // NOTE(draychev): helpful while developing alpha MVP -- remove before releasing beta version. + go func() { + counter := 0 + for { + glog.V(7).Infof("------------------------- Service Catalog Cache Refresh %d -------------------------", counter) + counter++ + serviceCatalog.refreshCache() + time.Sleep(5 * time.Second) + } + }() + return &serviceCatalog } // GetWeightedService gets the backing delegated services for the given target service and their weights. -func (sc ServiceCatalog) GetWeightedService(svcName mesh.ServiceName) ([]mesh.WeightedService, error) { +func (sc *ServiceCatalog) GetWeightedService(svcName mesh.ServiceName) ([]mesh.WeightedService, error) { var weightedServices []mesh.WeightedService - for _, split := range sc.meshSpecProvider.ListTrafficSplits() { + for _, split := range sc.meshSpec.ListTrafficSplits() { if mesh.ServiceName(split.Spec.Service) == svcName { for _, backend := range split.Spec.Backends { namespaced := fmt.Sprintf("%s/%s", split.Namespace, backend.Service) @@ -41,12 +62,14 @@ func (sc ServiceCatalog) GetWeightedService(svcName mesh.ServiceName) ([]mesh.We } // GetWeightedServices gets all services and their delegated services and weights -func (sc ServiceCatalog) GetWeightedServices() (map[mesh.ServiceName][]mesh.WeightedService, error) { +func (sc *ServiceCatalog) GetWeightedServices() (map[mesh.ServiceName][]mesh.WeightedService, error) { + sc.Lock() + defer sc.Unlock() glog.Info("[catalog] GetWeightedServices") byTargetService := make(map[mesh.ServiceName][]mesh.WeightedService) // TODO trafficSplit name must match Envoy's cluster name backendWeight := make(map[string]int) - for _, trafficSplit := range sc.meshSpecProvider.ListTrafficSplits() { + for _, trafficSplit := range sc.meshSpec.ListTrafficSplits() { targetServiceName := mesh.ServiceName(trafficSplit.Spec.Service) var services []mesh.WeightedService glog.V(7).Infof("[EDS] Discovered TrafficSplit resource: %s/%s for service %s\n", trafficSplit.Namespace, trafficSplit.Name, targetServiceName) @@ -75,7 +98,9 @@ func (sc ServiceCatalog) GetWeightedServices() (map[mesh.ServiceName][]mesh.Weig } // GetServiceIPs retrieves the IP addresses for the given service -func (sc ServiceCatalog) GetServiceIPs(namespacedServiceName mesh.ServiceName) ([]mesh.IP, error) { +func (sc *ServiceCatalog) GetServiceIPs(namespacedServiceName mesh.ServiceName) ([]mesh.IP, error) { + sc.Lock() + defer sc.Unlock() // TODO(draychev): split namespace from the service name -- for non-K8s services glog.Infof("[catalog] GetServiceIPs %s", namespacedServiceName) if _, found := sc.servicesCache[namespacedServiceName]; !found { @@ -94,11 +119,11 @@ func (sc ServiceCatalog) GetServiceIPs(namespacedServiceName mesh.ServiceName) ( func (sc *ServiceCatalog) refreshCache() { glog.Info("[catalog] Refresh cache...") servicesCache := make(map[mesh.ServiceName][]mesh.IP) - for providerType, provider := range sc.computeProviders { - // TODO(draychev): split the namespace from the service name -- non-K8s services won't have namespace - for _, namespacedServiceName := range sc.meshSpecProvider.ListServices() { + // TODO(draychev): split the namespace from the service name -- non-K8s services won't have namespace + for _, namespacedServiceName := range sc.meshSpec.ListServices() { + for _, provider := range sc.computeProviders { newIps := provider.GetIPs(namespacedServiceName) - glog.Infof("[catalog] Found ips=%+v for service=%s for provider=%s", ipsToString(newIps), namespacedServiceName, providerType) + glog.Infof("[catalog] Found ips=%+v for service=%s for provider=%s", ipsToString(newIps), namespacedServiceName, provider.GetID()) if existingIps, exists := servicesCache[namespacedServiceName]; exists { servicesCache[namespacedServiceName] = append(existingIps, newIps...) } else { @@ -107,7 +132,9 @@ func (sc *ServiceCatalog) refreshCache() { } } glog.Infof("[catalog] Services cache: %+v", servicesCache) + sc.Lock() sc.servicesCache = servicesCache + sc.Unlock() } func ipsToString(meshIPs []mesh.IP) []string { diff --git a/pkg/catalog/types.go b/pkg/catalog/types.go index d892f61028..62eb021d5e 100755 --- a/pkg/catalog/types.go +++ b/pkg/catalog/types.go @@ -1,21 +1,15 @@ package catalog import ( + "sync" + "github.com/deislabs/smc/pkg/mesh" - "github.com/deislabs/smc/pkg/providers/kube" ) -// Service is the struct for a service in the service catalog -type Service struct { - name mesh.ServiceName - ips []mesh.IP - provider mesh.ServiceProviderI - kubernetesClient *kube.KubernetesProvider -} - // ServiceCatalog is the struct for the service catalog type ServiceCatalog struct { + sync.Mutex servicesCache map[mesh.ServiceName][]mesh.IP - computeProviders map[string]mesh.ComputeProviderI - meshSpecProvider mesh.SpecI + computeProviders []mesh.ComputeProviderI + meshSpec mesh.SpecI } diff --git a/pkg/envoy/eds/server.go b/pkg/envoy/eds/server.go index d1378728cb..8532ebbce9 100755 --- a/pkg/envoy/eds/server.go +++ b/pkg/envoy/eds/server.go @@ -12,13 +12,12 @@ import ( "github.com/deislabs/smc/pkg/mesh" ) -// EDS implements the Envoy xDS Endpoint Discovery ServiceI +// EDS implements the Envoy xDS Endpoint Discovery Service type EDS struct { - ctx context.Context // root context - catalog mesh.ServiceCatalogI - computeProviders map[string]mesh.ComputeProviderI - meshSpecProvider mesh.SpecI - announceChan *channels.RingChannel + ctx context.Context // root context + catalog mesh.ServiceCatalogI + meshSpec mesh.SpecI + announceChan *channels.RingChannel } // FetchEndpoints is required by the EDS interface @@ -32,14 +31,13 @@ func (e *EDS) DeltaEndpoints(xds.EndpointDiscoveryService_DeltaEndpointsServer) } // NewEDSServer creates a new EDS server -func NewEDSServer(ctx context.Context, computeProviders map[string]mesh.ComputeProviderI, catalog mesh.ServiceCatalogI, meshTopology mesh.SpecI, announceChan *channels.RingChannel) *EDS { +func NewEDSServer(ctx context.Context, catalog mesh.ServiceCatalogI, meshSpec mesh.SpecI, announceChan *channels.RingChannel) *EDS { glog.Info("[EDS] Create NewEDSServer...") return &EDS{ - ctx: ctx, - catalog: catalog, - computeProviders: computeProviders, - meshSpecProvider: meshTopology, - announceChan: announceChan, + ctx: ctx, + catalog: catalog, + meshSpec: meshSpec, + announceChan: announceChan, } } diff --git a/pkg/envoy/eds/stream.go b/pkg/envoy/eds/stream.go index aa76fac48f..ae6ec99cf2 100755 --- a/pkg/envoy/eds/stream.go +++ b/pkg/envoy/eds/stream.go @@ -39,20 +39,22 @@ func (e *edsStreamHandler) run(ctx context.Context, server envoy.EndpointDiscove select { case <-ctx.Done(): return nil - case _ = <-e.announceChan.Out(): + case <-e.announceChan.Out(): glog.V(1).Infof("[EDS] Received a change announcement...") if err := e.updateEnvoyProxies(server, request.TypeUrl); err != nil { return errors.Wrap(err, "error sending") } break Run - } } } } func (e *edsStreamHandler) updateEnvoyProxies(server envoy.EndpointDiscoveryService_StreamEndpointsServer, url string) error { - glog.Info("[stream] Update all envoy proxies...") + + // NOTE(draychev): This is currently focused only on fulfilling whatever is required to run a TrafficSplit demo. + + glog.Info("[EDS][stream] Update all envoy proxies...") allServices, err := e.catalog.GetWeightedServices() if err != nil { glog.Error("Could not refresh weighted services: ", err) diff --git a/pkg/mesh/types.go b/pkg/mesh/types.go index c3c4520281..c283d99680 100644 --- a/pkg/mesh/types.go +++ b/pkg/mesh/types.go @@ -1,6 +1,8 @@ package mesh -import "github.com/deislabs/smi-sdk-go/pkg/apis/split/v1alpha2" +import ( + "github.com/deislabs/smi-sdk-go/pkg/apis/split/v1alpha2" +) // ServiceCatalogI is an interface w/ requirements to implement a service catalog type ServiceCatalogI interface { @@ -12,9 +14,8 @@ type ServiceCatalogI interface { // ServiceName is a type for a service name type ServiceName string -// ServiceProviderI is an interface declaring the required functions for any compute provider -type ServiceProviderI interface { - GetIPs(svcName ServiceName) []IP +func (sn ServiceName) String() string { + return string(sn) } // WeightedService is a struct of a delegated service backing a target service @@ -27,14 +28,33 @@ type WeightedService struct { // IP is an IP address type IP string -// ComputeProviderI is an interface declaring what a compute provider should implement. +// ComputeProviderI interface to be implemented by Kubernetes, Azure etc. providers. type ComputeProviderI interface { - GetIPs(svc ServiceName) []IP - Run(stopCh <-chan struct{}) error + // Retrieve the IP addresses comprising the ServiceName. + GetIPs(ServiceName) []IP + GetID() string + Run(<-chan struct{}) error } // SpecI is an interface declaring what an SMI spec provider should implement. type SpecI interface { ListTrafficSplits() []*v1alpha2.TrafficSplit ListServices() []ServiceName + GetComputeIDForService(ServiceName) []ComputeID +} + +// AzureID is a string type alias, which is the URI of a unique Azure cloud resource. +type AzureID string + +// KubernetesID is a struct type, which points to a uniquely identifiable Kubernetes cluster. +type KubernetesID struct { + ClusterID string + Namespace string + Service string +} + +// ComputeID is a struct, which contains the unique IDs of hte compute clusters where certain service may have Endpoints in. +type ComputeID struct { + AzureID + KubernetesID } diff --git a/pkg/providers/azure/auth.go b/pkg/providers/azure/auth.go index 5ea6a55416..ff32842db3 100755 --- a/pkg/providers/azure/auth.go +++ b/pkg/providers/azure/auth.go @@ -10,10 +10,13 @@ import ( "github.com/golang/glog" ) +/* +// TODO(draychev) func waitForAzureAuth(azClient Client, maxAuthRetryCount int, retryPause time.Duration) error { retryCount := 0 for { - _, err := azClient.getVMSS() // Use any GET to verify auth + // TODO(draychev): this will not work -- come up with a different way to get a 200 OK + _, err := azClient.getVMSS("someResourceGroup", "vmID") // Use any GET to verify auth if err == nil { return nil } @@ -27,6 +30,7 @@ func waitForAzureAuth(azClient Client, maxAuthRetryCount int, retryPause time.Du time.Sleep(retryPause) } } +*/ func getAuthorizerWithRetry(azureAuthFile string, maxAuthRetryCount int, retryPause time.Duration) (autorest.Authorizer, error) { var err error diff --git a/pkg/providers/azure/client.go b/pkg/providers/azure/client.go new file mode 100644 index 0000000000..56265097e4 --- /dev/null +++ b/pkg/providers/azure/client.go @@ -0,0 +1,57 @@ +package azure + +import ( + "context" + "time" + + r "github.com/Azure/azure-sdk-for-go/profiles/latest/resources/mgmt/resources" + c "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" + n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" + "github.com/Azure/go-autorest/autorest" + "github.com/eapache/channels" + "github.com/golang/glog" + + "github.com/deislabs/smc/pkg/mesh" +) + +// newClient creates an Azure Client +func newClient(subscriptionID string, namespace string, azureAuthFile string, maxAuthRetryCount int, retryPause time.Duration, announceChan *channels.RingChannel, meshSpec mesh.SpecI, providerIdent string) mesh.ComputeProviderI { + var authorizer autorest.Authorizer + var err error + if authorizer, err = getAuthorizerWithRetry(azureAuthFile, maxAuthRetryCount, retryPause); err != nil { + glog.Fatal("Failed obtaining authentication token for Azure Resource Manager") + } + + // TODO(draychev): The subscriptionID should be observed from the AzureResource (SMI) + az := Client{ + namespace: namespace, + publicIPsClient: n.NewPublicIPAddressesClient(subscriptionID), + groupsClient: r.NewGroupsClient(subscriptionID), + deploymentsClient: r.NewDeploymentsClient(subscriptionID), + vmssClient: c.NewVirtualMachineScaleSetsClient(subscriptionID), + vmClient: c.NewVirtualMachinesClient(subscriptionID), + netClient: n.NewInterfacesClient(subscriptionID), + subscriptionID: subscriptionID, + ctx: context.Background(), + authorizer: authorizer, + announceChan: announceChan, + mesh: meshSpec, + providerIdent: providerIdent, + } + + az.publicIPsClient.Authorizer = az.authorizer + az.groupsClient.Authorizer = az.authorizer + az.deploymentsClient.Authorizer = az.authorizer + az.vmssClient.Authorizer = az.authorizer + az.vmClient.Authorizer = az.authorizer + az.netClient.Authorizer = az.authorizer + + /* + // TODO(draychev): enable this when you come up with a way to probe ARM w/ minimal context + if err = waitForAzureAuth(az, maxAuthRetryCount, retryPause); err != nil { + glog.Fatal("Failed authenticating with Azure Resource Manager: ", err) + } + */ + + return az +} diff --git a/pkg/providers/azure/errors.go b/pkg/providers/azure/errors.go index 2944f372a6..248d87ca3f 100755 --- a/pkg/providers/azure/errors.go +++ b/pkg/providers/azure/errors.go @@ -3,3 +3,4 @@ package azure import "errors" var errUnableToObtainArmAuth = errors.New("unable to obtain ARM authorizer") +var errIncorrectAzureURI = errors.New("incorrect Azure URI") diff --git a/pkg/providers/azure/provider.go b/pkg/providers/azure/provider.go index ad5549f27e..58e9745997 100755 --- a/pkg/providers/azure/provider.go +++ b/pkg/providers/azure/provider.go @@ -1,54 +1,54 @@ package azure import ( - "context" + "fmt" + "strings" "time" "github.com/eapache/channels" - - r "github.com/Azure/azure-sdk-for-go/profiles/latest/resources/mgmt/resources" - c "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" - n "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" - "github.com/Azure/go-autorest/autorest" "github.com/golang/glog" "github.com/deislabs/smc/pkg/mesh" ) // NewProvider creates an Azure Client -func NewProvider(subscriptionID string, resourceGroup string, namespace string, azureAuthFile string, maxAuthRetryCount int, retryPause time.Duration, announceChan *channels.RingChannel) Client { - var authorizer autorest.Authorizer - var err error - if authorizer, err = getAuthorizerWithRetry(azureAuthFile, maxAuthRetryCount, retryPause); err != nil { - glog.Fatal("Failed obtaining authentication token for Azure Resource Manager") - } - az := Client{ - namespace: namespace, - publicIPsClient: n.NewPublicIPAddressesClient(subscriptionID), - groupsClient: r.NewGroupsClient(subscriptionID), - deploymentsClient: r.NewDeploymentsClient(subscriptionID), - vmssClient: c.NewVirtualMachineScaleSetsClient(subscriptionID), - vmClient: c.NewVirtualMachinesClient(subscriptionID), - netClient: n.NewInterfacesClient(subscriptionID), - subscriptionID: subscriptionID, - resourceGroup: resourceGroup, - ctx: context.Background(), - authorizer: authorizer, - announceChan: announceChan, - } +func NewProvider(subscriptionID string, namespace string, azureAuthFile string, maxAuthRetryCount int, retryPause time.Duration, announceChan *channels.RingChannel, meshSpec mesh.SpecI, providerIdent string) mesh.ComputeProviderI { + return newClient(subscriptionID, namespace, azureAuthFile, maxAuthRetryCount, retryPause, announceChan, meshSpec, providerIdent) +} + +// GetIPs returns the IP addresses for the given ServiceName Name +// This function is required by the ComputeProviderI +func (az Client) GetIPs(svc mesh.ServiceName) []mesh.IP { + var azureIPs []mesh.IP + clusters := az.mesh.GetComputeIDForService(svc) + for _, cluster := range clusters { + if cluster.AzureID == "" { + continue + } - az.publicIPsClient.Authorizer = az.authorizer - az.groupsClient.Authorizer = az.authorizer - az.deploymentsClient.Authorizer = az.authorizer - az.vmssClient.Authorizer = az.authorizer - az.vmClient.Authorizer = az.authorizer - az.netClient.Authorizer = az.authorizer + glog.Infof("[azure] Getting IPs for service %s", svc) + resourceGroup, kind, _, err := parseAzureID(cluster.AzureID) + if err != nil { + glog.Errorf("Unable to parse Azure URI %s: %s", cluster.AzureID, err) + continue + } - if err = waitForAzureAuth(az, maxAuthRetryCount, retryPause); err != nil { - glog.Fatal("Failed authenticating with Azure Resource Manager: ", err) - } + var computeKindObserver = map[computeKind]computeObserver{ + vm: az.getVM, + vmss: az.getVMSS, + } - return az + if observer, ok := computeKindObserver[kind]; ok { + var ips []mesh.IP + var err error + ips, err = observer(resourceGroup, cluster.AzureID) + if err != nil { + glog.Error("Could not fetch VMSS services: ", err) + } + azureIPs = append(azureIPs, ips...) + } + } + return azureIPs } // Run starts the Azure observer @@ -58,28 +58,20 @@ func (az Client) Run(stopCh <-chan struct{}) error { return nil } -// GetIPs returns the IP addresses for the given ServiceName Name; This is required by the ComputeProviderI -func (az Client) GetIPs(svc mesh.ServiceName) []mesh.IP { - glog.Infof("[azure] Getting IPs for service %s", svc) - // TODO(draychev): from the ServiceName determine the AzureResource - - var ips []mesh.IP - - if vmssServices, err := az.getVMSS(); err != nil { - glog.Error("Could not fetch VMSS services: ", err) - } else { - if vmssIPs, exists := vmssServices[svc]; exists { - ips = append(ips, vmssIPs...) - } - } +// GetID returns the unique identifier for the compute provider. +// This string will be used by logging tools to contextualize messages. +func (az Client) GetID() string { + return az.providerIdent +} - if vmServices, err := az.getVM(); err != nil { - glog.Error("Could not fetch VM services: ", err) - } else { - if vmIPs, exists := vmServices[svc]; exists { - ips = append(ips, vmIPs...) - } +func parseAzureID(id mesh.AzureID) (resourceGroup, computeKind, computeName, error) { + // Sample URI: /resource/subscriptions/e3f0/resourceGroups/mesh-rg/providers/Microsoft.Compute/virtualMachineScaleSets/baz + chunks := strings.Split(string(id), "/") + if len(chunks) != 9 { + return "", "", "", errIncorrectAzureURI } - - return ips + resGroup := resourceGroup(chunks[4]) + kind := computeKind(fmt.Sprintf("%s/%s", chunks[6], chunks[7])) + name := computeName(chunks[8]) + return resGroup, kind, name, nil } diff --git a/pkg/providers/azure/provider_test.go b/pkg/providers/azure/provider_test.go new file mode 100644 index 0000000000..f4a54319bd --- /dev/null +++ b/pkg/providers/azure/provider_test.go @@ -0,0 +1,22 @@ +package azure + +import ( + "github.com/deislabs/smc/pkg/mesh" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Azure Compute Provider", func() { + Describe("Testing Azure Compute Provider", func() { + Context("Testing parseAzureID", func() { + uri := mesh.AzureID("/subscriptions/e3f0/resourceGroups/mesh-rg/providers/Microsoft.Compute/virtualMachineScaleSets/baz") + It("returns default value in absence of an env var", func() { + rg, kind, name, err := parseAzureID(uri) + Expect(err).ShouldNot(HaveOccurred()) + Expect(rg).To(Equal(resourceGroup("mesh-rg"))) + Expect(kind).To(Equal(computeKind("Microsoft.Compute/virtualMachineScaleSets"))) + Expect(name).To(Equal(computeName(("baz")))) + }) + }) + }) +}) diff --git a/pkg/providers/azure/suite_test.go b/pkg/providers/azure/suite_test.go new file mode 100644 index 0000000000..186a936f8c --- /dev/null +++ b/pkg/providers/azure/suite_test.go @@ -0,0 +1,13 @@ +package azure + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestProvider(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Environment Suite") +} diff --git a/pkg/providers/azure/types.go b/pkg/providers/azure/types.go index e361c1fd9b..6a7c116148 100755 --- a/pkg/providers/azure/types.go +++ b/pkg/providers/azure/types.go @@ -8,9 +8,23 @@ import ( "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" "github.com/Azure/go-autorest/autorest" "github.com/eapache/channels" + + "github.com/deislabs/smc/pkg/mesh" +) + +type resourceGroup string +type computeKind string +type computeName string + +const ( + vm computeKind = "Microsoft.Compute/virtualMachines" + vmss computeKind = "Microsoft.Compute/virtualMachineScaleSets" ) +type computeObserver func(resourceGroup, mesh.AzureID) ([]mesh.IP, error) + // Client is an Azure Client +// Implements interfaces: ComputeProviderI type Client struct { namespace string publicIPsClient network.PublicIPAddressesClient @@ -20,8 +34,12 @@ type Client struct { vmClient compute.VirtualMachinesClient authorizer autorest.Authorizer netClient network.InterfacesClient - resourceGroup string subscriptionID string ctx context.Context announceChan *channels.RingChannel + mesh mesh.SpecI + + // Free-form string identifying the compute provider: Azure, Kubernetes etc. + // This is used in logs + providerIdent string } diff --git a/pkg/providers/azure/vm.go b/pkg/providers/azure/vm.go index d63fc8e3cd..c9b446a8ac 100755 --- a/pkg/providers/azure/vm.go +++ b/pkg/providers/azure/vm.go @@ -2,7 +2,6 @@ package azure import ( "context" - "fmt" "time" "github.com/golang/glog" @@ -11,36 +10,37 @@ import ( "github.com/deislabs/smc/pkg/utils" ) -func (az *Client) getVM() (map[mesh.ServiceName][]mesh.IP, error) { - res := make(map[mesh.ServiceName][]mesh.IP) +func (az *Client) getVM(rg resourceGroup, vmID mesh.AzureID) ([]mesh.IP, error) { + glog.V(7).Infof("[azure] Fetching IPS of VM for %s in resource group: %s", vmID, rg) + var ips []mesh.IP ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - list, err := az.vmClient.List(ctx, az.resourceGroup) - glog.Infof("[EDS][ServiceName Discovery] List VMs for resource group: %s", az.resourceGroup) + list, err := az.vmClient.List(ctx, string(rg)) + glog.Infof("[azure] Listing VMs for resource group: %s", rg) if err != nil { - glog.Error("Could not retrieve all VMSS: ", err) + glog.Error("[azure] Could not retrieve all VMs: ", err) } for _, vm := range list.Values() { - netIf := (*vm.NetworkProfile.NetworkInterfaces)[0].ID - glog.Infof("[EDS][ServiceName Discovery] Found VM %s with NIC %s", *vm.Name, *netIf) + if *vm.ID != string(vmID) { + continue + } + networkInterfaceID := (*vm.NetworkProfile.NetworkInterfaces)[0].ID + glog.Infof("[azure] Found VM %s with NIC %s", *vm.Name, *networkInterfaceID) ctxA, cancel := context.WithTimeout(context.Background(), 5*time.Second) - ifName := utils.GetLastChunkOfSlashed(*netIf) - nif, err := az.netClient.Get(ctxA, az.resourceGroup, ifName, "") + interfaceName := utils.GetLastChunkOfSlashed(*networkInterfaceID) + networkInterfaceName, err := az.netClient.Get(ctxA, string(rg), interfaceName, "") if err != nil { - glog.Error("Could got get net interface ", ifName, " for resource group ", az.resourceGroup, err) + glog.Error("[azure] Could got get network interface ", interfaceName, " for resource group ", rg, err) cancel() continue } - for _, ipConf := range *nif.IPConfigurations { + for _, ipConf := range *networkInterfaceName.IPConfigurations { if ipConf.PrivateIPAddress != nil { - // TODO: gotta shoehorn the Kubernetes namespace here because we form the unique keys for the services - // from the Namespace and teh ServiceName from the TrafficSplit CRD - svcID := mesh.ServiceName(fmt.Sprintf("%s/%s", az.namespace, *vm.ID)) - res[svcID] = append(res[svcID], mesh.IP(*ipConf.PrivateIPAddress)) + ips = append(ips, mesh.IP(*ipConf.PrivateIPAddress)) } } cancel() } - return res, nil + return ips, nil } diff --git a/pkg/providers/azure/vmss.go b/pkg/providers/azure/vmss.go index 6337f27cd8..cae8c43444 100755 --- a/pkg/providers/azure/vmss.go +++ b/pkg/providers/azure/vmss.go @@ -8,17 +8,21 @@ import ( "github.com/golang/glog" ) -func (az *Client) getVMSS() (map[mesh.ServiceName][]mesh.IP, error) { - ips := make(map[mesh.ServiceName][]mesh.IP) +func (az *Client) getVMSS(rg resourceGroup, vmID mesh.AzureID) ([]mesh.IP, error) { + glog.V(7).Infof("[azure] Fetching IPS of VMSS for %s in resource group: %s", vmID, rg) + var ips []mesh.IP ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - glog.Info("List all VMSS for resource group: ", az.resourceGroup) - list, err := az.vmssClient.List(ctx, az.resourceGroup) + glog.Info("[azure] List all VMSS for resource group: ", rg) + list, err := az.vmssClient.List(ctx, string(rg)) if err != nil { - glog.Error("Could not retrieve all VMSS: ", err) + glog.Error("[azure] Could not retrieve all VMSS: ", err) } for _, vmss := range list.Values() { - glog.Infof("Found VMSS %s", *vmss.Name) + if *vmss.ID != string(vmID) { + continue + } + glog.Infof("[azure] Found VMSS %s", *vmss.Name) // TODO(draychev): get the IP address of each sub-instance and append to the list of IPs } return ips, nil diff --git a/pkg/providers/kube/client.go b/pkg/providers/kube/client.go new file mode 100644 index 0000000000..d07d1a2d8f --- /dev/null +++ b/pkg/providers/kube/client.go @@ -0,0 +1,79 @@ +package kube + +import ( + "time" + + "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + smiExternalVersions "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/informers/externalversions" + "github.com/eapache/channels" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + smcClient "github.com/deislabs/smc/pkg/smc_client/clientset/versioned" + smcInformers "github.com/deislabs/smc/pkg/smc_client/informers/externalversions" +) + +// NewClient creates a provider based on a Kubernetes client instance. +func NewClient(kubeClient *kubernetes.Clientset, smiClient *versioned.Clientset, azureResourceClient *smcClient.Clientset, namespaces []string, resyncPeriod time.Duration, announceChan *channels.RingChannel, providerIdent string) *Client { + var options []informers.SharedInformerOption + for _, namespace := range namespaces { + options = append(options, informers.WithNamespace(namespace)) + } + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, options...) + + informerCollection := InformerCollection{ + Endpoints: informerFactory.Core().V1().Endpoints().Informer(), + Services: informerFactory.Core().V1().Services().Informer(), + } + + // Service Mesh Interface informers are optional + var smiOptions []smiExternalVersions.SharedInformerOption + var smiInformerFactory smiExternalVersions.SharedInformerFactory + if smiClient != nil { + smiInformerFactory = smiExternalVersions.NewSharedInformerFactoryWithOptions(smiClient, resyncPeriod, smiOptions...) + informerCollection.TrafficSplit = smiInformerFactory.Split().V1alpha2().TrafficSplits().Informer() + } + + // Azure Resource CRD informers are optional + var azureResourceOptions []smcInformers.SharedInformerOption + var azureResourceFactory smcInformers.SharedInformerFactory + if azureResourceClient != nil { + azureResourceFactory = smcInformers.NewSharedInformerFactoryWithOptions(azureResourceClient, resyncPeriod, azureResourceOptions...) + informerCollection.AzureResource = azureResourceFactory.Smc().V1().AzureResources().Informer() + } + + cacheCollection := CacheCollection{ + Endpoints: informerCollection.Endpoints.GetStore(), + Services: informerCollection.Services.GetStore(), + } + + if informerCollection.TrafficSplit != nil { + cacheCollection.TrafficSplit = informerCollection.TrafficSplit.GetStore() + } + + if informerCollection.AzureResource != nil { + cacheCollection.AzureResource = informerCollection.AzureResource.GetStore() + } + + client := Client{ + providerIdent: providerIdent, + kubeClient: kubeClient, + informers: &informerCollection, + caches: &cacheCollection, + announceChan: announceChan, + cacheSynced: make(chan interface{}), + } + + h := handlers{client} + + resourceHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: h.addFunc, + UpdateFunc: h.updateFunc, + DeleteFunc: h.deleteFunc, + } + + informerCollection.Endpoints.AddEventHandler(resourceHandler) + + return &client +} diff --git a/pkg/providers/kube/handlers.go b/pkg/providers/kube/handlers.go index d069aee30d..3942011a68 100755 --- a/pkg/providers/kube/handlers.go +++ b/pkg/providers/kube/handlers.go @@ -9,32 +9,32 @@ import ( ) type handlers struct { - provider *KubernetesProvider + Client } // general resource handlers func (h handlers) addFunc(obj interface{}) { - glog.V(9).Infof("[kubernetes] Add event: %+v", obj) - h.provider.announceChan.In() <- events.Event{ + glog.V(9).Infof("[%s] Add event: %+v", h.providerIdent, obj) + h.announceChan.In() <- events.Event{ Type: events.Create, Value: obj, } } func (h handlers) updateFunc(oldObj, newObj interface{}) { - glog.V(9).Infof("[kubernetes] Update event %+v", oldObj) + glog.V(9).Infof("[%s] Update event %+v", h.providerIdent, oldObj) if reflect.DeepEqual(oldObj, newObj) { return } - h.provider.announceChan.In() <- events.Event{ + h.announceChan.In() <- events.Event{ Type: events.Update, Value: newObj, } } func (h handlers) deleteFunc(obj interface{}) { - glog.V(9).Infof("[kubernetes] Delete event: %+v", obj) - h.provider.announceChan.In() <- events.Event{ + glog.V(9).Infof("[%s] Delete event: %+v", h.providerIdent, obj) + h.announceChan.In() <- events.Event{ Type: events.Delete, Value: obj, } diff --git a/pkg/providers/kube/mesh_spec.go b/pkg/providers/kube/mesh_spec.go index d1c37142ee..811109d5ed 100755 --- a/pkg/providers/kube/mesh_spec.go +++ b/pkg/providers/kube/mesh_spec.go @@ -2,24 +2,46 @@ package kube import ( "fmt" + "time" - "github.com/deislabs/smc/pkg/mesh" "github.com/deislabs/smi-sdk-go/pkg/apis/split/v1alpha2" + smiClient "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + "github.com/eapache/channels" "github.com/golang/glog" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + smc "github.com/deislabs/smc/pkg/apis/azureresource/v1" + "github.com/deislabs/smc/pkg/mesh" + smcClient "github.com/deislabs/smc/pkg/smc_client/clientset/versioned" ) +// We have a few different k8s clients. This identifies these in logs. +const kubernetesClientName = "MeshSpec" + +// NewMeshSpecClient creates the Kubernetes client, which retrieves SMI specific CRDs. +func NewMeshSpecClient(kubeConfig *rest.Config, namespaces []string, resyncPeriod time.Duration, announceChan *channels.RingChannel, stopChan chan struct{}) mesh.SpecI { + kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) + smiClientset := smiClient.NewForConfigOrDie(kubeConfig) + azureResourceClient := smcClient.NewForConfigOrDie(kubeConfig) + k8sClient := NewClient(kubeClient, smiClientset, azureResourceClient, namespaces, resyncPeriod, announceChan, kubernetesClientName) + err := k8sClient.Run(stopChan) + if err != nil { + glog.Fatalf("Could not start %s client: %s", kubernetesClientName, err) + } + return k8sClient +} + // GetTrafficSplitWeight retrieves the weight for the given service -func (kp *KubernetesProvider) GetTrafficSplitWeight(target mesh.ServiceName, delegate mesh.ServiceName) (int, error) { - fmt.Printf("Here is kp: %+v", kp) - fmt.Printf("Here is kp.Caches: %+v", kp.Caches) - fmt.Printf("Here is kp.Caches.TrafficSplit: %+v", kp.Caches.TrafficSplit) - item, exists, err := kp.Caches.TrafficSplit.Get(target) +func (c *Client) GetTrafficSplitWeight(target mesh.ServiceName, delegate mesh.ServiceName) (int, error) { + item, exists, err := c.caches.TrafficSplit.Get(target) if err != nil { - glog.Errorf("[kubernetes] Error retrieving %v from TrafficSplit cache", target) + glog.Errorf("[%s] Error retrieving %v from TrafficSplit cache", kubernetesClientName, target) return 0, errRetrievingFromCache } if !exists { - glog.Errorf("[kubernetes] %v does not exist in TrafficSplit cache", target) + glog.Errorf("[%s] %v does not exist in TrafficSplit cache", kubernetesClientName, target) return 0, errNotInCache } ts := item.(v1alpha2.TrafficSplit) @@ -28,14 +50,14 @@ func (kp *KubernetesProvider) GetTrafficSplitWeight(target mesh.ServiceName, del return be.Weight, nil } } - glog.Errorf("[kubernetes] Was looking for delegate %s for target service %s but did not find it", delegate, target) + glog.Errorf("[MeshSpec] Was looking for delegate %s for target service %s but did not find it", delegate, target) return 0, errBackendNotFound } // ListTrafficSplits returns the list of traffic splits. -func (kp *KubernetesProvider) ListTrafficSplits() []*v1alpha2.TrafficSplit { +func (c *Client) ListTrafficSplits() []*v1alpha2.TrafficSplit { var trafficSplits []*v1alpha2.TrafficSplit - for _, splitIface := range kp.Caches.TrafficSplit.List() { + for _, splitIface := range c.caches.TrafficSplit.List() { split := splitIface.(*v1alpha2.TrafficSplit) trafficSplits = append(trafficSplits, split) } @@ -43,10 +65,10 @@ func (kp *KubernetesProvider) ListTrafficSplits() []*v1alpha2.TrafficSplit { } // ListServices lists the services observed from the given compute provider -func (kp *KubernetesProvider) ListServices() []mesh.ServiceName { - // TODO(draychev): split the namespace and the service name -- for non-kubernetes services we won't have namespace +func (c *Client) ListServices() []mesh.ServiceName { + // TODO(draychev): split the namespace and the service kubernetesClientName -- for non-kubernetes services we won't have namespace var services []mesh.ServiceName - for _, splitIface := range kp.Caches.TrafficSplit.List() { + for _, splitIface := range c.caches.TrafficSplit.List() { split := splitIface.(*v1alpha2.TrafficSplit) namespacedServiceName := fmt.Sprintf("%s/%s", split.Namespace, split.Spec.Service) services = append(services, mesh.ServiceName(namespacedServiceName)) @@ -57,3 +79,58 @@ func (kp *KubernetesProvider) ListServices() []mesh.ServiceName { } return services } + +// GetComputeIDForService returns the collection of compute platforms / clusters, which form the given Mesh Service. +func (c *Client) GetComputeIDForService(svc mesh.ServiceName) []mesh.ComputeID { + var clusters []mesh.ComputeID + serviceInterface, exist, err := c.caches.Services.GetByKey(string(svc)) + if err != nil { + glog.Error("Error fetching Kubernetes Endpoints from cache: ", err) + return clusters + } + + if !exist { + glog.Errorf("Error fetching Kubernetes Endpoints from cache: ServiceName %s does not exist", svc) + return clusters + } + + if c.caches.AzureResource == nil { + //TODO(draychev): Should this be a Fatal? + glog.Error("AzureResource Kubernetes Cache is incorrectly setup") + return clusters + } + + var azureResourcesList []*smc.AzureResource + for _, azureResourceInterface := range c.caches.AzureResource.List() { + azureResourcesList = append(azureResourcesList, azureResourceInterface.(*smc.AzureResource)) + } + + clusters = append(clusters, matchServiceAzureResource(serviceInterface.(*v1.Service), azureResourcesList)) + + // TODO(draychev): populate list w/ !AzureResource + + return clusters +} + +type kv struct { + k string + v string +} + +func matchServiceAzureResource(svc *v1.Service, azureResourcesList []*smc.AzureResource) mesh.ComputeID { + azureResources := make(map[kv]*smc.AzureResource) + for _, azRes := range azureResourcesList { + for k, v := range azRes.ObjectMeta.Labels { + azureResources[kv{k, v}] = azRes + } + } + computeID := mesh.ComputeID{} + if service := svc; service != nil { + for k, v := range service.ObjectMeta.Labels { + if azRes, ok := azureResources[kv{k, v}]; ok && azRes != nil { + computeID.AzureID = mesh.AzureID(azRes.Spec.ResourceID) + } + } + } + return computeID +} diff --git a/pkg/providers/kube/mesh_spec_test.go b/pkg/providers/kube/mesh_spec_test.go new file mode 100644 index 0000000000..902fcc6b44 --- /dev/null +++ b/pkg/providers/kube/mesh_spec_test.go @@ -0,0 +1,44 @@ +package kube + +// func matchServiceAzureResource(svc *v1.Service, azureResourcesList []*smc.AzureResource) mesh.ComputeID { + +import ( + v12 "github.com/deislabs/smc/pkg/apis/azureresource/v1" + "github.com/deislabs/smc/pkg/mesh" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("Azure Compute Provider", func() { + Describe("Testing Azure Compute Provider", func() { + Context("Testing parseAzureID", func() { + It("returns default value in absence of an env var", func() { + svc := v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "k": "v", + "a": "b", + }, + }, + } + azureResources := []*v12.AzureResource{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "k": "v", + "a": "b", + "x": "y", + }, + }, + Spec: v12.AzureResourceSpec{ + ResourceID: "/one/two/three", + }, + }} + computeID := matchServiceAzureResource(&svc, azureResources) + expectedComputeID := mesh.ComputeID{AzureID: "/one/two/three"} + Expect(computeID).To(Equal(expectedComputeID)) + }) + }) + }) +}) diff --git a/pkg/providers/kube/provider.go b/pkg/providers/kube/provider.go index 68db2a7c84..5834f23651 100755 --- a/pkg/providers/kube/provider.go +++ b/pkg/providers/kube/provider.go @@ -4,30 +4,39 @@ import ( "time" "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/clientset/versioned" - smiExternalVersions "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/informers/externalversions" "github.com/eapache/channels" "github.com/golang/glog" v1 "k8s.io/api/core/v1" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/deislabs/smc/pkg/mesh" + smcClient "github.com/deislabs/smc/pkg/smc_client/clientset/versioned" ) +// NewProvider creates a new Kubernetes cluster/compute provider, which will inform SMC of Endpoints for a given service. +func NewProvider(kubeConfig *rest.Config, namespaces []string, resyncPeriod time.Duration, announceChan *channels.RingChannel, providerIdent string) mesh.ComputeProviderI { + kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) + // smiClient and azureResourceClient are used for SMI spec observation only + // these are not needed for the ComputeProviderI use-case + var smiClient *versioned.Clientset + var azureResourceClient *smcClient.Clientset + return NewClient(kubeClient, smiClient, azureResourceClient, namespaces, resyncPeriod, announceChan, providerIdent) +} + // GetIPs retrieves the list of IP addresses for the given service -func (kp KubernetesProvider) GetIPs(svc mesh.ServiceName) []mesh.IP { - glog.Infof("[kubernetes] Getting IPs for service %s", svc) +func (c Client) GetIPs(svc mesh.ServiceName) []mesh.IP { + glog.Infof("[%s] Getting IPs for service %s on Kubernetes", c.providerIdent, svc) var ips []mesh.IP - endpointsInterface, exist, err := kp.Caches.Endpoints.GetByKey(string(svc)) + endpointsInterface, exist, err := c.caches.Endpoints.GetByKey(string(svc)) if err != nil { - glog.Error("Error fetching endpoints from store, error occurred ", err) + glog.Errorf("[%s] Error fetching Kubernetes Endpoints from cache: %s", c.providerIdent, err) return ips } if !exist { - glog.Error("Error fetching endpoints from store! ServiceName does not exist: ", svc) + glog.Errorf("[%s] Error fetching Kubernetes Endpoints from cache: ServiceName %s does not exist", c.providerIdent, svc) return ips } @@ -41,81 +50,48 @@ func (kp KubernetesProvider) GetIPs(svc mesh.ServiceName) []mesh.IP { return ips } -// NewProvider creates a provider based on a Kubernetes client instance. -func NewProvider(kubeConfig *rest.Config, smiClient *versioned.Clientset, namespaces []string, resyncPeriod time.Duration, announceChan *channels.RingChannel) *KubernetesProvider { - kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) - - var options []informers.SharedInformerOption - for _, namespace := range namespaces { - options = append(options, informers.WithNamespace(namespace)) - } - - var smiOptions []smiExternalVersions.SharedInformerOption - informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, options...) - smiInformerFactory := smiExternalVersions.NewSharedInformerFactoryWithOptions(smiClient, resyncPeriod, smiOptions...) - - informerCollection := InformerCollection{ - Endpoints: informerFactory.Core().V1().Endpoints().Informer(), - Services: informerFactory.Core().V1().Services().Informer(), - TrafficSplit: smiInformerFactory.Split().V1alpha2().TrafficSplits().Informer(), - } - - cacheCollection := CacheCollection{ - Endpoints: informerCollection.Endpoints.GetStore(), - Services: informerCollection.Services.GetStore(), - TrafficSplit: informerCollection.TrafficSplit.GetStore(), - } - - context := &KubernetesProvider{ - kubeClient: kubeClient, - informers: &informerCollection, - Caches: &cacheCollection, - announceChan: announceChan, - CacheSynced: make(chan interface{}), - } - - h := handlers{context} - - resourceHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: h.addFunc, - UpdateFunc: h.updateFunc, - DeleteFunc: h.deleteFunc, - } - - informerCollection.Endpoints.AddEventHandler(resourceHandler) - - return context -} - // Run executes informer collection. -func (kp *KubernetesProvider) Run(stopCh <-chan struct{}) error { - glog.V(1).Infoln("k8s provider run started") +func (c *Client) Run(stopCh <-chan struct{}) error { + glog.V(1).Infoln("Kubernetes Compute Provider started") var hasSynced []cache.InformerSynced - if kp.informers == nil { + if c.informers == nil { return errInitInformers } - sharedInformers := []cache.SharedInformer{ - kp.informers.Endpoints, - kp.informers.Services, - kp.informers.TrafficSplit, + sharedInformers := map[friendlyName]cache.SharedInformer{ + "Endpoints": c.informers.Endpoints, + "Services": c.informers.Services, + "TrafficSplit": c.informers.TrafficSplit, + "AzureResource": c.informers.AzureResource, } - for _, informer := range sharedInformers { + var names []friendlyName + for name, informer := range sharedInformers { + // Depending on the use-case, some Informers from the collection may not have been initialized. + if informer == nil { + continue + } + names = append(names, name) + glog.Info("Starting informer: ", name) go informer.Run(stopCh) hasSynced = append(hasSynced, informer.HasSynced) } - glog.V(1).Infoln("Waiting for initial cache sync") + glog.V(1).Infof("Waiting informers cache sync: %+v", names) if !cache.WaitForCacheSync(stopCh, hasSynced...) { return errSyncingCaches } // Closing the cacheSynced channel signals to the rest of the system that... caches have been synced. - close(kp.CacheSynced) + close(c.cacheSynced) - glog.V(1).Infoln("initial cache sync done") - glog.V(1).Infoln("k8s provider run finished") + glog.V(1).Infof("Cache sync finished for %+v", names) return nil } + +// GetID returns a string descriptor / identifier of the compute provider. +// Required by interface: ComputeProviderI +func (c *Client) GetID() string { + return c.providerIdent +} diff --git a/pkg/providers/kube/suite_test.go b/pkg/providers/kube/suite_test.go new file mode 100644 index 0000000000..13c727d232 --- /dev/null +++ b/pkg/providers/kube/suite_test.go @@ -0,0 +1,13 @@ +package kube + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestProvider(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Environment Suite") +} diff --git a/pkg/providers/kube/types.go b/pkg/providers/kube/types.go index b16b45c37a..91f4de06a9 100755 --- a/pkg/providers/kube/types.go +++ b/pkg/providers/kube/types.go @@ -6,27 +6,33 @@ import ( "k8s.io/client-go/tools/cache" ) +type friendlyName string + // InformerCollection is a struct of the Kubernetes informers used in SMC type InformerCollection struct { - Endpoints cache.SharedIndexInformer - Services cache.SharedIndexInformer - Pods cache.SharedIndexInformer - TrafficSplit cache.SharedIndexInformer + Endpoints cache.SharedIndexInformer + Services cache.SharedIndexInformer + Pods cache.SharedIndexInformer + TrafficSplit cache.SharedIndexInformer + AzureResource cache.SharedIndexInformer } // CacheCollection is a struct of the Kubernetes caches used in SMC type CacheCollection struct { - Endpoints cache.Store - Services cache.Store - Pods cache.Store - TrafficSplit cache.Store + Endpoints cache.Store + Services cache.Store + Pods cache.Store + TrafficSplit cache.Store + AzureResource cache.Store } -// KubernetesProvider is a struct of the Kubernetes config and components used in SMC -type KubernetesProvider struct { - kubeClient kubernetes.Interface - informers *InformerCollection - Caches *CacheCollection - announceChan *channels.RingChannel - CacheSynced chan interface{} +// Client is a struct for all components necessary to connect to and maintain state of a Kubernetes cluster. +// Implements interfaces: ComputeProviderI +type Client struct { + caches *CacheCollection + cacheSynced chan interface{} + providerIdent string + kubeClient kubernetes.Interface + informers *InformerCollection + announceChan *channels.RingChannel } diff --git a/cmd/grpc.go b/pkg/utils/grpc.go old mode 100755 new mode 100644 similarity index 99% rename from cmd/grpc.go rename to pkg/utils/grpc.go index aa2be00bd4..3391bfa1bc --- a/cmd/grpc.go +++ b/pkg/utils/grpc.go @@ -1,14 +1,13 @@ -package cmd +package utils import ( "context" "fmt" - "net" - "time" - "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + "net" + "time" ) const ( diff --git a/run-eds.sh b/scripts/run-eds.sh similarity index 90% rename from run-eds.sh rename to scripts/run-eds.sh index fbbbedb373..5e8399ff30 100755 --- a/run-eds.sh +++ b/scripts/run-eds.sh @@ -15,7 +15,6 @@ source .env ./bin/eds \ --kubeconfig="$HOME/.kube/config" \ - --resource-group="$AZURE_RESOURCE_GROUP" \ --azureAuthFile="$HOME/.azure/azureAuth.json" \ --subscriptionID="$AZURE_SUBSCRIPTION" \ --namespace="$K8S_NAMESPACE" \ diff --git a/run-sds.sh b/scripts/run-sds.sh similarity index 100% rename from run-sds.sh rename to scripts/run-sds.sh