Skip to content

Commit

Permalink
Implement the functionality to backfill for extracting node-pack info…
Browse files Browse the repository at this point in the history
…rmation (#108)

Co-authored-by: James Kwon <[email protected]>
  • Loading branch information
james03160927 and james03160927 authored Jan 1, 2025
1 parent 4d8f856 commit d6aa70b
Show file tree
Hide file tree
Showing 23 changed files with 523 additions and 148 deletions.
Empty file removed .env
Empty file.
21 changes: 11 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package config

type Config struct {
ProjectID string
DripEnv string
SlackRegistryChannelWebhook string
JWTSecret string
DiscordSecurityChannelWebhook string
ProjectID string
DripEnv string
SlackRegistryChannelWebhook string
JWTSecret string
DiscordSecurityChannelWebhook string
DiscordSecurityPrivateChannelWebhook string
SecretScannerURL string
IDTokenAudience string
AlgoliaAppID string
AlgoliaAPIKey string
CloudStorageBucketName string
SecretScannerURL string
IDTokenAudience string
AlgoliaAppID string
AlgoliaAPIKey string
CloudStorageBucketName string
PubSubTopic string
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ services:
ALGOLIA_API_KEY:
SECRET_SCANNER_URL: ""
CLOUD_STORAGE_BUCKET_NAME: "staging-comfy-registry"
PUBSUB_TOPIC: comfy-registry-event-staging
294 changes: 192 additions & 102 deletions drip/api.gen.go

Large diffs are not rendered by default.

100 changes: 100 additions & 0 deletions gateways/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package pubsub

import (
"context"
"encoding/json"
"fmt"
"net/url"
"registry-backend/config"
"strconv"
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/rs/zerolog/log"
)

type PubSubService interface {
PublishNodePack(ctx context.Context, storageURL string) error
}

var _ PubSubService = (*pubsubimpl)(nil)

type pubsubimpl struct {
client *pubsub.Client
config *config.Config
topic *pubsub.Topic
}

func NewPubSubService(c *config.Config) (PubSubService, error) {
if c == nil || c.PubSubTopic == "" {
// Return a noop implementation if config is nil or storage is not enabled
log.Info().Msg("No pub sub configuration found, using noop implementation")
return &pubsubNoop{}, nil
}

// Initialize GCP storage client
client, err := pubsub.NewClient(context.Background(), c.ProjectID)
if err != nil {
return nil, fmt.Errorf("NewPubSubService: %v", err)
}
return &pubsubimpl{
client: client,
config: c,
topic: client.Topic(c.PubSubTopic),
}, nil
}

// PublishNodePack implements PubSubService.
func (p *pubsubimpl) PublishNodePack(ctx context.Context, storageURL string) (err error) {
u, err := url.Parse(storageURL)
if err != nil {
return fmt.Errorf("invalid storage URL: %w", err)
}

segments := strings.Split(u.Path, "/")
if len(segments) < 2 {
return fmt.Errorf("invalid storage URL: %w", err)
}
bucket := segments[1]
object := strings.Join(segments[2:], "/")
now := time.Now()
messagePayload := map[string]interface{}{
"kind": "storage#object",
"id": fmt.Sprintf("%s/%s/%d", bucket, object, now.Unix()),
"selfLink": fmt.Sprintf("https://www.googleapis.com/storage/v1/b/%s/o/%s", object, bucket),
"name": object,
"bucket": bucket,
"generation": strconv.FormatInt(time.Now().Unix(), 10),
"metageneration": "1",
"mediaLink": fmt.Sprintf("https://storage.googleapis.com/%s/%s", bucket, object),
}
// Marshal the payload to JSON
jsonData, err := json.Marshal(messagePayload)
if err != nil {
return fmt.Errorf("Failed to marshal JSON: %v", err)
}

result := p.topic.Publish(ctx, &pubsub.Message{
Data: jsonData,
Attributes: map[string]string{
"eventType": "OBJECT_FINALIZE", // Optional attribute for event type
},
})

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Failed to publish message: %v", err)
}

return
}

var _ PubSubService = (*pubsubNoop)(nil)

type pubsubNoop struct{}

// PublishNodePack implements PubSubService.
func (p *pubsubNoop) PublishNodePack(ctx context.Context, storageURL string) error {
return nil
}
62 changes: 62 additions & 0 deletions gateways/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pubsub

import (
"context"
"encoding/json"
"fmt"
"os"
"registry-backend/config"
"testing"
"time"

"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPublish(t *testing.T) {
projectID, ok := os.LookupEnv("PROJECT_ID")
if !ok {
t.Skip("PROJECT_ID is not set")
}

client, err := pubsub.NewClient(context.Background(), projectID)
require.NoError(t, err)

topic := client.Topic(fmt.Sprintf("pubsub-topic-test-%d", time.Now().Unix()))
t.Cleanup(func() {
t.Logf("Deleting topic %s", topic.ID())
topic.Delete(context.Background())
})
client.CreateTopic(context.Background(), topic.ID())

pubsubsvc, err := NewPubSubService(&config.Config{ProjectID: projectID, PubSubTopic: topic.ID()})
require.NoError(t, err)

subscriptionID := fmt.Sprintf("sub-%d", time.Now().Unix())
sub, err := client.CreateSubscription(context.Background(), subscriptionID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
RetainAckedMessages: false,
})
require.NoError(t, err)

err = pubsubsvc.PublishNodePack(context.Background(), "https://storage.cloud.google.com/testbucket/path1/path2/file.tar.gz")
require.NoError(t, err)
pubsubsvc.(*pubsubimpl).topic.Flush()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
data := map[string]string{}
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
t.Log(msg)
assert.NoError(t, json.Unmarshal(msg.Data, &data))
t.Log(data)
msg.Ack()
cancel()
})
<-ctx.Done()
require.NoError(t, err)
assert.Equal(t, "testbucket", data["bucket"])
assert.Equal(t, "path1/path2/file.tar.gz", data["name"])
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21.5

