Skip to content

Commit

Permalink
v3 xds integration test (#140)
Browse files Browse the repository at this point in the history
We have already added the code paths for enabling xds v3. This PR adds an integration test that exercises envoy bootstrap using xds v3 transport.
Signed-off-by: Jyoti Mahapatra [email protected]
  • Loading branch information
jyotimahapatra authored Sep 10, 2020
1 parent 908106f commit 27b018f
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 42 deletions.
131 changes: 90 additions & 41 deletions integration/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (

"github.com/envoyproxy/xds-relay/internal/pkg/log"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
gcpcachev2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
gcpcachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
gcpserverv2 "github.com/envoyproxy/go-control-plane/pkg/server/v2"
gcpserverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
gcptest "github.com/envoyproxy/go-control-plane/pkg/test"
gcpresourcev2 "github.com/envoyproxy/go-control-plane/pkg/test/resource/v2"
gcpresourcev3 "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
gcptestv2 "github.com/envoyproxy/go-control-plane/pkg/test/v2"
gcptestv3 "github.com/envoyproxy/go-control-plane/pkg/test/v3"
"github.com/envoyproxy/xds-relay/internal/app/server"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
Expand All @@ -32,6 +36,18 @@ import (

var testLogger = log.MockLogger.Named("e2e")

// Test parameters
const (
managementServerPort uint = 18000 // gRPC management server port
httpServicePort uint = 18080 // upstream HTTP/1.1 service that Envoy wll call
envoyListenerPort uint = 9000 // initial port for the Envoy listeners generated by the snapshot cache
nClusters = 7
nListeners = 9
nUpdates = 4
keyerConfiguration = "./testdata/keyer_configuration_e2e.yaml"
xdsRelayBootstrap = "./testdata/bootstrap_configuration_e2e.yaml"
)

func TestMain(m *testing.M) {
// We force a 1 second sleep before running a test to let the OS close any lingering socket from previous
// tests.
Expand All @@ -46,85 +62,118 @@ func TestSnapshotCacheSingleEnvoyAndXdsRelayServer(t *testing.T) {
}

g := gomega.NewWithT(t)

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

// Test parameters
const (
managementServerPort uint = 18000 // gRPC management server port
httpServicePort uint = 18080 // upstream HTTP/1.1 service that Envoy wll call
envoyListenerPort uint = 9000 // initial port for the Envoy listeners generated by the snapshot cache
nClusters = 7
nListeners = 9
nUpdates = 4
keyerConfiguration = "./testdata/keyer_configuration_e2e.yaml"
xdsRelayBootstrap = "./testdata/bootstrap_configuration_e2e.yaml"
envoyBootstrap = "./testdata/envoy_bootstrap.yaml"
)

// We run a service that returns the string "Hi, there!" locally and expose it through envoy.
// This is the service that Envoy will make requests to.
go gcptest.RunHTTP(ctx, httpServicePort)

// Mimic a management server using go-control-plane's snapshot cache.
managementServer, signal := startSnapshotCache(ctx, managementServerPort)
configv2, configv3, signal := startSnapshotCache(ctx, managementServerPort)

// Start xds-relay server.
startXdsRelayServer(ctx, cancelFunc, xdsRelayBootstrap, keyerConfiguration)

// Start envoy and return a bytes buffer containing the envoy logs.
envoyLogsBuffer := startEnvoy(ctx, envoyBootstrap, signal)
for _, version := range []core.ApiVersion{core.ApiVersion_V2, core.ApiVersion_V3} {
t.Run(version.String(), func(t *testing.T) {
// Start envoy and return a bytes buffer containing the envoy logs.
envoyLogsBuffer := startEnvoy(ctx, getEnvoyBootstrap(version), signal)

for i := 0; i < nUpdates; i++ {
setSnapshot(ctx, i, version, configv2, configv3)

// Initial cached snapshot configuration.
snapshotConfig := gcpresourcev2.TestSnapshot{
Xds: "xds",
UpstreamPort: uint32(httpServicePort),
BasePort: uint32(envoyListenerPort),
NumClusters: nClusters,
NumHTTPListeners: nListeners,
g.Eventually(func() (int, int) {
ok, failed := callLocalService(envoyListenerPort, nListeners)
testLogger.Info(ctx, "asserting envoy listeners configured: ok %v, failed %v", ok, failed)
return ok, failed
}, 1*time.Second, 100*time.Millisecond).Should(gomega.Equal(nListeners))
}

// TODO(https://github.com/envoyproxy/xds-relay/issues/66): figure out a way to only only copy
// envoy logs in case of failures.
testLogger.With("envoy_logs", envoyLogsBuffer.String()).Debug(ctx, "captured envoy logs")
})
}
}

for i := 0; i < nUpdates; i++ {
func setSnapshot(
ctx context.Context,
updateIndex int,
xdsVersion core.ApiVersion,
configv2 gcpcachev2.SnapshotCache,
configv3 gcpcachev3.SnapshotCache) {
switch xdsVersion {
case core.ApiVersion_V2:
snapshotConfig := gcpresourcev2.TestSnapshot{
Xds: "xds",
UpstreamPort: uint32(httpServicePort),
BasePort: uint32(envoyListenerPort),
NumClusters: nClusters,
NumHTTPListeners: nListeners,
}
// Bumping the snapshot version mimics new management server configuration.
snapshotConfig.Version = fmt.Sprintf("v%d", i)
snapshotConfig.Version = fmt.Sprintf("v%d", updateIndex)
testLogger.Info(ctx, "updating snapshots to version: %v", snapshotConfig.Version)

snapshot := snapshotConfig.Generate()
if err := snapshot.Consistent(); err != nil {
testLogger.Fatal(ctx, "snapshot inconsistency: %+v", snapshot)
}
err := managementServer.SetSnapshot("envoy-1", snapshot)
err := configv2.SetSnapshot("envoy-1", snapshot)
if err != nil {
testLogger.Fatal(ctx, "set snapshot error %q for %+v", err, snapshot)
}
case core.ApiVersion_V3:
snapshotConfig := gcpresourcev3.TestSnapshot{
Xds: "xds",
UpstreamPort: uint32(httpServicePort),
BasePort: uint32(envoyListenerPort),
NumClusters: nClusters,
NumHTTPListeners: nListeners,
}
// Bumping the snapshot version mimics new management server configuration.
snapshotConfig.Version = fmt.Sprintf("v%d", updateIndex)
testLogger.Info(ctx, "updating snapshots to version: %v", snapshotConfig.Version)

g.Eventually(func() (int, int) {
ok, failed := callLocalService(envoyListenerPort, nListeners)
testLogger.Info(ctx, "asserting envoy listeners configured: ok %v, failed %v", ok, failed)
return ok, failed
}, 1*time.Second, 100*time.Millisecond).Should(gomega.Equal(nListeners))
snapshot := snapshotConfig.Generate()
if err := snapshot.Consistent(); err != nil {
testLogger.Fatal(ctx, "snapshot inconsistency: %+v", snapshot)
}
err := configv3.SetSnapshot("envoy-1", snapshot)
if err != nil {
testLogger.Fatal(ctx, "set snapshot error %q for %+v", err, snapshot)
}
}
}

// TODO(https://github.com/envoyproxy/xds-relay/issues/66): figure out a way to only only copy
// envoy logs in case of failures.
testLogger.With("envoy_logs", envoyLogsBuffer.String()).Debug(ctx, "captured envoy logs")
func getEnvoyBootstrap(version core.ApiVersion) string {
switch version {
case core.ApiVersion_V2:
return "./testdata/envoy_bootstrap.yaml"
case core.ApiVersion_V3:
return "./testdata/envoy_bootstrap_v3.yaml"
default:
return ""
}
}

func startSnapshotCache(ctx context.Context, port uint) (gcpcachev2.SnapshotCache, chan struct{}) {
func startSnapshotCache(
ctx context.Context,
port uint) (gcpcachev2.SnapshotCache, gcpcachev3.SnapshotCache, chan struct{}) {
// Create a cache
signal := make(chan struct{})
cbv2 := &gcptestv2.Callbacks{Signal: signal}
configv2 := gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, gcpLogger{logger: testLogger.Named("snapshot")})
cbv3 := &gcptestv3.Callbacks{Signal: signal}
configv2 := gcpcachev2.NewSnapshotCache(false, gcpcachev2.IDHash{}, gcpLogger{logger: testLogger.Named("snapshotv2")})
configv3 := gcpcachev3.NewSnapshotCache(false, gcpcachev3.IDHash{}, gcpLogger{logger: testLogger.Named("snapshotv3")})
srv2 := gcpserverv2.NewServer(ctx, configv2, cbv2)
// We don't have support for v3 yet, but this is left here in preparation for the eventual
// inclusion of v3 resources.
srv3 := gcpserverv3.NewServer(ctx, nil, nil)
srv3 := gcpserverv3.NewServer(ctx, configv3, cbv3)

// Start up a gRPC-based management server.
go gcptest.RunManagementServer(ctx, srv2, srv3, port)

return configv2, signal
return configv2, configv3, signal
}

func startXdsRelayServer(ctx context.Context, cancel context.CancelFunc, bootstrapConfigFilePath string,
Expand Down
43 changes: 43 additions & 0 deletions integration/testdata/envoy_bootstrap_v3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Base Envoy config using v3 xds with a management server listening on 9991 (xds-relay) and admin port on 19000.
admin:
access_log_path: /dev/null
address:
socket_address:
address: 127.0.0.1
port_value: 19000
dynamic_resources:
cds_config:
resource_api_version: V3
api_config_source:
transport_api_version: V3
api_type: GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
set_node_on_first_message_only: true
lds_config:
resource_api_version: V3
api_config_source:
transport_api_version: V3
api_type: GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
set_node_on_first_message_only: true
node:
cluster: dev
id: envoy-1
static_resources:
clusters:
- connect_timeout: 1s
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 9991
http2_protocol_options: {}
name: xds_cluster
2 changes: 1 addition & 1 deletion internal/app/admin/http/testdata/entire_cachev3_cds.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@
}
],
"ExpirationTime": ""
}
}

0 comments on commit 27b018f

Please sign in to comment.