Skip to content

Commit

Permalink
WIP: python/go grpc interface for async uploads
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Jansson <[email protected]>
  • Loading branch information
andreasjansson committed Jan 1, 2021
1 parent d977c74 commit fceb76b
Show file tree
Hide file tree
Showing 64 changed files with 7,078 additions and 1,641 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ ENVIRONMENT := development
OS := $(shell uname -s)

.PHONY: build
build: verify-dev-env
build:
cd proto && $(MAKE) build
cd go && $(MAKE) build-all ENVIRONMENT=$(ENVIRONMENT)
cd python && $(MAKE) build

Expand Down
4 changes: 2 additions & 2 deletions go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ lint:
go run github.com/golangci/golangci-lint/cmd/golangci-lint run ./...

.PHONY: fmt
fmt: install-goimports
goimports --local replicate.ai -w -d .
fmt:
go run golang.org/x/tools/cmd/goimports --local replicate.ai -w -d .

.PHONY: mod-tidy
mod-tidy:
Expand Down
8 changes: 6 additions & 2 deletions go/cmd/replicate-shared/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package main

import (
"github.com/replicate/replicate/go/pkg/shared"
"github.com/replicate/replicate/go/pkg/cli"
"github.com/replicate/replicate/go/pkg/console"
)

func main() {
shared.Serve()
cmd := cli.NewDaemonCommand()
if err := cmd.Execute(); err != nil {
console.Fatal("%s", err)
}
}
14 changes: 10 additions & 4 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ require (
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-bindata/go-bindata v3.1.2+incompatible
github.com/golangci/golangci-lint v1.34.1
github.com/golang/protobuf v1.4.3
github.com/golangci/golangci-lint v1.32.2
github.com/google/martian v2.1.1-0.20190517191504-25dcb96d9e51+incompatible // indirect
github.com/hashicorp/go-uuid v1.0.2
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/logrusorgru/aurora v2.0.3+incompatible
Expand All @@ -29,7 +31,11 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a
google.golang.org/api v0.36.0
gotest.tools/gotestsum v0.6.0
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20200527145253-8367513e4ece
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0
gotest.tools/gotestsum v0.5.2
)
5 changes: 0 additions & 5 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,6 @@ google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1 h1:SfXqXS5hkufcdZ/mHtYCh53P2b+92WQq/DZcKLgsFRs=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
68 changes: 2 additions & 66 deletions go/pkg/cli/checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cli
import (
"fmt"
"os"
"path"
"path/filepath"

"github.com/logrusorgru/aurora"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/replicate/replicate/go/pkg/console"
"github.com/replicate/replicate/go/pkg/files"
"github.com/replicate/replicate/go/pkg/project"
"github.com/replicate/replicate/go/pkg/repository"
)

type checkoutOpts struct {
Expand Down Expand Up @@ -79,7 +77,7 @@ func checkoutCheckpoint(opts checkoutOpts, args []string) error {
}
}

