Skip to content

Commit

Permalink
fix: improve reconnection reliability after process reloads (#32707)
Browse files Browse the repository at this point in the history
This commit includes a 7 character fix in lib/service/connect.go to call
connector.Close() instead of connector.Client.Close() when a new client
fails to ping the auth server.
connector.Close() correctly avoids closing the client if it is a shared
copy of the Instance client.
The call to connector.Client.Close() was causing intermittent problems
where reconnectToAuthService could get stuck repeatedly trying to use
the same client that was just closed.
This appears to be fixed now that the Instance client is not being
improperly closed by other components.

I discovered this issue because it manifested itself in flaky failures
of TestHSMMigrate, where logs indicated that the Instance client was
being repeatedly reused but the connection was never successful

```
{"caller":"service/connect.go:1057","component":"proc:18","level":"info","message":"Reusing Instance client for Proxy. additionalSystemRoles=[Proxy]","pid":"34558.18","timestamp":"2023-09-27T21:30:05Z"}
{"caller":"service/connect.go:166","component":"proc:18","level":"debug","message":"Connected client: Identity(Proxy, cert(c90e905c-76e7-4c68-803b-ba364167ec6f.testcluster issued by testcluster:173887050308815087166604899475019267945),trust root(testcluster:322819974523436048061473591931335284057),trust root(testcluster:173887050308815087166604899475019267945),trust root(testcluster:135083743987735629230336583041497316143))","pid":"34558.18","timestamp":"2023-09-27T21:30:05Z"}
{"caller":"service/connect.go:98","component":"proc:18","level":"debug","message":"Connected client Proxy failed to execute test call: rpc error: code = Canceled desc = grpc: the client connection is closing. Node or proxy credentials are out of sync.","pid":"34558.18","timestamp":"2023-09-27T21:30:05Z"}
time="2023-09-27T21:30:13Z" level=warning msg="connection problem: readfrom tcp 172.18.0.2:38114->172.18.0.2:41065: use of closed network connection *net.OpError" dest="172.18.0.2:41065" source="172.18.0.2:37496" trace.component=loadbalancer trace.fields="map[listen:8ce2fe8a89f0:0]"
time="2023-09-27T21:30:13Z" level=warning msg="Failed to forward connection: readfrom tcp 172.18.0.2:38114->172.18.0.2:41065: use of closed network connection." trace.component=loadbalancer trace.fields="map[listen:8ce2fe8a89f0:0]"
time="2023-09-27T21:30:17Z" level=warning msg="Failed to create inventory control stream: rpc error: code = Canceled desc = grpc: the client connection is closing."
{"caller":"service/connect.go:124","component":"proc:18","level":"debug","message":"Retrying connection to auth server after waiting 41.323026451s.","pid":"34558.18","timestamp":"2023-09-27T21:30:46Z"}
{"caller":"service/connect.go:189","component":"proc:18","level":"debug","message":"Connected state: rotating servers (mode: manual, started: Sep 27 2023 21:29:24 UTC, ending: Sep 29 2023 03:29:24 UTC).","pid":"34558.18","timestamp":"2023-09-27T21:30:46Z"}
{"caller":"service/connect.go:1057","component":"proc:18","level":"info","message":"Reusing Instance client for Proxy. additionalSystemRoles=[Proxy]","pid":"34558.18","timestamp":"2023-09-27T21:30:46Z"}
...repeating...
```

The HSM tests have become flaky in the past when reload/reconnect bugs like
this have been introduced, but they are long tests that are a bit tricky
to run locally and issues like this one can be difficult to diagnose.
To try to improve our chances of catching these issues in the future,
I've written a new test that starts up an Auth and Proxy process and
repeatedly reloads both of them, asserting that the reload is always
successful in a reasonable amount of time.

The new test is able to catch the bug every time I have run it locally,
usually in ~4 out of the 8 parallel invocations to runs.
I have not seen any failures with the fix applied.
The entire test completes in ~12 seconds on my local machine.
  • Loading branch information
nklaassen authored Sep 29, 2023
1 parent 15d190b commit ca27ce9
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 310 deletions.
280 changes: 280 additions & 0 deletions integration/hsm/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hsm

import (
"context"
"net"
"os"
"path/filepath"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)

// teleportService wraps a *service.TeleportProcess and sets up a goroutine to
// handle process reloads. You must always call waitForNewProcess or
// waitForRestart in for the new process after an expected reload to be picked
// up. Methods are not meant to be called concurrently on the same receiver and
// are not generally thread safe.
type teleportService struct {
name string
log utils.Logger
config *servicecfg.Config
process *service.TeleportProcess
serviceChannel chan *service.TeleportProcess
errorChannel chan error
}

func newTeleportService(t *testing.T, config *servicecfg.Config, name string) *teleportService {
s := &teleportService{
config: config,
name: name,
log: config.Log,
serviceChannel: make(chan *service.TeleportProcess, 1),
errorChannel: make(chan error, 1),
}
t.Cleanup(func() {
require.NoError(t, s.close(), "error while closing %s during test cleanup", name)
})
return s
}

func (t *teleportService) close() error {
if t.process == nil {
return nil
}
if err := t.process.Close(); err != nil {
return trace.Wrap(err)
}
return trace.Wrap(t.process.Wait())
}

func (t *teleportService) start(ctx context.Context) error {
// Run the service in a background goroutine and hook into service.Run to
// receive all new processes after restarts and write them to a goroutine.
go func() {
t.errorChannel <- service.Run(ctx, *t.config, func(cfg *servicecfg.Config) (service.Process, error) {
t.log.Debugf("(Re)starting %s", t.name)
svc, err := service.NewTeleport(cfg)
if err == nil {
t.log.Debugf("Started %s, writing to serviceChannel", t.name)
t.serviceChannel <- svc
}
return svc, trace.Wrap(err)
})
}()
t.log.Debugf("Waiting for %s to start", t.name)
if err := t.waitForNewProcess(ctx); err != nil {
return trace.Wrap(err)
}
t.log.Debugf("%s started, waiting for it to be ready", t.name)
return t.waitForReady(ctx)
}

func (t *teleportService) waitForNewProcess(ctx context.Context) error {
select {
case t.process = <-t.serviceChannel:
t.log.Debugf("received new process for %s from serviceChannel", t.name)
case err := <-t.errorChannel:
return trace.Wrap(err)
case <-ctx.Done():
return trace.Wrap(ctx.Err(), "timed out waiting for %s to restart", t.name)
}
return nil
}

func (t *teleportService) waitForReady(ctx context.Context) error {
t.log.Debugf("Waiting for %s to be ready", t.name)
if _, err := t.process.WaitForEvent(ctx, service.TeleportReadyEvent); err != nil {
return trace.Wrap(err, "timed out waiting for %s to be ready", t.name)
}
// If this is an Auth servier, also wait for AuthIdentityEvent so that we
// can safely read the admin credentials and create a test client.
if t.process.GetAuthServer() != nil {
if _, err := t.process.WaitForEvent(ctx, service.AuthIdentityEvent); err != nil {
return trace.Wrap(err, "timed out waiting for %s auth identity event", t.name)
}
t.log.Debugf("%s is ready", t.name)
}
return nil
}

func (t *teleportService) waitForRestart(ctx context.Context) error {
t.log.Debugf("Waiting for %s to restart", t.name)
if err := t.waitForNewProcess(ctx); err != nil {
return trace.Wrap(err)
}
t.log.Debugf("%s restarted, waiting for new process to be ready", t.name)
return trace.Wrap(t.waitForReady(ctx))
}

func (t *teleportService) waitForShutdown(ctx context.Context) error {
t.log.Debugf("Waiting for %s to shut down", t.name)
select {
case err := <-t.errorChannel:
t.process = nil
return trace.Wrap(err)
case <-ctx.Done():
return trace.Wrap(ctx.Err(), "timed out waiting for %s to shut down", t.name)
}
}

func (t *teleportService) waitForLocalAdditionalKeys(ctx context.Context) error {
t.log.Debugf("Waiting for %s to have local additional keys", t.name)
clusterName, err := t.process.GetAuthServer().GetClusterName()
if err != nil {
return trace.Wrap(err)
}
hostCAID := types.CertAuthID{DomainName: clusterName.GetClusterName(), Type: types.HostCA}
for {
select {
case <-ctx.Done():
return trace.Wrap(ctx.Err(), "timed out waiting for %s to have local additional keys", t.name)
case <-time.After(250 * time.Millisecond):
}
ca, err := t.process.GetAuthServer().GetCertAuthority(ctx, hostCAID, true)
if err != nil {
return trace.Wrap(err)
}
hasUsableKeys, err := t.process.GetAuthServer().GetKeyStore().HasUsableAdditionalKeys(ctx, ca)
if err != nil {
return trace.Wrap(err)
}
if hasUsableKeys {
break
}
}
t.log.Debugf("%s has local additional keys", t.name)
return nil
}

func (t *teleportService) waitForPhaseChange(ctx context.Context) error {
t.log.Debugf("Waiting for %s to change phase", t.name)
if _, err := t.process.WaitForEvent(ctx, service.TeleportPhaseChangeEvent); err != nil {
return trace.Wrap(err, "timed out waiting for %s to change phase", t.name)
}
t.log.Debugf("%s changed phase", t.name)
return nil
}

func (t *teleportService) authAddr(testingT *testing.T) utils.NetAddr {
addr, err := t.process.AuthAddr()
require.NoError(testingT, err)

return *addr
}

type teleportServices []*teleportService

func (s teleportServices) forEach(f func(t *teleportService) error) error {
for i := range s {
if err := f(s[i]); err != nil {
return trace.Wrap(err)
}
}
return nil
}

func (s teleportServices) start(ctx context.Context) error {
return s.forEach(func(t *teleportService) error { return t.start(ctx) })
}

func (s teleportServices) waitForRestart(ctx context.Context) error {
return s.forEach(func(t *teleportService) error { return t.waitForRestart(ctx) })
}

func (s teleportServices) waitForLocalAdditionalKeys(ctx context.Context) error {
return s.forEach(func(t *teleportService) error { return t.waitForLocalAdditionalKeys(ctx) })
}

func (s teleportServices) waitForPhaseChange(ctx context.Context) error {
return s.forEach(func(t *teleportService) error { return t.waitForPhaseChange(ctx) })
}

func newAuthConfig(t *testing.T, log utils.Logger) *servicecfg.Config {
hostName, err := os.Hostname()
require.NoError(t, err)

config := servicecfg.MakeDefaultConfig()
config.DataDir = t.TempDir()
config.Auth.StorageConfig.Params["path"] = filepath.Join(config.DataDir, defaults.BackendDir)
config.SSH.Enabled = false
config.Proxy.Enabled = false
config.Log = log
config.InstanceMetadataClient = cloud.NewDisabledIMDSClient()
config.MaxRetryPeriod = 25 * time.Millisecond

config.Auth.Enabled = true
config.Auth.NoAudit = true
config.Auth.ListenAddr.Addr = net.JoinHostPort(hostName, "0")
config.Auth.PublicAddrs = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: hostName,
},
}
config.Auth.ClusterName, err = services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{
ClusterName: "testcluster",
})
require.NoError(t, err)
config.SetAuthServerAddress(config.Auth.ListenAddr)
config.Auth.StaticTokens, err = types.NewStaticTokens(types.StaticTokensSpecV2{
StaticTokens: []types.ProvisionTokenV1{
{
Roles: []types.SystemRole{"Proxy", "Node"},
Token: "foo",
},
},
})
require.NoError(t, err)

return config
}

func newProxyConfig(t *testing.T, authAddr utils.NetAddr, log utils.Logger) *servicecfg.Config {
hostName, err := os.Hostname()
require.NoError(t, err)

config := servicecfg.MakeDefaultConfig()
config.DataDir = t.TempDir()
config.CachePolicy.Enabled = true
config.Auth.Enabled = false
config.SSH.Enabled = false
config.SetToken("foo")
config.SetAuthServerAddress(authAddr)
config.Log = log
config.InstanceMetadataClient = cloud.NewDisabledIMDSClient()
config.MaxRetryPeriod = 25 * time.Millisecond

config.Proxy.Enabled = true
config.Proxy.DisableWebInterface = true
config.Proxy.DisableWebService = true
config.Proxy.DisableReverseTunnel = true
config.Proxy.SSHAddr.Addr = net.JoinHostPort(hostName, "0")
config.Proxy.WebAddr.Addr = net.JoinHostPort(hostName, "0")

return config
}
Loading

0 comments on commit ca27ce9

Please sign in to comment.