Skip to content

Commit 2980cf3

Browse files
authored
feat(poller): add more logs (#6559)
1 parent ff662ef commit 2980cf3

File tree

10 files changed

+95
-37
lines changed

10 files changed

+95
-37
lines changed

pkg/controllers/pd/handler.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"reflect"
2020

21+
"github.com/google/go-cmp/cmp"
2122
"k8s.io/apimachinery/pkg/types"
2223
"k8s.io/client-go/util/workqueue"
2324
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -97,10 +98,10 @@ func (r *Reconciler) MemberEventHandler() handler.TypedEventHandler[client.Objec
9798
CreateFunc: func(_ context.Context, event event.TypedCreateEvent[client.Object],
9899
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
99100
) {
100-
m := event.Object.(*pdv1.Member)
101-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
101+
r.Logger.Info("add pd member", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
102102

103-
r.Logger.Info("add member", "namespace", ns, "cluster", cluster, "name", m.Name, "health", m.Health, "invalid", m.Invalid)
103+
m := event.Object.(*pdv1.Member)
104+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
104105

105106
queue.Add(reconcile.Request{
106107
NamespacedName: types.NamespacedName{
@@ -113,10 +114,14 @@ func (r *Reconciler) MemberEventHandler() handler.TypedEventHandler[client.Objec
113114
UpdateFunc: func(_ context.Context, event event.TypedUpdateEvent[client.Object],
114115
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
115116
) {
116-
m := event.ObjectNew.(*pdv1.Member)
117-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
117+
r.Logger.Info("update pd member",
118+
"ns", event.ObjectNew.GetNamespace(),
119+
"name", event.ObjectNew.GetName(),
120+
"diff", cmp.Diff(event.ObjectOld, event.ObjectNew),
121+
)
118122

119-
r.Logger.Info("update member", "namespace", ns, "cluster", cluster, "name", m.Name, "health", m.Health, "invalid", m.Invalid)
123+
m := event.ObjectNew.(*pdv1.Member)
124+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
120125

121126
queue.Add(reconcile.Request{
122127
NamespacedName: types.NamespacedName{
@@ -128,10 +133,10 @@ func (r *Reconciler) MemberEventHandler() handler.TypedEventHandler[client.Objec
128133
DeleteFunc: func(_ context.Context, event event.TypedDeleteEvent[client.Object],
129134
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
130135
) {
131-
m := event.Object.(*pdv1.Member)
132-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
136+
r.Logger.Info("delete pd member", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
133137

134-
r.Logger.Info("delete member", "namespace", ns, "cluster", cluster, "name", m.Name)
138+
m := event.Object.(*pdv1.Member)
139+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
135140

136141
queue.Add(reconcile.Request{
137142
NamespacedName: types.NamespacedName{

pkg/controllers/tiflash/handler.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"reflect"
2121

22+
"github.com/google/go-cmp/cmp"
2223
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -108,6 +109,11 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
108109
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
109110
) {
110111
s := event.Object.(*pdv1.Store)
112+
if s.Engine() != pdv1.StoreEngineTiFlash && s.Engine() != pdv1.StoreEngineTiFlashCompute {
113+
return
114+
}
115+
r.Logger.Info("add tiflash store", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
116+
111117
req, err := r.getRequestOfTiFlashStore(ctx, s)
112118
if err != nil {
113119
return
@@ -119,6 +125,15 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
119125
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
120126
) {
121127
s := event.ObjectNew.(*pdv1.Store)
128+
if s.Engine() != pdv1.StoreEngineTiFlash && s.Engine() != pdv1.StoreEngineTiFlashCompute {
129+
return
130+
}
131+
r.Logger.Info("update tiflash store",
132+
"ns", event.ObjectNew.GetNamespace(),
133+
"name", event.ObjectNew.GetName(),
134+
"diff", cmp.Diff(event.ObjectOld, event.ObjectNew),
135+
)
136+
122137
req, err := r.getRequestOfTiFlashStore(ctx, s)
123138
if err != nil {
124139
return
@@ -130,6 +145,11 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
130145
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
131146
) {
132147
s := event.Object.(*pdv1.Store)
148+
if s.Engine() != pdv1.StoreEngineTiFlash && s.Engine() != pdv1.StoreEngineTiFlashCompute {
149+
return
150+
}
151+
r.Logger.Info("delete tiflash store", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
152+
133153
req, err := r.getRequestOfTiFlashStore(ctx, s)
134154
if err != nil {
135155
return
@@ -140,10 +160,6 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
140160
}
141161

142162
func (r *Reconciler) getRequestOfTiFlashStore(ctx context.Context, s *pdv1.Store) (reconcile.Request, error) {
143-
if s.Engine() != pdv1.StoreEngineTiFlash && s.Engine() != pdv1.StoreEngineTiFlashCompute {
144-
return reconcile.Request{}, fmt.Errorf("store is not tiflash, engine: %s", s.Engine())
145-
}
146-
147163
ns, cluster := timanager.SplitPrimaryKey(s.Namespace)
148164
var tiflashList v1alpha1.TiFlashList
149165
if err := r.Client.List(ctx, &tiflashList, client.MatchingLabels{

pkg/controllers/tikv/handler.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"reflect"
2121

22+
"github.com/google/go-cmp/cmp"
2223
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -108,6 +109,10 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
108109
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
109110
) {
110111
s := event.Object.(*pdv1.Store)
112+
if s.Engine() != pdv1.StoreEngineTiKV {
113+
return
114+
}
115+
r.Logger.Info("add tikv store", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
111116
req, err := r.getRequestOfTiKVStore(ctx, s)
112117
if err != nil {
113118
return
@@ -119,6 +124,14 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
119124
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
120125
) {
121126
s := event.ObjectNew.(*pdv1.Store)
127+
if s.Engine() != pdv1.StoreEngineTiKV {
128+
return
129+
}
130+
r.Logger.Info("update tikv store",
131+
"ns", event.ObjectNew.GetNamespace(),
132+
"name", event.ObjectNew.GetName(),
133+
"diff", cmp.Diff(event.ObjectOld, event.ObjectNew),
134+
)
122135
req, err := r.getRequestOfTiKVStore(ctx, s)
123136
if err != nil {
124137
return
@@ -130,6 +143,10 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
130143
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
131144
) {
132145
s := event.Object.(*pdv1.Store)
146+
if s.Engine() != pdv1.StoreEngineTiKV {
147+
return
148+
}
149+
r.Logger.Info("delete tikv store", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
133150
req, err := r.getRequestOfTiKVStore(ctx, s)
134151
if err != nil {
135152
return
@@ -140,10 +157,6 @@ func (r *Reconciler) StoreEventHandler() handler.TypedEventHandler[client.Object
140157
}
141158

142159
func (r *Reconciler) getRequestOfTiKVStore(ctx context.Context, s *pdv1.Store) (reconcile.Request, error) {
143-
if s.Engine() != pdv1.StoreEngineTiKV {
144-
return reconcile.Request{}, fmt.Errorf("store is not tikv")
145-
}
146-
147160
ns, cluster := timanager.SplitPrimaryKey(s.Namespace)
148161
var kvl v1alpha1.TiKVList
149162
if err := r.Client.List(ctx, &kvl, client.MatchingLabels{

pkg/controllers/tso/handler.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"reflect"
2020

21+
"github.com/google/go-cmp/cmp"
2122
"k8s.io/apimachinery/pkg/types"
2223
"k8s.io/client-go/util/workqueue"
2324
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -104,9 +105,9 @@ func (r *Reconciler) TSOMemberEventHandler() handler.TypedEventHandler[client.Ob
104105
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
105106
) {
106107
m := event.Object.(*pdv1.TSOMember)
107-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
108+
r.Logger.Info("add tso member", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
108109

109-
r.Logger.Info("add tso member", "namespace", ns, "cluster", cluster, "name", m.Name, "invalid", m.Invalid)
110+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
110111

111112
queue.Add(reconcile.Request{
112113
NamespacedName: types.NamespacedName{
@@ -119,10 +120,13 @@ func (r *Reconciler) TSOMemberEventHandler() handler.TypedEventHandler[client.Ob
119120
UpdateFunc: func(ctx context.Context, event event.TypedUpdateEvent[client.Object],
120121
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
121122
) {
123+
r.Logger.Info("update tso member",
124+
"ns", event.ObjectNew.GetNamespace(),
125+
"name", event.ObjectNew.GetName(),
126+
"diff", cmp.Diff(event.ObjectOld, event.ObjectNew),
127+
)
122128
m := event.ObjectNew.(*pdv1.TSOMember)
123-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
124-
125-
r.Logger.Info("update tso member", "namespace", ns, "cluster", cluster, "name", m.Name, "invalid", m.Invalid)
129+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
126130

127131
queue.Add(reconcile.Request{
128132
NamespacedName: types.NamespacedName{
@@ -134,10 +138,9 @@ func (r *Reconciler) TSOMemberEventHandler() handler.TypedEventHandler[client.Ob
134138
DeleteFunc: func(ctx context.Context, event event.TypedDeleteEvent[client.Object],
135139
queue workqueue.TypedRateLimitingInterface[reconcile.Request],
136140
) {
141+
r.Logger.Info("delete tso member", "ns", event.Object.GetNamespace(), "name", event.Object.GetName())
137142
m := event.Object.(*pdv1.TSOMember)
138-
ns, cluster := timanager.SplitPrimaryKey(m.Namespace)
139-
140-
r.Logger.Info("delete tso member", "namespace", ns, "cluster", cluster, "name", m.Name)
143+
ns, _ := timanager.SplitPrimaryKey(m.Namespace)
141144

142145
queue.Add(reconcile.Request{
143146
NamespacedName: types.NamespacedName{

pkg/timanager/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func TestClientManagerSource(t *testing.T) {
260260
return 0
261261
}).
262262
WithNewPollerFunc(&pdv1.Store{}, func(name string, logger logr.Logger, _ int) Poller {
263-
return NewPoller(name, logger, &lister, NewDeepEquality[pdv1.Store](), time.Millisecond*200)
263+
return NewPoller(name, logger, &lister, NewDeepEquality[pdv1.Store](logger), time.Millisecond*200)
264264
}).
265265
Build()
266266

pkg/timanager/pd/member.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewMemberPoller(name string, logger logr.Logger, c pdapi.PDClient) timanage
5757
lister := NewMemberLister(name, c)
5858

5959
// TODO: change interval
60-
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Member](), defaultPollInterval)
60+
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Member](logger), defaultPollInterval)
6161
}
6262

6363
type memberLister struct {

pkg/timanager/pd/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewStorePoller(name string, logger logr.Logger, c pdapi.PDClient) timanager
4949
lister := NewStoreLister(name, c)
5050

5151
// TODO: change interval
52-
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Store](), defaultPollInterval)
52+
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Store](logger), defaultPollInterval)
5353
}
5454

5555
type storeLister struct {

pkg/timanager/pd/tso.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewTSOMemberPoller(name string, logger logr.Logger, c pdapi.PDClient) timan
5050
lister := NewTSOMemberLister(name, c)
5151

5252
// TODO: change interval
53-
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.TSOMember](), defaultPollInterval)
53+
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.TSOMember](logger), defaultPollInterval)
5454
}
5555

5656
type tsoMemberLister struct {

pkg/timanager/poller.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/go-logr/logr"
25+
"github.com/google/go-cmp/cmp"
2526
"k8s.io/apimachinery/pkg/runtime"
2627
"k8s.io/apimachinery/pkg/watch"
2728

@@ -65,6 +66,7 @@ func NewPoller[T any, PT Object[T], L client.ObjectList](
6566
equality: eq,
6667
logger: logger,
6768
refreshCh: make(chan struct{}, bufSize),
69+
typeName: fmt.Sprintf("%T", new(T)),
6870
}
6971
}
7072

@@ -86,6 +88,8 @@ type poller[T any, PT Object[T], L client.ObjectList] struct {
8688

8789
lister Lister[T, PT, L]
8890
equality Equality[T, PT]
91+
92+
typeName string
8993
}
9094

9195
func (p *poller[T, PT, L]) renew(ctx context.Context) context.Context {
@@ -99,6 +103,7 @@ func (p *poller[T, PT, L]) renew(ctx context.Context) context.Context {
99103
}
100104

101105
func (p *poller[T, PT, L]) Sync(ctx context.Context) (runtime.Object, error) {
106+
p.logger.Info("poller sync", "cluster", p.name, "kind", p.typeName)
102107
p.lock.Lock()
103108
defer p.lock.Unlock()
104109

@@ -154,11 +159,13 @@ func (p *poller[T, PT, L]) Run(ctx context.Context, ch chan<- watch.Event) {
154159
for {
155160
select {
156161
case <-nctx.Done():
157-
p.logger.Info("poller is stopped", "cluster", p.name, "type", new(T))
162+
p.logger.Info("poller is stopped", "cluster", p.name, "kind", p.typeName)
158163
return
159164
case <-p.refreshCh:
165+
p.logger.Info("poller refresh", "cluster", p.name, "kind", p.typeName)
160166
p.poll(ctx)
161167
case <-timer.C:
168+
p.logger.Info("poller poll", "cluster", p.name, "kind", p.typeName)
162169
p.poll(ctx)
163170
}
164171
timer.Reset(p.interval)
@@ -241,17 +248,31 @@ func (p *poller[T, PT, L]) generateEvents(ctx context.Context, prevState, curSta
241248
func (p *poller[T, PT, L]) sendEvent(ctx context.Context, e *watch.Event) {
242249
select {
243250
case p.resultCh <- *e:
244-
p.logger.Info("poller send event", "type", e.Type, "object", e.Object)
251+
p.logger.Info("poller sent event", "type", e.Type, "object", e.Object, "kind", p.typeName)
245252
case <-ctx.Done():
246253
}
247254
}
248255

249-
type deepEquality[T any, PT Object[T]] struct{}
256+
type deepEquality[T any, PT Object[T]] struct {
257+
logger logr.Logger
258+
}
250259

251-
func (*deepEquality[T, PT]) Equal(preObj, curObj PT) bool {
252-
return reflect.DeepEqual(preObj, curObj)
260+
func (e *deepEquality[T, PT]) Equal(preObj, curObj PT) bool {
261+
if reflect.DeepEqual(preObj, curObj) {
262+
return true
263+
}
264+
265+
e.logger.Info("poll obj is changed",
266+
"namespace", curObj.GetNamespace(),
267+
"name", curObj.GetName(),
268+
"diff", cmp.Diff(preObj, curObj),
269+
)
270+
271+
return false
253272
}
254273

255-
func NewDeepEquality[T any, PT Object[T]]() Equality[T, PT] {
256-
return &deepEquality[T, PT]{}
274+
func NewDeepEquality[T any, PT Object[T]](logger logr.Logger) Equality[T, PT] {
275+
return &deepEquality[T, PT]{
276+
logger: logger,
277+
}
257278
}

pkg/timanager/poller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestPoller(t *testing.T) {
180180
},
181181
}
182182

183-
p := NewPoller(c.desc, logr.Discard(), &lister, NewDeepEquality[corev1.Pod](), time.Millisecond*500)
183+
p := NewPoller(c.desc, logr.Discard(), &lister, NewDeepEquality[corev1.Pod](logr.Discard()), time.Millisecond*500)
184184

185185
ctx, cancel := context.WithCancel(context.Background())
186186
list, err := p.Sync(ctx)

0 commit comments

Comments
 (0)