diff --git a/cmd/ejobs/main.go b/cmd/ejobs/main.go index 41de5b3..931b7fd 100644 --- a/cmd/ejobs/main.go +++ b/cmd/ejobs/main.go @@ -48,6 +48,7 @@ var ( var ( minImporters int // for start noDeps bool // for start + moduleFile string // for start waitInterval time.Duration // for wait force bool // for results errs bool // for results @@ -64,12 +65,14 @@ var commands = []command{ {"cancel", "JOBID...", "cancel the jobs", doCancel, nil}, - {"start", "[-min MIN_IMPORTERS] BINARY ARGS...", + {"start", "[-min MIN_IMPORTERS] [-file MODULE_FILE] [-nodeps] BINARY ARGS...", "start a job", doStart, func(fs *flag.FlagSet) { fs.IntVar(&minImporters, "min", -1, "run on modules with at least this many importers (<0: use server default of 10)") + fs.StringVar(&moduleFile, "file", "", + "file with modules to use: each line is MODULE_PATH VERSION NUM_IMPORTERS") fs.BoolVar(&noDeps, "nodeps", false, "do not download dependencies for modules") }, }, @@ -268,10 +271,18 @@ func doWait(ctx context.Context, args []string) error { return nil } +// GCS folders for types of files. +const ( + binaryFolder = "analysis-binaries" + moduleFileFolder = "module-files" +) + func doStart(ctx context.Context, args []string) error { + user := os.Getenv("USER") + // Validate arguments. if len(args) == 0 { - return errors.New("wrong number of args: want [-min N] [-nodeps] BINARY [ARG1 ARG2 ...]") + return errors.New("wrong number of args: want [-min N] [-file MODULE_FILE] [-nodeps] BINARY [ARG1 ARG2 ...]") } binaryFile := args[0] if fi, err := os.Stat(binaryFile); err != nil { @@ -291,25 +302,50 @@ func doStart(ctx context.Context, args []string) error { return fmt.Errorf("arg %q contains whitespace: not supported", arg) } } + // Copy binary to GCS if it's not already there. - if canceled, err := uploadAnalysisBinary(ctx, binaryFile); err != nil { + if _, canceled, err := uploadFile(ctx, binaryFile, binaryFolder); err != nil { return err } else if canceled { return nil } + + // Copy file to GCS if one is given. + modFileFolder := moduleFileFolder + if user != "" { + modFileFolder = path.Join(user, modFileFolder) + } + + var gcsPath string + if moduleFile != "" { + var canceled bool + var err error + gcsPath, canceled, err = uploadFile(ctx, moduleFile, modFileFolder) + if err != nil { + return err + } + if canceled { + return nil + } + } + // Ask the server to enqueue scan tasks. its, err := identityTokenSource(ctx) if err != nil { return err } u := fmt.Sprintf("%s/analysis/enqueue?binary=%s&user=%s&nodeps=%t", - workerURL, filepath.Base(binaryFile), os.Getenv("USER"), noDeps) + workerURL, filepath.Base(binaryFile), user, noDeps) if len(binaryArgs) > 0 { u += fmt.Sprintf("&args=%s", url.QueryEscape(strings.Join(binaryArgs, " "))) } if minImporters >= 0 { u += fmt.Sprintf("&min=%d", minImporters) } + if gcsPath != "" { + gurl := "gs://" + gcsPath + u += fmt.Sprintf("&file=%s", url.QueryEscape(gurl)) + } if *dryRun { fmt.Printf("dryrun: GET %s\n", u) return nil @@ -352,54 +388,59 @@ func checkIsLinuxAmd64(binaryFile string) error { return nil } -// uploadAnalysisBinary copies binaryFile to the GCS location used for -// analysis binaries. The user can cancel the upload if the file with -// the same name is already on GCS, upon which true is returned. Otherwise, -// false is returned. +// uploadFile copies localFile to the GCS location used for files. +// The GCS bucket is the projectID, defined above. +// The name of the destination object is the join of the remoteFolder and the basename +// of the localFile. For example, file ~/things/data.txt uploaded to folder "stuff" will be written +// to the object "stuff/data.txt". // -// As an optimization, it skips the upload if the file on GCS has the -// same checksum as the local file. -func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool, err error) { +// The user can cancel the upload if the file with the same name is already on GCS, +// upon which true is returned. Otherwise, false is returned. +// +// As an optimization, the upload is skipped if the file on GCS has the same checksum as the local file. +func uploadFile(ctx context.Context, localFile, remoteFolder string) (gcsPath string, canceled bool, err error) { + const bucketName = projectID + baseName := filepath.Base(localFile) + objectName := path.Join(remoteFolder, baseName) + gcsPath = path.Join(bucketName, objectName) + if *dryRun { - fmt.Printf("dryrun: upload analysis binary %s\n", binaryFile) - return false, nil + fmt.Printf("dryrun: upload file %s\n", localFile) + return gcsPath, false, nil } - const bucketName = projectID - binaryName := filepath.Base(binaryFile) - objectName := path.Join("analysis-binaries", binaryName) ts, err := accessTokenSource(ctx) if err != nil { - return false, err + return "", false, err } c, err := storage.NewClient(ctx, option.WithTokenSource(ts)) if err != nil { - return false, err + return "", false, err } defer c.Close() bucket := c.Bucket(bucketName) object := bucket.Object(objectName) attrs, err := object.Attrs(ctx) if errors.Is(err, storage.ErrObjectNotExist) { - fmt.Printf("%s binary does not exist on GCS: uploading\n", binaryName) + fmt.Printf("%s file does not exist on GCS: uploading\n", baseName) } else if err != nil { - return false, err + return "", false, err } else if g, w := len(attrs.MD5), md5.Size; g != w { - return false, fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w) + return "", false, fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w) } else { - localMD5, err := fileMD5(binaryFile) + localMD5, err := fileMD5(localFile) if err != nil { - return false, err + return "", false, err } if bytes.Equal(localMD5, attrs.MD5) { - fmt.Printf("Binary %q on GCS has the same checksum: not uploading.\n", binaryName) - return false, nil + fmt.Printf("File %q on GCS has the same checksum: not uploading.\n", baseName) + return gcsPath, false, nil } - // Ask the users if they want to overwrite the existing binary + // Ask the users if they want to overwrite the existing file // while providing more info to help them with their decision. updated := attrs.Updated.In(time.Local).Format(time.RFC1123) // use local time zone - fmt.Printf("The binary %q already exists on GCS.\n", binaryName) + fmt.Printf("The file %q already exists on GCS.\n", baseName) fmt.Printf("It was last uploaded on %s", updated) // Communicate uploader info if available. if uploader := attrs.Metadata[uploaderMetadataKey]; uploader != "" { @@ -412,12 +453,12 @@ func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool if r := strings.TrimSpace(response); r != "y" && r != "Y" { // Accept "Y" and "y" as confirmation. fmt.Println("Cancelling.") - return true, nil + return "", true, nil } } fmt.Printf("Uploading.\n") - if err := copyToGCS(ctx, object, binaryFile); err != nil { - return false, err + if err := copyToGCS(ctx, object, localFile); err != nil { + return "", false, err } // Add the uploader information for better messaging in the future. @@ -427,7 +468,7 @@ func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool // Refetch the object, otherwise attribute uploading won't have effect. object = bucket.Object(objectName) object.Update(ctx, toUpdate) // disregard errors - return false, nil + return gcsPath, false, nil } // fileMD5 computes the MD5 checksum of the given file. diff --git a/internal/scan/parse.go b/internal/scan/parse.go index cfdfec4..f50f4bb 100644 --- a/internal/scan/parse.go +++ b/internal/scan/parse.go @@ -7,7 +7,9 @@ package scan import ( "bufio" + "context" "fmt" + "io" "net/http" "net/url" "os" @@ -15,6 +17,7 @@ import ( "strconv" "strings" + "cloud.google.com/go/storage" "golang.org/x/pkgsite-metrics/internal/derrors" "golang.org/x/pkgsite-metrics/internal/version" ) @@ -33,7 +36,7 @@ type ModuleSpec struct { } func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec, err error) { - defer derrors.Wrap(&err, "parseCorpusFile(%q)", filename) + defer derrors.Wrap(&err, "ParseCorpusFile(%q)", filename) lines, err := ReadFileLines(filename) if err != nil { return nil, err @@ -67,11 +70,13 @@ func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec, // ReadFileLines reads and returns the lines from a file. // Whitespace on each line is trimmed. // Blank lines and lines beginning with '#' are ignored. +// +// If filename begins "gs://", it is intepreted as a GCS object. func ReadFileLines(filename string) (lines []string, err error) { - defer derrors.Wrap(&err, "readFileLines(%q)", filename) - f, err := os.Open(filename) + defer derrors.Wrap(&err, "ReadFileLines(%q)", filename) + f, err := openFile(context.TODO(), filename) if err != nil { - return nil, err + return nil, fmt.Errorf("openFile(%q): %w", filename, err) } defer f.Close() @@ -89,6 +94,22 @@ func ReadFileLines(filename string) (lines []string, err error) { return lines, nil } +func openFile(ctx context.Context, filename string) (io.ReadCloser, error) { + if !strings.HasPrefix(filename, "gs://") { + return os.Open(filename) + } + url := strings.TrimPrefix(filename, "gs://") + bucket, object, found := strings.Cut(url, "/") + if !found { + return nil, fmt.Errorf("bad GCS url (no slash): %q", filename) + } + c, err := storage.NewClient(ctx) + if err != nil { + return nil, fmt.Errorf("storage.NewClient: %w", err) + } + return c.Bucket(bucket).Object(object).NewReader(ctx) +} + // A ModuleURLPath holds the components of a URL path parsed // as module, version and suffix. type ModuleURLPath struct { diff --git a/internal/scan/parse_test.go b/internal/scan/parse_test.go index 0960cc3..201dff0 100644 --- a/internal/scan/parse_test.go +++ b/internal/scan/parse_test.go @@ -5,8 +5,10 @@ package scan import ( + "flag" "net/http" "reflect" + "slices" "strings" "testing" @@ -14,6 +16,8 @@ import ( "golang.org/x/pkgsite-metrics/internal/version" ) +var useGCS = flag.Bool("gcs", false, "use GCS in tests") + func TestParseModuleURLPath(t *testing.T) { for _, test := range []struct { path string @@ -120,6 +124,27 @@ func TestParseCorpusFile(t *testing.T) { } } +func TestReadFileLines(t *testing.T) { + // Effectively tested for local files by ParseCorpusFile. + // So just test for GCS. + // This doesn't work in CI so protecct it with a flag. + if !*useGCS { + t.Skip("need -gcs") + } + got, err := ReadFileLines("gs://go-ecosystem/test-modfile") + if err != nil { + t.Fatal(err) + } + want := []string{ + "mod1 v1.2.3 5", + "mod2 v1.0.0 10", + "mod3/v2 v2.1.2 0", + } + if !slices.Equal(got, want) { + t.Errorf("\ngot %v\nwant %v", got, want) + } +} + type params struct { Str string Int int