Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move r2-pump from GCP function to binary #283

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ jobs:
asset_name: git-sync
asset_content_type: application/octet-stream

- uses: actions/upload-release-asset@v1
name: release r2-pump
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./bin/r2-pump
asset_name: r2-pump
asset_content_type: application/octet-stream

test:
name: Test
runs-on: ubuntu-latest
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ define generate-func-make
endef

.PHONY: all
all: bin/process-version-host bin/git-sync bin/checker \
all: bin/process-version-host bin/git-sync bin/checker bin/r2-pump \
;$(foreach n,${CLOUD_FUNCTIONS},$(call generate-func-make,$n))

bin/checker:
Expand All @@ -22,6 +22,9 @@ bin/process-version-host:
bin/process-version:
go build $(GO_BUILD_ARGS) -o bin/process-version ./cmd/process-version

bin/r2-pump:
go build $(GO_BUILD_ARGS) -o bin/r2-pump ./cmd/r2-pump

.PHONY: schema
schema:
./bin/packages human > schema_human.json
Expand Down
4 changes: 2 additions & 2 deletions audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ func WroteAlgolia(ctx context.Context, pkgName string, currVersion string, lastV
return nil
}

func WroteR2(ctx context.Context, pkgName string, version string, keys []string, ext string) error {
func WroteR2(ctx context.Context, pkgName string, version string, keys []string) error {
content := bytes.NewBufferString("")
fmt.Fprint(content, "Files:\n")
for _, key := range keys {
fmt.Fprintf(content, "- %s\n", key)
}

if err := create(ctx, pkgName, version, "r2-publish/"+ext, content); err != nil {
if err := create(ctx, pkgName, version, "r2-publish", content); err != nil {
return errors.Wrap(err, "could not create audit log file")
}
return nil
Expand Down
201 changes: 201 additions & 0 deletions cmd/r2-pump/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"runtime"
"time"

"github.com/pkg/errors"

"github.com/cdnjs/tools/audit"
"github.com/cdnjs/tools/gcp"
"github.com/cdnjs/tools/metrics"
"github.com/cdnjs/tools/packages"
"github.com/cdnjs/tools/sentry"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"

"cloud.google.com/go/pubsub"
)

var (
PROJECT = os.Getenv("PROJECT")
SUBSCRIPTION = os.Getenv("SUBSCRIPTION")

R2_BUCKET = os.Getenv("R2_BUCKET")
R2_KEY_ID = os.Getenv("R2_KEY_ID")
R2_KEY_SECRET = os.Getenv("R2_KEY_SECRET")
R2_ENDPOINT = os.Getenv("R2_ENDPOINT")
)

func init() {
sentry.Init()
}

func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, PROJECT)
if err != nil {
log.Fatalf("could not create pubsub Client: %v", err)
}
sub := client.Subscription(SUBSCRIPTION)
sub.ReceiveSettings.Synchronous = true
sub.ReceiveSettings.MaxOutstandingMessages = 5
sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

for {
log.Printf("started consuming messages\n")
if err := consume(ctx, client, sub); err != nil {
log.Fatalf("could not pull messages: %s", err)
}
}
}

type Message struct {
GCSEvent gcp.GCSEvent `json:"gcsEvent"`
}

func consume(ctx context.Context, client *pubsub.Client, sub *pubsub.Subscription) error {
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("received message: %s\n", msg.Data)

msg.Ack()
if err := processMessage(ctx, msg.Data); err != nil {
log.Printf("failed to process message: %s\n", err)
}
})
if err != nil {
return errors.Wrap(err, "could not receive from subscription")
}
return nil
}

func processMessage(ctx context.Context, data []byte) error {
var message Message
if err := json.Unmarshal(data, &message); err != nil {
return errors.Wrap(err, "failed to parse")
}

return invoke(ctx, message.GCSEvent)
}

