@@ -2,6 +2,7 @@ package serve
2
2
3
3
import (
4
4
"bufio"
5
+ "context"
5
6
"crypto/sha256"
6
7
"crypto/tls"
7
8
"encoding/base64"
@@ -22,8 +23,12 @@ import (
22
23
"github.com/gorilla/websocket"
23
24
"github.com/spf13/cobra"
24
25
"github.com/spf13/viper"
26
+ corev1 "k8s.io/api/core/v1"
27
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
+ "k8s.io/client-go/tools/clientcmd"
25
29
26
30
"github.com/kosmos.io/kosmos/cmd/kubenest/node-agent/app/logger"
31
+ "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
27
32
)
28
33
29
34
var (
36
41
certFile string // SSL certificate file
37
42
keyFile string // SSL key file
38
43
addr string // server listen address
44
+ nodeName string // server nodename
39
45
log = logger .GetLogger ()
40
46
)
41
47
@@ -47,14 +53,28 @@ var upgrader = websocket.Upgrader{
47
53
},
48
54
} // use default options
49
55
56
+ const (
57
+ defaultKubeconfig = "/srv/node-agent/kubeconfigpath"
58
+ ) // kubeconfig for heartbeatCheck
59
+
50
60
func init () {
51
61
// setup flags
52
62
ServeCmd .PersistentFlags ().StringVarP (& addr , "addr" , "a" , ":5678" , "websocket service address" )
53
63
ServeCmd .PersistentFlags ().StringVarP (& certFile , "cert" , "c" , "cert.pem" , "SSL certificate file" )
54
64
ServeCmd .PersistentFlags ().StringVarP (& keyFile , "key" , "k" , "key.pem" , "SSL key file" )
65
+ ServeCmd .PersistentFlags ().StringVarP (& nodeName , "nodename" , "n" , "" , "set nodename" )
55
66
}
56
67
57
68
func serveCmdRun (_ * cobra.Command , _ []string ) error {
69
+ //start heartbeatCheck Goroutine
70
+ ctx , cancel := context .WithCancel (context .Background ())
71
+ defer cancel ()
72
+
73
+ if len (nodeName ) == 0 {
74
+ nodeName = viper .GetString ("NODE_NAME" )
75
+ }
76
+ go heartbeatCheck (ctx , nodeName )
77
+
58
78
user := viper .GetString ("WEB_USER" )
59
79
password := viper .GetString ("WEB_PASS" )
60
80
port := viper .GetString ("WEB_PORT" )
@@ -65,9 +85,57 @@ func serveCmdRun(_ *cobra.Command, _ []string) error {
65
85
if port != "" {
66
86
addr = ":" + port
67
87
}
88
+
68
89
return Start (addr , certFile , keyFile , user , password )
69
90
}
70
91
92
+ func heartbeatCheck (ctx context.Context , nodeName string ) {
93
+ kubeconfigPath := defaultKubeconfig
94
+ config , err := clientcmd .BuildConfigFromFlags ("" , kubeconfigPath )
95
+ if err != nil {
96
+ log .Errorf ("Failed to load kubeconfig from path %s:%v" , kubeconfigPath , err )
97
+ return
98
+ }
99
+ kosmosClient , err := versioned .NewForConfig (config )
100
+ if err != nil {
101
+ log .Errorf ("Failed to get config: %v" , err )
102
+ return
103
+ }
104
+
105
+ ticker := time .NewTicker (10 * time .Second )
106
+ defer ticker .Stop ()
107
+
108
+ for {
109
+ select {
110
+ case <- ctx .Done ():
111
+ log .Infof ("Heartbeat for node %s stopped" , nodeName )
112
+ return
113
+ case <- ticker .C :
114
+ node , err := kosmosClient .KosmosV1alpha1 ().GlobalNodes ().Get (ctx , nodeName , metav1.GetOptions {})
115
+ if err != nil {
116
+ log .Errorf ("Failed to get node: %v" , err )
117
+ }
118
+ heartbeatTime := metav1 .Now ()
119
+
120
+ if len (node .Status .Conditions ) == 0 {
121
+ log .Infof ("GlobalNode %s has no conditions, initializing default condition" , node .Name )
122
+ node .Status .Conditions = []corev1.NodeCondition {
123
+ {
124
+ LastHeartbeatTime : heartbeatTime ,
125
+ },
126
+ }
127
+ } else {
128
+ node .Status .Conditions [0 ].LastHeartbeatTime = heartbeatTime
129
+ }
130
+ if _ , err := kosmosClient .KosmosV1alpha1 ().GlobalNodes ().UpdateStatus (ctx , node , metav1.UpdateOptions {}); err != nil {
131
+ log .Errorf ("update node %s status for globalnode failed, %v" , node .Name , err )
132
+ } else {
133
+ log .Infof ("GlobalnodeHeartbeat: successfully updated global node %s, Status.Conditions: %+v" , node .Name , node .Status .Conditions )
134
+ }
135
+ }
136
+ }
137
+ }
138
+
71
139
// start server
72
140
func Start (addr , certFile , keyFile , user , password string ) error {
73
141
passwordHash := sha256 .Sum256 ([]byte (password ))
@@ -146,6 +214,7 @@ func Start(addr, certFile, keyFile, user, password string) error {
146
214
TLSConfig : tlsConfig ,
147
215
ReadHeaderTimeout : 10 * time .Second ,
148
216
}
217
+
149
218
err := server .ListenAndServeTLS ("" , "" )
150
219
if err != nil {
151
220
log .Errorf ("failed to start server %v" , err )
0 commit comments