diff --git a/.gitignore b/.gitignore index c37041316e..4d6c65acb2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode openapi/build/generate.json-e openapi/build/generate.json.bak coverage.out diff --git a/components/payments/cmd/root.go b/components/payments/cmd/root.go index acaecedb1c..3cd5b6f52e 100644 --- a/components/payments/cmd/root.go +++ b/components/payments/cmd/root.go @@ -17,6 +17,7 @@ import ( "github.com/formancehq/stack/libs/go-libs/health" "github.com/formancehq/stack/libs/go-libs/licence" "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces" + "github.com/formancehq/stack/libs/go-libs/publish" "github.com/formancehq/stack/libs/go-libs/service" "github.com/formancehq/stack/libs/go-libs/temporal" "github.com/spf13/cobra" @@ -37,6 +38,7 @@ const ( configEncryptionKeyFlag = "config-encryption-key" listenFlag = "listen" stackFlag = "stack" + stackPublicURLFlag = "stack-public-url" ) func NewRootCommand() *cobra.Command { @@ -99,6 +101,7 @@ func commonOptions(cmd *cobra.Command) (fx.Option, error) { listen, _ := cmd.Flags().GetString(listenFlag) stack, _ := cmd.Flags().GetString(stackFlag) + stackPublicURL, _ := cmd.Flags().GetString(stackPublicURLFlag) return fx.Options( fx.Provide(func() *bunconnect.ConnectionOptions { @@ -119,10 +122,11 @@ func commonOptions(cmd *cobra.Command) (fx.Option, error) { ), auth.FXModuleFromFlags(cmd), health.Module(), + publish.FXModuleFromFlags(cmd, service.IsDebug(cmd)), licence.FXModuleFromFlags(cmd, ServiceName), storage.Module(cmd, *connectionOptions, configEncryptionKey), api.NewModule(listen, service.IsDebug(cmd)), - engine.Module(pluginPaths, stack), + engine.Module(pluginPaths, stack, stackPublicURL), v2.NewModule(), v3.NewModule(), ), nil diff --git a/components/payments/cmd/server.go b/components/payments/cmd/server.go index b63bab649d..ab817f47e1 100644 --- a/components/payments/cmd/server.go +++ b/components/payments/cmd/server.go @@ -8,6 +8,7 @@ import ( "github.com/formancehq/stack/libs/go-libs/licence" "github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics" "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces" + "github.com/formancehq/stack/libs/go-libs/publish" "github.com/formancehq/stack/libs/go-libs/service" "github.com/formancehq/stack/libs/go-libs/temporal" "github.com/sirupsen/logrus" @@ -28,6 +29,7 @@ func newServer() *cobra.Command { otlpmetrics.AddFlags(cmd.Flags()) otlptraces.AddFlags(cmd.Flags()) auth.AddFlags(cmd.Flags()) + publish.AddFlags(ServiceName, cmd.Flags()) bunconnect.AddFlags(cmd.Flags()) iam.AddFlags(cmd.Flags()) temporal.AddFlags(cmd.Flags()) diff --git a/components/payments/go.mod b/components/payments/go.mod index ac4c4f3c61..9c7545703f 100644 --- a/components/payments/go.mod +++ b/components/payments/go.mod @@ -5,6 +5,7 @@ go 1.22 toolchain go1.22.6 require ( + github.com/ThreeDotsLabs/watermill v1.3.5 github.com/bombsimon/logrusr/v3 v3.1.0 github.com/formancehq/stack/libs/go-libs v0.0.0-20230221161632-e6dc6a89a85e github.com/gibson042/canonicaljson-go v1.0.3 @@ -36,7 +37,11 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect - github.com/ThreeDotsLabs/watermill v1.3.5 // indirect + github.com/IBM/sarama v1.43.3 // indirect + github.com/ThreeDotsLabs/watermill-http/v2 v2.3.0 // indirect + github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.1 // indirect + github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.2 // indirect + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect github.com/aws/aws-sdk-go-v2 v1.30.4 // indirect github.com/aws/aws-sdk-go-v2/config v1.27.28 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.28 // indirect @@ -54,9 +59,15 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-chi/chi v4.0.2+incompatible // indirect + github.com/go-chi/render v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect @@ -68,6 +79,7 @@ require ( github.com/gogo/status v1.1.1 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect github.com/gorilla/mux v1.8.0 // indirect @@ -75,13 +87,22 @@ require ( github.com/gorilla/securecookie v1.1.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -95,11 +116,16 @@ require ( github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 // indirect github.com/muhlemmer/gu v0.3.1 // indirect github.com/muhlemmer/httpforwarded v0.1.0 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.0.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pborman/uuid v1.2.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/riandyrn/otelchi v0.9.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/rs/cors v1.11.1 // indirect @@ -117,6 +143,9 @@ require ( github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.1 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xo/dburl v0.23.2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/zitadel/oidc/v2 v2.12.0 // indirect diff --git a/components/payments/go.sum b/components/payments/go.sum index bbdc9e18f8..27a0c47902 100644 --- a/components/payments/go.sum +++ b/components/payments/go.sum @@ -393,6 +393,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= @@ -400,7 +402,15 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= +github.com/ThreeDotsLabs/watermill-http/v2 v2.3.0 h1:EMLYXiuFhza/p5LgoM+R49JHYbkXuNLNC6tlXyG/x0E= +github.com/ThreeDotsLabs/watermill-http/v2 v2.3.0/go.mod h1:Ily2cdTrvlj9dLB8BqAy5OIzWwS8B5WrFh+Ey7wOgQQ= +github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.1 h1:xqSjxMpZUROIjFTLqmKDJfOn/1zbqagcOcKE4xSxUzc= +github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.1/go.mod h1:VPGwfsuZOEBcS2DKuq8DYMAMzir/eqCSXbNvMUy5bvs= +github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.2 h1:/87LcdSzUEdCKbJptaLE987hOVOs852b+v5pukegggo= +github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.2/go.mod h1:uslCjpuzANBzawXYlwx2IDyGjpv9M42U2TQH6JMMQis= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= github.com/aws/aws-sdk-go-v2/config v1.27.28 h1:OTxWGW/91C61QlneCtnD62NLb4W616/NM1jA8LhJqbg= @@ -468,6 +478,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4 h1:/xc676lCNA8jgPF2PW1FFpvRgDSciRz1z09ShIsVgTo= +github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4/go.mod h1:xLagu9ssYlykwO0rMuogWgQbqKF/96Et0ve0G9xnAHk= github.com/docker/cli v26.1.4+incompatible h1:I8PHdc0MtxEADqYJZvhBrW9bo8gawKwwenxRM7/rLu8= github.com/docker/cli v26.1.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/docker v27.2.1+incompatible h1:fQdiLfW7VLscyoeYEBz7/J8soYFDZV1u6VW6gJEjNMI= @@ -476,6 +488,12 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -496,11 +514,17 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gibson042/canonicaljson-go v1.0.3 h1:EAyF8L74AWabkyUmrvEFHEt/AGFQeD6RfwbAuf0j1bI= github.com/gibson042/canonicaljson-go v1.0.3/go.mod h1:DsLpJTThXyGNO+KZlI85C1/KDcImpP67k/RKVjcaEqo= +github.com/go-chi/chi v4.0.2+incompatible h1:maB6vn6FqCxrpz4FqWdh4+lwpyZIQS7YEAUcHlgXVRs= +github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8= +github.com/go-chi/render v1.0.1/go.mod h1:pq4Rr7HbnsdaeHagklXub+p6Wd16Af5l9koip1OvJns= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -569,6 +593,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -639,6 +665,7 @@ github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E= github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM= github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= @@ -647,14 +674,22 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4Zs github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI= github.com/hashicorp/go-plugin v1.6.1/go.mod h1:XPHFku2tFo3o3QKFgSYo+cghcUhw1NA1hZyMK0PWAw0= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0= @@ -674,6 +709,18 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jeremija/gosubmit v0.2.7 h1:At0OhGCFGPXyjPYAsCchoBUhE099pcBXmsb4iZqROIc= github.com/jeremija/gosubmit v0.2.7/go.mod h1:Ui+HS073lCFREXBbdfrJzMB57OI/bdxTiLtrDHHhFPI= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= @@ -684,6 +731,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -724,6 +773,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -736,6 +787,16 @@ github.com/muhlemmer/gu v0.3.1 h1:7EAqmFrW7n3hETvuAdmFmn4hS8W+z3LgKtrnow+YzNM= github.com/muhlemmer/gu v0.3.1/go.mod h1:YHtHR+gxM+bKEIIs7Hmi9sPT3ZDUvTN/i88wQpZkrdM= github.com/muhlemmer/httpforwarded v0.1.0 h1:x4DLrzXdliq8mprgUMR0olDvHGkou5BJsK/vWUetyzY= github.com/muhlemmer/httpforwarded v0.1.0/go.mod h1:yo9czKedo2pdZhoXe+yDkGVbU0TJ0q9oQ90BVoDEtw0= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM= +github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -756,6 +817,8 @@ github.com/ory/dockertest/v3 v3.11.0 h1:OiHcxKAvSDUwsEVh2BjxQQc/5EHz9n0va9awCtNG github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4SB194iUDnUI= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -768,6 +831,8 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/riandyrn/otelchi v0.9.0 h1:BuQxXR7/JF2yYOQl21Yyz5d52hns/96ecAaPUZiKQzc= github.com/riandyrn/otelchi v0.9.0/go.mod h1:iX30kllzThsf8oEcEbl3GifPJZtN4cnCWUUc+UhE4yM= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= @@ -839,6 +904,12 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= @@ -946,6 +1017,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1041,6 +1113,7 @@ golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= diff --git a/components/payments/internal/api/services/bank_accounts_forward_to_connector.go b/components/payments/internal/api/services/bank_accounts_forward_to_connector.go index ab9c36a311..4edb93be33 100644 --- a/components/payments/internal/api/services/bank_accounts_forward_to_connector.go +++ b/components/payments/internal/api/services/bank_accounts_forward_to_connector.go @@ -8,5 +8,5 @@ import ( ) func (s *Service) BankAccountsForwardToConnector(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) { - return s.engine.CreateBankAccount(ctx, bankAccountID, connectorID) + return s.engine.ForwartBankAccount(ctx, bankAccountID, connectorID) } diff --git a/components/payments/internal/api/services/connectors_reset.go b/components/payments/internal/api/services/connectors_reset.go index f53ffdcf32..b43a222d93 100644 --- a/components/payments/internal/api/services/connectors_reset.go +++ b/components/payments/internal/api/services/connectors_reset.go @@ -7,15 +7,6 @@ import ( ) func (s *Service) ConnectorsReset(ctx context.Context, connectorID models.ConnectorID) error { - config, err := s.ConnectorsConfig(ctx, connectorID) - if err != nil { - return err - } - - if err := s.engine.UninstallConnector(ctx, connectorID); err != nil { - return err - } - - _, err = s.engine.InstallConnector(ctx, connectorID.Provider, config) - return err + err := s.engine.ResetConnector(ctx, connectorID) + return handleEngineErrors(err) } diff --git a/components/payments/internal/api/services/pools_add_account.go b/components/payments/internal/api/services/pools_add_account.go index 94cf2c0fb0..f7cca80ee5 100644 --- a/components/payments/internal/api/services/pools_add_account.go +++ b/components/payments/internal/api/services/pools_add_account.go @@ -8,5 +8,6 @@ import ( ) func (s *Service) PoolsAddAccount(ctx context.Context, id uuid.UUID, accountID models.AccountID) error { - return s.storage.PoolsAddAccount(ctx, id, accountID) + err := s.engine.AddAccountToPool(ctx, id, accountID) + return handleEngineErrors(err) } diff --git a/components/payments/internal/api/services/pools_create.go b/components/payments/internal/api/services/pools_create.go index 55e62f7b10..929cd942ee 100644 --- a/components/payments/internal/api/services/pools_create.go +++ b/components/payments/internal/api/services/pools_create.go @@ -7,5 +7,6 @@ import ( ) func (s *Service) PoolsCreate(ctx context.Context, pool models.Pool) error { - return s.storage.PoolsUpsert(ctx, pool) + err := s.engine.CreatePool(ctx, pool) + return handleEngineErrors(err) } diff --git a/components/payments/internal/api/services/pools_delete.go b/components/payments/internal/api/services/pools_delete.go index 0c92dc00d7..2e41a23ba0 100644 --- a/components/payments/internal/api/services/pools_delete.go +++ b/components/payments/internal/api/services/pools_delete.go @@ -7,5 +7,6 @@ import ( ) func (s *Service) PoolsDelete(ctx context.Context, id uuid.UUID) error { - return s.storage.PoolsDelete(ctx, id) + err := s.engine.DeletePool(ctx, id) + return handleEngineErrors(err) } diff --git a/components/payments/internal/api/services/pools_remove_account.go b/components/payments/internal/api/services/pools_remove_account.go index 0d0207c22c..84163cb460 100644 --- a/components/payments/internal/api/services/pools_remove_account.go +++ b/components/payments/internal/api/services/pools_remove_account.go @@ -8,5 +8,6 @@ import ( ) func (s *Service) PoolsRemoveAccount(ctx context.Context, id uuid.UUID, accountID models.AccountID) error { - return s.storage.PoolsRemoveAccount(ctx, id, accountID) + err := s.engine.RemoveAccountFromPool(ctx, id, accountID) + return handleEngineErrors(err) } diff --git a/components/payments/internal/connectors/engine/activities/activity.go b/components/payments/internal/connectors/engine/activities/activity.go index ef425e5323..8bce48dab1 100644 --- a/components/payments/internal/connectors/engine/activities/activity.go +++ b/components/payments/internal/connectors/engine/activities/activity.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/formancehq/payments/internal/connectors/engine/plugins" + "github.com/formancehq/payments/internal/events" "github.com/formancehq/payments/internal/storage" temporalworker "github.com/formancehq/stack/libs/go-libs/temporal" "go.temporal.io/sdk/temporal" @@ -12,6 +13,7 @@ import ( type Activities struct { storage storage.Storage + events *events.Events plugins plugins.Plugins } @@ -161,13 +163,42 @@ func (a Activities) DefinitionSet() temporalworker.DefinitionSet { Append(temporalworker.Definition{ Name: "StorageWebhooksDelete", Func: a.StorageWebhooksDelete, + }). + Append(temporalworker.Definition{ + Name: "EventsSendAccount", + Func: a.EventsSendAccount, + }). + Append(temporalworker.Definition{ + Name: "EventsSendBalance", + Func: a.EventsSendBalance, + }). + Append(temporalworker.Definition{ + Name: "EventsSendBankAccount", + Func: a.EventsSendBankAccount, + }). + Append(temporalworker.Definition{ + Name: "EventsSendConnectorReset", + Func: a.EventsSendConnectorReset, + }). + Append(temporalworker.Definition{ + Name: "EventsSendPayment", + Func: a.EventsSendPayment, + }). + Append(temporalworker.Definition{ + Name: "EventsSendPoolCreation", + Func: a.EventsSendPoolCreation, + }). + Append(temporalworker.Definition{ + Name: "EventsSendPoolDeletion", + Func: a.EventsSendPoolDeletion, }) } -func New(storage storage.Storage, plugins plugins.Plugins) Activities { +func New(storage storage.Storage, events *events.Events, plugins plugins.Plugins) Activities { return Activities{ storage: storage, plugins: plugins, + events: events, } } diff --git a/components/payments/internal/connectors/engine/activities/events_send_account.go b/components/payments/internal/connectors/engine/activities/events_send_account.go new file mode 100644 index 0000000000..51aac0797e --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_account.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendAccount(ctx context.Context, account models.Account) error { + return a.events.Publish(ctx, a.events.NewEventSavedAccounts(account)) +} + +var EventsSendAccountActivity = Activities{}.EventsSendAccount + +func EventsSendAccount(ctx workflow.Context, account models.Account) error { + return executeActivity(ctx, EventsSendAccountActivity, nil, account) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_balance.go b/components/payments/internal/connectors/engine/activities/events_send_balance.go new file mode 100644 index 0000000000..d41aad2a8f --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_balance.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendBalance(ctx context.Context, balance models.Balance) error { + return a.events.Publish(ctx, a.events.NewEventSavedBalances(balance)) +} + +var EventsSendBalanceActivity = Activities{}.EventsSendBalance + +func EventsSendBalance(ctx workflow.Context, balance models.Balance) error { + return executeActivity(ctx, EventsSendBalanceActivity, nil, balance) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_bank_account.go b/components/payments/internal/connectors/engine/activities/events_send_bank_account.go new file mode 100644 index 0000000000..b311769155 --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_bank_account.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendBankAccount(ctx context.Context, bankAccount models.BankAccount) error { + return a.events.Publish(ctx, a.events.NewEventSavedBankAccounts(bankAccount)) +} + +var EventsSendBankAccountActivity = Activities{}.EventsSendBankAccount + +func EventsSendBankAccount(ctx workflow.Context, bankAccount models.BankAccount) error { + return executeActivity(ctx, EventsSendBankAccountActivity, nil, bankAccount) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_connector_reset.go b/components/payments/internal/connectors/engine/activities/events_send_connector_reset.go new file mode 100644 index 0000000000..134744f576 --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_connector_reset.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendConnectorReset(ctx context.Context, connectorID models.ConnectorID) error { + return a.events.Publish(ctx, a.events.NewEventResetConnector(connectorID)) +} + +var EventsSendConnectorResetActivity = Activities{}.EventsSendConnectorReset + +func EventsSendConnectorReset(ctx workflow.Context, connectorID models.ConnectorID) error { + return executeActivity(ctx, EventsSendConnectorResetActivity, nil, connectorID) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_payment.go b/components/payments/internal/connectors/engine/activities/events_send_payment.go new file mode 100644 index 0000000000..80ab61d137 --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_payment.go @@ -0,0 +1,26 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +type EventsSendPaymentRequest struct { + Payment models.Payment + Adjustment models.PaymentAdjustment +} + +func (a Activities) EventsSendPayment(ctx context.Context, req EventsSendPaymentRequest) error { + return a.events.Publish(ctx, a.events.NewEventSavedPayments(req.Payment, req.Adjustment)) +} + +var EventsSendPaymentActivity = Activities{}.EventsSendPayment + +func EventsSendPayment(ctx workflow.Context, payment models.Payment, adjustment models.PaymentAdjustment) error { + return executeActivity(ctx, EventsSendPaymentActivity, nil, EventsSendPaymentRequest{ + Payment: payment, + Adjustment: adjustment, + }) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_pool_creation.go b/components/payments/internal/connectors/engine/activities/events_send_pool_creation.go new file mode 100644 index 0000000000..990244f766 --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_pool_creation.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/formancehq/payments/internal/models" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendPoolCreation(ctx context.Context, pool models.Pool) error { + return a.events.Publish(ctx, a.events.NewEventSavedPool(pool)) +} + +var EventsSendPoolCreationActivity = Activities{}.EventsSendPoolCreation + +func EventsSendPoolCreation(ctx workflow.Context, pool models.Pool) error { + return executeActivity(ctx, EventsSendPoolCreationActivity, nil, pool) +} diff --git a/components/payments/internal/connectors/engine/activities/events_send_pool_deletion.go b/components/payments/internal/connectors/engine/activities/events_send_pool_deletion.go new file mode 100644 index 0000000000..62f784a321 --- /dev/null +++ b/components/payments/internal/connectors/engine/activities/events_send_pool_deletion.go @@ -0,0 +1,18 @@ +package activities + +import ( + "context" + + "github.com/google/uuid" + "go.temporal.io/sdk/workflow" +) + +func (a Activities) EventsSendPoolDeletion(ctx context.Context, id uuid.UUID) error { + return a.events.Publish(ctx, a.events.NewEventDeletePool(id)) +} + +var EventsSendPoolDeletionActivity = Activities{}.EventsSendPoolDeletion + +func EventsSendPoolDeletion(ctx workflow.Context, id uuid.UUID) error { + return executeActivity(ctx, EventsSendPoolDeletionActivity, nil, id) +} diff --git a/components/payments/internal/connectors/engine/engine.go b/components/payments/internal/connectors/engine/engine.go index ec45cd18ad..0d524012b2 100644 --- a/components/payments/internal/connectors/engine/engine.go +++ b/components/payments/internal/connectors/engine/engine.go @@ -23,8 +23,13 @@ import ( type Engine interface { InstallConnector(ctx context.Context, provider string, rawConfig json.RawMessage) (models.ConnectorID, error) UninstallConnector(ctx context.Context, connectorID models.ConnectorID) error - CreateBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) + ResetConnector(ctx context.Context, connectorID models.ConnectorID) error + ForwartBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) HandleWebhook(ctx context.Context, urlPath string, webhook models.Webhook) error + CreatePool(ctx context.Context, pool models.Pool) error + AddAccountToPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error + RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error + DeletePool(ctx context.Context, poolID uuid.UUID) error OnStart(ctx context.Context) error } @@ -80,7 +85,7 @@ func (e *engine) InstallConnector(ctx context.Context, provider string, rawConfi return models.ConnectorID{}, handlePluginError(err) } - err = e.workers.AddWorker(connector.ID) + err = e.workers.AddWorker(connector.ID.String()) if err != nil { return models.ConnectorID{}, err } @@ -142,7 +147,7 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn return err } - if err := e.workers.RemoveWorker(connectorID); err != nil { + if err := e.workers.RemoveWorker(connectorID.String()); err != nil { return err } @@ -153,7 +158,51 @@ func (e *engine) UninstallConnector(ctx context.Context, connectorID models.Conn return nil } -func (e *engine) CreateBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) { +func (e *engine) ResetConnector(ctx context.Context, connectorID models.ConnectorID) error { + connector, err := e.storage.ConnectorsGet(ctx, connectorID) + if err != nil { + return err + } + + // Detached the context to avoid being in a weird state if request is + // cancelled in the middle of the operation. + ctx, _ = contextutil.Detached(ctx) + if err := e.UninstallConnector(ctx, connectorID); err != nil { + return err + } + + _, err = e.InstallConnector(ctx, connectorID.Provider, connector.Config) + if err != nil { + return err + } + + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("reset-%s", connectorID.String()), + TaskQueue: connectorID.String(), + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunSendEvents, + workflow.SendEvents{ + ConnectorReset: &connectorID, + }, + ) + if err != nil { + return err + } + + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + +func (e *engine) ForwartBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID) (*models.BankAccount, error) { run, err := e.temporalClient.ExecuteWorkflow( ctx, client.StartWorkflowOptions{ @@ -229,6 +278,147 @@ func (e *engine) HandleWebhook(ctx context.Context, urlPath string, webhook mode return nil } +func (e *engine) CreatePool(ctx context.Context, pool models.Pool) error { + if err := e.storage.PoolsUpsert(ctx, pool); err != nil { + return err + } + + ctx, _ = contextutil.Detached(ctx) + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("pools-creation-%s", pool.IdempotemcyKey()), + TaskQueue: defaultWorkerName, + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowExecutionErrorWhenAlreadyStarted: false, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunSendEvents, + workflow.SendEvents{ + PoolsCreation: &pool, + }, + ) + if err != nil { + return err + } + + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + +func (e *engine) AddAccountToPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error { + if err := e.storage.PoolsAddAccount(ctx, id, accountID); err != nil { + return err + } + + ctx, _ = contextutil.Detached(ctx) + pool, err := e.storage.PoolsGet(ctx, id) + if err != nil { + return err + } + + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("pools-add-account-%s", pool.IdempotemcyKey()), + TaskQueue: defaultWorkerName, + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowExecutionErrorWhenAlreadyStarted: false, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunSendEvents, + workflow.SendEvents{ + PoolsCreation: pool, + }, + ) + if err != nil { + return err + } + + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + +func (e *engine) RemoveAccountFromPool(ctx context.Context, id uuid.UUID, accountID models.AccountID) error { + if err := e.storage.PoolsRemoveAccount(ctx, id, accountID); err != nil { + return err + } + + ctx, _ = contextutil.Detached(ctx) + pool, err := e.storage.PoolsGet(ctx, id) + if err != nil { + return err + } + + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("pools-remove-account-%s", pool.IdempotemcyKey()), + TaskQueue: defaultWorkerName, + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowExecutionErrorWhenAlreadyStarted: false, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunSendEvents, + workflow.SendEvents{ + PoolsCreation: pool, + }, + ) + if err != nil { + return err + } + + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + +func (e *engine) DeletePool(ctx context.Context, poolID uuid.UUID) error { + if err := e.storage.PoolsDelete(ctx, poolID); err != nil { + return err + } + ctx, _ = contextutil.Detached(ctx) + run, err := e.temporalClient.ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{ + ID: fmt.Sprintf("pools-deletion-%s", poolID.String()), + TaskQueue: defaultWorkerName, + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowExecutionErrorWhenAlreadyStarted: false, + SearchAttributes: map[string]interface{}{ + workflow.SearchAttributeStack: e.stack, + }, + }, + workflow.RunSendEvents, + workflow.SendEvents{ + PoolsDeletion: &poolID, + }, + ) + if err != nil { + return err + } + + if err := run.Get(ctx, nil); err != nil { + return err + } + + return nil +} + func (e *engine) OnStart(ctx context.Context) error { query := storage.NewListConnectorsQuery( bunpaginate.NewPaginatedQueryOptions(storage.ConnectorQuery{}). @@ -265,7 +455,7 @@ func (e *engine) onStartPlugin(ctx context.Context, connector models.Connector) return err } - err = e.workers.AddWorker(connector.ID) + err = e.workers.AddWorker(connector.ID.String()) if err != nil { return err } diff --git a/components/payments/internal/connectors/engine/module.go b/components/payments/internal/connectors/engine/module.go index 5e20540830..0376c0675a 100644 --- a/components/payments/internal/connectors/engine/module.go +++ b/components/payments/internal/connectors/engine/module.go @@ -3,10 +3,12 @@ package engine import ( "context" + "github.com/ThreeDotsLabs/watermill/message" "github.com/formancehq/payments/internal/connectors/engine/activities" "github.com/formancehq/payments/internal/connectors/engine/plugins" "github.com/formancehq/payments/internal/connectors/engine/webhooks" "github.com/formancehq/payments/internal/connectors/engine/workflow" + "github.com/formancehq/payments/internal/events" "github.com/formancehq/payments/internal/storage" "github.com/formancehq/stack/libs/go-libs/logging" "github.com/formancehq/stack/libs/go-libs/temporal" @@ -15,7 +17,7 @@ import ( "go.uber.org/fx" ) -func Module(pluginPath map[string]string, stack string) fx.Option { +func Module(pluginPath map[string]string, stack, stackURL string) fx.Option { ret := []fx.Option{ fx.Supply(worker.Options{}), fx.Provide(func( @@ -27,6 +29,9 @@ func Module(pluginPath map[string]string, stack string) fx.Option { ) Engine { return New(temporalClient, workers, plugins, storage, webhooks, stack) }), + fx.Provide(func(publisher message.Publisher) *events.Events { + return events.New(publisher, stackURL) + }), fx.Provide(func() plugins.Plugins { return plugins.New(pluginPath) }), @@ -36,8 +41,8 @@ func Module(pluginPath map[string]string, stack string) fx.Option { fx.Provide(func(temporalClient client.Client, plugins plugins.Plugins, webhooks webhooks.Webhooks) workflow.Workflow { return workflow.New(temporalClient, plugins, webhooks, stack) }), - fx.Provide(func(storage storage.Storage, plugins plugins.Plugins) activities.Activities { - return activities.New(storage, plugins) + fx.Provide(func(storage storage.Storage, events *events.Events, plugins plugins.Plugins) activities.Activities { + return activities.New(storage, events, plugins) }), fx.Provide( fx.Annotate(func( diff --git a/components/payments/internal/connectors/engine/workers.go b/components/payments/internal/connectors/engine/workers.go index 5e36411d11..bb0e80ab34 100644 --- a/components/payments/internal/connectors/engine/workers.go +++ b/components/payments/internal/connectors/engine/workers.go @@ -3,7 +3,6 @@ package engine import ( "sync" - "github.com/formancehq/payments/internal/models" "github.com/formancehq/stack/libs/go-libs/logging" "github.com/formancehq/stack/libs/go-libs/temporal" "go.temporal.io/sdk/activity" @@ -12,6 +11,10 @@ import ( temporalworkflow "go.temporal.io/sdk/workflow" ) +const ( + defaultWorkerName = "default" +) + type Workers struct { logger logging.Logger @@ -31,7 +34,7 @@ type Worker struct { } func NewWorkers(logger logging.Logger, temporalClient client.Client, workflows, activities []temporal.DefinitionSet, options worker.Options) *Workers { - return &Workers{ + workers := &Workers{ logger: logger, temporalClient: temporalClient, workers: make(map[string]Worker), @@ -39,6 +42,11 @@ func NewWorkers(logger logging.Logger, temporalClient client.Client, workflows, activities: activities, options: options, } + + // For all operation outside of connectors handlers + workers.AddWorker(defaultWorkerName) + + return workers } // Close is called when app is terminated @@ -52,15 +60,16 @@ func (w *Workers) Close() { } // Installing a new connector lauches a new worker -func (w *Workers) AddWorker(connectorID models.ConnectorID) error { +// A default one is instantiated when the workers struct is created +func (w *Workers) AddWorker(name string) error { w.rwMutex.Lock() defer w.rwMutex.Unlock() - if _, ok := w.workers[connectorID.String()]; ok { + if _, ok := w.workers[name]; ok { return nil } - worker := worker.New(w.temporalClient, connectorID.String(), w.options) + worker := worker.New(w.temporalClient, name, w.options) for _, set := range w.workflows { for _, workflow := range set { @@ -85,30 +94,30 @@ func (w *Workers) AddWorker(connectorID models.ConnectorID) error { } }() - w.workers[connectorID.String()] = Worker{ + w.workers[name] = Worker{ worker: worker, } - w.logger.Infof("worker for connector %s started", connectorID.String()) + w.logger.Infof("worker for connector %s started", name) return nil } // Uninstalling a connector stops the worker -func (w *Workers) RemoveWorker(connectorID models.ConnectorID) error { +func (w *Workers) RemoveWorker(name string) error { w.rwMutex.Lock() defer w.rwMutex.Unlock() - worker, ok := w.workers[connectorID.String()] + worker, ok := w.workers[name] if !ok { return nil } worker.worker.Stop() - delete(w.workers, connectorID.String()) + delete(w.workers, name) - w.logger.Infof("worker for connector %s removed", connectorID.String()) + w.logger.Infof("worker for connector %s removed", name) return nil } diff --git a/components/payments/internal/connectors/engine/workflow/create_bank_account.go b/components/payments/internal/connectors/engine/workflow/create_bank_account.go index 47d41b1b68..9ab9cc611e 100644 --- a/components/payments/internal/connectors/engine/workflow/create_bank_account.go +++ b/components/payments/internal/connectors/engine/workflow/create_bank_account.go @@ -4,6 +4,7 @@ import ( "github.com/formancehq/payments/internal/connectors/engine/activities" "github.com/formancehq/payments/internal/models" "github.com/google/uuid" + "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/workflow" ) @@ -67,6 +68,25 @@ func (w Workflow) runCreateBankAccount( bankAccount.RelatedAccounts = append(bankAccount.RelatedAccounts, relatedAccount) + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: relatedAccount.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + BankAccount: bankAccount, + }, + ).Get(ctx, nil); err != nil { + return nil, err + } + return bankAccount, nil } diff --git a/components/payments/internal/connectors/engine/workflow/fetch_accounts.go b/components/payments/internal/connectors/engine/workflow/fetch_accounts.go index 1661b4cf7b..a78c630b70 100644 --- a/components/payments/internal/connectors/engine/workflow/fetch_accounts.go +++ b/components/payments/internal/connectors/engine/workflow/fetch_accounts.go @@ -34,7 +34,7 @@ func (w Workflow) fetchAccounts( fetchNextAccount FetchNextAccounts, nextTasks []models.TaskTree, ) error { - stateReference := fmt.Sprintf("%s", models.CAPABILITY_FETCH_ACCOUNTS.String()) + stateReference := models.CAPABILITY_FETCH_ACCOUNTS.String() if fetchNextAccount.FromPayload != nil { stateReference = fmt.Sprintf("%s-%s", models.CAPABILITY_FETCH_ACCOUNTS.String(), fetchNextAccount.FromPayload.ID) } @@ -61,49 +61,94 @@ func (w Workflow) fetchAccounts( return errors.Wrap(err, "fetching next accounts") } + accounts := models.FromPSPAccounts( + accountsResponse.Accounts, + models.ACCOUNT_TYPE_INTERNAL, + fetchNextAccount.ConnectorID, + ) + if len(accountsResponse.Accounts) > 0 { err = activities.StorageAccountsStore( infiniteRetryContext(ctx), - models.FromPSPAccounts( - accountsResponse.Accounts, - models.ACCOUNT_TYPE_INTERNAL, - fetchNextAccount.ConnectorID, - ), + accounts, ) if err != nil { return errors.Wrap(err, "storing next accounts") } } - // TODO(polo): send event - // TODO(polo): events with IK to avoid duplicates + wg := workflow.NewWaitGroup(ctx) + errChan := make(chan error, len(accountsResponse.Accounts)*2) + for _, account := range accounts { + acc := account + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextAccount.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + Account: &acc, + }, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "sending events") + } + }) + } + for _, account := range accountsResponse.Accounts { - payload, err := json.Marshal(account) - if err != nil { - return errors.Wrap(err, "marshalling account") - } + acc := account + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + payload, err := json.Marshal(acc) + if err != nil { + errChan <- errors.Wrap(err, "marshalling account") + } - if err := workflow.ExecuteChildWorkflow( - workflow.WithChildOptions( - ctx, - workflow.ChildWorkflowOptions{ - TaskQueue: fetchNextAccount.ConnectorID.String(), - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, - SearchAttributes: map[string]interface{}{ - SearchAttributeStack: w.stack, + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextAccount.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, }, + ), + Run, + fetchNextAccount.Config, + fetchNextAccount.ConnectorID, + &FromPayload{ + ID: acc.Reference, + Payload: payload, }, - ), - Run, - fetchNextAccount.Config, - fetchNextAccount.ConnectorID, - &FromPayload{ - ID: account.Reference, - Payload: payload, - }, - nextTasks, - ).Get(ctx, nil); err != nil { - return errors.Wrap(err, "running next workflow") + nextTasks, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "running next workflow") + } + }) + } + + wg.Wait(ctx) + close(errChan) + for err := range errChan { + if err != nil { + return err } } diff --git a/components/payments/internal/connectors/engine/workflow/fetch_balances.go b/components/payments/internal/connectors/engine/workflow/fetch_balances.go index 70433e10a5..786e4bac7f 100644 --- a/components/payments/internal/connectors/engine/workflow/fetch_balances.go +++ b/components/payments/internal/connectors/engine/workflow/fetch_balances.go @@ -34,7 +34,7 @@ func (w Workflow) fetchBalances( fetchNextBalances FetchNextBalances, nextTasks []models.TaskTree, ) error { - stateReference := fmt.Sprintf("%s", models.CAPABILITY_FETCH_BALANCES.String()) + stateReference := models.CAPABILITY_FETCH_BALANCES.String() if fetchNextBalances.FromPayload != nil { stateReference = fmt.Sprintf("%s-%s", models.CAPABILITY_FETCH_BALANCES.String(), fetchNextBalances.FromPayload.ID) } @@ -61,48 +61,92 @@ func (w Workflow) fetchBalances( return errors.Wrap(err, "fetching next accounts") } + balances := models.FromPSPBalances( + balancesResponse.Balances, + fetchNextBalances.ConnectorID, + ) if len(balancesResponse.Balances) > 0 { err = activities.StorageBalancesStore( infiniteRetryContext(ctx), - models.FromPSPBalances( - balancesResponse.Balances, - fetchNextBalances.ConnectorID, - ), + balances, ) if err != nil { return errors.Wrap(err, "storing next accounts") } } - // TODO(polo): send event + wg := workflow.NewWaitGroup(ctx) + errChan := make(chan error, len(balancesResponse.Balances)*2) + for _, balance := range balances { + b := balance + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextBalances.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + Balance: &b, + }, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "sending events") + } + }) + } for _, balance := range balancesResponse.Balances { - payload, err := json.Marshal(balance) - if err != nil { - return errors.Wrap(err, "marshalling account") - } - - if err := workflow.ExecuteChildWorkflow( - workflow.WithChildOptions( - ctx, - workflow.ChildWorkflowOptions{ - TaskQueue: fetchNextBalances.ConnectorID.String(), - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, - SearchAttributes: map[string]interface{}{ - SearchAttributeStack: w.stack, + b := balance + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + payload, err := json.Marshal(b) + if err != nil { + errChan <- errors.Wrap(err, "marshalling account") + } + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextBalances.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, }, + ), + Run, + fetchNextBalances.Config, + fetchNextBalances.ConnectorID, + &FromPayload{ + ID: fmt.Sprintf("%s-balances", b.AccountReference), + Payload: payload, }, - ), - Run, - fetchNextBalances.Config, - fetchNextBalances.ConnectorID, - &FromPayload{ - ID: fmt.Sprintf("%s-balances", balance.AccountReference), - Payload: payload, - }, - nextTasks, - ).Get(ctx, nil); err != nil { - return errors.Wrap(err, "running next workflow") + nextTasks, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "running next workflow") + } + }) + } + + wg.Wait(ctx) + close(errChan) + for err := range errChan { + if err != nil { + return err } } diff --git a/components/payments/internal/connectors/engine/workflow/fetch_external_accounts.go b/components/payments/internal/connectors/engine/workflow/fetch_external_accounts.go index 4bc6980500..7758056cc4 100644 --- a/components/payments/internal/connectors/engine/workflow/fetch_external_accounts.go +++ b/components/payments/internal/connectors/engine/workflow/fetch_external_accounts.go @@ -34,7 +34,7 @@ func (w Workflow) fetchExternalAccounts( fetchNextExternalAccount FetchNextExternalAccounts, nextTasks []models.TaskTree, ) error { - stateReference := fmt.Sprintf("%s", models.CAPABILITY_FETCH_EXTERNAL_ACCOUNTS.String()) + stateReference := models.CAPABILITY_FETCH_EXTERNAL_ACCOUNTS.String() if fetchNextExternalAccount.FromPayload != nil { stateReference = fmt.Sprintf("%s-%s", models.CAPABILITY_FETCH_EXTERNAL_ACCOUNTS.String(), fetchNextExternalAccount.FromPayload.ID) } @@ -61,49 +61,93 @@ func (w Workflow) fetchExternalAccounts( return errors.Wrap(err, "fetching next accounts") } + accounts := models.FromPSPAccounts( + externalAccountsResponse.ExternalAccounts, + models.ACCOUNT_TYPE_EXTERNAL, + fetchNextExternalAccount.ConnectorID, + ) + if len(externalAccountsResponse.ExternalAccounts) > 0 { err = activities.StorageAccountsStore( infiniteRetryContext(ctx), - models.FromPSPAccounts( - externalAccountsResponse.ExternalAccounts, - models.ACCOUNT_TYPE_EXTERNAL, - fetchNextExternalAccount.ConnectorID, - ), + accounts, ) if err != nil { return errors.Wrap(err, "storing next accounts") } } - // TODO(polo): send event + wg := workflow.NewWaitGroup(ctx) + errChan := make(chan error, len(externalAccountsResponse.ExternalAccounts)*2) + for _, externalAccount := range accounts { + acc := externalAccount + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextExternalAccount.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + Account: &acc, + }, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "sending events") + } + }) + } for _, externalAccount := range externalAccountsResponse.ExternalAccounts { - payload, err := json.Marshal(externalAccount) - if err != nil { - return errors.Wrap(err, "marshalling external account") - } - - if err := workflow.ExecuteChildWorkflow( - workflow.WithChildOptions( - ctx, - workflow.ChildWorkflowOptions{ - TaskQueue: fetchNextExternalAccount.ConnectorID.String(), - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, - SearchAttributes: map[string]interface{}{ - SearchAttributeStack: w.stack, + acc := externalAccount + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + payload, err := json.Marshal(acc) + if err != nil { + errChan <- errors.Wrap(err, "marshalling external account") + } + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextExternalAccount.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, }, + ), + Run, + fetchNextExternalAccount.Config, + fetchNextExternalAccount.ConnectorID, + &FromPayload{ + ID: acc.Reference, + Payload: payload, }, - ), - Run, - fetchNextExternalAccount.Config, - fetchNextExternalAccount.ConnectorID, - &FromPayload{ - ID: externalAccount.Reference, - Payload: payload, - }, - nextTasks, - ).Get(ctx, nil); err != nil { - return errors.Wrap(err, "running next workflow") + nextTasks, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "running next workflow") + } + }) + } + + wg.Wait(ctx) + close(errChan) + for err := range errChan { + if err != nil { + return err } } diff --git a/components/payments/internal/connectors/engine/workflow/fetch_others.go b/components/payments/internal/connectors/engine/workflow/fetch_others.go index 3f498aa8b4..8d520ac2bd 100644 --- a/components/payments/internal/connectors/engine/workflow/fetch_others.go +++ b/components/payments/internal/connectors/engine/workflow/fetch_others.go @@ -34,7 +34,7 @@ func (w Workflow) fetchNextOthers( fetchNextOthers FetchNextOthers, nextTasks []models.TaskTree, ) error { - stateReference := fmt.Sprintf("%s", models.CAPABILITY_FETCH_OTHERS.String()) + stateReference := models.CAPABILITY_FETCH_OTHERS.String() if fetchNextOthers.FromPayload != nil { stateReference = fmt.Sprintf("%s-%s", models.CAPABILITY_FETCH_OTHERS.String(), fetchNextOthers.FromPayload.ID) } @@ -62,30 +62,45 @@ func (w Workflow) fetchNextOthers( return errors.Wrap(err, "fetching next others") } - // TODO(polo): send event for others ? store others ? - + wg := workflow.NewWaitGroup(ctx) + errChan := make(chan error, len(othersResponse.Others)) for _, other := range othersResponse.Others { - if err := workflow.ExecuteChildWorkflow( - workflow.WithChildOptions( - ctx, - workflow.ChildWorkflowOptions{ - TaskQueue: fetchNextOthers.ConnectorID.String(), - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, - SearchAttributes: map[string]interface{}{ - SearchAttributeStack: w.stack, + o := other + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextOthers.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, }, + ), + Run, + fetchNextOthers.Config, + fetchNextOthers.ConnectorID, + &FromPayload{ + ID: o.ID, + Payload: o.Other, }, - ), - Run, - fetchNextOthers.Config, - fetchNextOthers.ConnectorID, - &FromPayload{ - ID: other.ID, - Payload: other.Other, - }, - nextTasks, - ).Get(ctx, nil); err != nil { - return errors.Wrap(err, "running next workflow") + nextTasks, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "running next workflow") + } + }) + } + + wg.Wait(ctx) + close(errChan) + for err := range errChan { + if err != nil { + return err } } diff --git a/components/payments/internal/connectors/engine/workflow/fetch_payments.go b/components/payments/internal/connectors/engine/workflow/fetch_payments.go index 1c7987d0d2..1c089dbaad 100644 --- a/components/payments/internal/connectors/engine/workflow/fetch_payments.go +++ b/components/payments/internal/connectors/engine/workflow/fetch_payments.go @@ -34,7 +34,7 @@ func (w Workflow) fetchNextPayments( fetchNextPayments FetchNextPayments, nextTasks []models.TaskTree, ) error { - stateReference := fmt.Sprintf("%s", models.CAPABILITY_FETCH_PAYMENTS.String()) + stateReference := models.CAPABILITY_FETCH_PAYMENTS.String() if fetchNextPayments.FromPayload != nil { stateReference = fmt.Sprintf("%s-%s", models.CAPABILITY_FETCH_PAYMENTS.String(), fetchNextPayments.FromPayload.ID) } @@ -61,48 +61,93 @@ func (w Workflow) fetchNextPayments( return errors.Wrap(err, "fetching next payments") } + payments := models.FromPSPPayments( + paymentsResponse.Payments, + fetchNextPayments.ConnectorID, + ) + if len(paymentsResponse.Payments) > 0 { err = activities.StoragePaymentsStore( infiniteRetryContext(ctx), - models.FromPSPPayments( - paymentsResponse.Payments, - fetchNextPayments.ConnectorID, - ), + payments, ) if err != nil { return errors.Wrap(err, "storing next accounts") } } - // TODO(polo): send events + wg := workflow.NewWaitGroup(ctx) + errChan := make(chan error, len(paymentsResponse.Payments)*2) + for _, payment := range payments { + p := payment + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextPayments.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, + }, + ), + RunSendEvents, + SendEvents{ + Payment: &p, + }, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "sending events") + } + }) + } for _, payment := range paymentsResponse.Payments { - payload, err := json.Marshal(payment) - if err != nil { - return errors.Wrap(err, "marshalling payment") - } - - if err := workflow.ExecuteChildWorkflow( - workflow.WithChildOptions( - ctx, - workflow.ChildWorkflowOptions{ - TaskQueue: fetchNextPayments.ConnectorID.String(), - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, - SearchAttributes: map[string]interface{}{ - SearchAttributeStack: w.stack, + p := payment + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + + payload, err := json.Marshal(p) + if err != nil { + errChan <- errors.Wrap(err, "marshalling payment") + } + + if err := workflow.ExecuteChildWorkflow( + workflow.WithChildOptions( + ctx, + workflow.ChildWorkflowOptions{ + TaskQueue: fetchNextPayments.ConnectorID.String(), + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON, + SearchAttributes: map[string]interface{}{ + SearchAttributeStack: w.stack, + }, }, + ), + Run, + fetchNextPayments.Config, + fetchNextPayments.ConnectorID, + &FromPayload{ + ID: p.Reference, + Payload: payload, }, - ), - Run, - fetchNextPayments.Config, - fetchNextPayments.ConnectorID, - &FromPayload{ - ID: payment.Reference, - Payload: payload, - }, - nextTasks, - ).Get(ctx, nil); err != nil { - return errors.Wrap(err, "running next workflow") + nextTasks, + ).Get(ctx, nil); err != nil { + errChan <- errors.Wrap(err, "running next workflow") + } + }) + } + + wg.Wait(ctx) + close(errChan) + for err := range errChan { + if err != nil { + return err } } diff --git a/components/payments/internal/connectors/engine/workflow/instances.go b/components/payments/internal/connectors/engine/workflow/instances.go index 00efb12da3..5c7cb61138 100644 --- a/components/payments/internal/connectors/engine/workflow/instances.go +++ b/components/payments/internal/connectors/engine/workflow/instances.go @@ -43,7 +43,7 @@ func (w Workflow) createInstance( func (w Workflow) terminateInstance( ctx workflow.Context, connectorID models.ConnectorID, - err error, + terminateError error, ) error { info := workflow.GetInfo(ctx) @@ -56,8 +56,8 @@ func (w Workflow) terminateInstance( } var errMessage *string - if err != nil { - errMessage = pointer.For(err.Error()) + if terminateError != nil { + errMessage = pointer.For(terminateError.Error()) } now := workflow.Now(ctx).UTC() diff --git a/components/payments/internal/connectors/engine/workflow/send_events.go b/components/payments/internal/connectors/engine/workflow/send_events.go new file mode 100644 index 0000000000..56032a1a8c --- /dev/null +++ b/components/payments/internal/connectors/engine/workflow/send_events.go @@ -0,0 +1,104 @@ +package workflow + +import ( + "github.com/formancehq/payments/internal/connectors/engine/activities" + "github.com/formancehq/payments/internal/models" + "github.com/google/uuid" + "go.temporal.io/sdk/workflow" +) + +type SendEvents struct { + Account *models.Account + Balance *models.Balance + BankAccount *models.BankAccount + Payment *models.Payment + ConnectorReset *models.ConnectorID + PoolsCreation *models.Pool + PoolsDeletion *uuid.UUID +} + +func (w Workflow) runSendEvents( + ctx workflow.Context, + sendEvents SendEvents, +) error { + if sendEvents.Account != nil { + err := activities.EventsSendAccount( + infiniteRetryContext(ctx), + *sendEvents.Account, + ) + if err != nil { + return err + } + } + + if sendEvents.Balance != nil { + err := activities.EventsSendBalance( + infiniteRetryContext(ctx), + *sendEvents.Balance, + ) + if err != nil { + return err + } + } + + if sendEvents.BankAccount != nil { + err := activities.EventsSendBankAccount( + infiniteRetryContext(ctx), + *sendEvents.BankAccount, + ) + if err != nil { + return err + } + } + + if sendEvents.Payment != nil { + for _, adjustment := range sendEvents.Payment.Adjustments { + err := activities.EventsSendPayment( + infiniteRetryContext(ctx), + *sendEvents.Payment, + adjustment, + ) + if err != nil { + return err + } + } + } + + if sendEvents.ConnectorReset != nil { + err := activities.EventsSendConnectorReset( + infiniteRetryContext(ctx), + *sendEvents.ConnectorReset, + ) + if err != nil { + return err + } + } + + if sendEvents.PoolsCreation != nil { + err := activities.EventsSendPoolCreation( + infiniteRetryContext(ctx), + *sendEvents.PoolsCreation, + ) + if err != nil { + return err + } + } + + if sendEvents.PoolsDeletion != nil { + err := activities.EventsSendPoolDeletion( + infiniteRetryContext(ctx), + *sendEvents.PoolsDeletion, + ) + if err != nil { + return err + } + } + + return nil +} + +var RunSendEvents any + +func init() { + RunSendEvents = Workflow{}.runSendEvents +} diff --git a/components/payments/internal/connectors/engine/workflow/workflow.go b/components/payments/internal/connectors/engine/workflow/workflow.go index a7b6426c41..18654f8a2b 100644 --- a/components/payments/internal/connectors/engine/workflow/workflow.go +++ b/components/payments/internal/connectors/engine/workflow/workflow.go @@ -98,5 +98,9 @@ func (w Workflow) DefinitionSet() temporalworker.DefinitionSet { Append(temporalworker.Definition{ Name: "RunStoreWebhookTranslation", Func: w.runStoreWebhookTranslation, + }). + Append(temporalworker.Definition{ + Name: "RunSendEvents", + Func: w.runSendEvents, }) } diff --git a/components/payments/internal/connectors/plugins/public/mangopay/webhooks.go b/components/payments/internal/connectors/plugins/public/mangopay/webhooks.go index 411518c414..490b532792 100644 --- a/components/payments/internal/connectors/plugins/public/mangopay/webhooks.go +++ b/components/payments/internal/connectors/plugins/public/mangopay/webhooks.go @@ -116,12 +116,13 @@ func (p Plugin) createWebhooks(ctx context.Context, req models.CreateWebhooksReq webhookURL := fmt.Sprintf("%s/api/payments/v3/connectors/webhooks/%s", stackPublicURL, req.ConnectorID) for eventType, config := range webhookConfigs { + url := fmt.Sprintf("%s%s", webhookURL, config.urlPath) if v, ok := activeHooks[eventType]; ok { // Already created, continue if v.URL != webhookURL { // If the URL is different, update it - err := p.client.UpdateHook(ctx, v.ID, webhookURL) + err := p.client.UpdateHook(ctx, v.ID, url) if err != nil { return err } @@ -130,8 +131,6 @@ func (p Plugin) createWebhooks(ctx context.Context, req models.CreateWebhooksReq continue } - url := fmt.Sprintf("%s%s", webhookURL, config.urlPath) - // Otherwise, create it err := p.client.CreateHook(ctx, eventType, url) if err != nil { diff --git a/components/payments/internal/events/account.go b/components/payments/internal/events/account.go new file mode 100644 index 0000000000..aded0d04ed --- /dev/null +++ b/components/payments/internal/events/account.go @@ -0,0 +1,53 @@ +package events + +import ( + "encoding/json" + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type accountMessagePayload struct { + ID string `json:"id"` + CreatedAt time.Time `json:"createdAt"` + Reference string `json:"reference"` + Provider string `json:"provider"` + ConnectorID string `json:"connectorId"` + DefaultAsset string `json:"defaultAsset"` + AccountName string `json:"accountName"` + Type string `json:"type"` + Metadata map[string]string `json:"metadata"` + RawData json.RawMessage `json:"rawData"` +} + +func (e Events) NewEventSavedAccounts(account models.Account) publish.EventMessage { + payload := accountMessagePayload{ + ID: account.ID.String(), + ConnectorID: account.ConnectorID.String(), + Provider: account.ConnectorID.Provider, + CreatedAt: account.CreatedAt, + Reference: account.Reference, + Type: string(account.Type), + Metadata: account.Metadata, + RawData: account.Raw, + } + + if account.DefaultAsset != nil { + payload.DefaultAsset = *account.DefaultAsset + } + + if account.Name != nil { + payload.AccountName = *account.Name + } + + return publish.EventMessage{ + IdempotemcyKey: account.IdempotemcyKey(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedAccounts, + Payload: payload, + } +} diff --git a/components/payments/internal/events/balance.go b/components/payments/internal/events/balance.go new file mode 100644 index 0000000000..225af2a1a8 --- /dev/null +++ b/components/payments/internal/events/balance.go @@ -0,0 +1,41 @@ +package events + +import ( + "math/big" + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type balanceMessagePayload struct { + AccountID string `json:"accountID"` + ConnectorID string `json:"connectorId"` + Provider string `json:"provider"` + CreatedAt time.Time `json:"createdAt"` + LastUpdatedAt time.Time `json:"lastUpdatedAt"` + Asset string `json:"asset"` + Balance *big.Int `json:"balance"` +} + +func (e Events) NewEventSavedBalances(balance models.Balance) publish.EventMessage { + payload := balanceMessagePayload{ + AccountID: balance.AccountID.String(), + ConnectorID: balance.AccountID.ConnectorID.String(), + Provider: balance.AccountID.ConnectorID.Provider, + CreatedAt: balance.CreatedAt, + LastUpdatedAt: balance.LastUpdatedAt, + Asset: balance.Asset, + Balance: balance.Balance, + } + + return publish.EventMessage{ + IdempotemcyKey: balance.IdempotencyKey(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedBalances, + Payload: payload, + } +} diff --git a/components/payments/internal/events/bank_account.go b/components/payments/internal/events/bank_account.go new file mode 100644 index 0000000000..ded6f34720 --- /dev/null +++ b/components/payments/internal/events/bank_account.go @@ -0,0 +1,73 @@ +package events + +import ( + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type bankAccountMessagePayload struct { + ID string `json:"id"` + CreatedAt time.Time `json:"createdAt"` + Name string `json:"name"` + AccountNumber string `json:"accountNumber"` + IBAN string `json:"iban"` + SwiftBicCode string `json:"swiftBicCode"` + Country string `json:"country"` + RelatedAccounts []bankAccountRelatedAccountsPayload `json:"adjustments"` +} + +type bankAccountRelatedAccountsPayload struct { + CreatedAt time.Time `json:"createdAt"` + AccountID string `json:"accountID"` + ConnectorID string `json:"connectorID"` + Provider string `json:"provider"` +} + +func (e Events) NewEventSavedBankAccounts(bankAccount models.BankAccount) publish.EventMessage { + bankAccount.Offuscate() + + payload := bankAccountMessagePayload{ + ID: bankAccount.ID.String(), + CreatedAt: bankAccount.CreatedAt, + Name: bankAccount.Name, + } + + if bankAccount.AccountNumber != nil { + payload.AccountNumber = *bankAccount.AccountNumber + } + + if bankAccount.IBAN != nil { + payload.IBAN = *bankAccount.IBAN + } + + if bankAccount.SwiftBicCode != nil { + payload.SwiftBicCode = *bankAccount.SwiftBicCode + } + + if bankAccount.Country != nil { + payload.Country = *bankAccount.Country + } + + for _, relatedAccount := range bankAccount.RelatedAccounts { + relatedAccount := bankAccountRelatedAccountsPayload{ + CreatedAt: relatedAccount.CreatedAt, + AccountID: relatedAccount.AccountID.String(), + Provider: relatedAccount.ConnectorID.Provider, + ConnectorID: relatedAccount.ConnectorID.String(), + } + + payload.RelatedAccounts = append(payload.RelatedAccounts, relatedAccount) + } + + return publish.EventMessage{ + IdempotemcyKey: bankAccount.IdempotemcyKey(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedBankAccount, + Payload: payload, + } +} diff --git a/components/payments/internal/events/connector.go b/components/payments/internal/events/connector.go new file mode 100644 index 0000000000..63c85e8f50 --- /dev/null +++ b/components/payments/internal/events/connector.go @@ -0,0 +1,28 @@ +package events + +import ( + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type connectorMessagePayload struct { + CreatedAt time.Time `json:"createdAt"` + ConnectorID string `json:"connectorId"` +} + +func (e Events) NewEventResetConnector(connectorID models.ConnectorID) publish.EventMessage { + return publish.EventMessage{ + IdempotemcyKey: connectorID.String(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeConnectorReset, + Payload: connectorMessagePayload{ + CreatedAt: time.Now().UTC(), + ConnectorID: connectorID.String(), + }, + } +} diff --git a/components/payments/internal/events/events.go b/components/payments/internal/events/events.go new file mode 100644 index 0000000000..7301dca6fa --- /dev/null +++ b/components/payments/internal/events/events.go @@ -0,0 +1,27 @@ +package events + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" + eventsdef "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type Events struct { + publisher message.Publisher + + stackURL string +} + +func New(p message.Publisher, stackURL string) *Events { + return &Events{ + publisher: p, + stackURL: stackURL, + } +} + +func (e *Events) Publish(ctx context.Context, em publish.EventMessage) error { + return e.publisher.Publish(eventsdef.TopicPayments, + publish.NewMessage(ctx, em)) +} diff --git a/components/payments/internal/events/payment.go b/components/payments/internal/events/payment.go new file mode 100644 index 0000000000..61d6f936e4 --- /dev/null +++ b/components/payments/internal/events/payment.go @@ -0,0 +1,83 @@ +package events + +import ( + "encoding/json" + "math/big" + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/api" + "github.com/formancehq/stack/libs/go-libs/publish" +) + +type paymentMessagePayload struct { + ID string `json:"id"` + ConnectorID string `json:"connectorId"` + Provider string `json:"provider"` + Reference string `json:"reference"` + CreatedAt time.Time `json:"createdAt"` + Type string `json:"type"` + Status string `json:"status"` + Scheme string `json:"scheme"` + Asset string `json:"asset"` + SourceAccountID string `json:"sourceAccountId,omitempty"` + DestinationAccountID string `json:"destinationAccountId,omitempty"` + Links []api.Link `json:"links"` + RawData json.RawMessage `json:"rawData"` + + Amount *big.Int `json:"amount"` + Metadata map[string]string `json:"metadata"` +} + +func (e Events) NewEventSavedPayments(payment models.Payment, adjustment models.PaymentAdjustment) publish.EventMessage { + payload := paymentMessagePayload{ + ID: payment.ID.String(), + Reference: payment.Reference, + Type: payment.Type.String(), + Status: payment.Status.String(), + Amount: payment.Amount, + Scheme: payment.Scheme.String(), + Asset: payment.Asset, + CreatedAt: payment.CreatedAt, + ConnectorID: payment.ConnectorID.String(), + Provider: payment.ConnectorID.Provider, + SourceAccountID: func() string { + if payment.SourceAccountID == nil { + return "" + } + return payment.SourceAccountID.String() + }(), + DestinationAccountID: func() string { + if payment.DestinationAccountID == nil { + return "" + } + return payment.DestinationAccountID.String() + }(), + RawData: adjustment.Raw, + Metadata: payment.Metadata, + } + + if payment.SourceAccountID != nil { + payload.Links = append(payload.Links, api.Link{ + Name: "source_account", + URI: e.stackURL + "/api/payments/accounts/" + payment.SourceAccountID.String(), + }) + } + + if payment.DestinationAccountID != nil { + payload.Links = append(payload.Links, api.Link{ + Name: "destination_account", + URI: e.stackURL + "/api/payments/accounts/" + payment.DestinationAccountID.String(), + }) + } + + return publish.EventMessage{ + IdempotemcyKey: adjustment.IdempotemcyKey(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedPayments, + Payload: payload, + } +} diff --git a/components/payments/internal/events/pool.go b/components/payments/internal/events/pool.go new file mode 100644 index 0000000000..800965343f --- /dev/null +++ b/components/payments/internal/events/pool.go @@ -0,0 +1,58 @@ +package events + +import ( + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/formancehq/payments/pkg/events" + "github.com/formancehq/stack/libs/go-libs/publish" + "github.com/google/uuid" +) + +type poolMessagePayload struct { + ID string `json:"id"` + Name string `json:"name"` + CreatedAt time.Time `json:"createdAt"` + AccountIDs []string `json:"accountIDs"` +} + +func (e Events) NewEventSavedPool(pool models.Pool) publish.EventMessage { + payload := poolMessagePayload{ + ID: pool.ID.String(), + Name: pool.Name, + CreatedAt: pool.CreatedAt, + } + + payload.AccountIDs = make([]string, len(pool.PoolAccounts)) + for i, a := range pool.PoolAccounts { + payload.AccountIDs[i] = a.AccountID.String() + } + + return publish.EventMessage{ + IdempotemcyKey: pool.IdempotemcyKey(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeSavedPool, + Payload: payload, + } +} + +type deletePoolMessagePayload struct { + CreatedAt time.Time `json:"createdAt"` + ID string `json:"id"` +} + +func (e Events) NewEventDeletePool(id uuid.UUID) publish.EventMessage { + return publish.EventMessage{ + IdempotemcyKey: id.String(), + Date: time.Now().UTC(), + App: events.EventApp, + Version: events.EventVersion, + Type: events.EventTypeDeletePool, + Payload: deletePoolMessagePayload{ + CreatedAt: time.Now().UTC(), + ID: id.String(), + }, + } +} diff --git a/components/payments/internal/models/accounts.go b/components/payments/internal/models/accounts.go index 4be33646b6..4967c8367b 100644 --- a/components/payments/internal/models/accounts.go +++ b/components/payments/internal/models/accounts.go @@ -78,6 +78,10 @@ func (a Account) MarshalJSON() ([]byte, error) { }) } +func (a *Account) IdempotemcyKey() string { + return a.ID.String() +} + func (a *Account) UnmarshalJSON(data []byte) error { var aux struct { ID string `json:"id"` diff --git a/components/payments/internal/models/balances.go b/components/payments/internal/models/balances.go index 0abf08c01f..5d5e1ad90a 100644 --- a/components/payments/internal/models/balances.go +++ b/components/payments/internal/models/balances.go @@ -1,9 +1,12 @@ package models import ( + "encoding/base64" "encoding/json" "math/big" "time" + + "github.com/gibson042/canonicaljson-go" ) type PSPBalance struct { @@ -35,6 +38,25 @@ type Balance struct { Balance *big.Int `json:"balance"` } +func (b *Balance) IdempotencyKey() string { + var ik = struct { + AccountID string + CreatedAt int64 + LastUpdatedAt int64 + }{ + AccountID: b.AccountID.String(), + CreatedAt: b.CreatedAt.UnixNano(), + LastUpdatedAt: b.LastUpdatedAt.UnixNano(), + } + + data, err := canonicaljson.Marshal(ik) + if err != nil { + panic(err) + } + + return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(data) +} + func (b Balance) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { AccountID string `json:"accountID"` diff --git a/components/payments/internal/models/bank_accounts.go b/components/payments/internal/models/bank_accounts.go index 9a9d2d589b..cfcad066ba 100644 --- a/components/payments/internal/models/bank_accounts.go +++ b/components/payments/internal/models/bank_accounts.go @@ -33,6 +33,10 @@ type BankAccount struct { RelatedAccounts []BankAccountRelatedAccount `json:"relatedAccounts"` } +func (b *BankAccount) IdempotemcyKey() string { + return b.ID.String() +} + func (a *BankAccount) Offuscate() error { if a.IBAN != nil { length := len(*a.IBAN) diff --git a/components/payments/internal/models/connectors.go b/components/payments/internal/models/connectors.go index 55bde383d8..42fabb9cfa 100644 --- a/components/payments/internal/models/connectors.go +++ b/components/payments/internal/models/connectors.go @@ -19,6 +19,10 @@ type Connector struct { Config json.RawMessage `json:"config"` } +func (c *Connector) IdempotencyKey() string { + return c.ID.String() +} + func (c Connector) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { ID string `json:"id"` diff --git a/components/payments/internal/models/payment_adjustments.go b/components/payments/internal/models/payment_adjustments.go index 2c4b741815..9e71877885 100644 --- a/components/payments/internal/models/payment_adjustments.go +++ b/components/payments/internal/models/payment_adjustments.go @@ -31,6 +31,10 @@ type PaymentAdjustment struct { Raw json.RawMessage `json:"raw"` } +func (p *PaymentAdjustment) IdempotemcyKey() string { + return p.ID.String() +} + func (c PaymentAdjustment) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { ID string `json:"id"` diff --git a/components/payments/internal/models/pools.go b/components/payments/internal/models/pools.go index b5dabd97d1..ddc86b8037 100644 --- a/components/payments/internal/models/pools.go +++ b/components/payments/internal/models/pools.go @@ -1,8 +1,10 @@ package models import ( + "encoding/base64" "time" + "github.com/gibson042/canonicaljson-go" "github.com/google/uuid" ) @@ -13,3 +15,24 @@ type Pool struct { PoolAccounts []PoolAccounts `json:"poolAccounts"` } + +func (p *Pool) IdempotemcyKey() string { + relatedAccounts := make([]string, len(p.PoolAccounts)) + for i := range p.PoolAccounts { + relatedAccounts[i] = p.PoolAccounts[i].AccountID.String() + } + var ik = struct { + ID string + RelatedAccounts []string + }{ + ID: p.ID.String(), + RelatedAccounts: relatedAccounts, + } + + data, err := canonicaljson.Marshal(ik) + if err != nil { + panic(err) + } + + return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(data) +} diff --git a/libs/go-libs/publish/messages.go b/libs/go-libs/publish/messages.go index af1e7a3ff9..29f77ca535 100644 --- a/libs/go-libs/publish/messages.go +++ b/libs/go-libs/publish/messages.go @@ -38,11 +38,12 @@ func NewMessage(ctx context.Context, m EventMessage) *message.Message { } type EventMessage struct { - Date time.Time `json:"date"` - App string `json:"app"` - Version string `json:"version"` - Type string `json:"type"` - Payload any `json:"payload"` + IdempotemcyKey string `json:"idempotency_key"` + Date time.Time `json:"date"` + App string `json:"app"` + Version string `json:"version"` + Type string `json:"type"` + Payload any `json:"payload"` } func UnmarshalMessage(msg *message.Message) (trace.Span, *EventMessage, error) {