diff --git a/client/internal/dns/host.go b/client/internal/dns/host.go index e55a0705556..9ba517eadd9 100644 --- a/client/internal/dns/host.go +++ b/client/internal/dns/host.go @@ -78,7 +78,7 @@ func newNoopHostMocker() hostManager { } } -func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int) HostDNSConfig { +func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int, connectedPeers int) HostDNSConfig { config := HostDNSConfig{ RouteAll: false, ServerIP: ip, @@ -88,13 +88,14 @@ func dnsConfigToHostDNSConfig(dnsConfig nbdns.Config, ip string, port int) HostD if len(nsConfig.NameServers) == 0 { continue } - if nsConfig.Primary { + if nsConfig.Primary && connectedPeers != 0 { config.RouteAll = true } for _, domain := range nsConfig.Domains { config.Domains = append(config.Domains, DomainConfig{ Domain: strings.TrimSuffix(domain, "."), + Disabled: connectedPeers == 0, MatchOnly: !nsConfig.SearchDomainsEnabled, }) } diff --git a/client/internal/dns/server.go b/client/internal/dns/server.go index a4651ebb5b0..7a08c8f5d40 100644 --- a/client/internal/dns/server.go +++ b/client/internal/dns/server.go @@ -5,6 +5,7 @@ import ( "fmt" "net/netip" "runtime" + "slices" "strings" "sync" @@ -116,7 +117,7 @@ func NewDefaultServerPermanentUpstream( ds.hostsDNSHolder.set(hostsDnsList) ds.permanent = true ds.addHostRootZone() - ds.currentConfig = dnsConfigToHostDNSConfig(config, ds.service.RuntimeIP(), ds.service.RuntimePort()) + ds.currentConfig = dnsConfigToHostDNSConfig(config, ds.service.RuntimeIP(), ds.service.RuntimePort(), 1) ds.searchDomainNotifier = newNotifier(ds.SearchDomains()) ds.searchDomainNotifier.setListener(listener) setServerDns(ds) @@ -305,11 +306,18 @@ func (s *DefaultServer) applyConfiguration(update nbdns.Config) error { if err != nil { return fmt.Errorf("not applying dns update, error: %v", err) } - muxUpdates := append(localMuxUpdates, upstreamMuxUpdates...) //nolint:gocritic + + var muxUpdates []muxUpdate + if s.statusRecorder.GetConnectedPeersCount() == 0 { + log.Infof("O connected peers, not registering upstream handlers") + muxUpdates = localMuxUpdates + } else { + muxUpdates = append(localMuxUpdates, upstreamMuxUpdates...) //nolint:gocritic + } s.updateMux(muxUpdates) s.updateLocalResolver(localRecords) - s.currentConfig = dnsConfigToHostDNSConfig(update, s.service.RuntimeIP(), s.service.RuntimePort()) + s.currentConfig = dnsConfigToHostDNSConfig(update, s.service.RuntimeIP(), s.service.RuntimePort(), s.statusRecorder.GetConnectedPeersCount()) hostUpdate := s.currentConfig if s.service.RuntimePort() != defaultPort && !s.hostManager.supportCustomPort() { @@ -359,8 +367,8 @@ func (s *DefaultServer) buildLocalHandlerUpdate(customZones []nbdns.CustomZone) } func (s *DefaultServer) buildUpstreamHandlerUpdate(nameServerGroups []*nbdns.NameServerGroup) ([]muxUpdate, error) { - var muxUpdates []muxUpdate + log.Infof("length of nameServerGroups %d", len(nameServerGroups)) for _, nsGroup := range nameServerGroups { if len(nsGroup.NameServers) == 0 { log.Warn("received a nameserver group with empty nameserver list") @@ -495,7 +503,6 @@ func (s *DefaultServer) upstreamCallbacks( nsGroup *nbdns.NameServerGroup, handler dns.Handler, ) (deactivate func(error), reactivate func()) { - var removeIndex map[string]int deactivate = func(err error) { s.mux.Lock() defer s.mux.Unlock() @@ -503,21 +510,15 @@ func (s *DefaultServer) upstreamCallbacks( l := log.WithField("nameservers", nsGroup.NameServers) l.Info("Temporarily deactivating nameservers group due to timeout") - removeIndex = make(map[string]int) - for _, domain := range nsGroup.Domains { - removeIndex[domain] = -1 - } if nsGroup.Primary { - removeIndex[nbdns.RootZone] = -1 s.currentConfig.RouteAll = false s.service.DeregisterMux(nbdns.RootZone) } for i, item := range s.currentConfig.Domains { - if _, found := removeIndex[item.Domain]; found { + if slices.Contains(nsGroup.Domains, item.Domain) { s.currentConfig.Domains[i].Disabled = true s.service.DeregisterMux(item.Domain) - removeIndex[item.Domain] = i } } @@ -530,18 +531,16 @@ func (s *DefaultServer) upstreamCallbacks( } s.updateNSState(nsGroup, err, false) - } reactivate = func() { s.mux.Lock() defer s.mux.Unlock() - for domain, i := range removeIndex { - if i == -1 || i >= len(s.currentConfig.Domains) || s.currentConfig.Domains[i].Domain != domain { - continue + for i, item := range s.currentConfig.Domains { + if slices.Contains(nsGroup.Domains, item.Domain) { + s.currentConfig.Domains[i].Disabled = false + s.service.RegisterMux(item.Domain, handler) } - s.currentConfig.Domains[i].Disabled = false - s.service.RegisterMux(domain, handler) } l := log.WithField("nameservers", nsGroup.NameServers) diff --git a/client/internal/dns/upstream.go b/client/internal/dns/upstream.go index b3baf2fa8fd..3ec21e22de1 100644 --- a/client/internal/dns/upstream.go +++ b/client/internal/dns/upstream.go @@ -47,6 +47,8 @@ type upstreamResolverBase struct { mutex sync.Mutex reactivatePeriod time.Duration upstreamTimeout time.Duration + probeRunning atomic.Bool + cancelProbe context.CancelFunc deactivate func(error) reactivate func() @@ -56,7 +58,7 @@ type upstreamResolverBase struct { func newUpstreamResolverBase(ctx context.Context, statusRecorder *peer.Status) *upstreamResolverBase { ctx, cancel := context.WithCancel(ctx) - return &upstreamResolverBase{ + resolverBase := &upstreamResolverBase{ ctx: ctx, cancel: cancel, upstreamTimeout: upstreamTimeout, @@ -64,10 +66,26 @@ func newUpstreamResolverBase(ctx context.Context, statusRecorder *peer.Status) * failsTillDeact: failsTillDeact, statusRecorder: statusRecorder, } + + go resolverBase.watchPeersConnStatusChanges() + + return resolverBase +} + +func (u *upstreamResolverBase) watchPeersConnStatusChanges() { + for { + select { + case <-u.ctx.Done(): + return + case <-u.statusRecorder.GetPeersConnStatusChangeNotifier(): + log.Debugf("probing DNS availability") + go u.probeAvailability() + } + } } func (u *upstreamResolverBase) stop() { - log.Debugf("stopping serving DNS for upstreams %s", u.upstreamServers) + log.Warnf("stopping serving DNS for upstreams %s", u.upstreamServers) u.cancel() } @@ -104,7 +122,7 @@ func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { if err != nil { if errors.Is(err, context.DeadlineExceeded) || isTimeout(err) { log.WithError(err).WithField("upstream", upstream). - Warn("got an error while connecting to upstream") + Debug("got an error while connecting to upstream") continue } u.failsCount.Add(1) @@ -137,7 +155,7 @@ func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { return } u.failsCount.Add(1) - log.Error("all queries to the upstream nameservers failed with timeout") + log.Debug("all queries to the upstream nameservers failed with timeout") } // checkUpstreamFails counts fails and disables or enables upstream resolving @@ -163,62 +181,34 @@ func (u *upstreamResolverBase) checkUpstreamFails(err error) { } // probeAvailability tests all upstream servers simultaneously and -// disables the resolver if none work +// disables/enable the resolver based on probe's outcome func (u *upstreamResolverBase) probeAvailability() { - u.mutex.Lock() - defer u.mutex.Unlock() - - select { - case <-u.ctx.Done(): + if u.statusRecorder.GetConnectedPeersCount() == 0 { + u.disable(fmt.Errorf("no peers connected")) + // cancel backoff operation + if u.cancelProbe == nil { + return + } + log.Warn("canceling DNS probing because 0 peers connected") + u.cancelProbe() return - default: } - // avoid probe if upstreams could resolve at least one query and fails count is less than failsTillDeact - if u.successCount.Load() > 0 && u.failsCount.Load() < u.failsTillDeact { + if u.probeRunning.Load() { + log.Info("DNS probe already running") return } + defer func() { + u.probeRunning.Store(false) + }() + u.probeRunning.Store(true) - var success bool - var mu sync.Mutex - var wg sync.WaitGroup - - var errors *multierror.Error - for _, upstream := range u.upstreamServers { - upstream := upstream - - wg.Add(1) - go func() { - defer wg.Done() - err := u.testNameserver(upstream, 500*time.Millisecond) - if err != nil { - errors = multierror.Append(errors, err) - log.Warnf("probing upstream nameserver %s: %s", upstream, err) - return - } - - mu.Lock() - defer mu.Unlock() - success = true - }() - } - - wg.Wait() - - // didn't find a working upstream server, let's disable and try later - if !success { - u.disable(errors.ErrorOrNil()) - } -} - -// waitUntilResponse retries, in an exponential interval, querying the upstream servers until it gets a positive response -func (u *upstreamResolverBase) waitUntilResponse() { exponentialBackOff := &backoff.ExponentialBackOff{ - InitialInterval: 500 * time.Millisecond, + InitialInterval: 200 * time.Millisecond, RandomizationFactor: 0.5, Multiplier: 1.1, - MaxInterval: u.reactivatePeriod, - MaxElapsedTime: 0, + MaxInterval: 5 * time.Second, + MaxElapsedTime: 90 * time.Second, Stop: backoff.Stop, Clock: backoff.SystemClock, } @@ -226,30 +216,60 @@ func (u *upstreamResolverBase) waitUntilResponse() { operation := func() error { select { case <-u.ctx.Done(): - return backoff.Permanent(fmt.Errorf("exiting upstream retry loop for upstreams %s: parent context has been canceled", u.upstreamServers)) + return backoff.Permanent(fmt.Errorf("exiting upstream retry loop for upstreams %s: parent context : %s", u.upstreamServers, u.ctx.Err())) default: } + var success bool + var mu sync.Mutex + var wg sync.WaitGroup + + var errors *multierror.Error + u.mutex.Lock() + defer u.mutex.Unlock() for _, upstream := range u.upstreamServers { - if err := u.testNameserver(upstream, probeTimeout); err != nil { - log.Tracef("upstream check for %s: %s", upstream, err) - } else { - // at least one upstream server is available, stop probing - return nil - } + upstream := upstream + + wg.Add(1) + go func() { + defer wg.Done() + err := u.testNameserver(upstream, 500*time.Millisecond) + if err != nil { + errors = multierror.Append(errors, err) + log.Debugf("probing upstream nameserver %s: %s", upstream, err) + return + } + mu.Lock() + defer mu.Unlock() + success = true + }() } - log.Tracef("checking connectivity with upstreams %s failed. Retrying in %s", u.upstreamServers, exponentialBackOff.NextBackOff()) - return fmt.Errorf("upstream check call error") + wg.Wait() + + if !success { + return errors.ErrorOrNil() + } + return nil } - err := backoff.Retry(operation, exponentialBackOff) + ctx, cancel := context.WithCancel(context.Background()) + u.cancelProbe = cancel + err := backoff.Retry(func() error { + select { + case <-ctx.Done(): + log.Warn("DNS probing cancelled") + return backoff.Permanent(ctx.Err()) + default: + return operation() + } + }, backoff.WithContext(exponentialBackOff, ctx)) if err != nil { - log.Warn(err) + log.Warn("DNS probing failed") + u.disable(err) return } - log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServers) u.failsCount.Store(0) u.successCount.Add(1) u.reactivate() @@ -272,11 +292,10 @@ func (u *upstreamResolverBase) disable(err error) { return } - log.Warnf("Upstream resolving is Disabled for %v", reactivatePeriod) + log.Warn("Upstream resolving is disabled til a peer connects") u.successCount.Store(0) u.deactivate(err) u.disabled = true - go u.waitUntilResponse() } func (u *upstreamResolverBase) testNameserver(server string, timeout time.Duration) error { diff --git a/client/internal/engine.go b/client/internal/engine.go index d65322d6a53..a917b3bd246 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -824,10 +824,6 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { e.networkSerial = serial - // Test received (upstream) servers for availability right away instead of upon usage. - // If no server of a server group responds this will disable the respective handler and retry later. - e.dnsServer.ProbeAvailability() - return nil } diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index a7cfb95c4c7..56848d5f1c4 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -120,23 +120,24 @@ type FullStatus struct { // Status holds a state of peers, signal, management connections and relays type Status struct { - mux sync.Mutex - peers map[string]State - changeNotify map[string]chan struct{} - signalState bool - signalError error - managementState bool - managementError error - relayStates []relay.ProbeResult - localPeer LocalPeerState - offlinePeers []State - mgmAddress string - signalAddress string - notifier *notifier - rosenpassEnabled bool - rosenpassPermissive bool - nsGroupStates []NSGroupState - resolvedDomainsStates map[domain.Domain][]netip.Prefix + mux sync.Mutex + peers map[string]State + changeNotify map[string]chan struct{} + signalState bool + signalError error + managementState bool + managementError error + relayStates []relay.ProbeResult + localPeer LocalPeerState + offlinePeers []State + mgmAddress string + signalAddress string + notifier *notifier + rosenpassEnabled bool + rosenpassPermissive bool + nsGroupStates []NSGroupState + resolvedDomainsStates map[domain.Domain][]netip.Prefix + aPeerConnStatusChanged chan struct{} // To reduce the number of notification invocation this bool will be true when need to call the notification // Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events @@ -215,6 +216,7 @@ func (d *Status) RemovePeer(peerPubKey string) error { // UpdatePeerState updates peer status func (d *Status) UpdatePeerState(receivedState State) error { + var connStatusChanged bool d.mux.Lock() defer d.mux.Unlock() @@ -227,8 +229,9 @@ func (d *Status) UpdatePeerState(receivedState State) error { peerState.IP = receivedState.IP } - if receivedState.GetRoutes() != nil { - peerState.SetRoutes(receivedState.GetRoutes()) + routes := receivedState.GetRoutes() + if routes != nil { + peerState.SetRoutes(routes) } skipNotification := shouldSkipNotify(receivedState, peerState) @@ -243,10 +246,16 @@ func (d *Status) UpdatePeerState(receivedState State) error { peerState.LocalIceCandidateEndpoint = receivedState.LocalIceCandidateEndpoint peerState.RemoteIceCandidateEndpoint = receivedState.RemoteIceCandidateEndpoint peerState.RosenpassEnabled = receivedState.RosenpassEnabled + connStatusChanged = true } d.peers[receivedState.PubKey] = peerState + if connStatusChanged && d.aPeerConnStatusChanged != nil && (peerState.ConnStatus == StatusConnected || peerState.ConnStatus == StatusDisconnected) { + close(d.aPeerConnStatusChanged) + d.aPeerConnStatusChanged = nil + } + if skipNotification { return nil } @@ -323,6 +332,18 @@ func (d *Status) FinishPeerListModifications() { d.notifyPeerListChanged() } +// GetPeersConnStatusChangeNotifier returns a change notifier channel for routing peer list +func (d *Status) GetPeersConnStatusChangeNotifier() <-chan struct{} { + d.mux.Lock() + defer d.mux.Unlock() + if d.aPeerConnStatusChanged == nil { + ch := make(chan struct{}) + d.aPeerConnStatusChanged = ch + return ch + } + return d.aPeerConnStatusChanged +} + // GetPeerStateChangeNotifier returns a change notifier channel for a peer func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} { d.mux.Lock() @@ -342,6 +363,19 @@ func (d *Status) GetLocalPeerState() LocalPeerState { return d.localPeer } +// GetConnectedPeersCount returns number of peers connected +func (d *Status) GetConnectedPeersCount() int { + d.mux.Lock() + defer d.mux.Unlock() + var connectedCount int + for _, peer := range d.peers { + if peer.ConnStatus == StatusConnected { + connectedCount++ + } + } + return connectedCount +} + // UpdateLocalPeerState updates local peer status func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) { d.mux.Lock()