Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/rclone/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ rm -rf /tmp/rclone
# Fix the $GOPATH folder
chown -R "${USERNAME}:golang" /go
chmod -R g+r+w /go

# Make sure the default folders exists
mkdir -p /run/csi-rclone
122 changes: 25 additions & 97 deletions cmd/csi-rclone-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,35 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/SwissDataScienceCenter/csi-rclone/pkg/common"
"github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics"
"github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/klog"
mountUtils "k8s.io/mount-utils"
)

var (
endpoint string
nodeID string
cacheDir string
cacheSize string
meters []metrics.Observable
)
func exitOnError(err error) {
// ParseFlags uses errors to return some status information, ignore it here.
if err != nil && !errors.Is(err, pflag.ErrHelp) {
klog.Error(err.Error())
os.Exit(1)
}
}

func init() {
flag.Set("logtostderr", "true")
exitOnError(flag.Set("logtostderr", "true"))
}

func main() {
var meters []metrics.Observable
metricsServerConfig := metrics.ServerConfig{
Host: "localhost",
Port: 9090,
Expand All @@ -37,123 +39,49 @@ func main() {
ShutdownTimeout: 5 * time.Second,
Enabled: false,
}
nodeServerConfig := rclone.NodeServerConfig{}
controllerServerConfig := rclone.ControllerServerConfig{}

root := &cobra.Command{
Use: "rclone",
Short: "CSI based rclone driver",
}
// Allow flags to be defined in subcommands, they will be reported at the Execute() step, with the help printed
// before exiting.
root.FParseErrWhitelist.UnknownFlags = true

metricsServerConfig.CommandLineParameters(root)

runCmd := &cobra.Command{
Use: "run",
Short: "Start the CSI driver.",
}
root.AddCommand(runCmd)
exitOnError(nodeServerConfig.CommandLineParameters(runCmd, &meters))
exitOnError(controllerServerConfig.CommandLineParameters(runCmd, &meters))

runNode := &cobra.Command{
Use: "node",
Short: "Start the CSI driver node service - expected to run in a daemonset on every node.",
Run: func(cmd *cobra.Command, args []string) {
handleNode()
},
}
runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runNode.MarkPersistentFlagRequired("nodeid")
runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runNode.MarkPersistentFlagRequired("endpoint")
runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir")
runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size")
runCmd.AddCommand(runNode)
runController := &cobra.Command{
Use: "controller",
Short: "Start the CSI driver controller.",
Run: func(cmd *cobra.Command, args []string) {
handleController()
},
}
runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runController.MarkPersistentFlagRequired("nodeid")
runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runController.MarkPersistentFlagRequired("endpoint")
runCmd.AddCommand(runController)
root.AddCommand(runCmd)

versionCmd := &cobra.Command{
Use: "version",
Short: "Prints information about this version of csi rclone plugin",
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("csi-rclone plugin Version: %s", rclone.DriverVersion)
fmt.Printf("csi-rclone plugin Version: %s\n", rclone.DriverVersion)
},
}
root.AddCommand(versionCmd)

root.ParseFlags(os.Args[1:])
exitOnError(root.ParseFlags(os.Args[1:]))

if metricsServerConfig.Enabled {
// Gracefully exit the metrics background servers
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
ctx, stop := signal.NotifyContext(context.Background(), common.InterruptSignals...)
defer stop()

metricsServer := metricsServerConfig.NewServer(ctx, &meters)
go metricsServer.ListenAndServe()
}

if err := root.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%s", err.Error())
os.Exit(1)
}
exitOnError(root.Execute())

os.Exit(0)
}

func handleNode() {
err := unmountOldVols()
if err != nil {
klog.Warningf("There was an error when trying to unmount old volumes: %v", err)
}
d := rclone.NewDriver(nodeID, endpoint)
ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir, cacheSize)
if err != nil {
panic(err)
}
meters = append(meters, ns.Metrics()...)
d.WithNodeServer(ns)
err = d.Run()
if err != nil {
panic(err)
}
}

func handleController() {
d := rclone.NewDriver(nodeID, endpoint)
cs := rclone.NewControllerServer(d.CSIDriver)
meters = append(meters, cs.Metrics()...)
d.WithControllerServer(cs)
err := d.Run()
if err != nil {
panic(err)
}
}

