Skip to content

Commit

Permalink
[NET-4799] [OSS] xdsv2: listeners L4 support for connect proxies (#18436
Browse files Browse the repository at this point in the history
)

* refactor to avoid future import cycles
  • Loading branch information
ndhanushkodi authored Aug 15, 2023
1 parent 0e94f48 commit 6b7ccd0
Show file tree
Hide file tree
Showing 25 changed files with 3,104 additions and 89 deletions.
22 changes: 12 additions & 10 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/hashicorp/consul/agent/xds/config"
"github.com/hashicorp/consul/agent/xds/naming"

"github.com/hashicorp/go-hclog"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -366,7 +368,7 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
!meshConf.TransparentProxy.MeshDestinationsOnly {

clusters = append(clusters, &envoy_cluster_v3.Cluster{
Name: OriginalDestinationClusterName,
Name: naming.OriginalDestinationClusterName,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
},
Expand Down Expand Up @@ -1041,7 +1043,7 @@ func (s *ResourceGenerator) configIngressUpstreamCluster(c *envoy_cluster_v3.Clu
if svc != nil {
override = svc.PassiveHealthCheck
}
outlierDetection := ToOutlierDetection(cfgSnap.IngressGateway.Defaults.PassiveHealthCheck, override, false)
outlierDetection := config.ToOutlierDetection(cfgSnap.IngressGateway.Defaults.PassiveHealthCheck, override, false)

c.OutlierDetection = outlierDetection
}
Expand All @@ -1050,7 +1052,7 @@ func (s *ResourceGenerator) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, nam
var c *envoy_cluster_v3.Cluster
var err error

cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
cfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1144,7 +1146,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(

clusterName := generatePeeredClusterName(uid, tbs)

outlierDetection := ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true)
outlierDetection := config.ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true)
// We can't rely on health checks for services on cluster peers because they
// don't take into account service resolvers, splitters and routers. Setting
// MaxEjectionPercent too 100% gives outlier detection the power to eject the
Expand Down Expand Up @@ -1279,7 +1281,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck, nil, true),
OutlierDetection: config.ToOutlierDetection(cfg.PassiveHealthCheck, nil, true),
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
Expand Down Expand Up @@ -1499,7 +1501,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
},
OutlierDetection: ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true),
OutlierDetection: config.ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true),
}

