Skip to content

Commit 7bad692

Browse files
committed
feature: etcd backup
Signed-off-by: Maxim Vasilenko <[email protected]>
1 parent 9eaaca2 commit 7bad692

File tree

4 files changed

+442
-0
lines changed

4 files changed

+442
-0
lines changed

cmd/backup.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
Copyright 2024 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cmd
18+
19+
import (
20+
backup "github.com/deckhouse/deckhouse-cli/internal/backup/cmd"
21+
)
22+
23+
func init() {
24+
rootCmd.AddCommand(backup.NewCommand())
25+
}

internal/backup/cmd/backup.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
Copyright 2024 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package backup
18+
19+
import (
20+
"github.com/spf13/cobra"
21+
"k8s.io/kubectl/pkg/util/templates"
22+
23+
"github.com/deckhouse/deckhouse-cli/internal/backup/cmd/etcd"
24+
)
25+
26+
var backupLong = templates.LongDesc(`
27+
Backup various parts of Deckhouse Kubernetes Platform
28+
29+
© Flant JSC 2024`)
30+
31+
func NewCommand() *cobra.Command {
32+
backupCmd := &cobra.Command{
33+
Use: "backup",
34+
Short: "Backup various parts of Deckhouse Kubernetes Platform",
35+
Long: backupLong,
36+
}
37+
38+
backupCmd.AddCommand(
39+
etcd.NewCommand(),
40+
)
41+
42+
return backupCmd
43+
}