require (
cloud.google.com/go/monitoring v1.18.0
cloud.google.com/go/pubsub v1.36.1
cloud.google.com/go/storage v1.38.0
entgo.io/ent v0.13.1
firebase.google.com/go v3.13.0+incompatible
Expand Down
16 changes: 6 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ cloud.google.com/go/firestore v1.14.0 h1:8aLcKnMPoldYU3YHgu4t2exrKhLQkqaXAGqT0lj
cloud.google.com/go/firestore v1.14.0/go.mod h1:96MVaHLsEhbvkBEdZgfN+AS/GIkco1LRpH9Xp9YZfzQ=
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI=
cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM=
cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI=
cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg=
cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s=
cloud.google.com/go/monitoring v1.18.0 h1:NfkDLQDG2UR3WYZVQE8kwSbUIEyIqJUPl+aOQdFH1T4=
cloud.google.com/go/monitoring v1.18.0/go.mod h1:c92vVBCeq/OB4Ioyo+NbN2U7tlg5ZH41PZcdvfc+Lcg=
cloud.google.com/go/pubsub v1.36.1 h1:dfEPuGCHGbWUhaMCTHUFjfroILEkx55iUmKBZTP5f+Y=
cloud.google.com/go/pubsub v1.36.1/go.mod h1:iYjCa9EzWOoBiTdd4ps7QoMtMln5NwaZQpK1hbRfBDE=
cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg=
cloud.google.com/go/storage v1.38.0/go.mod h1:tlUADB0mAb9BgYls9lq+8MGkfzOXuLrnHXlpHmvFJoY=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
Expand Down Expand Up @@ -151,8 +155,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI=
github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY=
github.com/invopop/yaml v0.2.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand Down Expand Up @@ -197,8 +199,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
Expand All @@ -219,8 +219,6 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI=
Expand Down Expand Up @@ -249,10 +247,6 @@ github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -290,6 +284,8 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zclconf/go-cty v1.14.2 h1:kTG7lqmBou0Zkx35r6HJHUQTvaRPr5bIAf3AoHS0izI=
github.com/zclconf/go-cty v1.14.2/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA=
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/ban_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestBan(t *testing.T) {

// Initialize the Service
mockStorageService := new(gateways.MockStorageService)
mockPubsubService := new(gateways.MockPubSubService)
mockSlackService := new(gateways.MockSlackService)
mockDiscordService := new(gateways.MockDiscordService)
mockSlackService.
Expand All @@ -37,7 +38,7 @@ func TestBan(t *testing.T) {
Return(nil)

impl := implementation.NewStrictServerImplementation(
client, &config.Config{}, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia)
client, &config.Config{}, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia)

authz := drip_authorization.NewAuthorizationManager(client, impl.RegistryService).AuthorizationMiddleware()

Expand Down
3 changes: 2 additions & 1 deletion integration-tests/ci_cd_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestCICD(t *testing.T) {
mockStorageService := new(gateways.MockStorageService)
mockSlackService := new(gateways.MockSlackService)
mockDiscordService := new(gateways.MockDiscordService)
mockPubsubService := new(gateways.MockPubSubService)
mockSlackService.
On("SendRegistryMessageToSlack", mock.Anything).
Return(nil) // Do nothing for all slack messsage calls.
Expand All @@ -35,7 +36,7 @@ func TestCICD(t *testing.T) {
On("IndexNodes", mock.Anything, mock.Anything).
Return(nil)
impl := implementation.NewStrictServerImplementation(
client, &config.Config{}, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia)
client, &config.Config{}, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia)

ctx := context.Background()
now := time.Now()
Expand Down
44 changes: 30 additions & 14 deletions integration-tests/registry_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,13 @@ type mockedImpl struct {
mockSlackService *gateways.MockSlackService
mockDiscordService *gateways.MockDiscordService
mockAlgolia *gateways.MockAlgoliaService
mockPubsubService *gateways.MockPubSubService
}

func newMockedImpl(client *ent.Client, cfg *config.Config) (impl mockedImpl, authz strictecho.StrictEchoMiddlewareFunc) {
// Initialize the Service
mockStorageService := new(gateways.MockStorageService)
mockPubsubService := new(gateways.MockPubSubService)

mockDiscordService := new(gateways.MockDiscordService)
mockDiscordService.On("SendSecurityCouncilMessage", mock.Anything, mock.Anything).
Expand All @@ -137,11 +139,12 @@ func newMockedImpl(client *ent.Client, cfg *config.Config) (impl mockedImpl, aut

impl = mockedImpl{
DripStrictServerImplementation: implementation.NewStrictServerImplementation(
client, cfg, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia),
client, cfg, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia),
mockStorageService: mockStorageService,
mockSlackService: mockSlackService,
mockDiscordService: mockDiscordService,
mockAlgolia: mockAlgolia,
mockPubsubService: mockPubsubService,
}
authz = drip_authorization.NewAuthorizationManager(client, impl.RegistryService).
AuthorizationMiddleware()
Expand Down Expand Up @@ -862,7 +865,6 @@ func TestRegistryComfyNode(t *testing.T) {
Body: pub,
})
require.NoError(t, err, "should return created publisher")
// createdPublisher := (respub.(drip.CreatePublisher201JSONResponse))

tokenName := "test-token-name"
tokenDescription := "test-token-description"
Expand Down Expand Up @@ -894,24 +896,30 @@ func TestRegistryComfyNode(t *testing.T) {
})
require.NoError(t, err, "should not return error")

// create another node version
_, err = withMiddleware(authz, impl.PublishNodeVersion)(ctx, drip.PublishNodeVersionRequestObject{
PublisherId: *pub.Id,
NodeId: *node.Id,
Body: &drip.PublishNodeVersionJSONRequestBody{
PersonalAccessToken: token,
Node: *node,
NodeVersion: *randomNodeVersion(1),
},
})
require.NoError(t, err, "should not return error")
// create another node versions
nodeVersionToBeBackfill := []*drip.NodeVersion{
randomNodeVersion(1),
randomNodeVersion(2),
}
for _, nv := range nodeVersionToBeBackfill {
_, err = withMiddleware(authz, impl.PublishNodeVersion)(ctx, drip.PublishNodeVersionRequestObject{
PublisherId: *pub.Id,
NodeId: *node.Id,
Body: &drip.PublishNodeVersionJSONRequestBody{
PersonalAccessToken: token,
Node: *node,
NodeVersion: *nv,
},
})
require.NoError(t, err, "should not return error")
}

t.Run("NoComfyNode", func(t *testing.T) {
res, err := withMiddleware(authz, impl.GetNodeVersion)(ctx, drip.GetNodeVersionRequestObject{
NodeId: *node.Id,
VersionId: *nodeVersion.Version,
})
require.NoError(t, err, "should return created node version")
require.NoError(t, err, "should not return error")
require.IsType(t, drip.GetNodeVersion200JSONResponse{}, res)
assert.Empty(t, res.(drip.GetNodeVersion200JSONResponse).ComfyNodes)
})
Expand Down Expand Up @@ -1023,4 +1031,12 @@ func TestRegistryComfyNode(t *testing.T) {
}
assert.True(t, found)
})

t.Run("TriggerBackfill", func(t *testing.T) {
impl.mockPubsubService.On("PublishNodePack", mock.Anything, mock.Anything).Return(nil)
res, err := withMiddleware(authz, impl.ComfyNodesBackfill)(ctx, drip.ComfyNodesBackfillRequestObject{})
require.NoError(t, err, "should return created node version")
require.IsType(t, drip.ComfyNodesBackfill204Response{}, res)
impl.mockPubsubService.AssertNumberOfCalls(t, "PublishNodePack", len(nodeVersionToBeBackfill))
})
}
Loading

0 comments on commit d6aa70b

Please sign in to comment.