func invoke(ctx context.Context, e gcp.GCSEvent) error {
sentry.Init()
defer sentry.PanicHandler()

pkgName := e.Metadata["package"].(string)
version := e.Metadata["version"].(string)
log.Printf("Invoke %s %s\n", pkgName, version)

configStr, err := b64.StdEncoding.DecodeString(e.Metadata["config"].(string))
if err != nil {
return fmt.Errorf("could not decode config: %v", err)
}

archive, err := gcp.ReadObject(ctx, e.Bucket, e.Name)
if err != nil {
return fmt.Errorf("could not read object: %v", err)
}

bucket := aws.String(R2_BUCKET)

r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: R2_ENDPOINT,
}, nil
})

cfg, err := config.LoadDefaultConfig(ctx,
config.WithEndpointResolverWithOptions(r2Resolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(R2_KEY_ID, R2_KEY_SECRET, "")),
)
if err != nil {
return fmt.Errorf("could not load config: %s", err)
}

s3Client := s3.NewFromConfig(cfg)

keys := make([]string, 0)

onFile := func(name string, r io.Reader) error {
// remove leading slash
name = name[1:]
key := fmt.Sprintf("%s/%s/%s", pkgName, version, name)

content, err := ioutil.ReadAll(r)
if err != nil {
return errors.Wrap(err, "could not read file")
}

keys = append(keys, key)

meta := newMetadata(len(content))

s3Object := s3.PutObjectInput{
Body: bytes.NewReader(content),
Bucket: bucket,
Key: aws.String(key),
Metadata: meta,
}
if err := uploadFile(ctx, s3Client, &s3Object); err != nil {
return errors.Wrap(err, "failed to upload file")
}
return nil
}
if err := gcp.Inflate(bytes.NewReader(archive), onFile); err != nil {
return fmt.Errorf("could not inflate archive: %s", err)
}

if len(keys) == 0 {
log.Printf("%s: no files to publish\n", pkgName)
}

pkg := new(packages.Package)
if err := json.Unmarshal([]byte(configStr), &pkg); err != nil {
return fmt.Errorf("failed to parse config: %s", err)
}

if err := audit.WroteR2(ctx, pkgName, version, keys); err != nil {
log.Printf("failed to audit: %s\n", err)
}
if err := metrics.NewUpdatePublishedR2(); err != nil {
return errors.Wrap(err, "could not report metrics")
}

return nil
}

func newMetadata(size int) map[string]string {
lastModifiedTime := time.Now()
lastModifiedSeconds := lastModifiedTime.UnixNano() / int64(time.Second)
lastModifiedStr := lastModifiedTime.Format(http.TimeFormat)
etag := fmt.Sprintf("%x-%x", lastModifiedSeconds, size)

meta := make(map[string]string)

// https://github.com/cdnjs/origin-worker/blob/ff91d30586c9e924ff919407401dff6f52826b4d/src/index.js#L212-L213
meta["etag"] = etag
meta["last_modified"] = lastModifiedStr

return meta
}

func uploadFile(ctx context.Context, s3Client *s3.Client, obj *s3.PutObjectInput) error {
if _, err := s3Client.PutObject(ctx, obj); err != nil {
return errors.Wrapf(err, "failed to put Object %s", *obj.Key)
}

return nil
}
30 changes: 3 additions & 27 deletions functions/r2-pump/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,22 @@ go 1.20
replace github.com/cdnjs/tools => ../../

require (
github.com/aws/aws-sdk-go v1.44.282
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/credentials v1.13.26
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
cloud.google.com/go/pubsub v1.10.3
github.com/cdnjs/tools v0.0.0-00010101000000-000000000000
github.com/cloudflare/cloudflare-go v0.69.0
github.com/pkg/errors v0.9.1
)

require (
cloud.google.com/go v0.81.0 // indirect
cloud.google.com/go/storage v1.15.0 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/algolia/algoliasearch-client-go/v3 v3.4.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/getsentry/sentry-go v0.6.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-github v17.0.0+incompatible // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/karrick/godirwalk v1.15.6 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
Expand All @@ -55,9 +31,9 @@ require (
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/api v0.45.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading
Loading