internal/backup/cmd/etcd/etcd.go

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*
2+
Copyright 2024 Flant JSC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package etcd
18+
19+
import (
20+
"bufio"
21+
"bytes"
22+
"context"
23+
"fmt"
24+
"io"
25+
"log"
26+
"os"
27+
"time"
28+
29+
"github.com/samber/lo"
30+
"github.com/spf13/cobra"
31+
v1 "k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/client-go/kubernetes"
35+
_ "k8s.io/client-go/plugin/pkg/client/auth"
36+
"k8s.io/client-go/rest"
37+
"k8s.io/client-go/tools/clientcmd"
38+
"k8s.io/client-go/tools/remotecommand"
39+
"k8s.io/kubectl/pkg/util/templates"
40+
)
41+
42+
var etcdLong = templates.LongDesc(`
43+
Take a snapshot of ETCD state.
44+
45+
This command creates a snapshot of the Kubernetes underlying key-value database ETCD.
46+
47+
© Flant JSC 2024`)
48+
49+
func NewCommand() *cobra.Command {
50+
etcdCmd := &cobra.Command{
51+
Use: "etcd <snapshot-path>",
52+
Short: "Take a snapshot of ETCD state",
53+
Long: etcdLong,
54+
ValidArgs: []string{"snapshot-path"},
55+
SilenceErrors: true,
56+
SilenceUsage: true,
57+
PreRunE: func(cmd *cobra.Command, args []string) error {
58+
return validateFlags()
59+
},
60+
RunE: etcd,
61+
}
62+
63+
addFlags(etcdCmd.Flags())
64+
return etcdCmd
65+
}
66+
67+
const (
68+
etcdPodNamespace = "kube-system"
69+
etcdPodsLabelSelector = "component=etcd"
70+
71+
bufferSize16MB = 16 * 1024 * 1024
72+
)
73+
74+
var (
75+
kubeconfigPath string
76+
requestedEtcdPodName string
77+
78+
verboseLog bool
79+
)
80+
81+
func etcd(_ *cobra.Command, args []string) error {
82+
log.SetFlags(log.LstdFlags)
83+
if len(args) != 1 {
84+
return fmt.Errorf("This command requires exactly 1 argument")
85+
}
86+
87+
config, kubeCl, err := setupK8sClientset(kubeconfigPath)
88+
if err != nil {
89+
return fmt.Errorf("Failed to setup Kubernetes client: %w", err)
90+
}
91+
92+
etcdPods, err := findETCDPods(kubeCl)
93+
if err != nil {
94+
return fmt.Errorf("Looking up etcd pods failed: %w", err)
95+
}
96+
97+
pipeExecOpts := &v1.PodExecOptions{
98+
Stdout: true,
99+
Stderr: true,
100+
Container: "etcd",
101+
Command: []string{
102+
"/usr/bin/etcdctl",
103+
"--endpoints", "https://127.0.0.1:2379/",
104+
"--key", "/etc/kubernetes/pki/etcd/ca.key",
105+
"--cert", "/etc/kubernetes/pki/etcd/ca.crt",
106+
"--cacert", "/etc/kubernetes/pki/etcd/ca.crt",
107+
"snapshot", "pipe",
108+
},
109+
}
110+
111+
if len(etcdPods) > 1 {
112+
log.Println(
113+
"Will try to snapshot these instances sequentially until one of them succeeds or all of them fail",
114+
etcdPods)
115+
}
116+
117+
for _, etcdPodName := range etcdPods {
118+
log.Println("Trying to snapshot", etcdPodName)
119+
120+
snapshotFile, err := os.CreateTemp(".", ".*.snapshotPart")
121+
if err != nil {
122+
return fmt.Errorf("Failed to prepare temporary etcd snapshot file: %w", err)
123+
}
124+
defer func(fileName string) {
125+
_ = os.Remove(fileName)
126+
}(snapshotFile.Name())
127+
128+
stdout := bufio.NewWriterSize(snapshotFile, bufferSize16MB)
129+
stderr := &bytes.Buffer{}
130+
131+
if err = checkEtcdPodExistsAndReady(kubeCl, etcdPodName); err != nil {
132+
log.Printf("%s: Fail, %v\n", etcdPodName, err)
133+
continue
134+
}
135+
136+
snapshotStreamingSupported, err := checkEtcdInstanceSupportsSnapshotStreaming(kubeCl, config, etcdPodName)
137+
if err != nil {
138+
log.Printf("%s: Fail, %v\n", etcdPodName, err)
139+
continue
140+
}
141+
if !snapshotStreamingSupported {
142+
log.Printf("%s: etcd instance does not support snapshot streaming\n", etcdPodName)
143+
continue
144+
}
145+
146+
if err = streamCommand(kubeCl, config, pipeExecOpts, etcdPodName, etcdPodNamespace, stdout, stderr); err != nil {
147+
log.Printf("%s: Fail, %v\n", etcdPodName, err)
148+
if verboseLog {
149+
log.Println("STDERR:", stderr.String())
150+
}
151+
continue
152+
}
153+
154+
if err = stdout.Flush(); err != nil {
155+
return fmt.Errorf("Flushing snapshot data to disk: %w", err)
156+
}
157+
158+
if err = os.Rename(snapshotFile.Name(), args[0]); err != nil {
159+
return fmt.Errorf("Failed to move snapshot file: %w", err)
160+
}
161+
162+
log.Println("Snapshot successfully taken from", etcdPodName)
163+
return nil
164+
}
165+
166+
return fmt.Errorf("All known etcd replicas are unavailable to snapshot")
167+
}
168+
169+
func checkEtcdInstanceSupportsSnapshotStreaming(
170+
kubeCl *kubernetes.Clientset,
171+
config *rest.Config,
172+
etcdPodName string,
173+
) (bool, error) {
174+
helpExecOpts := &v1.PodExecOptions{
175+
Stdout: true,
176+
Stderr: true,
177+
Container: "etcd",
178+
Command: []string{
179+
"/usr/bin/etcdctl", "help",
180+
},
181+
}
182+
183+
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
184+
if err := streamCommand(kubeCl, config, helpExecOpts, etcdPodName, etcdPodNamespace, stdout, stderr); err != nil {
185+
if verboseLog {
186+
log.Println("HELP STDERR:", stderr.String())
187+
}
188+
return false, fmt.Errorf("streamCommand: %w", err)
189+
}
190+
191+
if bytes.Contains(stdout.Bytes(), []byte("snapshot pipe")) {
192+
return true, nil
193+
}
194+
195+
return false, nil
196+
}
197+
198+
func streamCommand(
199+
kubeCl kubernetes.Interface,
200+
restConfig *rest.Config,
201+
execOpts *v1.PodExecOptions,
202+
podName, podNamespace string,
203+
stdout, stderr io.Writer,
204+
) error {
205+
scheme := runtime.NewScheme()
206+
parameterCodec := runtime.NewParameterCodec(scheme)
207+
if err := v1.AddToScheme(scheme); err != nil {
208+
return fmt.Errorf("Failed to create parameter codec: %w", err)
209+
}
210+
211+
request := kubeCl.CoreV1().
212+
RESTClient().
213+
Post().
214+
Resource("pods").
215+
SubResource("exec").
216+
VersionedParams(execOpts, parameterCodec).
217+
Namespace(podNamespace).
218+
Name(podName)
219+
220+
executor, err := remotecommand.NewSPDYExecutor(restConfig, "POST", request.URL())
221+
if err != nil {
222+
log.Printf("Creating SPDY executor for Pod %s: %v", podName, err)
223+
}
224+
225+
if err = executor.StreamWithContext(
226+
context.Background(),
227+
remotecommand.StreamOptions{
228+
Stdout: stdout,
229+
Stderr: stderr,
230+
}); err != nil {
231+
return err
232+
}
233+
234+
return nil
235+
}
236+
237+
func setupK8sClientset(kubeconfigPath string) (*rest.Config, *kubernetes.Clientset, error) {
238+
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
239+
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath}, nil).ClientConfig()
240+
if err != nil {
241+
return nil, nil, fmt.Errorf("Reading kubeconfig file: %w", err)
242+
}
243+
244+
kubeCl, err := kubernetes.NewForConfig(config)
245+
if err != nil {
246+
return nil, nil, fmt.Errorf("Constructing Kubernetes clientset: %w", err)
247+
}
248+
249+
return config, kubeCl, nil
250+
}
251+
252+
func findETCDPods(kubeCl kubernetes.Interface) ([]string, error) {
253+
if requestedEtcdPodName != "" {
254+
if err := checkEtcdPodExistsAndReady(kubeCl, requestedEtcdPodName); err != nil {
255+
return nil, err
256+
}
257+
258+
return []string{requestedEtcdPodName}, nil
259+
}
260+
261+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
262+
defer cancel()
263+
264+
pods, err := kubeCl.CoreV1().Pods(etcdPodNamespace).List(ctx, metav1.ListOptions{
265+
LabelSelector: etcdPodsLabelSelector,
266+
})
267+
if err != nil {
268+
return nil, fmt.Errorf("listing etcd Pods: %w", err)
269+
}
270+
271+
pods.Items = lo.Filter(pods.Items, func(pod v1.Pod, _ int) bool {
272+
podIsReady := lo.FindOrElse(
273+
pod.Status.Conditions, v1.PodCondition{},
274+
func(condition v1.PodCondition) bool {
275+
return condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue
276+
}).Status == v1.ConditionTrue
277+
278+
_, foundEtcdContainer := lo.Find(pod.Spec.Containers, func(container v1.Container) bool {
279+
return container.Name == "etcd"
280+
})
281+
282+
return podIsReady && foundEtcdContainer
283+
})
284+
285+
if len(pods.Items) == 0 {
286+
return nil, fmt.Errorf("no valid etcd Pods found")
287+
}
288+
289+
return lo.Map(pods.Items, func(pod v1.Pod, _ int) string {
290+
return pod.Name
291+
}), nil
292+
}
293+
294+
func checkEtcdPodExistsAndReady(kubeCl kubernetes.Interface, podName string) error {
295+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
296+
defer cancel()
297+
298+
pod, err := kubeCl.CoreV1().Pods(etcdPodNamespace).Get(ctx, podName, metav1.GetOptions{})
299+
if err != nil {
300+
return fmt.Errorf("Query Pod %s: %w", podName, err)
301+
}
302+
303+
podReady := lo.FindOrElse(pod.Status.Conditions, v1.PodCondition{}, func(condition v1.PodCondition) bool {
304+
return condition.Type == v1.PodReady
305+
}).Status == v1.ConditionTrue
306+
307+
if !podReady {
308+
return fmt.Errorf("Pod %s is not yet ready, cannot snapshot it now", podName)
309+
}
310+
311+
return nil
312+
}

0 commit comments

Comments
 (0)