Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/nginx-gateway-fabric/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ rules:
{{- if .Values.nginxGateway.gwAPIExperimentalFeatures.enable }}
- backendtlspolicies
- tlsroutes
- tcproutes
- udproutes
{{- end }}
verbs:
- list
Expand All @@ -111,6 +113,8 @@ rules:
{{- if .Values.nginxGateway.gwAPIExperimentalFeatures.enable }}
- backendtlspolicies/status
- tlsroutes/status
- tcproutes/status
- udproutes/status
{{- end }}
verbs:
- update
Expand Down
14 changes: 14 additions & 0 deletions internal/controller/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,18 @@ func registerControllers(
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
},
},
{
objectType: &gatewayv1alpha2.TCPRoute{},
options: []controller.Option{
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
},
},
{
objectType: &gatewayv1alpha2.UDPRoute{},
options: []controller.Option{
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
},
},
}
controllerRegCfgs = append(controllerRegCfgs, gwExpFeatures...)
}
Expand Down Expand Up @@ -774,6 +786,8 @@ func prepareFirstEventBatchPreparerArgs(cfg config.Config) ([]client.Object, []c
&gatewayv1alpha3.BackendTLSPolicyList{},
&apiv1.ConfigMapList{},
&gatewayv1alpha2.TLSRouteList{},
&gatewayv1alpha2.TCPRouteList{},
&gatewayv1alpha2.UDPRouteList{},
)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/controller/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func TestPrepareFirstEventBatchPreparerArgs(t *testing.T) {
partialObjectMetadataList,
&gatewayv1alpha3.BackendTLSPolicyList{},
&gatewayv1alpha2.TLSRouteList{},
&gatewayv1alpha2.TCPRouteList{},
&gatewayv1alpha2.UDPRouteList{},
&gatewayv1.GRPCRouteList{},
&ngfAPIv1alpha1.ClientSettingsPolicyList{},
&ngfAPIv1alpha2.ObservabilityPolicyList{},
Expand Down Expand Up @@ -174,6 +176,8 @@ func TestPrepareFirstEventBatchPreparerArgs(t *testing.T) {
&inference.InferencePoolList{},
&gatewayv1alpha3.BackendTLSPolicyList{},
&gatewayv1alpha2.TLSRouteList{},
&gatewayv1alpha2.TCPRouteList{},
&gatewayv1alpha2.UDPRouteList{},
&gatewayv1.GRPCRouteList{},
&ngfAPIv1alpha1.ClientSettingsPolicyList{},
&ngfAPIv1alpha2.ObservabilityPolicyList{},
Expand Down
34 changes: 33 additions & 1 deletion internal/controller/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@

// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
// Only applicable when using NGINX Plus.
func (n *NginxUpdaterImpl) UpdateUpstreamServers(

Check failure on line 107 in internal/controller/nginx/agent/agent.go

View workflow job for this annotation

GitHub Actions / Go Lint (.)

cyclomatic complexity 16 of func `(*NginxUpdaterImpl).UpdateUpstreamServers` is high (> 15) (gocyclo)
deployment *Deployment,
conf dataplane.Configuration,
) {
Expand All @@ -119,7 +119,10 @@

var errs []error
var applied bool
actions := make([]*pb.NGINXPlusAction, 0, len(conf.Upstreams)+len(conf.StreamUpstreams))
actions := make([]*pb.NGINXPlusAction, 0,
len(conf.Upstreams)+len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams))

// HTTP/GRPC Upstreams
for _, upstream := range conf.Upstreams {
// Skip upstreams that have resolve servers to avoid "UpstreamServerImmutable" errors
if upstreamHasResolveServers(upstream) {
Expand All @@ -133,6 +136,7 @@
actions = append(actions, action)
}

// TLS Passthrough Upstreams
for _, upstream := range conf.StreamUpstreams {
// Skip upstreams that have resolve servers to avoid "UpstreamServerImmutable" errors
if upstreamHasResolveServers(upstream) {
Expand All @@ -146,6 +150,34 @@
actions = append(actions, action)
}

// TCP Upstreams
for _, upstream := range conf.TCPUpstreams {
// Skip upstreams that have resolve servers to avoid "UpstreamServerImmutable" errors
if upstreamHasResolveServers(upstream) {
continue
}
action := &pb.NGINXPlusAction{
Action: &pb.NGINXPlusAction_UpdateStreamServers{
UpdateStreamServers: buildStreamUpstreamServers(upstream),
},
}
actions = append(actions, action)
}

// UDP Upstreams
for _, upstream := range conf.UDPUpstreams {
// Skip upstreams that have resolve servers to avoid "UpstreamServerImmutable" errors
if upstreamHasResolveServers(upstream) {
continue
}
action := &pb.NGINXPlusAction{
Action: &pb.NGINXPlusAction_UpdateStreamServers{
UpdateStreamServers: buildStreamUpstreamServers(upstream),
},
}
actions = append(actions, action)
}

if actionsEqual(deployment.GetNGINXPlusActions(), actions) {
return
}
Expand Down
7 changes: 7 additions & 0 deletions internal/controller/nginx/config/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
)

// Server holds all configuration for a stream server.
type Server struct {

Check failure on line 9 in internal/controller/nginx/config/stream/config.go

View workflow job for this annotation

GitHub Actions / Go Lint (.)

fieldalignment: struct with 160 pointer bytes could be 128 (govet)
Listen string
StatusZone string
ProxyPass string
Expand All @@ -14,6 +14,12 @@
RewriteClientIP shared.RewriteClientIPSettings
SSLPreread bool
IsSocket bool
Protocol string
UDPConfig *UDPConfig
}

type UDPConfig struct {
ProxyTimeout string
}

// Upstream holds all configuration for a stream upstream.
Expand All @@ -28,6 +34,7 @@
type UpstreamServer struct {
Address string
Resolve bool
Weight int32 // Weight for load balancing, default 1
}

// ServerConfig holds configuration for a stream server and IP family to be used by NGINX.
Expand Down
74 changes: 70 additions & 4 deletions internal/controller/nginx/config/stream_servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"fmt"
gotemplate "text/template"

"github.com/go-logr/logr"

Check failure on line 7 in internal/controller/nginx/config/stream_servers.go

View workflow job for this annotation

GitHub Actions / Go Lint (.)

File is not properly formatted (goimports)
"github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/config/shared"
"github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/config/stream"
"github.com/nginx/nginx-gateway-fabric/v2/internal/controller/state/dataplane"
Expand All @@ -13,7 +14,7 @@
var streamServersTemplate = gotemplate.Must(gotemplate.New("streamServers").Parse(streamServersTemplateText))

func (g GeneratorImpl) executeStreamServers(conf dataplane.Configuration) []executeResult {
streamServers := createStreamServers(conf)
streamServers := createStreamServers(g.logger, conf)

streamServerConfig := stream.ServerConfig{
Servers: streamServers,
Expand All @@ -32,18 +33,25 @@
}
}

func createStreamServers(conf dataplane.Configuration) []stream.Server {
if len(conf.TLSPassthroughServers) == 0 {
func createStreamServers(logger logr.Logger, conf dataplane.Configuration) []stream.Server {
totalServers := len(conf.TLSPassthroughServers) + len(conf.TCPServers) + len(conf.UDPServers)
if totalServers == 0 {
return nil
}

streamServers := make([]stream.Server, 0, len(conf.TLSPassthroughServers)*2)
streamServers := make([]stream.Server, 0, totalServers*2)
portSet := make(map[int32]struct{})
upstreams := make(map[string]dataplane.Upstream)

for _, u := range conf.StreamUpstreams {
upstreams[u.Name] = u
}
for _, u := range conf.TCPUpstreams {
upstreams[u.Name] = u
}
for _, u := range conf.UDPUpstreams {
upstreams[u.Name] = u
}

for _, server := range conf.TLSPassthroughServers {
if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" {
Expand Down Expand Up @@ -77,9 +85,67 @@
}
streamServers = append(streamServers, streamServer)
}

// Process Layer4 servers (TCP and UDP)
processLayer4Servers(logger, conf.TCPServers, conf.UDPServers, upstreams, portSet, &streamServers)

return streamServers
}

// processLayer4Servers processes TCP and UDP servers to create stream servers.
func processLayer4Servers(
logger logr.Logger,
tcpServers []dataplane.Layer4VirtualServer,
udpServers []dataplane.Layer4VirtualServer,
upstreams map[string]dataplane.Upstream,
portSet map[int32]struct{},
streamServers *[]stream.Server,
) {
// Process TCP servers
for i, server := range tcpServers {
if _, inPortSet := portSet[server.Port]; inPortSet {
continue // Skip if port already in use
}

if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
streamServer := stream.Server{
Listen: fmt.Sprint(server.Port),
StatusZone: fmt.Sprintf("tcp_%d", server.Port),
ProxyPass: server.UpstreamName,
}
*streamServers = append(*streamServers, streamServer)
portSet[server.Port] = struct{}{}
} else {
logger.V(1).Info("TCP Server skipped - upstream not found or no endpoints",
"serverIndex", i,
"port", server.Port,
"upstreamName", server.UpstreamName,
)
}
}

// Process UDP servers
for _, server := range udpServers {
if _, inPortSet := portSet[server.Port]; inPortSet {
continue // Skip if port already in use
}

if u, ok := upstreams[server.UpstreamName]; ok && server.UpstreamName != "" && len(u.Endpoints) > 0 {
streamServer := stream.Server{
Listen: fmt.Sprintf("%d udp", server.Port),
StatusZone: fmt.Sprintf("udp_%d", server.Port),
ProxyPass: server.UpstreamName,
Protocol: "udp",
UDPConfig: &stream.UDPConfig{
ProxyTimeout: "1s",
},
}
*streamServers = append(*streamServers, streamServer)
portSet[server.Port] = struct{}{}
}
}
}

func getRewriteClientIPSettingsForStream(
rewriteConfig dataplane.RewriteClientIPSettings,
) shared.RewriteClientIPSettings {
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/nginx/config/stream_servers_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ server {
{{- if $s.SSLPreread }}
ssl_preread on;
{{- end }}

{{- if and (eq $s.Protocol "udp") $s.UDPConfig }}
proxy_timeout {{ $s.UDPConfig.ProxyTimeout }};
{{- end }}
}
{{- end }}

Expand Down
7 changes: 5 additions & 2 deletions internal/controller/nginx/config/stream_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/gomega"

"github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/config/stream"
Expand Down Expand Up @@ -174,7 +175,8 @@ func TestCreateStreamServers(t *testing.T) {
},
}

streamServers := createStreamServers(conf)
logger := logr.Discard()
streamServers := createStreamServers(logger, conf)

g := NewWithT(t)

Expand Down Expand Up @@ -405,7 +407,8 @@ func TestCreateStreamServersWithNone(t *testing.T) {
TLSPassthroughServers: nil,
}

streamServers := createStreamServers(conf)
logger := logr.Discard()
streamServers := createStreamServers(logger, conf)

g := NewWithT(t)

Expand Down
12 changes: 11 additions & 1 deletion internal/controller/nginx/config/upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ func executeUpstreams(upstreams []http.Upstream) []executeResult {
}

func (g GeneratorImpl) executeStreamUpstreams(conf dataplane.Configuration) []executeResult {
upstreams := g.createStreamUpstreams(conf.StreamUpstreams)
// Combine all stream upstreams: TLS, TCP, and UDP
allUpstreams := make([]dataplane.Upstream, 0, len(conf.StreamUpstreams)+len(conf.TCPUpstreams)+len(conf.UDPUpstreams))
allUpstreams = append(allUpstreams, conf.StreamUpstreams...)
allUpstreams = append(allUpstreams, conf.TCPUpstreams...)
allUpstreams = append(allUpstreams, conf.UDPUpstreams...)

upstreams := g.createStreamUpstreams(allUpstreams)

result := executeResult{
dest: streamConfigFile,
Expand Down Expand Up @@ -109,9 +115,13 @@ func (g GeneratorImpl) createStreamUpstream(up dataplane.Upstream) stream.Upstre
if ep.IPv6 {
format = "[%s]:%d"
}
// Keep the original weight from endpoint
// For single backend: Weight is 0 (template won't output weight directive)
// For multi-backend: Weight is set from BackendRef.Weight (template outputs weight=X if > 1)
upstreamServers[idx] = stream.UpstreamServer{
Address: fmt.Sprintf(format, ep.Address, ep.Port),
Resolve: ep.Resolve,
Weight: ep.Weight,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/nginx/config/upstreams_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ upstream {{ $u.Name }} {
state {{ $u.StateFile }};
{{- else }}
{{ range $server := $u.Servers }}
server {{ $server.Address }}{{ if $server.Resolve }} resolve{{ end }};
server {{ $server.Address }}{{ if ne $server.Weight 0 }}{{ if ne $server.Weight 1 }} weight={{ $server.Weight }}{{ end }}{{ end }}{{ if $server.Resolve }} resolve{{ end }};
{{- end }}
{{- end }}
}
Expand Down
2 changes: 0 additions & 2 deletions internal/controller/nginx/modules/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading