Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add execsync for cri , Used for testing high concurrency exec. #102

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 25 additions & 37 deletions benches/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,60 +154,46 @@ func (cb *CustomBench) runThread(ctx context.Context, runner driver.Driver, thre
return
}

log_error := func(cmd string, name string, err error, out string, elapsed time.Duration) {
if err != nil {
errors[cmd]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations[cmd] = elapsed
log.Debug(out)
}
// Stats calls must be stopped at the end of current iteration if streaming
statsCtx, statsCancel := context.WithCancel(ctx)

for _, cmd := range commands {
// add binary expression
parts := strings.SplitN(cmd, " ", 2)
var args []string
if len(parts) == 2 {
args = strings.Split(parts[1], " ")
cmd = parts[0]
log.Debugf("running command: %s args: %s", cmd, args)
}
log.Debugf("running command: %s", cmd)
switch strings.ToLower(cmd) {
case "run", "start":
out, runElapsed, err := runner.Run(ctx, ctr)
if err != nil {
errors["run"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["run"] = runElapsed
log.Debug(out)
log_error("run", name, err, out, runElapsed)
case "stop", "kill":
out, stopElapsed, err := runner.Stop(ctx, ctr)
if err != nil {
errors["stop"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["stop"] = stopElapsed
log.Debug(out)
log_error("stop", name, err, out, stopElapsed)
case "remove", "erase", "delete":
out, rmElapsed, err := runner.Remove(ctx, ctr)
if err != nil {
errors["delete"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["delete"] = rmElapsed
log.Debug(out)
log_error("remove", name, err, out, rmElapsed)
case "pause":
out, pauseElapsed, err := runner.Pause(ctx, ctr)
if err != nil {
errors["pause"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["pause"] = pauseElapsed
log.Debug(out)
log_error(cmd, name, err, out, pauseElapsed)
case "unpause", "resume":
out, unpauseElapsed, err := runner.Unpause(ctx, ctr)
if err != nil {
errors["resume"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["resume"] = unpauseElapsed
log.Debug(out)
log_error("resume", name, err, out, unpauseElapsed)
case "wait":
out, waitElapsed, err := runner.Wait(ctx, ctr)
if err != nil {
errors["wait"]++
log.Warnf("Error during container command %q on %q: %v\n Output: %s", cmd, name, err, out)
}
durations["wait"] = waitElapsed
log.Debug(out)
log_error(cmd, name, err, out, waitElapsed)
case "metrics", "stats":
if reader, err := runner.Stats(statsCtx, ctr); err != nil {
errors["metrics"]++
Expand All @@ -220,7 +206,9 @@ func (cb *CustomBench) runThread(ctx context.Context, runner driver.Driver, thre
reader.Close()
}()
}

case "execsync":
out, execElapsed, err := runner.Execsync(ctx, ctr, args)
log_error(cmd, name, err, out, execElapsed)
default:
log.Errorf("Command %q unrecognized from YAML commands list; skipping", cmd)
}
Expand Down
3 changes: 3 additions & 0 deletions driver/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ type Driver interface {
// Wait blocks thread until container stop
Wait(ctx context.Context, ctr Container) (string, time.Duration, error)

// Exec will exec cmd in a container
Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error)

// Close allows the driver to free any resources/close any
// connections
Close() error
Expand Down
4 changes: 4 additions & 0 deletions driver/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ func (r *ContainerdDriver) Unpause(ctx context.Context, ctr Container) (string,
return "", elapsed, nil
}

func (r *ContainerdDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// much of this code is copied from docker/docker/reference.go
const (
// DefaultTag defines the default tag used when performing images related actions and no tag or digest is specified
Expand Down
5 changes: 5 additions & 0 deletions driver/containerd_ctr.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (r *CtrDriver) Run(ctx context.Context, ctr Container) (string, time.Durati
return utils.ExecTimedCmdNoOut(ctx, r.ctrBinary, args)
}

// Exec implements Driver.
func (r *CtrDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// Stop will stop/kill a container
func (r *CtrDriver) Stop(ctx context.Context, ctr Container) (string, time.Duration, error) {
return utils.ExecTimedCmd(ctx, r.ctrBinary, "containers kill "+ctr.Name())
Expand Down
27 changes: 26 additions & 1 deletion driver/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,15 @@ func (c *CRIDriver) Run(ctx context.Context, ctr Container) (string, time.Durati
pconfig.Metadata.Name = defaultPodNamePrefix + cconfig.Metadata.Name
start := time.Now()

_, err = (*c.runtimeClient).CreateContainer(ctx, &pb.CreateContainerRequest{PodSandboxId: ctr.GetPodID(), Config: &cconfig, SandboxConfig: &pconfig})
cRes, err := (*c.runtimeClient).CreateContainer(ctx, &pb.CreateContainerRequest{PodSandboxId: ctr.GetPodID(), Config: &cconfig, SandboxConfig: &pconfig})
if err != nil {
return "", 0, err
}
_, err = (*c.runtimeClient).StartContainer(ctx, &pb.StartContainerRequest{ContainerId: cRes.ContainerId})
if err != nil {
return "", 0, err
}

elapsed := time.Since(start)
return "", elapsed, nil
}
Expand Down Expand Up @@ -296,6 +301,26 @@ func (c *CRIDriver) Remove(ctx context.Context, ctr Container) (string, time.Dur
return "", elapsed, nil
}

// Exec implements Driver.
func (c *CRIDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
start := time.Now()
resp, err := (*c.runtimeClient).ListContainers(ctx, &pb.ListContainersRequest{Filter: &pb.ContainerFilter{PodSandboxId: ctr.GetPodID()}})
if err != nil {
return "", 0, nil
}

containers := resp.GetContainers()
for _, ctr := range containers {
_, err := (*c.runtimeClient).ExecSync(ctx, &pb.ExecSyncRequest{ContainerId: ctr.GetId(), Cmd: cmd})
if err != nil {
log.Errorf("Container(%s) Exec error:%v", ctr.GetId(), err)
return "", 0, nil
}
}
elapsed := time.Since(start)
return "", elapsed, nil
}

// Pause will pause a container
// not supported in CRI API
func (c *CRIDriver) Pause(_ context.Context, _ Container) (string, time.Duration, error) {
Expand Down
5 changes: 5 additions & 0 deletions driver/crun.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ func (r *CRunDriver) Unpause(ctx context.Context, ctr Container) (string, time.D
return utils.ExecTimedCmd(ctx, r.crunBinary, "resume "+ctr.Name())
}

// Exec implements Driver.
func (r *CRunDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// take the output of "crun list" and parse into container instances
func parseCRunList(listOutput string) []*CRunContainer {
var results []*CRunContainer
Expand Down
5 changes: 5 additions & 0 deletions driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func (d *DockerDriver) Stats(ctx context.Context, ctr Container) (io.ReadCloser,
return reader, nil
}

// Exec implements Driver.
func (d *DockerDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

func getDockerPID(path string) (int, error) {
if path == "" {
path = dockerDefaultPIDPath
Expand Down
5 changes: 5 additions & 0 deletions driver/docker_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func (d *DockerCLIDriver) Stats(ctx context.Context, ctr Container) (io.ReadClos
return utils.ExecCmdStream(ctx, d.dockerBinary, args)
}

// Exec implements Driver.
func (d *DockerCLIDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// ProcNames returns the list of process names contributing to mem/cpu usage during overhead benchmark
func (d *DockerCLIDriver) ProcNames() []string {
return dockerProcNames
Expand Down
5 changes: 5 additions & 0 deletions driver/runc.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (r *RuncDriver) Unpause(ctx context.Context, ctr Container) (string, time.D
return utils.ExecTimedCmd(ctx, r.runcBinary, "resume "+ctr.Name())
}

// Exec implements Driver.
func (r *RuncDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// take the output of "runc list" and parse into container instances
func parseRuncList(listOutput string) []*RuncContainer {
var results []*RuncContainer
Expand Down
5 changes: 5 additions & 0 deletions driver/youki.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (r *YoukiDriver) Unpause(ctx context.Context, ctr Container) (string, time.
return utils.ExecTimedCmd(ctx, r.youkiBinary, "resume "+ctr.Name())
}

// Exec implements Driver.
func (r *YoukiDriver) Execsync(ctx context.Context, ctr Container, cmd []string) (string, time.Duration, error) {
panic("unimplemented")
}

// take the output of "youki list" and parse into container instances
func parseYoukiList(listOutput string) []*YoukiContainer {
var results []*YoukiContainer
Expand Down
Loading