From d6aa70b871ce8fe4862113f732eb47f9265f6592 Mon Sep 17 00:00:00 2001 From: James Kwon <96548424+james03160927@users.noreply.github.com> Date: Wed, 1 Jan 2025 12:03:41 -0500 Subject: [PATCH] Implement the functionality to backfill for extracting node-pack information (#108) Co-authored-by: James Kwon <96548424+hongil0316@users.noreply.github.com> --- .env | 0 config/config.go | 21 +- docker-compose.yml | 1 + drip/api.gen.go | 294 ++++++++++++------ gateways/pubsub/pubsub.go | 100 ++++++ gateways/pubsub/pubsub_test.go | 62 ++++ go.mod | 1 + go.sum | 16 +- integration-tests/ban_test.go | 3 +- integration-tests/ci_cd_integration_test.go | 3 +- .../registry_integration_test.go | 44 ++- main.go | 2 + mock/gateways/mock_pubsub_service.go | 20 ++ node-pack-extract/cloudbuild.yaml | 2 +- openapi.yml | 30 +- run-service-prod.yaml | 2 + run-service-staging.yaml | 2 + server/implementation/api.implementation.go | 4 +- server/implementation/registry.go | 12 + .../authentication/service_account_auth.go | 1 + .../service_account_auth_test.go | 1 + server/server.go | 12 +- services/registry/registry_svc.go | 38 ++- 23 files changed, 523 insertions(+), 148 deletions(-) delete mode 100644 .env create mode 100644 gateways/pubsub/pubsub.go create mode 100644 gateways/pubsub/pubsub_test.go create mode 100644 mock/gateways/mock_pubsub_service.go diff --git a/.env b/.env deleted file mode 100644 index e69de29..0000000 diff --git a/config/config.go b/config/config.go index 4ae457b..0927c0b 100644 --- a/config/config.go +++ b/config/config.go @@ -1,15 +1,16 @@ package config type Config struct { - ProjectID string - DripEnv string - SlackRegistryChannelWebhook string - JWTSecret string - DiscordSecurityChannelWebhook string + ProjectID string + DripEnv string + SlackRegistryChannelWebhook string + JWTSecret string + DiscordSecurityChannelWebhook string DiscordSecurityPrivateChannelWebhook string - SecretScannerURL string - IDTokenAudience string - AlgoliaAppID string - AlgoliaAPIKey string - CloudStorageBucketName string + SecretScannerURL string + IDTokenAudience string + AlgoliaAppID string + AlgoliaAPIKey string + CloudStorageBucketName string + PubSubTopic string } diff --git a/docker-compose.yml b/docker-compose.yml index 741c9e3..7f95539 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,3 +22,4 @@ services: ALGOLIA_API_KEY: SECRET_SCANNER_URL: "" CLOUD_STORAGE_BUCKET_NAME: "staging-comfy-registry" + PUBSUB_TOPIC: comfy-registry-event-staging diff --git a/drip/api.gen.go b/drip/api.gen.go index 0a748b4..a6af079 100644 --- a/drip/api.gen.go +++ b/drip/api.gen.go @@ -658,6 +658,9 @@ type ServerInterface interface { // Retrieve all distinct branches for a given repo // (GET /branch) GetBranch(ctx echo.Context, params GetBranchParams) error + // trigger comfy nodes backfill + // (POST /comfy-nodes/backfill) + ComfyNodesBackfill(ctx echo.Context) error // Retrieve CI data for a given commit // (GET /gitcommit) GetGitcommit(ctx echo.Context, params GetGitcommitParams) error @@ -823,6 +826,15 @@ func (w *ServerInterfaceWrapper) GetBranch(ctx echo.Context) error { return err } +// ComfyNodesBackfill converts echo context to params. +func (w *ServerInterfaceWrapper) ComfyNodesBackfill(ctx echo.Context) error { + var err error + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.ComfyNodesBackfill(ctx) + return err +} + // GetGitcommit converts echo context to params. func (w *ServerInterfaceWrapper) GetGitcommit(ctx echo.Context) error { var err error @@ -1749,6 +1761,7 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL router.PUT(baseURL+"/admin/nodes/:nodeId/versions/:versionNumber", wrapper.AdminUpdateNodeVersion) router.GET(baseURL+"/branch", wrapper.GetBranch) + router.POST(baseURL+"/comfy-nodes/backfill", wrapper.ComfyNodesBackfill) router.GET(baseURL+"/gitcommit", wrapper.GetGitcommit) router.GET(baseURL+"/gitcommitsummary", wrapper.GetGitcommitsummary) router.GET(baseURL+"/nodes", wrapper.ListAllNodes) @@ -1888,6 +1901,56 @@ func (response GetBranch500Response) VisitGetBranchResponse(w http.ResponseWrite return nil } +type ComfyNodesBackfillRequestObject struct { +} + +type ComfyNodesBackfillResponseObject interface { + VisitComfyNodesBackfillResponse(w http.ResponseWriter) error +} + +type ComfyNodesBackfill204Response struct { +} + +func (response ComfyNodesBackfill204Response) VisitComfyNodesBackfillResponse(w http.ResponseWriter) error { + w.WriteHeader(204) + return nil +} + +type ComfyNodesBackfill400JSONResponse ErrorResponse + +func (response ComfyNodesBackfill400JSONResponse) VisitComfyNodesBackfillResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + +type ComfyNodesBackfill401Response struct { +} + +func (response ComfyNodesBackfill401Response) VisitComfyNodesBackfillResponse(w http.ResponseWriter) error { + w.WriteHeader(401) + return nil +} + +type ComfyNodesBackfill403JSONResponse ErrorResponse + +func (response ComfyNodesBackfill403JSONResponse) VisitComfyNodesBackfillResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(403) + + return json.NewEncoder(w).Encode(response) +} + +type ComfyNodesBackfill500JSONResponse ErrorResponse + +func (response ComfyNodesBackfill500JSONResponse) VisitComfyNodesBackfillResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(500) + + return json.NewEncoder(w).Encode(response) +} + type GetGitcommitRequestObject struct { Params GetGitcommitParams } @@ -3578,6 +3641,9 @@ type StrictServerInterface interface { // Retrieve all distinct branches for a given repo // (GET /branch) GetBranch(ctx context.Context, request GetBranchRequestObject) (GetBranchResponseObject, error) + // trigger comfy nodes backfill + // (POST /comfy-nodes/backfill) + ComfyNodesBackfill(ctx context.Context, request ComfyNodesBackfillRequestObject) (ComfyNodesBackfillResponseObject, error) // Retrieve CI data for a given commit // (GET /gitcommit) GetGitcommit(ctx context.Context, request GetGitcommitRequestObject) (GetGitcommitResponseObject, error) @@ -3763,6 +3829,29 @@ func (sh *strictHandler) GetBranch(ctx echo.Context, params GetBranchParams) err return nil } +// ComfyNodesBackfill operation middleware +func (sh *strictHandler) ComfyNodesBackfill(ctx echo.Context) error { + var request ComfyNodesBackfillRequestObject + + handler := func(ctx echo.Context, request interface{}) (interface{}, error) { + return sh.ssi.ComfyNodesBackfill(ctx.Request().Context(), request.(ComfyNodesBackfillRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "ComfyNodesBackfill") + } + + response, err := handler(ctx, request) + + if err != nil { + return err + } else if validResponse, ok := response.(ComfyNodesBackfillResponseObject); ok { + return validResponse.VisitComfyNodesBackfillResponse(ctx.Response()) + } else if response != nil { + return fmt.Errorf("unexpected response type: %T", response) + } + return nil +} + // GetGitcommit operation middleware func (sh *strictHandler) GetGitcommit(ctx echo.Context, params GetGitcommitParams) error { var request GetGitcommitRequestObject @@ -4776,108 +4865,109 @@ func (sh *strictHandler) GetWorkflowResult(ctx echo.Context, workflowResultId st // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+x9fW/bttb4VyH8u8BW/Bw73e29eJD/0mTtsi1pkDQddtc+vrREy2wlUiOppF6Q7/6A", - "bxIlkbLsvNmdgWFtTerwkDzvPOS5HUQ0yylBRPDBwe2AR3OUQfXXw0hgSn6m0wvEi1TIn3JGc8QERqoD", - "VB0mn+l0gmP5Q4x4xHAufx0cDE5iRASeYcQAnQExR+AznQIxxxwwBRFMUUpJwoGgg+FALHI0OBhwwTBJ", - "BndDC54VpCd4VpBVwBdiTlkb7vs5ArrNwo1olmHhhXGdTK4ZzAJQrhGDCQIfLg5PQcFRDKYLuw4VNEwE", - "ShCT4KYMkmg+ITBDbYhnMEPlTFGKriERIMEC6K986EU0my3UAs5SmHA/lqqTWjvVaQR+HCUj8N+9vZTe", - "7MnJ/TcAOsNiMod87gcrW5YvoAHj218J5OS4N4gMcQ4T5IdjGnsDEzgLQLoi+CuQzVzALAc3c0QckOAG", - "cpDBGA2GgxllGRR6g//9yrvfURHDyTViXMFvDnd0dXwITKsiHx/CiMQd2CISK2RdBoQcwMY0Rv3QTbCY", - "MJRT/1iyhWNB2QIoAvYg69vmK4L/LBDAFTfPKCuR1ZzsolcU2LsQUgoJhpMEsUnBUYCxZQu4mVNguqLY", - "DjXyAc1gNMcETbiAWjz+g6HZ4GDw/8aV1BwbkTk+1Z0vVd+74UCKSigwSSZ8wQXyCIl3tgfQPYK7nCP4", - "pUPQyOb+UiZnE1Jk09AS5UWaAob+LBAXwHT0obQQc0rC1Hu+eE9ZNF9KwPlCyH73B8QFZF2cq9ofkBsk", - "TRRLieI3yr7MUnpzUZBL/YH6lErFMJnhFC0DcKn7vpFd74aDGwOvh5KwXduLdVf+QqefUSQk4COpBs5o", - "jNp6PoICJZQtPMx7AmyjFIUMqYEJjRHAHKSYCxQPNVFKrk4YLXJJ7rIHH4VVlmyfrCYt5Cc+gDHKGZJI", - "+kwIEmPZxAGe1TCvPhqB4/LvGm0AGQJzHMeIAKyF/9WJM5cppSmCRI/tjNYc/DXDaAac3+y+yVG+42BW", - "EGUAwRSLBaAM5AXLKUfeZUNfc8RwhoiA6QrzdD8bAl4oagCCgmgOSYK4HJahjF7D1D9Di2Q3JSIi2GIv", - "p5iIclpyFPQVRYWoiMY7NUzyQkzkzx4D5hjNMJHzkp1ADhnMkECM+wDRQkhImE8kYXo2RE8LXMO0UCDV", - "oklyxTOAYDQHGoJcOKiIW+KLBcoUYu3FMb9AxuBC/pshUTCiOJf7F4zbFdMjcUXdUQqZJAFMSpbmvpGr", - "mQYGDqzhZY4iyUtcjaw6STQsCjmjcRFVaiWwUT6Z8iNj2sKuy5MYCYhT7tOHmuCB7oFigImWxJJg4JQW", - "QtOThCtpc46JWSSGOE2v1WaJ1RYnaDgegihFkAFIYhBREmGOfOyqkFllQS4QzynhHkGL7Hq1DZEKyfYw", - "UlljJgXcHwZE9cEnDxZvsThS5uplkWVQy/WGZ/cAvlGnMyMB6Q6O3mjb0I/sdoRxI470CoPRJsCEV8sI", - "4xhrGj6vLahHLdVJLYO5YrqmRSgo0KOAHGJXrlXbWRou/qms5qzEUKA9ZUT1Iuiazds2HPJiEsEcRlh4", - "jIf3VMAUHJ1fAarRMta2VwvEmH9ZCkt2ArZTH6hJXkz0j76Ve3t+pcThCKBkBM4+nByfHIL3iKcQ/PI/", - "+35dhQWG6STKC58WVo1qxvAa4hROUwSmaEYN/Ut7VFmpfNQFXM4yDF2twX3Ae70MC136GOsAt47Ucqu1", - "a7sylFG2WEoGuluf/ac87HdICmixo+mtKeJqWhBRgF8xKb6CH/ZH+6+8Dg7OJzOG0F8BKstxDnS7Ubk+", - "INL3U77NhCOGfVr8vXRsdKNcSuMNwsSduOVbHyf7Tf9KEbRlaNArUFEl6xY4Rq2fses2crud3pCUwjgQ", - "vdIOqhyl7BgY0nHdcOTb8KuLX6W4dUxw2W/UN5Chwgst92TZ9FNplguXCrucQblJH0xX+TGOkDEkPHQF", - "xdzO59eTox/PLn8E0uG0TouZYxW28eIXVpEx5nkKFzVVGZxmXkxTzOc65tA1w/OyozRsFO91B1cNf/oR", - "MOELZSPZafbZ+CWL0s/1l5vl+vzKVtCWbTBCIRW97rJ0SYWJ6Pa1dENMf1lOBpEikzZk9ethJPC1NAWq", - "n45RioSKwFS/vYaEoNixNSscXIpt2wfKy0ypZ4uNaSpXwfqi0lTRxIu5FcTdIQTeZZJ1bV0VDPGtWsQQ", - "FCg+FAHGgAIpp0HFmuQe2tCVNLjMx6OeRpcKYCASIxJ5xf6hckTlMkk94vYF1i1oOm79XaMVYifVljTi", - "J/7AiJHVV8zDCn+80272O5IuPgHDnPaL+lAOj4TF9D3Y11Bui4sZgjxkM+i2Mi5lmFoTsZeROy2QclFL", - "lTIEM5qm9EZZJSiDRODIdsMkGYHTggswLZWRGyDr6aa2p96QDrXGmpCotdRlRa3JiAxPyzkiscTM0/Qm", - "hUmyXNBc5ZKhLnQIeyWpoywX22yc/alcaJeLjTwarRpt/G2OxByxGqylvOLbnnPEuJRnh1GEOH9PvyCf", - "bA0LqRqD+SWWkEDXlVcdQU8b4RmBQ5BJ36GM9HgiKwoJaYYRIaVaDIpA9HO1SHEZ/uhzrOS3fy6MaB2B", - "ZrBA4TwCR5BIFoSA4yxPa3EjvzK3m9ixU3qw3Gw+gGr37YCyB9BxPrmYhQSs984/pJeuXCOtNzX5Cag0", - "+B6EiO5hgNtdLxEagRPxHQfwC1YxaKjOBOUWjsDlnBZpLDcupTeIRTBAbylNaKcZWQ72HQeyc8ibnSLW", - "qdNNF2uyV3Nw1Xgvc/oUWUu4qeQthbfVJC1YhCYRjVF57LumLi3RcDRpkeeUCS/QGzTlWASinWGyNXNs", - "Ee89iMXsgHf/GE0DrpFssQJBnTj79q8Fz55a91rIK9m5eznaqrvRVCruxu8d5nx9/HYAOws6N6oJwDhm", - "UmrphcZcLc96LnYPGEviu8sA+FbXPY5tTV+61xPpd3tOsR1v3Hrh5hj4/hrNAFKA+yg1RYfRpPBZ32qH", - "I2l291uQrSAEzA/jDJMeJ6OKXeeQAyg/ADnD1zhFdXPPcWQwP8xzRq97nS5rUcABNJ/4QT4C0baTEByh", - "0Gq8FJBpq73V9EbZar6WIyrtHOEVG1Lao6hgWCwupRzTJPIaQYbYYaG5Zar+9cZS7s+/vR8MdTKiWh7V", - "Wk12LkQ+uLtTUeuZVkxYSI7UaQzg8Pxk4DhWg5ejfZORQ2COBweDf472Rz8MhgPJrAqbsdrvsYoajG/l", - "Hyfx3dhA4ONb87czFVC6UwRfeIwhZYYpUBxEkNitBlAfudvw8aDMDqLkJJYqX36i/RY3YiIRLI+0D/64", - "HUgaVkgPLKEMNK4D9xhQsAKZ1YNeJeoFVJviSvA+6c6Ii9c0VgG2iEqbXS0QzPNUHaRTMv5svOYKVF1s", - "bJRn3uak+mGrXBX1gz7OVWj/sL+/0vR7B3vvWmeGH2w+lKKaGPBCOQSzIk2VXffqAVGpn1t7kHkNY5sz", - "NgSYXMMUxyYpI4YCjjRCL31KTR8x4L9QrDv98+mwfkPZVCXy6JFfPd3IdvMIFWBGC6Lm/q+n3LETIqTb", - "kwKO2DViOo2hJquVwHGl9B+fJKeXp91aaAEttYCkVmCnZdjSBqn/sPPlg09yhLFJWT64HSRI+Nxr6cdy", - "ANPUZAkgbSlAkOBrRFR8vi1E3yLx2uZCN+SmP11VGmQznArEwFRF+pVU/LNAbFGJRdlxYjJawyIxRjOo", - "kuR1+BkSShYZLUwo+erEI14+3VN21EWnXaf7HQm06eSQANVb+jPlGBW7NDcup16a7k17JXFdIMEwkqoz", - "VYfaApNIhKlBfT1OsDDZGn1IixWED8FnOtX/N8nGfKgiGa5NzQHknEZYydkbLObl0Ho0Lym+LXHpQY3N", - "RHdFmUhEcyU+5WxD5Kn7KwOgU+H3ONyuuEEicnSixw5zRgngUuc3r4wB8WSrroyF/fBMs+iKKJgkJINE", - "ST8rY1Fewlhx/FzSmDnEFhQwQ/WhYXLtL3rkzsv2QXdg0csTcyUm5KiYRGkRq9iiQqhr9Ev8VwiD/b4o", - "rCR7W/v67KL2s70SVRe2Xeq4eZfKE4cTVMBUG9/vZucwqSWOOQvaS2TrRiAnCDGRPN5f2AWlu46aP7R8", - "t7zlSnQjOOsy3Umz6xbtgFeHxwkWBhofGjpXEk8ZKEPnUoBeCHthpluiW0x6mhnOrZiGXHGwewIu6BKA", - "ayOnv19P+H7rwu9hxU7k5Oxi1F/2tNJ9H1X4XDpuaLmjsUNMhjuxFjT/uteSdKZFL0V1NTM0IFW0kCqz", - "TZZIphwmmCgz0h7xmDslEaNcW6XlIQFvC6FfMReHaXqmRlsifc4d5nLyJPS1nMfhsLNyND2S5umCkZK5", - "AuOmWNvJa1sVa49suH8ytfkILRFWHcU/LDvrObdo5RR+xVmROTunZ+TMo53QWJJfL4HQSGwqhUDuvQZx", - "VDCGiHCFtRcHJUJCKbnN2ZQ5xGFQpRTqhifR6obXy2QKcaYTSWsKDyfINbQGlwl9BU2os02M97SEnb3f", - "VK1CFcnRsudTJfXGDGESo6+KwqlOuKlLrQvdwUotHw815aX6QIpYfahQC26OnjO6OdrQjdMLJvWHZjBp", - "TXMEWTTHJBl17p/u9vjK61KNs9NdzZF/QYsbymJ11UctUTXfwJBmx1ayt3cacqchdxryeTSkPU9eKmMl", - "45vbsRIyBFzfzY3K7N1WUOJM5z4/yjnxp0c+3fTtgjpHsjeE/z4HgmraZCtov0mWYLoAJ8e9GGCMCRcw", - "TbsYwfBX+UxBGRw036r5DwHCZTK1vkMFVPywxCyYanGiwfj5Zo0rXQ+WkuG9lR/57jrUgmXgZAZohoV6", - "asNZDifF3OYFh0Jb1WUaRxVDIUlicDD4348f4///8ePI+eMfjxBjv0cqxFlJJ2UG9PM6DA3Vx4torl+a", - "mcK43Bidpjf62wq5TXWlrLvjZmxJjpsiK4JM6t5yccfQNUY3POwan1OuNPiF6viI+V71haiMQy4gM4cu", - "zhVI53qd1/8QsFeC2MNFwtexJI7Dj4g0NFjQhJWOv71H9BjMEmSS603OD3ISgGKgSVzfpOAenQXL23nL", - "2cWmWzrmQTv+7egC/tgJkm26FwVXxn71bW+HspWy2EzGue30v+sJjt54wwymHA0f3i1fdYqeybXo61fj", - "PdmQFSg3/6n1oWI4E914UibfbOZWGyR3x26Ml5sbiX3L8qfrTrDXmX383OfwLVvXtAffn1GpKa6uTo5f", - "BAz9clYb5VZ3WMr9dWJpAuz0Xi9PuEvlrcgkd2OV0rFXxiz9tuORulRZPlXAn+W2wPPcE3ioxx3WzPH3", - "50YtVA50ednVk5C/y3/fivz3ks/1TgKHG/WDjIgJiEmTyx1OXIPPx7eR/X6Jkqzod+MZ3g/JmejG6M2G", - "UNgx9zfP3AkSRnsvHA4HU8hRDCgBWHCgr8+G+Ls67O50WM+rbk/hh9Ve0FrmhTnPB1VYbtjlsQ03AWtZ", - "Dw65ONv+6W7YacNVW7a+gdSTJPqYNy8fa2DPTXP1yEJYqG4WFW64kN+2+4Oa+KW7hG4qHgqxUF3ijtX+", - "QP1IiPco8WiOoi/l3fvqRQ/73Iu6iZ8yBOMFEPAL8hwVfjBjuPy59MDQM5KgwKIbOoKzfZ/UIGo8k8IP", - "yxyRdlIJK5D7jkG5gvaTIVDRR0DFHLEbXHs5p+M1qzZhXVnoZsmkJcHKWzObcIyXYc4xSZx10M+7qC3d", - "zFMtS8lyC2GIGdwMoR4ceFv+3TgssXrura3k9DNwHUzk8REc2PdkiVeBN07U/DXKPs3zhPZyhc5WXwfX", - "2+ySV9gaCvm2z0Uk+09t8lQ3Y3akt1rEtRJezbyjpsVdeGhMv1bwhGT27Nb8k5P2NjxIsoI1v+PGVRWB", - "eREErmzT1yyK8RSS8OHHa0g20qDQb+eBy114cOt1zmtIajTcTa3NG6D+xBX+hrKnodvtvAByj9sZy1NM", - "+EY+i7Vt0r0W+QzdfeiOd/Y+vNowy6s6H3q6EGrnPYltCJz2f/ttq+OXUcEFzbqSHbtVR+2SUHck5dHZ", - "Z/h4F4i8N77UjZ9QIGZ3/We7oj/1zGqvagg75ltG2s+pYp7oKt435s3vpMl2hRCWSZMVFGv/gMK3qGB3", - "gYkVAhOUgS3I0G8EKL7jwEjyFXgiR0wdZ3bdunmLxHnZ7V2dTfi28sm9XmKD5McYC28EpNdTCmXSlbP6", - "u6yrFY6BnIWrl21wX5G8X/i5425a4C6n/nrVOyzbbMm2s/L7xu5k3zWrntoKWBNdAWtS1s7qroxuDIja", - "wCFgn9a6FvDynmu35npwnBDkL6WoqoqqZltHschNFUXUvN9sMqDWeEfxDN2UcCw1b0VMapeo188aN5LN", - "RLpWuOm0qnBt3hNcHgfbKknbebHiMSz/skYIqfjyexNqe/G8sTbvJYEnuWj4/NcTPBis5Bzb3bR7+WKN", - "i4hDfwEj43hTkuqiwVU1VEhip1aprZ5Tf6WqNXY7p3a9IkffHus+TujQWwF3V7doV7doV7eoFVz0izY5", - "O1e2NZ7DWdXQuU98ZaNzYHcRkb9dRET5iEvuOrZLk/ONJuN+Vyo9BddXzD/ylu3+ez1waea8vXqjfufT", - "u6GG23x8psgGnEICE5TJ+S69Feqhum1MV/cxz2OH0wL19FUJGHSTLsrMKX85/fXiYHqHtygnaxf/Wuei", - "qo9iuvm8h1Yd36o/+92ke2LB4Pd9Db4PH7TSC/g3TwjTi/CNZYStyTp2xD0e6QwO733v8yJNjWYmqsZb", - "7VVB5eNFlMRFJIAFCCRAb6UKW60chjirkSOfYXKYdFVAGw6+7iV0z/yoSswdF3rI8NOPGSyLpqz+tmrj", - "OekIkkA9lV3QZXs1WMlslmSBolnFNvqobw8ygWcwEt2PEF+pzoe270MdCev98htiuq1e97ZteA0H8DqZ", - "XDOYBaBcI1W6Ema0IMrd+nBxeCrd39jey2eFa9E5VTR07UJd3NlXLXlaRF+QKNu7C9jqzuBmjhhSP9BC", - "SJI2BYQZUlU21QWa1kDq/aFJShM+SSI+UYr2wF8xUcyBoMAUeQTykxFAyQgk/GA81jjsSbTGsik8FivI", - "ZJYquesbRzXVFlF9JpcyADPDYjKHfO5dSNPulOtrD2mqA9o+4VGk8PSDkC2NOsoGe/3YpWz7OPj9999/", - "3zs93Ts+fv/TTwenpweXl//5OADf/7D/8t97L/f3Xu6/398/UP/954UXjyKG7ll9sxpMDN3qA63PEYk7", - "5mArkNp5fKZT/XT8FcFfVQsXMMslZD0pTdL/fuWl8M90OsGxT5I2qxpIx9WM50NaAhIMJwlik4KjAEur", - "oNPNnALTFcVdMDMYzTHRLxsvjcGc6s6Xqu/dcEA9lPuuWdG6IQV8SGgunSguLZnPA/tQURQEHElzQJ+A", - "SRhAzGFZaJeDt0eXikm/5y8kn7oyIMCnussw1PCDD+kcwS8dMlE2ryUQczYx9Yv8cKWJZRRDq9CRg91C", - "zCkJc4gCpfqUB4Y9NipfCMqi+VKwstMqcFX1eG8le6dur/fDgqzIWwEMVBHiDpFQFSl+GKGgjzeWMdxv", - "pq76ReE8WW6LrffUiba7N57iJmepTSjFVbm2is3r6qWuBlqqpa7UmwjX1tqRxC7lt+jXI/yG1rApF3O9", - "jLHnq33bKNMbIXxthLW1F+tFmDrqe7GqQMI9qoErDMrBOfjeFZ0vwIzRTJfiN5bPWyx+KqZAF1Y3Bi/v", - "eorxLRJX3Dyx90gn3wq+Z7Hf/bLqexPtmmhvVs8TeYuE551vZRzBNLUvWI2cxXOjVD3ftHxD2UMs6+5p", - "y01/2rJ2kGikYPAMsVclD1PJekkxj0ZMpMzd6YxXoq95qrKRdUTyyYt5SL2wSOUPkv0Gj1jbY1dY/96a", - "9H5VPUv0exYqrUKS5cIA2xNz8HJ/f3Tf6qFO1PMJq4gOBy7XP0DNmB5mzMZWkdncci51AtHinNMMSUtL", - "mMLNgSwna0zr5zjHt/bfF+rfS56t/63WudeBVRP+xjzyp63On+nUTMb/ZnyGG0b0E54fmeG3r46Kidvp", - "t/3utL0rAWgiKVg6OBjMhcj5wXgMczxScckRZcng7tPd/wUAAP//bY4vE86+AAA=", + "H4sIAAAAAAAC/+x9fW/bttb4VyH8u8BW/Bw73e29eJD/0mTtsi1pkDQddtc+vrRE22wlUiOppF6Q7/6A", + "bxIlkbLkvNmdgWFtTerwkDzvPOS5HUQ0zShBRPDBwe2ARwuUQvXXw0hgSn6m0wvE80TInzJGM8QERqoD", + "VB0mn+l0gmP5Q4x4xHAmfx0cDE5iRASeYcQAnQGxQOAznQKxwBwwBRFMUULJnANBB8OBWGZocDDggmEy", + "H9wNLXiWk47gWU76gM/FgrIm3PcLBHSbhRvRNMXCC+N6PrlmMA1AuUYMzhH4cHF4CnKOYjBd2nUooWEi", + "0BwxCW7KIIkWEwJT1IR4BlNUzBQl6BoSAeZYAP2VD72IprOlWsBZAufcj6XqpNZOdRqBH0fzEfjv3l5C", + "b/bk5P4bAJ1iMVlAvvCDlS2rF9CA8e2vBHJy3BlEijiHc+SHYxo7AxM4DUC6IvgrkM1cwDQDNwtEHJDg", + "BnKQwhgNhoMZZSkUeoP//cq731Eew8k1YlzBrw93dHV8CEyrIh8fwojELdgiEitkXQaEHMDaNEbd0J1j", + "MWEoo/6xZAvHgrIlUATsQda3zVcE/5kjgEtunlFWIKs52UUvz7F3IaQUEgzP54hNco4CjC1bwM2CAtMV", + "xXaokQ9oCqMFJmjCBdTi8R8MzQYHg/83LqXm2IjM8anufKn63g0HUlRCgcl8wpdcII+QeGd7AN0juMsZ", + "gl9aBI1s7i5lMjYheToNLVGWJwlg6M8ccQFMRx9KS7GgJEy958v3lEWLlQScLYXsd39AXEDWxrmq/QG5", + "QdJEvpIofqPsyyyhNxc5udQfqE+pVAyTGU7QKgCXuu8b2fVuOLgx8DooCdu1uVh3xS90+hlFQgI+kmrg", + "jMaoqecjKNCcsqWHeU+AbZSikCE1MKExApiDBHOB4qEmSsnVc0bzTJK77MFHYZUl2yf9pIX8xAcwRhlD", + "EkmfCUFiLJs4wLMK5uVHI3Bc/F2jDSBDYIHjGBGAtfC/OnHmMqU0QZDosZ3R6oO/ZhjNgPOb3Tc5yncc", + "zHKiDCCYYLEElIEsZxnlyLts6GuGGE4RETDpMU/3syHguaIGICiIFpDMEZfDMpTSa5j4Z2iRbKdERARb", + "7mUUE1FMS46CvqIoFyXReKeGSZaLifzZY8Acoxkmcl6yE8gggykSiHEfIJoLCQnziSRMz4boaYFrmOQK", + "pFo0Sa54BhCMFkBDkAsHFXFLfLFAqUKsuTjmF8gYXMp/MyRyRhTncv+CcbtieiSuqDtKIJMkgEnB0tw3", + "cjnTwMCBNbzMUCR5iauRVSeJhkUhYzTOo1KtBDbKJ1N+ZExb2FV5EiMBccJ9+lATPNA9UAww0ZJYEgyc", + "0lxoepJwJW0uMDGLxBCnybXaLNFvcYKG4yGIEgQZgCQGESUR5sjHrgqZPgtygXhGCfcIWmTXq2mIlEg2", + "h5HKGjMp4P4wIMoPPnmweIvFkTJXL/M0hVqu1zy7B/CNWp0ZCUh3cPRG04Z+ZLcjjBtxpFcYjDYBJrxc", + "RhjHWNPweWVBPWqpSmopzBTT1S1CQYEeBWQQu3Kt3M7CcPFPpZ+zEkOB9pQR1YmgKzZv03DI8kkEMxhh", + "4TEe3lMBE3B0fgWoRstY214tEGP+ZSUs2QnYTl2gzrN8on/0rdzb8yslDkcAzUfg7MPJ8ckheI94AsEv", + "/7Pv11VYYJhMoiz3aWHVqGYMryFO4DRBYIpm1NC/tEeVlcpHbcDlLMPQ1RrcB7zXy7DQpY+xDnDrSK22", + "Wtu2K0UpZcuVZKC7ddl/ysN+h6SABjua3poirqY5ETn4FZP8K/hhf7T/yuvg4GwyYwj9FaCyDGdAtxuV", + "6wMifT/l20w4Ytinxd9Lx0Y3yqU03iCcuxO3fOvjZL/pXyqCpgwNegUqqmTdAseo9TN21UZuttMbklAY", + "B6JX2kGVoxQdA0M6rhuOfBt+dfGrFLeOCS77jboGMlR4oeGerJp+Is1y4VJhmzMoN+mD6So/xhEyhoSH", + "rqBY2Pn8enL049nlj0A6nNZpMXMswzZe/MIqMsY8S+CyoiqD08zyaYL5Qscc2mZ4XnSUho3ivfbgquFP", + "PwImfKFsJDvNLhu/YlG6uf5ys1yfX9kK2rINRiikotddVi6pMBHdrpZuiOkvi8kgkqfShix/PYwEvpam", + "QPnTMUqQUBGY8rfXkBAUO7ZmiYNLsU37QHmZCfVssTFN5SpYX1SaKpp4MbeCuD2EwNtMsratK4MhvlWL", + "GIICxYciwBhQIOU0qFiT3EMbupIGl/l41NHoUgEMRGJEIq/YP1SOqFwmqUfcvsC6BXXHrbtr1CN2Um5J", + "LX7iD4wYWX3FPKzwxzvtZr8jyfITMMxpv6gO5fBIWEzfg30N5Ta4mCHIQzaDbiviUoapNRF7GbnVAikW", + "tVApQzCjSUJvlFWCUkgEjmw3TOYjcJpzAaaFMnIDZB3d1ObUa9Kh0lgREpWWqqyoNBmR4Wk5RySWmHma", + "3iRwPl8taK4yyVAXOoTdS+ooy8U2G2d/Khfa5WIjj0Z9o42/LZBYIFaBtZJXfNtzjhiX8uwwihDn7+kX", + "5JOtYSFVYTC/xBIS6LryqiXoaSM8I3AIUuk7FJEeT2RFISHNMCKkVItBHoh+9osUF+GPLsdKfvvnwojW", + "EagHCxTOI3AEiWRBCDhOs6QSN/Irc7uJLTulB8vM5gOodt8OKHsAHeeTi5lLwHrv/EN66co10jpTk5+A", + "CoPvQYjoHga43fUCoRE4Ed9xAL9gFYOG6kxQbuEIXC5onsRy4xJ6g1gEA/SW0DltNSOLwb7jQHYOebNT", + "xFp1uuliTfZyDq4a72ROnyJrCdeVvKXwppqkOYvQJKIxKo5919SlBRqOJs2zjDLhBXqDphyLQLQzTLZm", + "jg3ivQexmB3w7h+jScA1ki1WIKgTZ9/+NeDZU+tOC3klO7cvR1N115oKxV37vcWcr47fDGCnQedGNQEY", + "x0xKLb3QmKvlWc/F7gBjRXx3FQDf6rrHsY3pS/d6Iv1uzym2441bL9wcA99foxlACnAXpaboMJrkPutb", + "7XAkze5uC7IVhID5YZxi0uFkVLHrAnIA5QcgY/gaJ6hq7jmODOaHWcbodafTZS0KOIDmEz/IRyDaZhKC", + "IxQajZcCMm21N5reKFvN13JEpZ0jvGJDSnsU5QyL5aWUY5pEXiPIEDvMNbdM1b/eWMr9+bf3g6FORlTL", + "o1rLyS6EyAZ3dypqPdOKCQvJkTqNARyenwwcx2rwcrRvMnIIzPDgYPDP0f7oh8FwIJlVYTNW+z1WUYPx", + "rfzjJL4bGwh8fGv+dqYCSneK4HOPMaTMMAWKgwgSu9UA6iN3Gz4eFNlBlJzEUuXLT7Tf4kZMJILFkfbB", + "H7cDScMK6YEllIHGdeAeAwqWI7N60KtEvYAqU+wF75PujLh4TWMVYIuotNnVAsEsS9RBOiXjz8ZrLkFV", + "xcZGeeZNTqoetspVUT/o41yF9g/7+72m3znYe9c4M/xg86EU1cSA58ohmOVJouy6Vw+ISvXc2oPMaxjb", + "nLEhwOQaJjg2SRkxFHCkEXrpU2r6iAH/hWLd6Z9Ph/UbyqYqkUeP/OrpRrabR6gAM5oTNfd/PeWOnRAh", + "3Z4EcMSuEdNpDBVZrQSOK6X/+CQ5vTjt1kILaKkFJLUCOy3DljZI/YedLx98kiOMTcrywe1gjoTPvZZ+", + "LAcwSUyWANKWAgRzfI2Iis83hehbJF7bXOia3PSnq0qDbIYTgRiYqki/kop/5ogtS7EoO05MRmtYJMZo", + "BlWSvA4/Q0LJMqW5CSVfnXjEy6d7yo6q6LTrdL8jgSadHBKgekt/phijZJf6xmXUS9Odaa8grgskGEZS", + "dSbqUFtgEokwNaivx2rt97QSn8LoywwnygrNqA4EVumliPLz17ZvY0s8k7SdyxzinbjtL243RdgVBGd2", + "09yH0Kme05IurCQricbIsjkWJkOoizhjOeFD8JlO9f9NgjsfquiZ68dxADmnEVa6/QaLRUHuejSv+Htb", + "4NJBAtYvVyhpiES0UDQkOSwkEnV/ZXS2GpkdEipKCSwROTrRY4elcQHgUufU98aAeDKke2NhPzzTaqEn", + "CibxzSBR0E9vLIqLPz3HzySNmcQJQQEzkjY0TKZjFB5d97KZXBFY9CJLQ6kmOSomUZLHKp6tEGob/RL/", + "FcJgvysKvfR9Y1+fXb1/ttfwqgq+TSrW7+95Yr+CCphoh+/d7BzOK8mKzoJ2MhN0I5AThJhIHu8u7IIW", + "hT6peWibwvKWa0UYwVmV6U5qZ7toB7xMWJhjYaDxoaFzJfGUUTx0LqLohbCXtNolusWko2nr3MSqyRUH", + "uyfggjYBuDZy+vv1hO+3LvweVuxETp44Rt1lTyPF/FGFz6UT+ih2NHaIyXAn1oLmX/daktZU/JWo9nN9", + "AlJFC6kiw2mFZMrgHBNlRtpjRXOPKWKUa6u0OJjiTSH0K+biMEm0ybtC+pw7zOXk5uirYI/DYWfFaHok", + "zdM5IwVzBcZNsLaT17Yq1h7ZcP9kanNgGiKsTP94WHbWc27Qyin8itM8dXZOz8iZRzOJtiC/TgKhlkxX", + "CIHMe/XmKGcMEeEKay8OSoSE0sDrsyny1sOgCinUDk+i1Q6vk8kU4kwnnFAXHo6nP7QGl/H/gybU2SbG", + "GBvCzt6pK1eh9Lldd1sHdxjCJEZfw7GdC93BSi0fD9XlpfpAilh9kFUJqI+eM8Qz2tCN0wsm9YdmMGlN", + "cwRZtMBkPmrdP93t8ZXXpRpnp7vqI/+CljeUxep6mVqicr6BIc2O9bK3dxpypyF3GvJ5NKTNYVgpYyXj", + "mxvZEjIEXN8Hj4qM8UZQ4kzn2z9KbsKnRz5R9+2COru0t9L/PofQatpkK2i/TpZgugQnx50YYIwJF1Cf", + "A4YYwfBX8TRGERw036r5DwHCRQK/vrcHVPywwCyY3nOiwfj5Zo1rhA+WBuR9CSLy3a+pBMvAyQzQFAv1", + "vIuzHM61BpuLHgptlRe4HFUMhSSJwcHgfz9+jP//x48j549/PEKM/R7pN2cFnRRZ98/rMNRUH8+jhX7d", + "aArjYmN0aujobyvkNtWVsu6OmyUoOW6KrAgy6aKrxR1D1xjd8LBrfE650uAXquMj5hhWF6I0DrmAzBy6", + "ONdunSudXv9DwE5JiQ8XCV/HkjgOP1xT02BBE1Y6/vbu2mMwS5BJrjc5J81JOouBJnF9e4d7dBYsboSu", + "Zheb4uuYB834t6ML+GMn5TbpXuRcGfvlt50dykaabD0B7LbV/64m1XrjDTOYcDR8eLe87xQ9k2vQ16/G", + "e7IhK1Bs/lPrQ8VwJrrxpEy+2cytNkjujt0YLzfXkklX5exXnWCvM/v4+fbhm92uaQ++P6NSU1xdnRy/", + "CBj6xaw2yq1usZS768TCBNjpvU6ecJvK68kkd27ubEvKrLrI6+RAPscNlee5m/JQD4qsea/Enxu1VHn3", + "xQVrzyWQ3Z2LrbhzUfC53kngcKN+BBQxATGpc3kjG7kXn49vI/v9CiVZ0u/GM7wfkjPRjdGbNaGwY+5v", + "nrnnSBjtvXQ4HEwhRzGgBGDBgb6yHeLv8rC71WE9L7s9hR9WebVtlRfmPFlVYrlhN2g23ASsZD045OJs", + "+6e7YasNV27Z+gZSR5LoYt68fKyBPa8bqIc9wkJ1s6hwd43rQe+sauKX7hK6KXkoxEJViTtW+wP1wzTe", + "o8SjBYq+FO89lK/I2CeG1OsPCUMwXgIBvyDPUeEHM4bLnysPDD0jCQosuqEjONv3SQ2i2tM8/LDIEWkm", + "lbAcuW9nFCtoPxkCFX0EVCwQu8GV15paXlBrEtaVhW6WTFoSrLg1swnHeCnmHJO5sw76SSG1pZt5qmUp", + "WW4hDDGDmyHUgQNvi78bhyVWTww2lZx+erCFiTw+ggP7nizxKvCujpq/RtmneZ7QXi7R2eonCPQ2u+QV", + "toZCvu1zEcn+U5s85c2YHen1i7iWwqued1S3uHMPjekXMp6QzJ7dmn9y0t6GR3B6WPM7buyrCMwrNLC3", + "TV+xKMZTSMKHH68h2UiDQr/XCC534cGt1zmvIanQcDu11m+A+hNX+BvKnoZut/MCyD1uZ6xOMeEb+TbQ", + "tkn3SuQzdPehPd7Z+fBqwyyv8nzo6UKorfcktiFw2v0BrK2OX0Y5FzRtS3ZsVx2VS0LtkZRHZ5/h410g", + "8t74Ujd+QoGY3fWf7Yr+VDOrvaoh7JhvGWk/p4p5oqt435g3v5Mm2xVCWCVNeijW7gGFb1HB7gITPQIT", + "lIEtyNCvBSi+48BI8h48kSGmjjPbbt28ReK86PauyiZ8W/nkXi+xQfJjjIU3AtLpKYUi6cpZ/V3WVY9j", + "IGfhqqVC3Fck7xd+brmbFrjLqb/ue4dlmy3ZZlZ+19id7LtmpV1bdW2iq65Ninpt7dX4jQFRGTgE7NNa", + "1wJe3nPt1lwPjucE+ct3qkq2qtnW7swzU7kT1e83mwyoNd5RPEM3BRxLzVsRk9ol6nWzxo1kM5GuHjed", + "+grX+j3B1XGwrZK0rRcrHsPyL+rSkJIvvzehthfPG2vzXhJ4kouGz389wYNBL+fY7qbdyxdrXEQc+otm", + "GcebkkQXqi4r8EISO/VxbcWm6itVjbGbObXrFdb69lj3cUKH3qrLu1pZu1pZu1pZjeCiX7TJ2bmyrfYc", + "Tl9D5z7xlY3Ogd1FRP52ERHlI66469gsh883moy7Xan0FPnvmX/kLRX/93rg0sx5e/VG9c6nd0MNt/n4", + "TJENOIUEzlEq57vyVqiH6rYxXd3HPI8dTitigp4SMOgmWRaZU95tXDMOpnd4i3KydvGvdS6q+iimnc87", + "aNXxrfqz2026JxYMft/X4PvwQSu9gH/zhDC9CN9YRtiarGNH3OORzuDw3vc+z5PEaGaiarxVXhVUPl5E", + "SZxHAliAQAL0VqqwFfJhiLNqOfIpJofztgpow8HXvTndMz+qEnPHuR4y/PRjCouiKf3fVq09Jx1BEqin", + "sgu6fAMVcy3JAkWzim30Ud8eZALPYCTaHyG+Up0Pbd+HOhLW++U3xHRbte5t0/AaDuD1fHLNYBqAco1U", + "6UqY0pwod+vDxeGpdH9jey+f5a5F51TR0LULdUFxX4XuaR59QaJoby9gqzuDmwViSP1AcyFJ2hQQZkhV", + "2VQXaBoDqfeHJgmd88k84hOlaA/8FRPFAggKTJFHID8ZATQfgTk/GI81DnsSrbFsCo/FcjKZJUru+sZR", + "TZVF1KWYWU4CMFMsJgvIF96FNO1Oub7mkKY6oO0THkUKTz8I2VKro2yw149dyraPg99///33vdPTvePj", + "9z/9dHB6enB5+Z+PA/D9D/sv/733cn/v5f77/f0D9d9/XnjxyGPontXXq8HE0K0+0PgckbhlDrYCqZ3H", + "ZzrVT8dfEfxVtXAB00xC1pPSJP3vV14K/0ynExz7JGm9qoF0XM14PqQlIFOVe5JzFGBpFXS6WdCyHHsb", + "zBRGC0z0y8YrYzCnuvOl6ns3HFAP5b6rV7SuSQEfEppLJ4pLC+bzwD5UFAUBR9Ic0CdgEgYQC1gU2uXg", + "7dGlYtLv+QvJp64MCPCp7jIMNfzgQzpD8EuLTJTNawnEjE1M/SI/XGliGcXQKHTkYLcUC0rCHKJAqT7F", + "gWGHjcqWgrJosRKs7NQHLkMZ9RUiqdTt9X6Yk568FcBAFSFuEQllkeKHEQr6eGMVw/1m6qpf5M6T5bbY", + "ekedaLt74ylucpbahEJcFWur2LyqXqpqoKFaqkq9jnBlrR1J7FJ+g349wm9oDZtiMdfLGHu+2re1Mr0R", + "wtdGWFt7sVqEqaW+FysLJNyjGrjCoBicg+9d0fkCzBhNdSl+Y/m8xeKnfAp0YXVj8PK2pxjfInHFzRN7", + "j3TyreB7FvvdL33fm2jWRHvTP0/kLRKed76VcQSTxL5gNXIWz41SdXzT8g1lD7Gsu6ctN/1py8pBopGC", + "wTPETpU8TCXrFcU8ajGRInenNV6JvmaJykbWEcknL+Yh9cIykT9I9hs8Ym2PXWH9e2vS+1X1LNDvWKi0", + "DEkWCwNsT8zBy/390X2rhzpRzyesIjocuFz/ADVjOpgxG1tFZnPLuVQJRItzTlMkLS1hCjcHspysMa2f", + "4xzf2n9fqH+veLb+t0rnTgdWdfgb88iftjp/plMzGf+b8SmuGdFPeH5kht++Oiombqff9rvT9q4EoIkk", + "Z8ngYLAQIuMH4zHM8EjFJUeUzQd3n+7+LwAA///ub3TnQsEAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/gateways/pubsub/pubsub.go b/gateways/pubsub/pubsub.go new file mode 100644 index 0000000..4c78128 --- /dev/null +++ b/gateways/pubsub/pubsub.go @@ -0,0 +1,100 @@ +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "registry-backend/config" + "strconv" + "strings" + "time" + + "cloud.google.com/go/pubsub" + "github.com/rs/zerolog/log" +) + +type PubSubService interface { + PublishNodePack(ctx context.Context, storageURL string) error +} + +var _ PubSubService = (*pubsubimpl)(nil) + +type pubsubimpl struct { + client *pubsub.Client + config *config.Config + topic *pubsub.Topic +} + +func NewPubSubService(c *config.Config) (PubSubService, error) { + if c == nil || c.PubSubTopic == "" { + // Return a noop implementation if config is nil or storage is not enabled + log.Info().Msg("No pub sub configuration found, using noop implementation") + return &pubsubNoop{}, nil + } + + // Initialize GCP storage client + client, err := pubsub.NewClient(context.Background(), c.ProjectID) + if err != nil { + return nil, fmt.Errorf("NewPubSubService: %v", err) + } + return &pubsubimpl{ + client: client, + config: c, + topic: client.Topic(c.PubSubTopic), + }, nil +} + +// PublishNodePack implements PubSubService. +func (p *pubsubimpl) PublishNodePack(ctx context.Context, storageURL string) (err error) { + u, err := url.Parse(storageURL) + if err != nil { + return fmt.Errorf("invalid storage URL: %w", err) + } + + segments := strings.Split(u.Path, "/") + if len(segments) < 2 { + return fmt.Errorf("invalid storage URL: %w", err) + } + bucket := segments[1] + object := strings.Join(segments[2:], "/") + now := time.Now() + messagePayload := map[string]interface{}{ + "kind": "storage#object", + "id": fmt.Sprintf("%s/%s/%d", bucket, object, now.Unix()), + "selfLink": fmt.Sprintf("https://www.googleapis.com/storage/v1/b/%s/o/%s", object, bucket), + "name": object, + "bucket": bucket, + "generation": strconv.FormatInt(time.Now().Unix(), 10), + "metageneration": "1", + "mediaLink": fmt.Sprintf("https://storage.googleapis.com/%s/%s", bucket, object), + } + // Marshal the payload to JSON + jsonData, err := json.Marshal(messagePayload) + if err != nil { + return fmt.Errorf("Failed to marshal JSON: %v", err) + } + + result := p.topic.Publish(ctx, &pubsub.Message{ + Data: jsonData, + Attributes: map[string]string{ + "eventType": "OBJECT_FINALIZE", // Optional attribute for event type + }, + }) + + _, err = result.Get(ctx) + if err != nil { + return fmt.Errorf("Failed to publish message: %v", err) + } + + return +} + +var _ PubSubService = (*pubsubNoop)(nil) + +type pubsubNoop struct{} + +// PublishNodePack implements PubSubService. +func (p *pubsubNoop) PublishNodePack(ctx context.Context, storageURL string) error { + return nil +} diff --git a/gateways/pubsub/pubsub_test.go b/gateways/pubsub/pubsub_test.go new file mode 100644 index 0000000..75bef01 --- /dev/null +++ b/gateways/pubsub/pubsub_test.go @@ -0,0 +1,62 @@ +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "os" + "registry-backend/config" + "testing" + "time" + + "cloud.google.com/go/pubsub" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPublish(t *testing.T) { + projectID, ok := os.LookupEnv("PROJECT_ID") + if !ok { + t.Skip("PROJECT_ID is not set") + } + + client, err := pubsub.NewClient(context.Background(), projectID) + require.NoError(t, err) + + topic := client.Topic(fmt.Sprintf("pubsub-topic-test-%d", time.Now().Unix())) + t.Cleanup(func() { + t.Logf("Deleting topic %s", topic.ID()) + topic.Delete(context.Background()) + }) + client.CreateTopic(context.Background(), topic.ID()) + + pubsubsvc, err := NewPubSubService(&config.Config{ProjectID: projectID, PubSubTopic: topic.ID()}) + require.NoError(t, err) + + subscriptionID := fmt.Sprintf("sub-%d", time.Now().Unix()) + sub, err := client.CreateSubscription(context.Background(), subscriptionID, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: 10 * time.Second, + RetainAckedMessages: false, + }) + require.NoError(t, err) + + err = pubsubsvc.PublishNodePack(context.Background(), "https://storage.cloud.google.com/testbucket/path1/path2/file.tar.gz") + require.NoError(t, err) + pubsubsvc.(*pubsubimpl).topic.Flush() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + data := map[string]string{} + err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { + t.Log(msg) + assert.NoError(t, json.Unmarshal(msg.Data, &data)) + t.Log(data) + msg.Ack() + cancel() + }) + <-ctx.Done() + require.NoError(t, err) + assert.Equal(t, "testbucket", data["bucket"]) + assert.Equal(t, "path1/path2/file.tar.gz", data["name"]) +} diff --git a/go.mod b/go.mod index 8f665b5..cfe9197 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21.5 require ( cloud.google.com/go/monitoring v1.18.0 + cloud.google.com/go/pubsub v1.36.1 cloud.google.com/go/storage v1.38.0 entgo.io/ent v0.13.1 firebase.google.com/go v3.13.0+incompatible diff --git a/go.sum b/go.sum index d9af16c..c08c66d 100644 --- a/go.sum +++ b/go.sum @@ -11,10 +11,14 @@ cloud.google.com/go/firestore v1.14.0 h1:8aLcKnMPoldYU3YHgu4t2exrKhLQkqaXAGqT0lj cloud.google.com/go/firestore v1.14.0/go.mod h1:96MVaHLsEhbvkBEdZgfN+AS/GIkco1LRpH9Xp9YZfzQ= cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= +cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM= +cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI= cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= cloud.google.com/go/monitoring v1.18.0 h1:NfkDLQDG2UR3WYZVQE8kwSbUIEyIqJUPl+aOQdFH1T4= cloud.google.com/go/monitoring v1.18.0/go.mod h1:c92vVBCeq/OB4Ioyo+NbN2U7tlg5ZH41PZcdvfc+Lcg= +cloud.google.com/go/pubsub v1.36.1 h1:dfEPuGCHGbWUhaMCTHUFjfroILEkx55iUmKBZTP5f+Y= +cloud.google.com/go/pubsub v1.36.1/go.mod h1:iYjCa9EzWOoBiTdd4ps7QoMtMln5NwaZQpK1hbRfBDE= cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg= cloud.google.com/go/storage v1.38.0/go.mod h1:tlUADB0mAb9BgYls9lq+8MGkfzOXuLrnHXlpHmvFJoY= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= @@ -151,8 +155,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI= github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= -github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= -github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY= github.com/invopop/yaml v0.2.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -197,8 +199,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= -github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= @@ -219,8 +219,6 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= -github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= -github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= @@ -249,10 +247,6 @@ github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= -github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -290,6 +284,8 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zclconf/go-cty v1.14.2 h1:kTG7lqmBou0Zkx35r6HJHUQTvaRPr5bIAf3AoHS0izI= github.com/zclconf/go-cty v1.14.2/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= +go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8= +go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA= diff --git a/integration-tests/ban_test.go b/integration-tests/ban_test.go index 7a0b21d..ccca358 100644 --- a/integration-tests/ban_test.go +++ b/integration-tests/ban_test.go @@ -26,6 +26,7 @@ func TestBan(t *testing.T) { // Initialize the Service mockStorageService := new(gateways.MockStorageService) + mockPubsubService := new(gateways.MockPubSubService) mockSlackService := new(gateways.MockSlackService) mockDiscordService := new(gateways.MockDiscordService) mockSlackService. @@ -37,7 +38,7 @@ func TestBan(t *testing.T) { Return(nil) impl := implementation.NewStrictServerImplementation( - client, &config.Config{}, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia) + client, &config.Config{}, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia) authz := drip_authorization.NewAuthorizationManager(client, impl.RegistryService).AuthorizationMiddleware() diff --git a/integration-tests/ci_cd_integration_test.go b/integration-tests/ci_cd_integration_test.go index d5aeff5..b4d0024 100644 --- a/integration-tests/ci_cd_integration_test.go +++ b/integration-tests/ci_cd_integration_test.go @@ -27,6 +27,7 @@ func TestCICD(t *testing.T) { mockStorageService := new(gateways.MockStorageService) mockSlackService := new(gateways.MockSlackService) mockDiscordService := new(gateways.MockDiscordService) + mockPubsubService := new(gateways.MockPubSubService) mockSlackService. On("SendRegistryMessageToSlack", mock.Anything). Return(nil) // Do nothing for all slack messsage calls. @@ -35,7 +36,7 @@ func TestCICD(t *testing.T) { On("IndexNodes", mock.Anything, mock.Anything). Return(nil) impl := implementation.NewStrictServerImplementation( - client, &config.Config{}, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia) + client, &config.Config{}, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia) ctx := context.Background() now := time.Now() diff --git a/integration-tests/registry_integration_test.go b/integration-tests/registry_integration_test.go index 84987ef..9cb0172 100644 --- a/integration-tests/registry_integration_test.go +++ b/integration-tests/registry_integration_test.go @@ -109,11 +109,13 @@ type mockedImpl struct { mockSlackService *gateways.MockSlackService mockDiscordService *gateways.MockDiscordService mockAlgolia *gateways.MockAlgoliaService + mockPubsubService *gateways.MockPubSubService } func newMockedImpl(client *ent.Client, cfg *config.Config) (impl mockedImpl, authz strictecho.StrictEchoMiddlewareFunc) { // Initialize the Service mockStorageService := new(gateways.MockStorageService) + mockPubsubService := new(gateways.MockPubSubService) mockDiscordService := new(gateways.MockDiscordService) mockDiscordService.On("SendSecurityCouncilMessage", mock.Anything, mock.Anything). @@ -137,11 +139,12 @@ func newMockedImpl(client *ent.Client, cfg *config.Config) (impl mockedImpl, aut impl = mockedImpl{ DripStrictServerImplementation: implementation.NewStrictServerImplementation( - client, cfg, mockStorageService, mockSlackService, mockDiscordService, mockAlgolia), + client, cfg, mockStorageService, mockPubsubService, mockSlackService, mockDiscordService, mockAlgolia), mockStorageService: mockStorageService, mockSlackService: mockSlackService, mockDiscordService: mockDiscordService, mockAlgolia: mockAlgolia, + mockPubsubService: mockPubsubService, } authz = drip_authorization.NewAuthorizationManager(client, impl.RegistryService). AuthorizationMiddleware() @@ -862,7 +865,6 @@ func TestRegistryComfyNode(t *testing.T) { Body: pub, }) require.NoError(t, err, "should return created publisher") - // createdPublisher := (respub.(drip.CreatePublisher201JSONResponse)) tokenName := "test-token-name" tokenDescription := "test-token-description" @@ -894,24 +896,30 @@ func TestRegistryComfyNode(t *testing.T) { }) require.NoError(t, err, "should not return error") - // create another node version - _, err = withMiddleware(authz, impl.PublishNodeVersion)(ctx, drip.PublishNodeVersionRequestObject{ - PublisherId: *pub.Id, - NodeId: *node.Id, - Body: &drip.PublishNodeVersionJSONRequestBody{ - PersonalAccessToken: token, - Node: *node, - NodeVersion: *randomNodeVersion(1), - }, - }) - require.NoError(t, err, "should not return error") + // create another node versions + nodeVersionToBeBackfill := []*drip.NodeVersion{ + randomNodeVersion(1), + randomNodeVersion(2), + } + for _, nv := range nodeVersionToBeBackfill { + _, err = withMiddleware(authz, impl.PublishNodeVersion)(ctx, drip.PublishNodeVersionRequestObject{ + PublisherId: *pub.Id, + NodeId: *node.Id, + Body: &drip.PublishNodeVersionJSONRequestBody{ + PersonalAccessToken: token, + Node: *node, + NodeVersion: *nv, + }, + }) + require.NoError(t, err, "should not return error") + } t.Run("NoComfyNode", func(t *testing.T) { res, err := withMiddleware(authz, impl.GetNodeVersion)(ctx, drip.GetNodeVersionRequestObject{ NodeId: *node.Id, VersionId: *nodeVersion.Version, }) - require.NoError(t, err, "should return created node version") + require.NoError(t, err, "should not return error") require.IsType(t, drip.GetNodeVersion200JSONResponse{}, res) assert.Empty(t, res.(drip.GetNodeVersion200JSONResponse).ComfyNodes) }) @@ -1023,4 +1031,12 @@ func TestRegistryComfyNode(t *testing.T) { } assert.True(t, found) }) + + t.Run("TriggerBackfill", func(t *testing.T) { + impl.mockPubsubService.On("PublishNodePack", mock.Anything, mock.Anything).Return(nil) + res, err := withMiddleware(authz, impl.ComfyNodesBackfill)(ctx, drip.ComfyNodesBackfillRequestObject{}) + require.NoError(t, err, "should return created node version") + require.IsType(t, drip.ComfyNodesBackfill204Response{}, res) + impl.mockPubsubService.AssertNumberOfCalls(t, "PublishNodePack", len(nodeVersionToBeBackfill)) + }) } diff --git a/main.go b/main.go index 0eb9e7e..3cd842e 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ func validateEnvVars(env string) { "ALGOLIA_APP_ID", "ALGOLIA_API_KEY", "ID_TOKEN_AUDIENCE", + "PUBSUB_TOPIC", } // Add production specific variables @@ -88,6 +89,7 @@ func main() { AlgoliaAPIKey: os.Getenv("ALGOLIA_API_KEY"), IDTokenAudience: os.Getenv("ID_TOKEN_AUDIENCE"), CloudStorageBucketName: os.Getenv("CLOUD_STORAGE_BUCKET_NAME"), + PubSubTopic: os.Getenv("PUBSUB_TOPIC"), } // Construct the database connection string diff --git a/mock/gateways/mock_pubsub_service.go b/mock/gateways/mock_pubsub_service.go new file mode 100644 index 0000000..74c90d7 --- /dev/null +++ b/mock/gateways/mock_pubsub_service.go @@ -0,0 +1,20 @@ +package gateways + +import ( + "context" + "registry-backend/gateways/pubsub" + + "github.com/stretchr/testify/mock" +) + +var _ pubsub.PubSubService = &MockPubSubService{} + +type MockPubSubService struct { + mock.Mock +} + +// PublishNodePack implements pubsub.PubSubService. +func (m *MockPubSubService) PublishNodePack(ctx context.Context, storageURL string) error { + args := m.Called(ctx, storageURL) + return args.Error(0) +} diff --git a/node-pack-extract/cloudbuild.yaml b/node-pack-extract/cloudbuild.yaml index c8a4337..4df866f 100644 --- a/node-pack-extract/cloudbuild.yaml +++ b/node-pack-extract/cloudbuild.yaml @@ -34,7 +34,7 @@ steps: - | NODE="$(basename $(dirname $(dirname '$_CUSTOM_NODE_URL')))" VERSION="$(basename $(dirname '$_CUSTOM_NODE_URL'))" - curl -vsSf \ + curl -vsS --fail-with-body \ -H "Authorization: Bearer $(cat /workspace/token)" \ -H "Content-Type: application/json" \ -X "POST" "-d" "@/workspace/$_CUSTOM_NODE_NAME.json" \ diff --git a/openapi.yml b/openapi.yml index c7fa25c..36bbc3f 100644 --- a/openapi.yml +++ b/openapi.yml @@ -1858,7 +1858,35 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' - + /comfy-nodes/backfill: + post: + summary: trigger comfy nodes backfill + operationId: ComfyNodesBackfill + tags: + - ComfyNodes + responses: + '204': + description: Backfill triggered + '400': + description: Bad request, invalid input data. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '401': + description: Unauthorized + '403': + description: Forbidden + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' components: schemas: PersonalAccessToken: diff --git a/run-service-prod.yaml b/run-service-prod.yaml index 1a8f9c7..50dca7b 100644 --- a/run-service-prod.yaml +++ b/run-service-prod.yaml @@ -67,6 +67,8 @@ spec: value: https://api.comfy.org - name: CLOUD_STORAGE_BUCKET_NAME value: comfy-registry + - name: PUBSUB_TOPIC + value: comfy-registry-event resources: limits: cpu: 4000m diff --git a/run-service-staging.yaml b/run-service-staging.yaml index 96a1a4d..08a54f3 100644 --- a/run-service-staging.yaml +++ b/run-service-staging.yaml @@ -58,3 +58,5 @@ spec: value: https://stagingapi.comfy.org - name: CLOUD_STORAGE_BUCKET_NAME value: staging-comfy-registry + - name: PUBSUB_TOPIC + value: comfy-registry-event-staging diff --git a/server/implementation/api.implementation.go b/server/implementation/api.implementation.go index 4728233..e77df5f 100644 --- a/server/implementation/api.implementation.go +++ b/server/implementation/api.implementation.go @@ -5,6 +5,7 @@ import ( "registry-backend/ent" "registry-backend/gateways/algolia" "registry-backend/gateways/discord" + "registry-backend/gateways/pubsub" gateway "registry-backend/gateways/slack" "registry-backend/gateways/storage" dripservices_comfyci "registry-backend/services/comfy_ci" @@ -24,13 +25,14 @@ func NewStrictServerImplementation( client *ent.Client, config *config.Config, storageService storage.StorageService, + pubsubService pubsub.PubSubService, slackService gateway.SlackService, discordService discord.DiscordService, algolia algolia.AlgoliaService) *DripStrictServerImplementation { return &DripStrictServerImplementation{ Client: client, ComfyCIService: dripservices_comfyci.NewComfyCIService(config), - RegistryService: dripservices.NewRegistryService(storageService, slackService, discordService, algolia, config), + RegistryService: dripservices.NewRegistryService(storageService, pubsubService, slackService, discordService, algolia, config), MixpanelService: mixpanel.NewApiClient("f919d1b9da9a57482453c72ef7b16d88"), } } diff --git a/server/implementation/registry.go b/server/implementation/registry.go index 28985a4..3ffc37b 100644 --- a/server/implementation/registry.go +++ b/server/implementation/registry.go @@ -1052,3 +1052,15 @@ func (impl *DripStrictServerImplementation) GetComfyNode(ctx context.Context, re res = drip.GetComfyNode200JSONResponse(*cn) return } + +func (impl *DripStrictServerImplementation) ComfyNodesBackfill(ctx context.Context, request drip.ComfyNodesBackfillRequestObject) (drip.ComfyNodesBackfillResponseObject, error) { + log.Ctx(ctx).Info().Msg("ComfyNodesBackfill request received") + err := impl.RegistryService.TriggerComfyNodesBackfill(ctx, impl.Client) + if err != nil { + log.Ctx(ctx).Error().Msgf("Failed to trigger comfy nodes backfill w/ err: %v", err) + return drip.ComfyNodesBackfill500JSONResponse{Message: "Failed to trigger comfy nodes backfill", Error: err.Error()}, nil + } + + log.Ctx(ctx).Info().Msgf("ComfyNodesBackfill successful") + return drip.ComfyNodesBackfill204Response{}, nil +} diff --git a/server/middleware/authentication/service_account_auth.go b/server/middleware/authentication/service_account_auth.go index 62c099c..cb2e87d 100644 --- a/server/middleware/authentication/service_account_auth.go +++ b/server/middleware/authentication/service_account_auth.go @@ -17,6 +17,7 @@ func ServiceAccountAuthMiddleware() echo.MiddlewareFunc { regexp.MustCompile(`^/security-scan$`): {"GET"}, regexp.MustCompile(`^/nodes/reindex$`): {"POST"}, regexp.MustCompile(`^/nodes/[^/]+/versions/[^/]+/comfy-nodes$`): {"POST"}, + regexp.MustCompile(`^/comfy-nodes/backfill$`): {"POST"}, } return func(next echo.HandlerFunc) echo.HandlerFunc { diff --git a/server/middleware/authentication/service_account_auth_test.go b/server/middleware/authentication/service_account_auth_test.go index 082ceeb..3642e66 100644 --- a/server/middleware/authentication/service_account_auth_test.go +++ b/server/middleware/authentication/service_account_auth_test.go @@ -42,6 +42,7 @@ func TestServiceAccountAllowList(t *testing.T) { {"Reindex Nodes", "/nodes/reindex", "POST", false}, {"Reindex Nodes", "/security-scan", "GET", false}, {"Create Comfy-Nodes", "/nodes/test/versions/1.0.0/comfy-nodes", "POST", false}, + {"Backfill Comfy-Nodes", "/comfy-nodes/backfill", "POST", false}, } for _, tt := range tests { diff --git a/server/server.go b/server/server.go index 83d7789..04ceaa8 100644 --- a/server/server.go +++ b/server/server.go @@ -7,6 +7,7 @@ import ( "registry-backend/ent" "registry-backend/gateways/algolia" "registry-backend/gateways/discord" + "registry-backend/gateways/pubsub" "registry-backend/gateways/slack" "registry-backend/gateways/storage" handler "registry-backend/server/handlers" @@ -27,6 +28,7 @@ import ( type ServerDependencies struct { StorageService storage.StorageService + PubSubService pubsub.PubSubService SlackService slack.SlackService AlgoliaService algolia.AlgoliaService DiscordService discord.DiscordService @@ -58,6 +60,12 @@ func initializeDependencies(config *config.Config) (*ServerDependencies, error) return nil, err } + pubsubService, err := pubsub.NewPubSubService(config) + if err != nil { + log.Error().Err(err).Msg("Failed to initialize pub/sub service") + return nil, err + } + slackService := slack.NewSlackService(config) algoliaService, err := algolia.NewAlgoliaService(config) @@ -77,6 +85,7 @@ func initializeDependencies(config *config.Config) (*ServerDependencies, error) return &ServerDependencies{ StorageService: storageService, SlackService: slackService, + PubSubService: pubsubService, AlgoliaService: algoliaService, DiscordService: discordService, MonitoringClient: *mon, @@ -115,7 +124,8 @@ func (s *Server) Start() error { // Attach implementation of the generated OAPI strict server impl := implementation.NewStrictServerImplementation( - s.Client, s.Config, s.Dependencies.StorageService, s.Dependencies.SlackService, + s.Client, s.Config, s.Dependencies.StorageService, s.Dependencies.PubSubService, + s.Dependencies.SlackService, s.Dependencies.DiscordService, s.Dependencies.AlgoliaService) // Define middleware for authorization diff --git a/services/registry/registry_svc.go b/services/registry/registry_svc.go index a433400..d0bd607 100644 --- a/services/registry/registry_svc.go +++ b/services/registry/registry_svc.go @@ -23,6 +23,7 @@ import ( "registry-backend/ent/user" "registry-backend/gateways/algolia" "registry-backend/gateways/discord" + "registry-backend/gateways/pubsub" gateway "registry-backend/gateways/slack" "registry-backend/gateways/storage" "registry-backend/mapper" @@ -40,15 +41,17 @@ import ( type RegistryService struct { storageService storage.StorageService + pubsubService pubsub.PubSubService slackService gateway.SlackService algolia algolia.AlgoliaService discordService discord.DiscordService config *config.Config } -func NewRegistryService(storageSvc storage.StorageService, slackSvc gateway.SlackService, discordSvc discord.DiscordService, algoliaSvc algolia.AlgoliaService, config *config.Config) *RegistryService { +func NewRegistryService(storageSvc storage.StorageService, pubsubService pubsub.PubSubService, slackSvc gateway.SlackService, discordSvc discord.DiscordService, algoliaSvc algolia.AlgoliaService, config *config.Config) *RegistryService { return &RegistryService{ storageService: storageSvc, + pubsubService: pubsubService, slackService: slackSvc, discordService: discordSvc, algolia: algoliaSvc, @@ -104,15 +107,15 @@ func PrettifyJSON(input string) (string, error) { var temp interface{} err := json.Unmarshal([]byte(input), &temp) if err != nil { - return "", fmt.Errorf("invalid JSON input: %v", err) + return "", fmt.Errorf("invalid JSON input: %v", err) } - + // Marshal back to JSON with indentation pretty, err := json.MarshalIndent(temp, "", " ") if err != nil { - return "", fmt.Errorf("failed to marshal JSON: %v", err) + return "", fmt.Errorf("failed to marshal JSON: %v", err) } - + return string(pretty), nil } @@ -685,6 +688,29 @@ func (s *RegistryService) GetComfyNode(ctx context.Context, client *ent.Client, return nv.Edges.ComfyNodes[0], nil } +func (s *RegistryService) TriggerComfyNodesBackfill(ctx context.Context, client *ent.Client) error { + nvs, err := client.NodeVersion. + Query(). + WithStorageFile(). + Where(nodeversion.Not(nodeversion.HasComfyNodes())). + All(ctx) + if err != nil { + return fmt.Errorf("failed to query node versions: %w", err) + } + for i, nv := range nvs { + if nv.Edges.StorageFile.FileURL == "" { + continue + } + + log.Ctx(ctx).Info().Msgf("backfilling comfy node: %s", nv.Edges.StorageFile.FileURL) + err := s.pubsubService.PublishNodePack(ctx, nv.Edges.StorageFile.FileURL) + if err != nil { + return fmt.Errorf("fail to trigger node pack backfil for node %s-%s at index %d", nv.NodeID, nv.Version, i) + } + } + return nil +} + func (s *RegistryService) AssertPublisherPermissions(ctx context.Context, client *ent.Client, publisherID string, @@ -1036,7 +1062,7 @@ func (s *RegistryService) PerformSecurityCheck(ctx context.Context, client *ent. err = s.discordService.SendSecurityCouncilMessage( fmt.Sprintf("Security issues were found in node %s@%s. Status is flagged. "+ "Please check it here: https://registry.comfy.org/nodes/%s/versions/%s. \n "+ - "Issues are: \n%s", nodeVersion.NodeID, nodeVersion.Version, nodeVersion.NodeID, nodeVersion.Version, + "Issues are: \n%s", nodeVersion.NodeID, nodeVersion.Version, nodeVersion.NodeID, nodeVersion.Version, prettyIssues), false) if err != nil { log.Ctx(ctx).Error().Err(err).Msgf("failed to send message to discord")