Skip to content

Commit

Permalink
Apply host dns settings base on peer count
Browse files Browse the repository at this point in the history
  • Loading branch information
hurricanehrndz committed Aug 15, 2024
1 parent 6016d2f commit 09778a6
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 107 deletions.
5 changes: 3 additions & 2 deletions client/internal/dns/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newNoopHostMocker() hostManager {
}
}

func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int) HostDNSConfig {
func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int, connectedPeers int) HostDNSConfig {
config := HostDNSConfig{
RouteAll: false,
ServerIP: ip,
Expand All @@ -88,13 +88,14 @@ func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int) HostD
if len(nsConfig.NameServers) == 0 {
continue
}
if nsConfig.Primary {
if nsConfig.Primary && connectedPeers != 0 {
config.RouteAll = true
}

for _, domain := range nsConfig.Domains {
config.Domains = append(config.Domains, DomainConfig{
Domain: strings.TrimSuffix(domain, "."),
Disabled: connectedPeers == 0,
MatchOnly: !nsConfig.SearchDomainsEnabled,
})
}
Expand Down
35 changes: 17 additions & 18 deletions client/internal/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/netip"
"runtime"
"slices"
"strings"
"sync"

Expand Down Expand Up @@ -116,7 +117,7 @@ func NewDefaultServerPermanentUpstream(
ds.hostsDNSHolder.set(hostsDnsList)
ds.permanent = true
ds.addHostRootZone()
ds.currentConfig = dnsConfigToHostDNSConfig(config, ds.service.RuntimeIP(), ds.service.RuntimePort())
ds.currentConfig = dnsConfigToHostDNSConfig(config, ds.service.RuntimeIP(), ds.service.RuntimePort(), 1)
ds.searchDomainNotifier = newNotifier(ds.SearchDomains())
ds.searchDomainNotifier.setListener(listener)
setServerDns(ds)
Expand Down Expand Up @@ -305,11 +306,18 @@ func (s *DefaultServer) applyConfiguration(update nbdns.Config) error {
if err != nil {
return fmt.Errorf("not applying dns update, error: %v", err)
}
muxUpdates := append(localMuxUpdates, upstreamMuxUpdates...) //nolint:gocritic

var muxUpdates []muxUpdate
if s.statusRecorder.GetConnectedPeersCount() == 0 {
log.Infof("O connected peers, not registering upstream handlers")
muxUpdates = localMuxUpdates
} else {
muxUpdates = append(localMuxUpdates, upstreamMuxUpdates...) //nolint:gocritic
}

s.updateMux(muxUpdates)
s.updateLocalResolver(localRecords)
s.currentConfig = dnsConfigToHostDNSConfig(update, s.service.RuntimeIP(), s.service.RuntimePort())
s.currentConfig = dnsConfigToHostDNSConfig(update, s.service.RuntimeIP(), s.service.RuntimePort(), s.statusRecorder.GetConnectedPeersCount())

hostUpdate := s.currentConfig
if s.service.RuntimePort() != defaultPort && !s.hostManager.supportCustomPort() {
Expand Down Expand Up @@ -359,8 +367,8 @@ func (s *DefaultServer) buildLocalHandlerUpdate(customZones []nbdns.CustomZone)
}

func (s *DefaultServer) buildUpstreamHandlerUpdate(nameServerGroups []*nbdns.NameServerGroup) ([]muxUpdate, error) {

var muxUpdates []muxUpdate
log.Infof("length of nameServerGroups %d", len(nameServerGroups))
for _, nsGroup := range nameServerGroups {
if len(nsGroup.NameServers) == 0 {
log.Warn("received a nameserver group with empty nameserver list")
Expand Down Expand Up @@ -495,29 +503,22 @@ func (s *DefaultServer) upstreamCallbacks(
nsGroup *nbdns.NameServerGroup,
handler dns.Handler,
) (deactivate func(error), reactivate func()) {
var removeIndex map[string]int
deactivate = func(err error) {
s.mux.Lock()
defer s.mux.Unlock()

l := log.WithField("nameservers", nsGroup.NameServers)
l.Info("Temporarily deactivating nameservers group due to timeout")

removeIndex = make(map[string]int)
for _, domain := range nsGroup.Domains {
removeIndex[domain] = -1
}
if nsGroup.Primary {
removeIndex[nbdns.RootZone] = -1
s.currentConfig.RouteAll = false
s.service.DeregisterMux(nbdns.RootZone)
}

for i, item := range s.currentConfig.Domains {
if _, found := removeIndex[item.Domain]; found {
if slices.Contains(nsGroup.Domains, item.Domain) {
s.currentConfig.Domains[i].Disabled = true
s.service.DeregisterMux(item.Domain)
removeIndex[item.Domain] = i
}
}

Expand All @@ -530,18 +531,16 @@ func (s *DefaultServer) upstreamCallbacks(
}

s.updateNSState(nsGroup, err, false)

}
reactivate = func() {
s.mux.Lock()
defer s.mux.Unlock()

for domain, i := range removeIndex {
if i == -1 || i >= len(s.currentConfig.Domains) || s.currentConfig.Domains[i].Domain != domain {
continue
for i, item := range s.currentConfig.Domains {
if slices.Contains(nsGroup.Domains, item.Domain) {
s.currentConfig.Domains[i].Disabled = false
s.service.RegisterMux(item.Domain, handler)
}
s.currentConfig.Domains[i].Disabled = false
s.service.RegisterMux(domain, handler)
}

l := log.WithField("nameservers", nsGroup.NameServers)
Expand Down
147 changes: 83 additions & 64 deletions client/internal/dns/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type upstreamResolverBase struct {
mutex sync.Mutex
reactivatePeriod time.Duration
upstreamTimeout time.Duration
probeRunning atomic.Bool
cancelProbe context.CancelFunc

deactivate func(error)
reactivate func()
Expand All @@ -56,18 +58,34 @@ type upstreamResolverBase struct {
func newUpstreamResolverBase(ctx context.Context, statusRecorder *peer.Status) *upstreamResolverBase {
ctx, cancel := context.WithCancel(ctx)

return &upstreamResolverBase{
resolverBase := &upstreamResolverBase{
ctx: ctx,
cancel: cancel,
upstreamTimeout: upstreamTimeout,
reactivatePeriod: reactivatePeriod,
failsTillDeact: failsTillDeact,
statusRecorder: statusRecorder,
}

go resolverBase.watchPeersConnStatusChanges()

return resolverBase
}

func (u *upstreamResolverBase) watchPeersConnStatusChanges() {
for {
select {
case <-u.ctx.Done():
return
case <-u.statusRecorder.GetPeersConnStatusChangeNotifier():
log.Debugf("probing DNS availability")
go u.probeAvailability()
}
}
}

func (u *upstreamResolverBase) stop() {
log.Debugf("stopping serving DNS for upstreams %s", u.upstreamServers)
log.Warnf("stopping serving DNS for upstreams %s", u.upstreamServers)
u.cancel()
}

Expand Down Expand Up @@ -104,7 +122,7 @@ func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || isTimeout(err) {
log.WithError(err).WithField("upstream", upstream).
Warn("got an error while connecting to upstream")
Debug("got an error while connecting to upstream")
continue
}
u.failsCount.Add(1)
Expand Down Expand Up @@ -137,7 +155,7 @@ func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
return
}
u.failsCount.Add(1)
log.Error("all queries to the upstream nameservers failed with timeout")
log.Debug("all queries to the upstream nameservers failed with timeout")
}

// checkUpstreamFails counts fails and disables or enables upstream resolving
Expand All @@ -163,93 +181,95 @@ func (u *upstreamResolverBase) checkUpstreamFails(err error) {
}

// probeAvailability tests all upstream servers simultaneously and
// disables the resolver if none work
// disables/enable the resolver based on probe's outcome
func (u *upstreamResolverBase) probeAvailability() {
u.mutex.Lock()
defer u.mutex.Unlock()

select {
case <-u.ctx.Done():
if u.statusRecorder.GetConnectedPeersCount() == 0 {
u.disable(fmt.Errorf("no peers connected"))
// cancel backoff operation
if u.cancelProbe == nil {
return
}
log.Warn("canceling DNS probing because 0 peers connected")
u.cancelProbe()
return
default:
}

// avoid probe if upstreams could resolve at least one query and fails count is less than failsTillDeact
if u.successCount.Load() > 0 && u.failsCount.Load() < u.failsTillDeact {
if u.probeRunning.Load() {
log.Info("DNS probe already running")
return
}
defer func() {
u.probeRunning.Store(false)
}()
u.probeRunning.Store(true)

var success bool
var mu sync.Mutex
var wg sync.WaitGroup

var errors *multierror.Error
for _, upstream := range u.upstreamServers {
upstream := upstream

wg.Add(1)
go func() {
defer wg.Done()
err := u.testNameserver(upstream, 500*time.Millisecond)
if err != nil {
errors = multierror.Append(errors, err)
log.Warnf("probing upstream nameserver %s: %s", upstream, err)
return
}

mu.Lock()
defer mu.Unlock()
success = true
}()
}

wg.Wait()

// didn't find a working upstream server, let's disable and try later
if !success {
u.disable(errors.ErrorOrNil())
}
}

// waitUntilResponse retries, in an exponential interval, querying the upstream servers until it gets a positive response
func (u *upstreamResolverBase) waitUntilResponse() {
exponentialBackOff := &backoff.ExponentialBackOff{
InitialInterval: 500 * time.Millisecond,
InitialInterval: 200 * time.Millisecond,
RandomizationFactor: 0.5,
Multiplier: 1.1,
MaxInterval: u.reactivatePeriod,
MaxElapsedTime: 0,
MaxInterval: 5 * time.Second,
MaxElapsedTime: 90 * time.Second,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}

operation := func() error {
select {
case <-u.ctx.Done():
return backoff.Permanent(fmt.Errorf("exiting upstream retry loop for upstreams %s: parent context has been canceled", u.upstreamServers))
return backoff.Permanent(fmt.Errorf("exiting upstream retry loop for upstreams %s: parent context : %s", u.upstreamServers, u.ctx.Err()))
default:
}

var success bool
var mu sync.Mutex
var wg sync.WaitGroup

var errors *multierror.Error
u.mutex.Lock()
defer u.mutex.Unlock()
for _, upstream := range u.upstreamServers {
if err := u.testNameserver(upstream, probeTimeout); err != nil {
log.Tracef("upstream check for %s: %s", upstream, err)
} else {
// at least one upstream server is available, stop probing
return nil
}
upstream := upstream

wg.Add(1)
go func() {
defer wg.Done()
err := u.testNameserver(upstream, 500*time.Millisecond)
if err != nil {
errors = multierror.Append(errors, err)
log.Debugf("probing upstream nameserver %s: %s", upstream, err)
return
}
mu.Lock()
defer mu.Unlock()
success = true
}()
}

log.Tracef("checking connectivity with upstreams %s failed. Retrying in %s", u.upstreamServers, exponentialBackOff.NextBackOff())
return fmt.Errorf("upstream check call error")
wg.Wait()

if !success {
return errors.ErrorOrNil()
}
return nil
}

err := backoff.Retry(operation, exponentialBackOff)
ctx, cancel := context.WithCancel(context.Background())
u.cancelProbe = cancel
err := backoff.Retry(func() error {
select {
case <-ctx.Done():
log.Warn("DNS probing cancelled")
return backoff.Permanent(ctx.Err())
default:
return operation()
}
}, backoff.WithContext(exponentialBackOff, ctx))
if err != nil {
log.Warn(err)
log.Warn("DNS probing failed")
u.disable(err)
return
}

log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServers)
u.failsCount.Store(0)
u.successCount.Add(1)
u.reactivate()
Expand All @@ -272,11 +292,10 @@ func (u *upstreamResolverBase) disable(err error) {
return
}

log.Warnf("Upstream resolving is Disabled for %v", reactivatePeriod)
log.Warn("Upstream resolving is disabled til a peer connects")
u.successCount.Store(0)
u.deactivate(err)
u.disabled = true
go u.waitUntilResponse()
}

func (u *upstreamResolverBase) testNameserver(server string, timeout time.Duration) error {
Expand Down
4 changes: 0 additions & 4 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,10 +824,6 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {

e.networkSerial = serial

// Test received (upstream) servers for availability right away instead of upon usage.
// If no server of a server group responds this will disable the respective handler and retry later.
e.dnsServer.ProbeAvailability()

return nil
}

Expand Down
Loading

0 comments on commit 09778a6

Please sign in to comment.