Skip to content

Commit c619741

Browse files
authored
Merge pull request kosmos-io#824 from duanmengkk/feature_etcd_watcher
add etcd watcher for blockaffinity
2 parents 0c599cc + e7068a9 commit c619741

File tree

8 files changed

+819
-103
lines changed

8 files changed

+819
-103
lines changed

go.sum

-88
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package blockwatchsyncer
2+
3+
import (
4+
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
5+
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
6+
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
7+
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
8+
)
9+
10+
// NewBlockWatchSyncer creates a new BlockAffinity v1 Syncer.
11+
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
12+
resourceTypes := []watchersyncer.ResourceType{
13+
{
14+
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
15+
},
16+
}
17+
18+
return watchersyncer.New(client, resourceTypes, callbacks)
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package blockwatchsyncer
2+
3+
import (
4+
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
5+
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
6+
"k8s.io/apimachinery/pkg/util/wait"
7+
"k8s.io/klog/v2"
8+
"time"
9+
)
10+
11+
// syncedPollPeriod controls how often you look at the status of your sync funcs
12+
var syncedPollPeriod = 100 * time.Millisecond
13+
14+
type BlockEventHandler struct {
15+
// Channel for getting updates and status updates from syncer.
16+
syncerC chan interface{}
17+
18+
processor lifted.AsyncWorker
19+
// Channel to indicate node status reporter routine is not needed anymore.
20+
done chan struct{}
21+
22+
// Flag to show we are in-sync.
23+
inSync bool
24+
}
25+
26+
func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
27+
return &BlockEventHandler{
28+
processor: processor,
29+
}
30+
}
31+
32+
func (b *BlockEventHandler) Run(stopCh <-chan struct{}) {
33+
for {
34+
select {
35+
case <-stopCh:
36+
return
37+
case <-b.done:
38+
return
39+
case event := <-b.syncerC:
40+
switch event := event.(type) {
41+
case []api.Update:
42+
b.onupdate(event)
43+
case api.SyncStatus:
44+
b.inSync = true
45+
}
46+
}
47+
}
48+
}
49+
50+
func (b *BlockEventHandler) Stop() {
51+
b.done <- struct{}{}
52+
}
53+
54+
func (b *BlockEventHandler) Done() <-chan struct{} {
55+
return b.done
56+
}
57+
58+
func (b *BlockEventHandler) InSync() bool {
59+
return b.inSync
60+
}
61+
62+
func (b *BlockEventHandler) OnStatusUpdated(status api.SyncStatus) {
63+
if status == api.InSync {
64+
b.syncerC <- status
65+
}
66+
}
67+
68+
func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
69+
b.syncerC <- updates
70+
}
71+
72+
// todo put etcd's event info AsyncWorker's queue
73+
func (b *BlockEventHandler) onupdate(event []api.Update) {
74+
75+
}
76+
77+
func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
78+
err := wait.PollImmediateUntil(syncedPollPeriod, func() (done bool, err error) {
79+
if b.inSync {
80+
return true, nil
81+
}
82+
return false, nil
83+
}, stopCh)
84+
85+
if err != nil {
86+
klog.V(2).Infof("stop requested")
87+
return false
88+
}
89+
90+
klog.V(4).Infof("caches populated")
91+
return true
92+
}

pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go

+11-15
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package adaper
22

33
import (
4+
"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer"
45
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
56
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
67
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
8+
"k8s.io/klog/v2"
79
)
810

911
type CalicoETCDAdapter struct {
1012
sync bool
13+
watchSyncer api.Syncer
1114
etcdClient api.Client
1215
clusterNodeLister clusterlister.ClusterNodeLister
1316
processor lifted.AsyncWorker
@@ -25,7 +28,14 @@ func NewCalicoETCDAdapter(etcdClient api.Client,
2528
}
2629

2730
func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
28-
// todo use c.etcdClient to list and watch blockaffinity in etcd
31+
blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor)
32+
c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
33+
c.watchSyncer.Start()
34+
blockEventHandler.Run(stopCh)
35+
36+
blockEventHandler.WaitForCacheSync(stopCh)
37+
c.sync = true
38+
klog.Info("calico blockaffinities etcd watchsyncer started!")
2939
return nil
3040
}
3141

@@ -39,17 +49,3 @@ func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error)
3949
func (c *CalicoETCDAdapter) Synced() bool {
4050
return c.sync
4151
}
42-
43-
func (c *CalicoETCDAdapter) OnAdd(obj interface{}) {
44-
// todo add event info to c.processor
45-
}
46-
47-
// OnUpdate handles object update event and push the object to queue.
48-
func (c *CalicoETCDAdapter) OnUpdate(_, newObj interface{}) {
49-
// todo add event info to c.processor
50-
}
51-
52-
// OnDelete handles object delete event and push the object to queue.
53-
func (c *CalicoETCDAdapter) OnDelete(obj interface{}) {
54-
// todo add event info to c.processor
55-
}

vendor/github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer/doc.go

+31
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)