var lb *structs.LoadBalancer
Expand Down Expand Up @@ -1676,7 +1678,7 @@ type clusterOpts struct {

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts clusterOpts) *envoy_cluster_v3.Cluster {
cfg, err := ParseGatewayConfig(snap.Proxy.Config)
cfg, err := config.ParseGatewayConfig(snap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1819,7 +1821,7 @@ func configureClusterWithHostnames(
// makeExternalIPCluster creates an Envoy cluster for routing to IP addresses outside of Consul
// This is used by terminating gateways for Destinations
func (s *ResourceGenerator) makeExternalIPCluster(snap *proxycfg.ConfigSnapshot, opts clusterOpts) *envoy_cluster_v3.Cluster {
cfg, err := ParseGatewayConfig(snap.Proxy.Config)
cfg, err := config.ParseGatewayConfig(snap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1858,7 +1860,7 @@ func (s *ResourceGenerator) makeExternalIPCluster(snap *proxycfg.ConfigSnapshot,
// makeExternalHostnameCluster creates an Envoy cluster for hostname endpoints that will be resolved with DNS
// This is used by both terminating gateways for Destinations, and Mesh Gateways for peering control plane traffice
func (s *ResourceGenerator) makeExternalHostnameCluster(snap *proxycfg.ConfigSnapshot, opts clusterOpts) *envoy_cluster_v3.Cluster {
cfg, err := ParseGatewayConfig(snap.Proxy.Config)
cfg, err := config.ParseGatewayConfig(snap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -2044,7 +2046,7 @@ func (s *ResourceGenerator) getTargetClusterName(upstreamsSnapshot *proxycfg.Con

clusterName = generatePeeredClusterName(targetUID, tbs)
}
clusterName = CustomizeClusterName(clusterName, chain)
clusterName = naming.CustomizeClusterName(clusterName, chain)
if forMeshGateway {
clusterName = meshGatewayExportedClusterNamePrefix + clusterName
}
Expand Down
2 changes: 1 addition & 1 deletion agent/xds/config.go → agent/xds/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package xds
package config

import (
"strings"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package xds
package config

import (
"testing"
Expand Down
7 changes: 7 additions & 0 deletions agent/xds/configfetcher/config_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package configfetcher

// ConfigFetcher is the interface the agent needs to expose
// for the xDS server to fetch agent config, currently only one field is fetched
type ConfigFetcher interface {
AdvertiseAddrLAN() string
}
33 changes: 17 additions & 16 deletions agent/xds/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
envoy_tcp_proxy_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/hashicorp/consul/agent/xds/config"
"github.com/hashicorp/consul/agent/xds/naming"
"github.com/hashicorp/consul/agent/xds/platform"

"github.com/hashicorp/go-hclog"
"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -50,8 +53,6 @@ import (
"github.com/hashicorp/consul/types"
)

const virtualIPTag = "virtual"

// listenersFromSnapshot returns the xDS API representation of the "listeners" in the snapshot.
func (s *ResourceGenerator) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
if cfgSnap == nil {
Expand Down Expand Up @@ -118,7 +119,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
}

proxyCfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
proxyCfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -258,7 +259,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// We only match on this virtual IP if the upstream is in the proxy's partition.
// This is because the IP is not guaranteed to be unique across k8s clusters.
if acl.EqualPartitions(e.Node.PartitionOrDefault(), cfgSnap.ProxyID.PartitionOrDefault()) {
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
if vip := e.Service.TaggedAddresses[naming.VirtualIPTag]; vip.Address != "" {
uniqueAddrs[vip.Address] = struct{}{}
}
}
Expand Down Expand Up @@ -462,7 +463,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// The virtualIPTag is used by consul-k8s to store the ClusterIP for a service.
// For services imported from a peer,the partition will be equal in all cases.
if acl.EqualPartitions(e.Node.PartitionOrDefault(), cfgSnap.ProxyID.PartitionOrDefault()) {
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
if vip := e.Service.TaggedAddresses[naming.VirtualIPTag]; vip.Address != "" {
uniqueAddrs[vip.Address] = struct{}{}
}
}
Expand Down Expand Up @@ -552,8 +553,8 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.

filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
accessLogs: &cfgSnap.Proxy.AccessLogs,
clusterName: OriginalDestinationClusterName,
filterName: OriginalDestinationClusterName,
clusterName: naming.OriginalDestinationClusterName,
filterName: naming.OriginalDestinationClusterName,
protocol: "tcp",
})
if err != nil {
Expand Down Expand Up @@ -787,7 +788,7 @@ func parseCheckPath(check structs.CheckType) (structs.ExposePath, error) {

// listenersFromSnapshotGateway returns the "listener" for a terminating-gateway or mesh-gateway service
func (s *ResourceGenerator) listenersFromSnapshotGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
cfg, err := ParseGatewayConfig(cfgSnap.Proxy.Config)
cfg, err := config.ParseGatewayConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1171,7 +1172,7 @@ func createDownstreamTransportSocketForConnectTLS(cfgSnap *proxycfg.ConfigSnapsh

// Determine listener protocol type from configured service protocol. Don't hard fail on a config typo,
//The parse func returns default config if there is an error, so it's safe to continue.
cfg, _ := ParseProxyConfig(cfgSnap.Proxy.Config)
cfg, _ := config.ParseProxyConfig(cfgSnap.Proxy.Config)

// Create TLS validation context for mTLS with leaf certificate and root certs.
tlsContext := makeCommonTLSContext(
Expand Down Expand Up @@ -1263,7 +1264,7 @@ func (s *ResourceGenerator) makeInboundListener(cfgSnap *proxycfg.ConfigSnapshot
var l *envoy_listener_v3.Listener
var err error

cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
cfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1513,7 +1514,7 @@ func (s *ResourceGenerator) finalizePublicListenerFromConfig(l *envoy_listener_v
}

func (s *ResourceGenerator) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSnapshot, cluster string, path structs.ExposePath) (proto.Message, error) {
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
cfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1588,7 +1589,7 @@ func (s *ResourceGenerator) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSna
&envoy_core_v3.CidrRange{AddressPrefix: advertise, PrefixLen: &wrapperspb.UInt32Value{Value: uint32(advertiseLen)}},
)

if ok, err := kernelSupportsIPv6(); err != nil {
if ok, err := platform.SupportsIPv6(); err != nil {
return nil, err
} else if ok {
ranges = append(ranges,
Expand Down Expand Up @@ -1639,7 +1640,7 @@ func (s *ResourceGenerator) makeTerminatingGatewayListener(
intentions := cfgSnap.TerminatingGateway.Intentions[svc]
svcConfig := cfgSnap.TerminatingGateway.ServiceConfigs[svc]

cfg, err := ParseProxyConfig(svcConfig.ProxyConfig)
cfg, err := config.ParseProxyConfig(svcConfig.ProxyConfig)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1683,7 +1684,7 @@ func (s *ResourceGenerator) makeTerminatingGatewayListener(
intentions := cfgSnap.TerminatingGateway.Intentions[svc]
svcConfig := cfgSnap.TerminatingGateway.ServiceConfigs[svc]

cfg, err := ParseProxyConfig(svcConfig.ProxyConfig)
cfg, err := config.ParseProxyConfig(svcConfig.ProxyConfig)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -1807,7 +1808,7 @@ func (s *ResourceGenerator) makeFilterChainTerminatingGateway(cfgSnap *proxycfg.
filterChain.Filters = append(filterChain.Filters, authFilter)
}

proxyCfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
proxyCfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
Expand Down Expand Up @@ -2128,7 +2129,7 @@ func (s *ResourceGenerator) makeMeshGatewayPeerFilterChain(
if err != nil {
return nil, err
}
clusterName = meshGatewayExportedClusterNamePrefix + CustomizeClusterName(target.Name, chain)
clusterName = meshGatewayExportedClusterNamePrefix + naming.CustomizeClusterName(target.Name, chain)
}

uid := proxycfg.NewUpstreamIDFromServiceName(svc)
Expand Down
3 changes: 2 additions & 1 deletion agent/xds/listeners_apigateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/hashicorp/consul/agent/xds/naming"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *ResourceGenerator) makeAPIGatewayListeners(address string, cfgSnap *pro
if err != nil {
return nil, err
}
clusterName = CustomizeClusterName(target.Name, chain)
clusterName = naming.CustomizeClusterName(target.Name, chain)
}

filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter)
Expand Down
3 changes: 2 additions & 1 deletion agent/xds/listeners_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/hashicorp/consul/agent/xds/naming"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap
if err != nil {
return nil, err
}
clusterName = CustomizeClusterName(target.Name, chain)
clusterName = naming.CustomizeClusterName(target.Name, chain)
}

filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter)
Expand Down
Loading

0 comments on commit 6b7ccd0

Please sign in to comment.