Skip to content

Commit

Permalink
internal/scan,cmd/ejobs: support GCS module files
Browse files Browse the repository at this point in the history
The user can specify a list of modules by uploading a file to GCS
and providing that file to the analysis/enqueue route.
The ejobs command supports this with the -file flag to start, which
takes a local file and uploads it, as it does with binaries.

There has always been a way to provide a file of modules instead of
reading them from pkgsite, but the filename had to refer to a local
file, making it useful only for local testing or files included in the
deployment. This feature makes it more generally useful.

Change-Id: Iea6b7949e2314b656e36584067e04a6e55b27b90
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/643535
Reviewed-by: Zvonimir Pavlinovic <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
jba committed Jan 21, 2025
1 parent 76b4e21 commit 5f32e1b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 35 deletions.
103 changes: 72 additions & 31 deletions cmd/ejobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
var (
minImporters int // for start
noDeps bool // for start
moduleFile string // for start
waitInterval time.Duration // for wait
force bool // for results
errs bool // for results
Expand All @@ -64,12 +65,14 @@ var commands = []command{
{"cancel", "JOBID...",
"cancel the jobs",
doCancel, nil},
{"start", "[-min MIN_IMPORTERS] BINARY ARGS...",
{"start", "[-min MIN_IMPORTERS] [-file MODULE_FILE] [-nodeps] BINARY ARGS...",
"start a job",
doStart,
func(fs *flag.FlagSet) {
fs.IntVar(&minImporters, "min", -1,
"run on modules with at least this many importers (<0: use server default of 10)")
fs.StringVar(&moduleFile, "file", "",
"file with modules to use: each line is MODULE_PATH VERSION NUM_IMPORTERS")
fs.BoolVar(&noDeps, "nodeps", false, "do not download dependencies for modules")
},
},
Expand Down Expand Up @@ -268,10 +271,18 @@ func doWait(ctx context.Context, args []string) error {
return nil
}

// GCS folders for types of files.
const (
binaryFolder = "analysis-binaries"
moduleFileFolder = "module-files"
)

func doStart(ctx context.Context, args []string) error {
user := os.Getenv("USER")

// Validate arguments.
if len(args) == 0 {
return errors.New("wrong number of args: want [-min N] [-nodeps] BINARY [ARG1 ARG2 ...]")
return errors.New("wrong number of args: want [-min N] [-file MODULE_FILE] [-nodeps] BINARY [ARG1 ARG2 ...]")
}
binaryFile := args[0]
if fi, err := os.Stat(binaryFile); err != nil {
Expand All @@ -291,25 +302,50 @@ func doStart(ctx context.Context, args []string) error {
return fmt.Errorf("arg %q contains whitespace: not supported", arg)
}
}

// Copy binary to GCS if it's not already there.
if canceled, err := uploadAnalysisBinary(ctx, binaryFile); err != nil {
if _, canceled, err := uploadFile(ctx, binaryFile, binaryFolder); err != nil {
return err
} else if canceled {
return nil
}

// Copy file to GCS if one is given.
modFileFolder := moduleFileFolder
if user != "" {
modFileFolder = path.Join(user, modFileFolder)
}

var gcsPath string
if moduleFile != "" {
var canceled bool
var err error
gcsPath, canceled, err = uploadFile(ctx, moduleFile, modFileFolder)
if err != nil {
return err
}
if canceled {
return nil
}
}

// Ask the server to enqueue scan tasks.
its, err := identityTokenSource(ctx)
if err != nil {
return err
}
u := fmt.Sprintf("%s/analysis/enqueue?binary=%s&user=%s&nodeps=%t",
workerURL, filepath.Base(binaryFile), os.Getenv("USER"), noDeps)
workerURL, filepath.Base(binaryFile), user, noDeps)
if len(binaryArgs) > 0 {
u += fmt.Sprintf("&args=%s", url.QueryEscape(strings.Join(binaryArgs, " ")))
}
if minImporters >= 0 {
u += fmt.Sprintf("&min=%d", minImporters)
}
if gcsPath != "" {
gurl := "gs://" + gcsPath
u += fmt.Sprintf("&file=%s", url.QueryEscape(gurl))
}
if *dryRun {
fmt.Printf("dryrun: GET %s\n", u)
return nil
Expand Down Expand Up @@ -352,54 +388,59 @@ func checkIsLinuxAmd64(binaryFile string) error {
return nil
}

// uploadAnalysisBinary copies binaryFile to the GCS location used for
// analysis binaries. The user can cancel the upload if the file with
// the same name is already on GCS, upon which true is returned. Otherwise,
// false is returned.
// uploadFile copies localFile to the GCS location used for files.
// The GCS bucket is the projectID, defined above.
// The name of the destination object is the join of the remoteFolder and the basename
// of the localFile. For example, file ~/things/data.txt uploaded to folder "stuff" will be written
// to the object "stuff/data.txt".
//
// As an optimization, it skips the upload if the file on GCS has the
// same checksum as the local file.
func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool, err error) {
// The user can cancel the upload if the file with the same name is already on GCS,
// upon which true is returned. Otherwise, false is returned.
//
// As an optimization, the upload is skipped if the file on GCS has the same checksum as the local file.
func uploadFile(ctx context.Context, localFile, remoteFolder string) (gcsPath string, canceled bool, err error) {
const bucketName = projectID
baseName := filepath.Base(localFile)
objectName := path.Join(remoteFolder, baseName)
gcsPath = path.Join(bucketName, objectName)

if *dryRun {
fmt.Printf("dryrun: upload analysis binary %s\n", binaryFile)
return false, nil
fmt.Printf("dryrun: upload file %s\n", localFile)
return gcsPath, false, nil
}
const bucketName = projectID
binaryName := filepath.Base(binaryFile)
objectName := path.Join("analysis-binaries", binaryName)

ts, err := accessTokenSource(ctx)
if err != nil {
return false, err
return "", false, err
}
c, err := storage.NewClient(ctx, option.WithTokenSource(ts))
if err != nil {
return false, err
return "", false, err
}
defer c.Close()
bucket := c.Bucket(bucketName)
object := bucket.Object(objectName)
attrs, err := object.Attrs(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
fmt.Printf("%s binary does not exist on GCS: uploading\n", binaryName)
fmt.Printf("%s file does not exist on GCS: uploading\n", baseName)
} else if err != nil {
return false, err
return "", false, err
} else if g, w := len(attrs.MD5), md5.Size; g != w {
return false, fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w)
return "", false, fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w)

} else {
localMD5, err := fileMD5(binaryFile)
localMD5, err := fileMD5(localFile)
if err != nil {
return false, err
return "", false, err
}
if bytes.Equal(localMD5, attrs.MD5) {
fmt.Printf("Binary %q on GCS has the same checksum: not uploading.\n", binaryName)
return false, nil
fmt.Printf("File %q on GCS has the same checksum: not uploading.\n", baseName)
return gcsPath, false, nil
}
// Ask the users if they want to overwrite the existing binary
// Ask the users if they want to overwrite the existing file
// while providing more info to help them with their decision.
updated := attrs.Updated.In(time.Local).Format(time.RFC1123) // use local time zone
fmt.Printf("The binary %q already exists on GCS.\n", binaryName)
fmt.Printf("The file %q already exists on GCS.\n", baseName)
fmt.Printf("It was last uploaded on %s", updated)
// Communicate uploader info if available.
if uploader := attrs.Metadata[uploaderMetadataKey]; uploader != "" {
Expand All @@ -412,12 +453,12 @@ func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool
if r := strings.TrimSpace(response); r != "y" && r != "Y" {
// Accept "Y" and "y" as confirmation.
fmt.Println("Cancelling.")
return true, nil
return "", true, nil
}
}
fmt.Printf("Uploading.\n")
if err := copyToGCS(ctx, object, binaryFile); err != nil {
return false, err
if err := copyToGCS(ctx, object, localFile); err != nil {
return "", false, err
}

// Add the uploader information for better messaging in the future.
Expand All @@ -427,7 +468,7 @@ func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool
// Refetch the object, otherwise attribute uploading won't have effect.
object = bucket.Object(objectName)
object.Update(ctx, toUpdate) // disregard errors
return false, nil
return gcsPath, false, nil
}

// fileMD5 computes the MD5 checksum of the given file.
Expand Down
29 changes: 25 additions & 4 deletions internal/scan/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ package scan

import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/version"
)
Expand All @@ -33,7 +36,7 @@ type ModuleSpec struct {
}

func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec, err error) {
defer derrors.Wrap(&err, "parseCorpusFile(%q)", filename)
defer derrors.Wrap(&err, "ParseCorpusFile(%q)", filename)
lines, err := ReadFileLines(filename)
if err != nil {
return nil, err
Expand Down Expand Up @@ -67,11 +70,13 @@ func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec,
// ReadFileLines reads and returns the lines from a file.
// Whitespace on each line is trimmed.
// Blank lines and lines beginning with '#' are ignored.
//
// If filename begins "gs://", it is intepreted as a GCS object.
func ReadFileLines(filename string) (lines []string, err error) {
defer derrors.Wrap(&err, "readFileLines(%q)", filename)
f, err := os.Open(filename)
defer derrors.Wrap(&err, "ReadFileLines(%q)", filename)
f, err := openFile(context.TODO(), filename)
if err != nil {
return nil, err
return nil, fmt.Errorf("openFile(%q): %w", filename, err)
}
defer f.Close()

Expand All @@ -89,6 +94,22 @@ func ReadFileLines(filename string) (lines []string, err error) {
return lines, nil
}

func openFile(ctx context.Context, filename string) (io.ReadCloser, error) {
if !strings.HasPrefix(filename, "gs://") {
return os.Open(filename)
}
url := strings.TrimPrefix(filename, "gs://")
bucket, object, found := strings.Cut(url, "/")
if !found {
return nil, fmt.Errorf("bad GCS url (no slash): %q", filename)
}
c, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %w", err)
}
return c.Bucket(bucket).Object(object).NewReader(ctx)
}

// A ModuleURLPath holds the components of a URL path parsed
// as module, version and suffix.
type ModuleURLPath struct {
Expand Down
25 changes: 25 additions & 0 deletions internal/scan/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
package scan

import (
"flag"
"net/http"
"reflect"
"slices"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"golang.org/x/pkgsite-metrics/internal/version"
)

var useGCS = flag.Bool("gcs", false, "use GCS in tests")

func TestParseModuleURLPath(t *testing.T) {
for _, test := range []struct {
path string
Expand Down Expand Up @@ -120,6 +124,27 @@ func TestParseCorpusFile(t *testing.T) {
}
}

func TestReadFileLines(t *testing.T) {
// Effectively tested for local files by ParseCorpusFile.
// So just test for GCS.
// This doesn't work in CI so protecct it with a flag.
if !*useGCS {
t.Skip("need -gcs")
}
got, err := ReadFileLines("gs://go-ecosystem/test-modfile")
if err != nil {
t.Fatal(err)
}
want := []string{
"mod1 v1.2.3 5",
"mod2 v1.0.0 10",
"mod3/v2 v2.1.2 0",
}
if !slices.Equal(got, want) {
t.Errorf("\ngot %v\nwant %v", got, want)
}
}

type params struct {
Str string
Int int
Expand Down

0 comments on commit 5f32e1b

Please sign in to comment.