Skip to content

Commit 3f8e03e

Browse files
committed
Add Workload DnsController
Signed-off-by: Tom <[email protected]>
1 parent d878050 commit 3f8e03e

File tree

4 files changed

+297
-21
lines changed

4 files changed

+297
-21
lines changed

pkg/controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
148148
c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling)
149149

150150
if c.client.WorkloadController != nil {
151+
c.client.WorkloadController.StartDnsController(stopCh)
151152
c.client.WorkloadController.Run(ctx)
152153
} else {
153154
c.client.AdsController.StartDnsController(stopCh)

pkg/controller/workload/dns.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package workload
2+
3+
import (
4+
"net"
5+
"net/netip"
6+
"sync"
7+
"time"
8+
9+
"google.golang.org/protobuf/proto"
10+
"kmesh.net/kmesh/api/v2/workloadapi"
11+
"kmesh.net/kmesh/pkg/controller/workload/cache"
12+
"kmesh.net/kmesh/pkg/dns"
13+
)
14+
15+
type dnsController struct {
16+
workloadsChan chan []*workloadapi.Workload
17+
cache cache.WorkloadCache
18+
dnsResolver *dns.DNSResolver
19+
// store the copy of pendingResolveWorkload.
20+
workloadCache map[string]*pendingResolveDomain
21+
// store all pending hostnames in the workloads
22+
pendingHostnames map[string][]string
23+
sync.RWMutex
24+
}
25+
26+
// pending resolve domain info of Dual-Engine Mode,
27+
// workload is used for create the apiworkload
28+
type pendingResolveDomain struct {
29+
Workloads []*workloadapi.Workload
30+
RefreshRate time.Duration
31+
}
32+
33+
func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) {
34+
resolver, err := dns.NewDNSResolver()
35+
if err != nil {
36+
return nil, err
37+
}
38+
return &dnsController{
39+
workloadsChan: make(chan []*workloadapi.Workload),
40+
cache: cache,
41+
dnsResolver: resolver,
42+
workloadCache: make(map[string]*pendingResolveDomain),
43+
pendingHostnames: make(map[string][]string),
44+
}, nil
45+
}
46+
47+
func (r *dnsController) Run(stopCh <-chan struct{}) {
48+
go r.dnsResolver.StartDnsResolver(stopCh)
49+
go r.refreshWorker(stopCh)
50+
go r.processWorkloads()
51+
go func() {
52+
<-stopCh
53+
close(r.workloadsChan)
54+
}()
55+
}
56+
57+
func (r *dnsController) processWorkloads() {
58+
for workloads := range r.workloadsChan {
59+
r.processDomains(workloads)
60+
}
61+
}
62+
63+
func (r *dnsController) processDomains(workload []*workloadapi.Workload) {
64+
domains := getPendingResolveDomain(workload)
65+
66+
// store all pending hostnames of clusters in pendingHostnames
67+
for _, workload := range workload {
68+
workloadName := workload.GetName()
69+
hostname := workload.GetHostname()
70+
r.pendingHostnames[workloadName] = []string{hostname}
71+
r.workloadCache[hostname] = &pendingResolveDomain{
72+
Workloads: []*workloadapi.Workload{workload},
73+
RefreshRate: 15 * time.Second,
74+
}
75+
}
76+
77+
// delete any scheduled re-resolve for domains we no longer care about
78+
r.dnsResolver.RemoveUnwatchDomain(domains)
79+
80+
// update workloadCache with pendingResolveWorkload
81+
for k, v := range domains {
82+
addresses := r.dnsResolver.GetDNSAddresses(k)
83+
if addresses != nil {
84+
go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses)
85+
} else {
86+
// Initialize the newly added hostname
87+
// and add it to the dns queue to be resolved.
88+
domainInfo := &dns.DomainInfo{
89+
Domain: k,
90+
RefreshRate: v.(*pendingResolveDomain).RefreshRate,
91+
}
92+
r.dnsResolver.AddDomainInQueue(domainInfo, 0)
93+
}
94+
}
95+
}
96+
97+
func (r *dnsController) refreshWorker(stop <-chan struct{}) {
98+
for {
99+
select {
100+
case <-stop:
101+
return
102+
case domain := <-r.dnsResolver.DnsChan:
103+
pendingDomain := r.getWorkloadsByDomain(domain)
104+
// log.Infof("dnsController refreshWorker: domain %s, pendingDomain %v", domain, pendingDomain)
105+
addrs := r.dnsResolver.GetDNSAddresses(domain)
106+
maxRetry := 3
107+
for range maxRetry {
108+
if len(addrs) > 0 {
109+
r.updateWorkloads(pendingDomain, domain, addrs)
110+
}
111+
time.Sleep(1 * time.Second)
112+
}
113+
}
114+
}
115+
}
116+
117+
func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) {
118+
isWorkerUpdate := false
119+
if pendingDomain == nil || addrs == nil{
120+
return
121+
}
122+
log.Infof("dnsController updateWorkloads: pendingDomain %v, domain %s, addrs %v", pendingDomain, domain, addrs)
123+
124+
for _, workload := range pendingDomain.Workloads {
125+
if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready {
126+
// log.Infof("dnsController update cache for workload %s with addresses %v", workload.ResourceName(), addrs)
127+
if r.cache.GetWorkloadByUid(workload.GetUid()) != nil {
128+
r.cache.AddOrUpdateWorkload(newWorkload)
129+
delete(r.workloadCache, domain)
130+
isWorkerUpdate = true
131+
}
132+
}
133+
}
134+
135+
if isWorkerUpdate {
136+
// w.cache.Flush()
137+
}
138+
}
139+
140+
func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) {
141+
ready := true
142+
hostNames := r.pendingHostnames[workload.GetName()]
143+
addressesOfHostname := make(map[string][]string)
144+
145+
for _, hostName := range hostNames {
146+
// log.Infof("overwriteDnsWorkload: checking hostname %s for workload %s with domain %s", hostName, workload.ResourceName(), domain)
147+
addresses := r.dnsResolver.GetDNSAddresses(hostName)
148+
// There are hostnames in this Cluster that are not resolved.
149+
if addresses != nil {
150+
addressesOfHostname[hostName] = addresses
151+
} else {
152+
ready = false
153+
}
154+
}
155+
156+
if ready {
157+
// log.Infof("overwriteDnsWorkload ready for workload %s with domain %s", workload.ResourceName(), domain)
158+
newWorkload := cloneWorkload(workload)
159+
for _, addr := range addrs {
160+
if ip := net.ParseIP(addr); ip != nil && ip.To4() != nil {
161+
newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice())
162+
}
163+
}
164+
165+
return ready, newWorkload
166+
}
167+
168+
return ready, nil
169+
}
170+
171+
func getPendingResolveDomain(workloads []*workloadapi.Workload) map[string]any {
172+
domains := make(map[string]any)
173+
174+
for _, workload := range workloads {
175+
hostname := workload.GetHostname()
176+
if hostname == "" {
177+
continue
178+
}
179+
180+
if _, err := netip.ParseAddr(hostname); err == nil {
181+
// This is an ip address
182+
continue
183+
}
184+
185+
// log.Infof("getPendingResolveDomain: processing workload %s with hostname %s", workload.ResourceName(), hostname)
186+
if v, ok := domains[hostname]; ok {
187+
v.(*pendingResolveDomain).Workloads = append(v.(*pendingResolveDomain).Workloads, workload)
188+
} else {
189+
190+
domainWithRefreshRate := &pendingResolveDomain{
191+
Workloads: []*workloadapi.Workload{workload},
192+
RefreshRate: 15 * time.Second,
193+
}
194+
domains[hostname] = domainWithRefreshRate
195+
}
196+
}
197+
198+
return domains
199+
}
200+
201+
func (r *dnsController) newWorkloadCache() {
202+
r.Lock()
203+
defer r.Unlock()
204+
205+
if r.workloadCache != nil {
206+
log.Debug("clean up dns workloads")
207+
r.workloadCache = map[string]*pendingResolveDomain{}
208+
return
209+
}
210+
}
211+
212+
func (r *dnsController) getWorkloadsByDomain(domain string) *pendingResolveDomain {
213+
r.RLock()
214+
defer r.RUnlock()
215+
216+
if r.workloadCache != nil {
217+
if v, ok := r.workloadCache[domain]; ok {
218+
return v
219+
}
220+
}
221+
return nil
222+
}
223+
224+
func cloneWorkload(workload *workloadapi.Workload) *workloadapi.Workload {
225+
if workload == nil {
226+
return nil
227+
}
228+
workloadCopy := proto.Clone(workload).(*workloadapi.Workload)
229+
return workloadCopy
230+
}

