Skip to content

Commit

Permalink
Merge pull request #12 from gammadia/speed-up-transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
gnutix authored Nov 7, 2023
2 parents 65b5f39 + 50b783f commit 3859d3a
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 13 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ Creating the Alfred node image is done using [Packer](https://www.packer.io/) in

- docker
- tar
- zstd
4 changes: 2 additions & 2 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/fatih/color"
"github.com/gammadia/alfred/client/sossh"
"github.com/gammadia/alfred/proto"
"github.com/gammadia/alfred/server/config"
"github.com/samber/lo"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
)

// Versioning information set at build time
Expand Down Expand Up @@ -53,7 +53,7 @@ var alfredCmd = &cobra.Command{
clientConn, err = grpc.Dial(
fmt.Sprintf("%s:%s", host, port),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxPacketSize)),
grpc.WithContextDialer(func(ctx context.Context, remote string) (net.Conn, error) {
if !sshTunneling {
return net.Dial("tcp", remote)
Expand Down
2 changes: 1 addition & 1 deletion client/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func sendImageToServer(cmd *cobra.Command, image string) error {
return nil
case proto.LoadImageResponse_CONTINUE:
// The image does not exist on the server, send it
cmd := exec.Command("docker", "save", image)
cmd := exec.Command("/bin/sh", "-c", fmt.Sprintf("docker save '%s' | zstd --compress --adapt --min=5 --max=15", image))
reader := lo.Must(cmd.StdoutPipe())
if err := cmd.Start(); err != nil {
return fmt.Errorf("docker save: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions client/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"math"

"github.com/gammadia/alfred/proto"
"github.com/spf13/cobra"
)
Expand All @@ -10,12 +12,12 @@ var versionCmd = &cobra.Command{
Short: "Show the version number of Alfred",

RunE: func(cmd *cobra.Command, args []string) error {
cmd.Printf("alfred version %s (%s)\n", version, commit[:7])
cmd.Printf("alfred version %s (%s)\n", version, commit[:int(math.Min(float64(len(commit)), 7))])

if response, err := client.Ping(cmd.Context(), &proto.PingRequest{}); err != nil {
return err
} else {
cmd.Printf("server version %s (%s)\n", response.Version, response.Commit[:7])
cmd.Printf("server version %s (%s)\n", response.Version, response.Commit[:int(math.Min(float64(len(commit)), 7))])
return nil
}
},
Expand Down
4 changes: 2 additions & 2 deletions provisioner/openstack/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ func (n *Node) ensureNodeHasImage(image string) error {
}
defer session.Close()

saveCmd := exec.Command("docker", "save", image)
saveCmd := exec.Command("/bin/sh", "-c", fmt.Sprintf("docker save '%s' | zstd --compress --adapt --min=5 --max=15", image))
saveOut := lo.Must(saveCmd.StdoutPipe())
session.Stdin = saveOut

if err := saveCmd.Start(); err != nil {
return fmt.Errorf("failed to start docker save: %w", err)
}

if err := session.Run("docker load"); err != nil {
if err := session.Run("zstd --decompress | docker load"); err != nil {
return fmt.Errorf("failed docker load: %w", err)
}

Expand Down
5 changes: 5 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package config

const (
MaxPacketSize = 50 * 1024 * 1024 // 50 MB
)
5 changes: 2 additions & 3 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import (
"sync"

"github.com/gammadia/alfred/proto"
"github.com/gammadia/alfred/server/config"
"github.com/gammadia/alfred/server/flags"
"github.com/gammadia/alfred/server/log"

"github.com/samber/lo"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/encoding/gzip"
)

// Versioning information set at build time
Expand Down Expand Up @@ -60,7 +59,7 @@ func main() {
setupInterrupts()

// Setup gRPC server
s := grpc.NewServer()
s := grpc.NewServer(grpc.MaxRecvMsgSize(config.MaxPacketSize))
proto.RegisterAlfredServer(s, &server{})

// Setup scheduler
Expand Down
3 changes: 2 additions & 1 deletion server/server_artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"github.com/gammadia/alfred/server/config"
"io"
"os"
"path"
Expand All @@ -24,7 +25,7 @@ func (s *server) DownloadArtifact(req *proto.DownloadArtifactRequest, srv proto.
return fmt.Errorf("failed to open artifact file: %w", err)
}

chunk := make([]byte, 2*1024*1024) // 2MB
chunk := make([]byte, config.MaxPacketSize-1024*1024 /* leave 1MB margin */)
for {
n, err := io.ReadFull(file, chunk)
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
Expand Down
5 changes: 3 additions & 2 deletions server/server_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/docker/docker/client"
"github.com/gammadia/alfred/proto"
"github.com/gammadia/alfred/server/config"
"github.com/gammadia/alfred/server/log"
"github.com/samber/lo"
"google.golang.org/grpc/codes"
Expand All @@ -31,7 +32,7 @@ func (s *server) LoadImage(srv proto.Alfred_LoadImageServer) (err error) {
log.Debug("Image not found, sending continue")
if err = srv.Send(&proto.LoadImageResponse{
Status: proto.LoadImageResponse_CONTINUE,
ChunkSize: lo.ToPtr(uint32(2 * 1024 * 1024)), // 2MB chunks
ChunkSize: lo.ToPtr(uint32(config.MaxPacketSize - 1024*1024 /* leave 1MB of margin */)),
}); err != nil {
return
}
Expand All @@ -53,7 +54,7 @@ func (s *server) LoadImage(srv proto.Alfred_LoadImageServer) (err error) {
}

// Main load loop
cmd := exec.Command("docker", "load")
cmd := exec.Command("/bin/sh", "-c", "zstd --decompress | docker load")
cmd.WaitDelay = 10 * time.Second

writer := lo.Must(cmd.StdinPipe())
Expand Down

0 comments on commit 3859d3a

Please sign in to comment.