Skip to content

Commit

Permalink
feat(restore): allow for unpinning agent from CPU (#4073)
Browse files Browse the repository at this point in the history
* chore(go.mod): bump sub pkg versions

* feat(agent): allow for setting multiple CPUs in config

This commit makes it possible to specify multiple
CPUs in 'scylla-manager-agent.yaml' config to which
agent will be pinned. The 'cpu' field in config
now allows for both single int and array of ints
values. The default behavior remains the same.

* refactor(agent): extract cpu pinning logic to separate function

Except for the extraction, this commit also increases log
level (from DEBUG to INFO) of message about missing cpuset file.
It also makes this function return an error, which is going
to be useful when this function is going to be used inside
the cpu pinning agent endpoints.

* feat(agent): implement cpu pinning endpoints

This commit implement 3 cpu pinning endpoints
allowing to query (GET), pin (POST) and
unpin (DELETE) agent cpus.

* feat(scyllaclient): implement cpu pinning methods

* feat(restore): optionally unpin agent from CPUs

This commit adds UnpinAgentCPU field to Target.
It optionally allows to unpin agent from CPUs
for the time of the restore.

Fixes #3951

* feat(restore): always enable auto compaction after restore

This should be done just because of safety.
It prevents rare edge cases like:
- restore runs with --allow-compaction=false
- restore is paused
- restore is modified with --allow-compaction=true
- restore is resumed

* feat(command/restore): add --unpin-agent-cpu flag

This commit allows user to control whether
agent should be pinned to CPUs during restore.

* feat(restore_test): extend TestRestoreTablesPreparationIntegration with cpu pinning

This way this test also checks cpu pinning before and after backup.
It also checks cpu pinning before, in the middle, when paused,
when resumed, and after restore.
  • Loading branch information
Michal-Leszczynski authored Oct 22, 2024
1 parent d044b17 commit c720770
Show file tree
Hide file tree
Showing 33 changed files with 1,379 additions and 252 deletions.
1 change: 1 addition & 0 deletions dist/etc/scylla-manager-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
# CPU to run Scylla Manager Agent on. By default the agent would read Scylla
# configuration at /etc/scylla.d/cpuset.conf and try to find a core not used by
# Scylla. If that's not possible user can specify a core to run agent on.
# It's possible to specify a list of CPUs instead of a single value.
#cpu: 0

# Logging configuration.
Expand Down
3 changes: 3 additions & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ options:
Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node.
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: unpin-agent-cpu
default_value: "false"
usage: "Defines if ScyllaDB Manager Agent should be unpinned from CPUs during restore.\nThis might significantly improve download speed at the cost of decreasing streaming speed. \n"
- name: window
default_value: '[]'
usage: |
Expand Down
3 changes: 3 additions & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ options:
Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node.
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: unpin-agent-cpu
default_value: "false"
usage: "Defines if ScyllaDB Manager Agent should be unpinned from CPUs during restore.\nThis might significantly improve download speed at the cost of decreasing streaming speed. \n"
- name: window
default_value: '[]'
usage: |
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241017140216-d044b1738488
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241017140216-d044b1738488
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241017140216-d044b1738488
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand Down Expand Up @@ -128,5 +128,8 @@ require (
replace (
github.com/gocql/gocql => github.com/scylladb/gocql v1.12.0
github.com/rclone/rclone => github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e
github.com/scylladb/scylla-manager/v3/pkg/managerclient => ./v3/pkg/managerclient
github.com/scylladb/scylla-manager/v3/pkg/util => ./v3/pkg/util
github.com/scylladb/scylla-manager/v3/swagger => ./v3/swagger
google.golang.org/api v0.114.0 => github.com/scylladb/google-api-go-client v0.34.1-patched
)
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1053,12 +1053,6 @@ github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzV
github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478 h1:nI+3aLInCdLRYsR+iEuZkzFblgvAdGhWJT/Wqt7cKnU=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478/go.mod h1:6cKRPGgPPFrPSh+M90k+bp0oIFsB5SZsdI6hWeXnXOY=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478 h1:QEhE1TL21CZglbImBdDkjo6ZzPnc3OkOMcDYmo6QwIk=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478 h1:cL8e+sW/7MZcq9guSBleR0bU+lEGxw1ZUjAf6vajHdE=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
29 changes: 28 additions & 1 deletion pkg/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,40 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/scylla-manager/v3/pkg/config/agent"
"github.com/scylladb/scylla-manager/v3/pkg/util/cpuset"
"github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models"
)

func newAgentHandler(c agent.Config, rclone http.Handler) *chi.Mux {
func newAgentHandler(c agent.Config, rclone http.Handler, logger log.Logger) *chi.Mux {
m := chi.NewMux()

m.Get("/node_info", newNodeInfoHandler(c).getNodeInfo)
m.Get("/pin_cpu", func(writer http.ResponseWriter, request *http.Request) {
cpus, err := cpuset.SchedGetAffinity()
if err != nil {
render.Status(request, http.StatusInternalServerError)
render.Respond(writer, request, err)
}
casted := make([]int64, len(cpus))
for i := range cpus {
casted[i] = int64(cpus[i])
}
render.Respond(writer, request, &models.Cpus{CPU: casted})
})
m.Post("/pin_cpu", func(writer http.ResponseWriter, request *http.Request) {
if err := findAndPinCPUs(request.Context(), c, logger); err != nil {
render.Status(request, http.StatusInternalServerError)
render.Respond(writer, request, err)
}
})
m.Delete("/pin_cpu", func(writer http.ResponseWriter, request *http.Request) {
if err := unpinFromCPUs(); err != nil {
render.Status(request, http.StatusInternalServerError)
render.Respond(writer, request, err)
}
})
m.Post("/terminate", selfSigterm())
m.Post("/free_os_memory", func(writer http.ResponseWriter, request *http.Request) {
debug.FreeOSMemory()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/agent/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newRouter(c agent.Config, metrics AgentMetrics, rclone http.Handler, logger
auth.ValidateToken(c.AuthToken, time.Second, unauthorizedErrorBody),
)
// Agent specific endpoints
priv.Mount("/agent", newAgentHandler(c, rclone))
priv.Mount("/agent", newAgentHandler(c, rclone, logger.Named("agent")))
// Scylla prometheus proxy
priv.Mount("/metrics", promProxy(c))
// Fallback to Scylla API proxy
Expand Down
73 changes: 47 additions & 26 deletions pkg/cmd/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"runtime"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -83,32 +84,7 @@ func (s *server) init(ctx context.Context) error {
)
}

// Try to get CPUs to pin to
var cpus []int
if s.config.CPU != agent.NoCPU {
cpus = []int{s.config.CPU}
} else if free, err := findFreeCPUs(); err != nil {
if os.IsNotExist(errors.Cause(err)) || errors.Is(err, cpuset.ErrNoCPUSetConfig) {
// Ignore if there is no cpuset file
s.logger.Debug(ctx, "Failed to find CPUs to pin to", "error", err)
} else {
s.logger.Error(ctx, "Failed to find CPUs to pin to", "error", err)
}
} else {
cpus = free
}
// Pin to CPUs if possible
if len(cpus) == 0 {
s.logger.Info(ctx, "Running on all CPUs")
runtime.GOMAXPROCS(1)
} else {
if err := pinToCPUs(cpus); err != nil {
s.logger.Error(ctx, "Failed to pin to CPUs", "cpus", cpus, "error", err)
} else {
s.logger.Info(ctx, "Running on CPUs", "cpus", cpus)
}
runtime.GOMAXPROCS(len(cpus))
}
_ = findAndPinCPUs(ctx, s.config, s.logger) // nolint: errcheck

// Log memory limit
if l, err := cgroupMemoryLimit(); err != nil {
Expand All @@ -134,6 +110,51 @@ func (s *server) init(ctx context.Context) error {
)
}

func findAndPinCPUs(ctx context.Context, cfg agent.Config, logger log.Logger) error {
var aggregatedErr error
// Try to get CPUs to pin to
var cpus []int
if !slices.Contains(cfg.CPU, agent.NoCPU) {
cpus = cfg.CPU
} else if free, err := findFreeCPUs(); err != nil {
if os.IsNotExist(errors.Cause(err)) || errors.Is(err, cpuset.ErrNoCPUSetConfig) {
// Ignore if there is no cpuset file
logger.Info(ctx, "Failed to find CPUs to pin to", "error", err)
} else {
logger.Error(ctx, "Failed to find CPUs to pin to", "error", err)
aggregatedErr = multierr.Append(aggregatedErr, err)
}
} else {
cpus = free
}
// Pin to CPUs if possible
if len(cpus) == 0 {
logger.Info(ctx, "Running on all CPUs")
runtime.GOMAXPROCS(1)
} else {
if err := pinToCPUs(cpus); err != nil {
logger.Error(ctx, "Failed to pin to CPUs", "cpus", cpus, "error", err)
aggregatedErr = multierr.Append(aggregatedErr, err)
} else {
logger.Info(ctx, "Running on CPUs", "cpus", cpus)
}
runtime.GOMAXPROCS(len(cpus))
}
return aggregatedErr
}

func unpinFromCPUs() error {
cpus := make([]int, runtime.NumCPU())
for i := range cpus {
cpus[i] = i
}
if err := pinToCPUs(cpus); err != nil {
return errors.Wrapf(err, "pin to cpus %v", cpus)
}
runtime.GOMAXPROCS(len(cpus))
return nil
}

func (s *server) makeServers(ctx context.Context) error {
tlsConfig, err := s.tlsConfig(ctx)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type command struct {
transfers int
rateLimit []string
allowCompaction bool
unpinAgentCPU bool
restoreSchema bool
restoreTables bool
dryRun bool
Expand Down Expand Up @@ -84,6 +85,7 @@ func (cmd *command) init() {
w.Unwrap().IntVar(&cmd.transfers, "transfers", 0, "")
w.Unwrap().StringSliceVar(&cmd.rateLimit, "rate-limit", nil, "")
w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "")
w.Unwrap().BoolVar(&cmd.unpinAgentCPU, "unpin-agent-cpu", false, "")
w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "")
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
Expand Down Expand Up @@ -162,6 +164,10 @@ func (cmd *command) run(args []string) error {
props["allow_compaction"] = cmd.allowCompaction
ok = true
}
if cmd.Flag("unpin-agent-cpu").Changed {
props["unpin_agent_cpu"] = cmd.unpinAgentCPU
ok = true
}
if cmd.Flag("restore-schema").Changed {
if cmd.Update() {
return wrapper("restore-schema")
Expand Down
4 changes: 4 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ allow-compaction: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
unpin-agent-cpu: |
Defines if ScyllaDB Manager Agent should be unpinned from CPUs during restore.
This might significantly improve download speed at the cost of decreasing streaming speed.
restore-schema: |
Specifies restore type (alternative to '--restore-tables' flag).
Restore will recreate schema by applying the backed up output of DESCRIBE SCHEMA WITH INTERNALS via CQL.
Expand Down
37 changes: 35 additions & 2 deletions pkg/config/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Config struct {
TLSKeyFile string `yaml:"tls_key_file"`
Prometheus string `yaml:"prometheus"`
Debug string `yaml:"debug"`
CPU int `yaml:"cpu"`
CPU CPUs `yaml:"cpu"`
Logger config.LogConfig `yaml:"logger"`
Scylla ScyllaConfig `yaml:"scylla"`
Rclone rclone.GlobalOptions `yaml:"rclone"`
Expand All @@ -66,7 +66,7 @@ func DefaultConfig() Config {
TLSVersion: config.TLSv12,
Prometheus: ":5090",
Debug: "127.0.0.1:5112",
CPU: NoCPU,
CPU: []int{NoCPU},
Logger: DefaultLogConfig(),
Scylla: ScyllaConfig{
APIAddress: "0.0.0.0",
Expand Down Expand Up @@ -121,3 +121,36 @@ func Obfuscate(c Config) Config {
}
return c
}

// CPUs represents a list of CPUs.
// It supports custom marshalling methods which allow
// for specifying 'cpu' in 'scylla-manager-agent.yaml' config
// as both a single int or an array of ints.
type CPUs []int

// UnmarshalYAML is a custom unmarshaller accepting both
// single ints and int array.
func (c *CPUs) UnmarshalYAML(unmarshal func(interface{}) error) error {
var v int
errV := unmarshal(&v)
if errV == nil {
*c = []int{v}
return nil
}
var arr []int
errArr := unmarshal(&arr)
if errArr == nil {
*c = arr
return nil
}
return errors.Wrap(multierr.Append(errV, errArr), "parse CPU as an int or int array")
}

// MarshalYAML is a custom marshaller returning
// single int is possible, and int array otherwise.
func (c CPUs) MarshalYAML() (interface{}, error) {
if len(c) == 1 {
return (c)[0], nil
}
return c, nil
}
5 changes: 5 additions & 0 deletions pkg/config/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func TestParseConfig(t *testing.T) {
Input: []string{"./testdata/structs_empty.input.yaml"},
Golden: "./testdata/structs_empty.golden.yaml",
},
{
Name: "multiple cpus",
Input: []string{"./testdata/multiple_cpus.input.yaml"},
Golden: "./testdata/multiple_cpus.golden.yaml",
},
}

s := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit c720770

Please sign in to comment.