proj := project.NewProject(repo)
proj := project.NewProject(repo, projectDir)
result, err := proj.CheckpointOrExperimentFromPrefix(prefix)
if err != nil {
return err
Expand All @@ -88,23 +86,6 @@ func checkoutCheckpoint(opts checkoutOpts, args []string) error {
experiment := result.Experiment
checkpoint := result.Checkpoint

if checkpoint != nil {
console.Info("Checking out files from checkpoint %s and its experiment %s", checkpoint.ShortID(), experiment.ShortID())
} else {
// When checking out experiment, also check out best/latest checkpoint
checkpoint = experiment.BestCheckpoint()
if checkpoint != nil {
console.Info("Checking out files from experiment %s and its best checkpoint %s", experiment.ShortID(), checkpoint.ShortID())
} else {
checkpoint = experiment.LatestCheckpoint()
if checkpoint != nil {
console.Info("Checking out files from experiment %s and its latest checkpoint %s", experiment.ShortID(), checkpoint.ShortID())
} else {
console.Info("Checking out files from experiment %s", experiment.ShortID())
}
}
}

displayPath := filepath.Join(outputDir, experiment.Path)

// FIXME(bfirsh): this is a bodge and isn't always quite right -- if no experiment path set, and we're checking out checkpoint, display the checkpoint path
Expand Down Expand Up @@ -142,50 +123,5 @@ func checkoutCheckpoint(opts checkoutOpts, args []string) error {

fmt.Fprintln(os.Stderr)

experimentFilesExist := true
checkpointFilesExist := true

if err := repo.GetPathTar(path.Join("experiments", experiment.ID+".tar.gz"), outputDir); err != nil {
// Ignore does not exist errors
if _, ok := err.(*repository.DoesNotExistError); ok {
console.Debug("No experiment data found")
experimentFilesExist = false
} else {
return err
}
} else {
console.Info("Copied the files from experiment %s to %q", experiment.ShortID(), filepath.Join(outputDir, experiment.Path))
}

// Overlay checkpoint on top of experiment
if checkpoint != nil {

if err := repo.GetPathTar(path.Join("checkpoints", checkpoint.ID+".tar.gz"), outputDir); err != nil {
if _, ok := err.(*repository.DoesNotExistError); ok {
console.Debug("No checkpoint data found")
checkpointFilesExist = false
} else {
return err

}
} else {
console.Info("Copied the files from checkpoint %s to %q", checkpoint.ShortID(), filepath.Join(outputDir, checkpoint.Path))
}

}

if !experimentFilesExist && !checkpointFilesExist {
// Just an experiment, no checkpoints
if checkpoint == nil {
return fmt.Errorf("The experiment %s does not have any files associated with it. You need to pass the 'path' argument to 'init()' to check out files.", experiment.ShortID())
}
return fmt.Errorf("Neither the experiment %s nor the checkpoint %s has any files associated with it. You need to pass the 'path' argument to 'init()' or 'checkpoint()' to check out files.", experiment.ShortID(), checkpoint.ShortID())
}

console.Info(`If you want to run this experiment again, this is how it was run:
` + experiment.Command + `
`)

return nil
return proj.CheckoutCheckpoint(checkpoint, experiment, outputDir)
}
29 changes: 20 additions & 9 deletions go/pkg/cli/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,31 @@ func addRepositoryURLFlagVar(cmd *cobra.Command, opt *string) {
}

// getRepositoryURLFromStringOrConfig attempts to get it from passed string from --repository,
// otherwise finds replicate.yaml recursively
// otherwise finds replicate.yaml recursively.
// The project directory is determined by the following logic:
// * If an explicit directory is passed with -D, that is used
// * Else, if repository URL isn't manually passed with -R, the directory of replicate.yaml is used
// * Otherwise, the current working directory is used
// Returns (repositoryURL, projectDir, error)
func getRepositoryURLFromStringOrConfig(repositoryURL string) (string, string, error) {
projectDir := global.ProjectDirectory
if repositoryURL == "" {
conf, projectDir, err := config.FindConfigInWorkingDir(global.ProjectDirectory)
conf, confProjectDir, err := config.FindConfigInWorkingDir(global.ProjectDirectory)
if err != nil {
return "", "", err
}
return conf.Repository, projectDir, nil
if repositoryURL == "" {
repositoryURL = conf.Repository
}
if global.ProjectDirectory == "" {
projectDir = confProjectDir
} else {
projectDir = global.ProjectDirectory
}
}

// if global.ProjectDirectory == "", abs of that is cwd
// FIXME (bfirsh): this does not look up directories for replicate.yaml, so might be the wrong
// projectDir. It should probably use return value of FindConfigInWorkingDir.
projectDir, err := filepath.Abs(global.ProjectDirectory)
// abs of "" if cwd
projectDir, err := filepath.Abs(projectDir)
if err != nil {
return "", "", fmt.Errorf("Failed to determine absolute directory of '%s': %w", global.ProjectDirectory, err)
}
Expand Down Expand Up @@ -71,14 +82,14 @@ func getProjectDir() (string, error) {
// getRepository returns the project's repository, with caching if needed
// This is not in repository package so we can do user interface stuff around syncing
func getRepository(repositoryURL, projectDir string) (repository.Repository, error) {
repo, err := repository.ForURL(repositoryURL)
repo, err := repository.ForURL(repositoryURL, projectDir)
if err != nil {
return nil, err
}
// projectDir might be "" if you use --repository option
if repository.NeedsCaching(repo) && projectDir != "" {
console.Info("Fetching new data from %q...", repo.RootURL())
repo, err = repository.NewCachedMetadataRepository(repo, projectDir)
repo, err = repository.NewCachedMetadataRepository(projectDir, repo)
if err != nil {
return nil, err
}
Expand Down
40 changes: 40 additions & 0 deletions go/pkg/cli/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cli

import (
"github.com/spf13/cobra"

"github.com/replicate/replicate/go/pkg/project"
"github.com/replicate/replicate/go/pkg/shared"
)

func NewDaemonCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "replicate-daemon <socket-path>",
RunE: runDaemon,
}
setPersistentFlags(cmd)
addRepositoryURLFlag(cmd)
return cmd
}

func runDaemon(cmd *cobra.Command, args []string) error {
socketPath := args[0]

projectGetter := func() (proj *project.Project, err error) {
repositoryURL, projectDir, err := getRepositoryURLFromFlagOrConfig(cmd)
if err != nil {
return nil, err
}
repo, err := getRepository(repositoryURL, projectDir)
if err != nil {
return nil, err
}
proj = project.NewProject(repo, projectDir)
return proj, err
}

if err := shared.Serve(projectGetter, socketPath); err != nil {
return err
}
return nil
}
4 changes: 2 additions & 2 deletions go/pkg/cli/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func diffCheckpoints(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
proj := project.NewProject(repo)
proj := project.NewProject(repo, projectDir)
au := getAurora()
return printDiff(os.Stdout, au, proj, prefix1, prefix2)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func printMapDiff(w *tabwriter.Writer, au aurora.Aurora, map1, map2 map[string]s
// Returns a map of checkpoint things we want to show in diff
func checkpointToMap(checkpoint *project.Checkpoint) map[string]string {
return map[string]string{
"Step": strconv.Itoa(checkpoint.Step),
"Step": strconv.FormatInt(checkpoint.Step, 10),
"Created": checkpoint.Created.In(timezone).Format(time.RFC1123),
"Path": checkpoint.Path,
}
Expand Down
4 changes: 2 additions & 2 deletions go/pkg/cli/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestDiffSameExperiment(t *testing.T) {

conf := &config.Config{}
repo := createShowTestData(t, workingDir, conf)
proj := project.NewProject(repo)
proj := project.NewProject(repo, workingDir)

au := aurora.NewAurora(false)
out := new(bytes.Buffer)
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestDiffDifferentExperiment(t *testing.T) {

conf := &config.Config{}
repo := createShowTestData(t, workingDir, conf)
proj := project.NewProject(repo)
proj := project.NewProject(repo, workingDir)

au := aurora.NewAurora(false)
out := new(bytes.Buffer)
Expand Down
8 changes: 4 additions & 4 deletions go/pkg/cli/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (exp *ListExperiment) GetValue(name string) param.Value {
}
if name == "step" {
if exp.LatestCheckpoint != nil {
return param.Int(exp.LatestCheckpoint.Step)
return param.Int(int64(exp.LatestCheckpoint.Step))
}
return param.Int(0)
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (exp *ListExperiment) GetValue(name string) param.Value {
}

func Experiments(repo repository.Repository, format Format, all bool, filters *param.Filters, sorter *param.Sorter) error {
proj := project.NewProject(repo)
proj := project.NewProject(repo, "")
listExperiments, err := createListExperiments(proj, filters)
if err != nil {
return err
Expand Down Expand Up @@ -209,7 +209,7 @@ func outputTable(experiments []*ListExperiment, all bool) error {

latestCheckpoint := ""
if exp.LatestCheckpoint != nil {
latestCheckpoint = fmt.Sprintf("%s (step %s)", exp.LatestCheckpoint.ShortID(), strconv.Itoa(exp.LatestCheckpoint.Step))
latestCheckpoint = fmt.Sprintf("%s (step %s)", exp.LatestCheckpoint.ShortID(), strconv.FormatInt(exp.LatestCheckpoint.Step, 10))
}
fmt.Fprintf(tw, "%s\t", latestCheckpoint)

Expand All @@ -227,7 +227,7 @@ func outputTable(experiments []*ListExperiment, all bool) error {
bestCheckpoint := ""

if exp.BestCheckpoint != nil {
bestCheckpoint = fmt.Sprintf("%s (step %s)", exp.BestCheckpoint.ShortID(), strconv.Itoa(exp.BestCheckpoint.Step))
bestCheckpoint = fmt.Sprintf("%s (step %s)", exp.BestCheckpoint.ShortID(), strconv.FormatInt(exp.BestCheckpoint.Step, 10))
}
fmt.Fprintf(tw, "%s\t", bestCheckpoint)

Expand Down
2 changes: 1 addition & 1 deletion go/pkg/cli/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func removeExperimentOrCheckpoint(cmd *cobra.Command, prefixes []string) error {
if err != nil {
return err
}
proj := project.NewProject(repo)
proj := project.NewProject(repo, projectDir)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/pkg/cli/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func show(opts showOpts, args []string, out io.Writer) error {
if err != nil {
return err
}
proj := project.NewProject(repo)
proj := project.NewProject(repo, projectDir)
result, err := proj.CheckpointOrExperimentFromPrefix(prefix)
if err != nil {
return err
Expand Down Expand Up @@ -139,7 +139,7 @@ func showExperiment(au aurora.Aurora, out io.Writer, proj *project.Project, exp
fmt.Fprintf(cw, "%s\n", strings.Join(headings, "\t"))

for _, checkpoint := range exp.Checkpoints {
columns := []string{checkpoint.ShortID(), strconv.Itoa(checkpoint.Step), console.FormatTime(checkpoint.Created)}
columns := []string{checkpoint.ShortID(), strconv.FormatInt(checkpoint.Step, 10), console.FormatTime(checkpoint.Created)}
for _, label := range labelNames {
val := checkpoint.Metrics[label]
s := val.ShortString(10, 5)
Expand Down
4 changes: 2 additions & 2 deletions go/pkg/cli/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestShowCheckpoint(t *testing.T) {

conf := &config.Config{}
repo := createShowTestData(t, workingDir, conf)
proj := project.NewProject(repo)
proj := project.NewProject(repo, workingDir)
result, err := proj.CheckpointOrExperimentFromPrefix("3cc")
require.NoError(t, err)
require.NotNil(t, result.Checkpoint)
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestShowExperiment(t *testing.T) {

conf := &config.Config{}
repo := createShowTestData(t, workingDir, conf)
proj := project.NewProject(repo)
proj := project.NewProject(repo, workingDir)
result, err := proj.CheckpointOrExperimentFromPrefix("1eee")
require.NoError(t, err)
require.NotNil(t, result.Experiment)
Expand Down
Loading

0 comments on commit fceb76b

Please sign in to comment.