// unmountOldVols is used to unmount volumes after a restart on a node
func unmountOldVols() error {
const mountType = "fuse.rclone"
const unmountTimeout = time.Second * 5
klog.Info("Checking for existing mounts")
mounter := mountUtils.Mounter{}
mounts, err := mounter.List()
if err != nil {
return err
}
for _, mount := range mounts {
if mount.Type != mountType {
continue
}
err := mounter.UnmountWithForce(mount.Path, unmountTimeout)
if err != nil {
klog.Warningf("Failed to unmount %s because of %v.", mount.Path, err)
continue
}
klog.Infof("Sucessfully unmounted %s", mount.Path)
}
return nil
}
8 changes: 4 additions & 4 deletions deploy/csi-rclone/templates/csi-controller-rclone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ spec:
image: {{ .Values.csiControllerRclone.csiProvisioner.image.repository }}:{{ .Values.csiControllerRclone.csiProvisioner.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.csiControllerRclone.csiProvisioner.imagePullPolicy }}
volumeMounts:
- name: socket-dir
mountPath: /csi
- mountPath: /csi
name: socket-dir
- name: rclone
args:
- run
Expand Down Expand Up @@ -85,7 +85,7 @@ spec:
fieldRef:
fieldPath: spec.nodeName
- name: CSI_ENDPOINT
value: "unix://plugin/csi.sock"
value: "unix://csi/csi.sock"
- name: KUBERNETES_CLUSTER_DOMAIN
value: {{ quote .Values.kubernetesClusterDomain }}
{{- if .Values.csiControllerRclone.rclone.goMemLimit }}
Expand Down Expand Up @@ -114,7 +114,7 @@ spec:
timeoutSeconds: 3
periodSeconds: 2
volumeMounts:
- mountPath: /plugin
- mountPath: /csi
name: socket-dir
- name: liveness-probe
imagePullPolicy: Always
Expand Down
21 changes: 14 additions & 7 deletions deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
- name: node-driver-registrar
args:
- --v=5
- --csi-address=/plugin/csi.sock
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/{{ .Values.storageClassName }}/csi.sock
env:
- name: KUBE_NODE_NAME
Expand All @@ -45,17 +45,17 @@ spec:
resources:
{{- toYaml .Values.csiNodepluginRclone.rclone.resources | nindent 12 }}
volumeMounts:
- mountPath: /plugin
- mountPath: /csi
name: plugin-dir
- mountPath: /registration
name: registration-dir
- name: liveness-probe
imagePullPolicy: Always
image: registry.k8s.io/sig-storage/livenessprobe:v2.15.0
args:
- --csi-address=/plugin/csi.sock
- --csi-address=/csi/csi.sock
volumeMounts:
- mountPath: /plugin
- mountPath: /csi
name: plugin-dir
- name: rclone
args:
Expand Down Expand Up @@ -86,7 +86,7 @@ spec:
fieldRef:
fieldPath: spec.nodeName
- name: CSI_ENDPOINT
value: "unix://plugin/csi.sock"
value: "unix://csi/csi.sock"
- name: KUBERNETES_CLUSTER_DOMAIN
value: {{ quote .Values.kubernetesClusterDomain }}
- name: DRIVER_NAME
Expand Down Expand Up @@ -134,8 +134,10 @@ spec:
timeoutSeconds: 10
periodSeconds: 30
volumeMounts:
- mountPath: /plugin
- mountPath: /csi
name: plugin-dir
- mountPath: /run/csi-rclone
name: node-temp-dir
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: pods-mount-dir
Expand All @@ -154,6 +156,11 @@ spec:
{{ toYaml . | nindent 8 }}
{{- end }}
volumes:
- hostPath:
# NOTE: We mount on /tmp because we want the saved configuration to not survive a whole node restart.
path: /tmp/{{.Release.Namespace}}-{{.Release.Name}}-{{.Release.Revision}}
type: DirectoryOrCreate
name: node-temp-dir
- hostPath:
path: {{ .Values.kubeletDir }}/plugins/{{ .Values.storageClassName }}
type: DirectoryOrCreate
Expand All @@ -167,4 +174,4 @@ spec:
type: DirectoryOrCreate
name: registration-dir
- name: cache-dir
emptyDir:
emptyDir: {}
6 changes: 3 additions & 3 deletions deploy/csi-rclone/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ csiControllerRclone:
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128M
# memory: 128Mi
# If set, used to set GOMEMLIMIT, it should be strictly lower than
# limits.memory to prevent OOMkills
goMemLimit: # 115Mi
goMemLimit: # 115MiB
# Prometheus metrics
metrics:
enabled: true
Expand Down Expand Up @@ -68,7 +68,7 @@ csiNodepluginRclone:
# memory: 128Mi
# If set, used to set GOMEMLIMIT, it should be strictly lower than
# limits.memory to prevent OOMkills
goMemLimit: # 115Mi
goMemLimit: # 115MiB
# Prometheus metrics
metrics:
enabled: true
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package common

import (
"os"
"syscall"
)

// Signals to listen to:
// 1. os.Interrup -> allows devs to easily run a server locally
// 2. syscall.SIGTERM -> sent by kubernetes when stopping a server gracefully
var InterruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
Loading