pkg/controller/workload/workload_controller.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ var log = logger.NewLoggerScope("workload_controller")
4040
type Controller struct {
4141
Stream discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient
4242
Processor *Processor
43+
dnsResolverController *dnsController
4344
Rbac *auth.Rbac
4445
MetricController *telemetry.MetricController
4546
MapMetricController *telemetry.MapMetricController
@@ -48,9 +49,17 @@ type Controller struct {
4849
}
4950

5051
func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) *Controller {
52+
processor := NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps)
53+
dnsResolverController, err := NewDnsController(processor.WorkloadCache)
54+
if err != nil {
55+
log.Errorf("dns resolver of Dual-Engine mode create failed: %v", err)
56+
return nil
57+
}
58+
processor.DnsResolverChan = dnsResolverController.workloadsChan
5159
c := &Controller{
52-
Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),
53-
bpfWorkloadObj: bpfWorkload,
60+
dnsResolverController: dnsResolverController,
61+
Processor: processor,
62+
bpfWorkloadObj: bpfWorkload,
5463
}
5564
// do some initialization when restart
5665
// restore endpoint index, otherwise endpoint number can double
@@ -142,6 +151,7 @@ func (c *Controller) HandleWorkloadStream() error {
142151
return fmt.Errorf("stream recv failed, %s", err)
143152
}
144153

154+
// c.dnsResolverController.newWorkloadCache()
145155
c.Processor.processWorkloadResponse(rspDelta, c.Rbac)
146156

147157
if err = c.Stream.Send(c.Processor.ack); err != nil {
@@ -188,3 +198,9 @@ func (c *Controller) SetConnectionMetricTrigger(enable bool) {
188198
func (c *Controller) GetConnectionMetricTrigger() bool {
189199
return c.MetricController.EnableConnectionMetric.Load()
190200
}
201+
202+
func (c *Controller) StartDnsController(stopCh <-chan struct{}) {
203+
if c.dnsResolverController != nil {
204+
c.dnsResolverController.Run(stopCh)
205+
}
206+
}

0 commit comments

Comments
 (0)