Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ e2e: kind-clusters ## Runs end-to-end tests against KinD clusters
$(foreach suite, $(TEST_SUITES), \
PATH=$(LOCALBIN):$$PATH \
TAG=$$local_tag \
go test -tags=integ -run TestTraffic $(PROJECT_DIR)/test/e2e/scenarios/$(suite) \
go test -tags=integ -timeout 30m -run TestTraffic $(PROJECT_DIR)/test/e2e/scenarios/$(suite) \
--istio.test.hub=docker.io/istio\
--istio.test.tag=$(ISTIO_VERSION)\
--istio.test.kube.config=$(PROJECT_DIR)/test/east.kubeconfig,$(PROJECT_DIR)/test/west.kubeconfig,$(PROJECT_DIR)/test/central.kubeconfig\
Expand Down
13 changes: 1 addition & 12 deletions api/v1alpha1/meshfederation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,9 @@ type MeshFederationList struct {

// MeshFederationSpec defines the desired state of MeshFederation.
type MeshFederationSpec struct {
// Network name used by Istio for load balancing
// +kubebuilder:validation:Required
Network string `json:"network"`

// +kubebuilder:default:=cluster.local
TrustDomain string `json:"trustDomain"`

// Namespace used to create mesh-wide resources
// +kubebuilder:default:=istio-system
ControlPlaneNamespace string `json:"controlPlaneNamespace"`

// TODO: CRD proposal states "If no ingress is specified, it means the controller supports only single network topology". However, some config, such as gateway/port config, seems to be required.
// Config specifying ingress type and ingress gateway config
// +kubebuilder:validation:Required
// +kubebuilder:validation:Optional
IngressConfig IngressConfig `json:"ingress"`

// Selects the K8s Services to export to all remote meshes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ spec:
spec:
description: MeshFederationSpec defines the desired state of MeshFederation.
properties:
controlPlaneNamespace:
default: istio-system
description: Namespace used to create mesh-wide resources
type: string
export:
description: |-
Selects the K8s Services to export to all remote meshes.
Expand Down Expand Up @@ -154,17 +150,6 @@ spec:
- gateway
- type
type: object
network:
description: Network name used by Istio for load balancing
type: string
trustDomain:
default: cluster.local
type: string
required:
- controlPlaneNamespace
- ingress
- network
- trustDomain
type: object
status:
description: MeshFederationStatus defines the observed state of MeshFederation.
Expand Down
15 changes: 8 additions & 7 deletions chart/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@ rules:
verbs: ["get", "watch", "list"]
- apiGroups: ["networking.istio.io"]
resources: ["gateways", "serviceentries", "workloadentries"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["security.istio.io"]
resources: ["peerauthentications"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
{{- if (include "remotes.hasOpenshiftRouterPeer" .) }}
- apiGroups: ["networking.istio.io"]
resources: ["destinationrules"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
{{- end }}
{{- if eq .Values.federation.meshPeers.local.ingressType "openshift-router" }}
- apiGroups: ["networking.istio.io"]
resources: ["envoyfilters"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["route.openshift.io"]
resources: ["routes", "routes/custom-host"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
{{- end }}
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["federation.openshift-service-mesh.io"]
resources: ["meshfederations", "federatedservices"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["federation.openshift-service-mesh.io"]
resources: ["meshfederations/status", "federatedservices/status"]
verbs: ["get"]
verbs: ["get", "update", "patch"]
- apiGroups: ["federation.openshift-service-mesh.io"]
resources: ["meshfederations/finalizers"]
verbs: ["get", "update", "patch"]
149 changes: 27 additions & 122 deletions cmd/federation-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"fmt"
"os"
"os/signal"
"sort"
"syscall"
"time"

routev1client "github.com/openshift/client-go/route/clientset/versioned"
// +kubebuilder:scaffold:imports
routev1 "github.com/openshift/api/route/v1"
networkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
securityv1beta1 "istio.io/client-go/pkg/apis/security/v1beta1"
istiokube "istio.io/istio/pkg/kube"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/listers/core/v1"
// +kubebuilder:scaffold:imports
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand All @@ -47,13 +46,10 @@ import (
"github.com/openshift-service-mesh/federation/internal/pkg/config"
"github.com/openshift-service-mesh/federation/internal/pkg/istio"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/fds"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/informer"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/kube"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds/adsc"
"github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds/adss"
"github.com/openshift-service-mesh/federation/internal/pkg/networking"
"github.com/openshift-service-mesh/federation/internal/pkg/openshift"
"github.com/openshift-service-mesh/federation/internal/pkg/xds"
"github.com/openshift-service-mesh/federation/internal/pkg/xds/adsc"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -62,14 +58,9 @@ import (

var (
// Global variables to store the parsed commandline arguments
meshPeers,
exportedServiceSet,
importedServiceSet,
metricsAddr,
probeAddr string

enableLeaderElection,
useCtrls bool
meshPeers, exportedServiceSet, importedServiceSet,
metricsAddr, probeAddr string
enableLeaderElection bool

loggingOptions = istiolog.DefaultOptions()
log = istiolog.RegisterScope("default", "default logging scope")
Expand All @@ -80,7 +71,9 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1alpha1.AddToScheme(scheme))
utilruntime.Must(v1alpha1.AddToScheme(scheme))
utilruntime.Must(networkingv1alpha3.AddToScheme(scheme))
utilruntime.Must(securityv1beta1.AddToScheme(scheme))
utilruntime.Must(routev1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}

Expand All @@ -100,9 +93,6 @@ func parseFlags() {
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")

flag.BoolVar(&useCtrls, "use-ctrls", false,
"feature-flag: enables controller-runtime reconcilers instead of legacy mode.")

// Attach Istio logging options to the flag set
loggingOptions.AttachFlags(func(_ *[]string, _ string, _ []string, _ string) {
// unused and not available out-of-the box in flag package
Expand All @@ -115,6 +105,8 @@ func parseFlags() {
}

func main() {
opts := zap.Options{Development: true}
opts.BindFlags(flag.CommandLine)
parseFlags()

if err := istiolog.Configure(loggingOptions); err != nil {
Expand All @@ -126,21 +118,6 @@ func main() {
log.Fatalf("failed to parse configuration passed to the program arguments: %v", err)
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

if useCtrls {
runCtrls(ctx, cancel)
}

runLegacyMode(ctx, cfg)

<-ctx.Done()
}

func runCtrls(ctx context.Context, cancel context.CancelFunc) {
opts := zap.Options{Development: true}
opts.BindFlags(flag.CommandLine)
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Expand All @@ -157,7 +134,9 @@ func runCtrls(ctx context.Context, cancel context.CancelFunc) {
os.Exit(1)
}

if err = meshfederation.NewReconciler(mgr.GetClient()).SetupWithManager(mgr); err != nil {
meshConfigPushRequests := make(chan xds.PushRequest)

if err = meshfederation.NewReconciler(mgr.GetClient(), cfg.MeshPeers.Remotes, meshConfigPushRequests).SetupWithManager(mgr); err != nil {
log.Errorf("unable to create controller for MeshFederation custom resource: %s", err)
os.Exit(1)
}
Expand All @@ -166,6 +145,7 @@ func runCtrls(ctx context.Context, cancel context.CancelFunc) {
os.Exit(1)
}
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Errorf("unable to set up health check: %s", err)
os.Exit(1)
Expand All @@ -174,16 +154,18 @@ func runCtrls(ctx context.Context, cancel context.CancelFunc) {
log.Errorf("unable to set up ready check: %s", err)
os.Exit(1)
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

go func() {
log.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
log.Errorf("failed to start manager: %s", err)
cancel()
}
}()
}

func runLegacyMode(ctx context.Context, cfg *config.Federation) {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("failed to create in-cluster config: %v", err)
Expand All @@ -194,33 +176,17 @@ func runLegacyMode(ctx context.Context, cfg *config.Federation) {
log.Fatalf("failed to create Istio client: %v", err)
}

fdsPushRequests := make(chan xds.PushRequest)
meshConfigPushRequests := make(chan xds.PushRequest)

informerFactory := informers.NewSharedInformerFactory(istioClient.Kube(), 0)
serviceInformer := informerFactory.Core().V1().Services().Informer()
serviceLister := informerFactory.Core().V1().Services().Lister()
informerFactory.Start(ctx.Done())

serviceController, err := informer.NewResourceController(serviceInformer, corev1.Service{},
informer.NewServiceExportEventHandler(*cfg, fdsPushRequests, meshConfigPushRequests))
if err != nil {
log.Fatalf("failed to create service informer: %v", err)
}
serviceController.RunAndWait(ctx.Done())

startFederationServer(ctx, cfg, serviceLister, fdsPushRequests)

if cfg.MeshPeers.Local.IngressType == config.OpenShiftRouter {
go resolveRemoteIP(ctx, cfg.MeshPeers.Remotes, meshConfigPushRequests)
}

importedServiceStore := fds.NewImportedServiceStore()
for _, remote := range cfg.MeshPeers.Remotes {
startFDSClient(ctx, remote, meshConfigPushRequests, importedServiceStore)
}

informerFactory := informers.NewSharedInformerFactory(istioClient.Kube(), 0)
serviceLister := informerFactory.Core().V1().Services().Lister()
informerFactory.Start(ctx.Done())
startReconciler(ctx, cfg, serviceLister, meshConfigPushRequests, importedServiceStore)

<-ctx.Done()
}

func startReconciler(ctx context.Context, cfg *config.Federation, serviceLister v1.ServiceLister, meshConfigPushRequests chan xds.PushRequest, importedServiceStore *fds.ImportedServiceStore) {
Expand All @@ -239,26 +205,14 @@ func startReconciler(ctx context.Context, cfg *config.Federation, serviceLister

istioConfigFactory := istio.NewConfigFactory(*cfg, serviceLister, importedServiceStore, namespace)
reconcilers := []kube.Reconciler{
kube.NewGatewayResourceReconciler(istioClient, istioConfigFactory),
kube.NewServiceEntryReconciler(istioClient, istioConfigFactory),
kube.NewWorkloadEntryReconciler(istioClient, istioConfigFactory),
kube.NewPeerAuthResourceReconciler(istioClient, namespace),
}

if cfg.MeshPeers.AnyRemotePeerWithOpenshiftRouterIngress() {
reconcilers = append(reconcilers, kube.NewDestinationRuleReconciler(istioClient, istioConfigFactory))
}

if cfg.MeshPeers.Local.IngressType == config.OpenShiftRouter {
routeClient, err := routev1client.NewForConfig(kubeConfig)
if err != nil {
log.Fatalf("failed to create Route client: %v", err)
}

reconcilers = append(reconcilers, kube.NewEnvoyFilterReconciler(istioClient, istioConfigFactory))
reconcilers = append(reconcilers, kube.NewRouteReconciler(routeClient, openshift.NewConfigFactory(*cfg, serviceLister)))
}

rm := kube.NewReconcilerManager(meshConfigPushRequests, reconcilers...)
if err := rm.ReconcileAll(ctx); err != nil {
log.Fatalf("initial Istio resource reconciliation failed: %v", err)
Expand All @@ -267,55 +221,6 @@ func startReconciler(ctx context.Context, cfg *config.Federation, serviceLister
go rm.Start(ctx)
}

func startFederationServer(ctx context.Context, cfg *config.Federation, serviceLister v1.ServiceLister, fdsPushRequests chan xds.PushRequest) {
federationServer := adss.NewServer(
fdsPushRequests,
fds.NewExportedServicesGenerator(*cfg, serviceLister),
)

go func() {
if err := federationServer.Run(ctx); err != nil {
log.Fatalf("failed to start FDS server: %v", err)
}
}()
}

func resolveRemoteIP(ctx context.Context, remotes []config.Remote, meshConfigPushRequests chan xds.PushRequest) {
var prevIPs []string
for _, remote := range remotes {
prevIPs = append(prevIPs, networking.Resolve(remote.Addresses[0])...)
}

resolveIPs := func() {
var currIPs []string
for _, remote := range remotes {
log.Debugf("Resolving %s", remote.Name)
currIPs = append(currIPs, networking.Resolve(remote.Addresses[0])...)
}

sort.Strings(currIPs)
if !slices.Equal(prevIPs, currIPs) {
log.Infof("IP addresses have changed")
prevIPs = currIPs
meshConfigPushRequests <- xds.PushRequest{TypeUrl: xds.WorkloadEntryTypeUrl}
}
}

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

resolveLoop:
for {
select {
case <-ctx.Done():
break resolveLoop
case <-ticker.C:
resolveIPs()
}
}

}

func startFDSClient(ctx context.Context, remote config.Remote, meshConfigPushRequests chan xds.PushRequest, importedServiceStore *fds.ImportedServiceStore) {
var discoveryAddr string
if networking.IsIP(remote.Addresses[0]) {
Expand Down
Loading
Loading