Skip to content

Commit

Permalink
feat(flows): add capability to add metadata on send stage (#1072)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Dec 21, 2023
1 parent 25bdee7 commit 7ef70e1
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 97 deletions.
15 changes: 15 additions & 0 deletions components/ledger/libs/collectionutils/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ func ConvertMap[K comparable, FROM any, TO any](m map[K]FROM, mapper func(v FROM
return ret
}

func MergeMaps[K comparable, V any](m1, m2 map[K]V) map[K]V {
ret := make(map[K]V)
if m1 != nil {
for k, v := range m1 {
ret[k] = v
}
}
if m2 != nil {
for k, v := range m2 {
ret[k] = v
}
}
return ret
}

func ToAny[V any](v V) any {
return v
}
Expand Down
50 changes: 34 additions & 16 deletions ee/orchestration/internal/schema/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,26 @@ func interpolate(ctx Context, v string) string {
})
}

func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
func mapObjectField(ctx Context, raw any, spec reflect.Value, fieldTag tag) error {
switch spec.Kind() {
case reflect.Map:
if raw == nil {
return nil
}
vRaw := reflect.ValueOf(raw)
if vRaw.Kind() != reflect.Map {
return fmt.Errorf("expecting map, got type: %T", raw)
}

spec.Set(reflect.MakeMap(spec.Type()))
for _, key := range vRaw.MapKeys() {
value := vRaw.MapIndex(key)
targetValue := reflect.New(spec.Type().Elem()).Elem()
if err := mapObjectField(ctx, value.Interface(), targetValue, tag{}); err != nil {
return err
}
spec.SetMapIndex(key, targetValue)
}
case reflect.Pointer:
if raw == nil {
return nil
Expand All @@ -46,7 +64,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
if interpolated == "" {
return nil
}
Expand All @@ -59,7 +77,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case float64:
spec.Set(reflect.ValueOf(big.NewInt(int64(json))))
case nil:
defaultValue := tag.defaultValue
defaultValue := fieldTag.defaultValue
if defaultValue == "" {
return nil
}
Expand All @@ -74,17 +92,17 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
return nil
}
spec.Set(reflect.New(spec.Type().Elem()))
return mapObjectField(ctx, raw, spec.Elem(), tag)
return mapObjectField(ctx, raw, spec.Elem(), fieldTag)
case reflect.String:
switch json := raw.(type) {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
}
spec.SetString(interpolated)
case nil:
spec.SetString(tag.defaultValue)
spec.SetString(fieldTag.defaultValue)
default:
return fmt.Errorf("expected string but was %T", json)
}
Expand All @@ -94,7 +112,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
if interpolated == "" {
return nil
}
Expand All @@ -107,7 +125,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case float64:
spec.SetUint(uint64(json))
case nil:
defaultValue := tag.defaultValue
defaultValue := fieldTag.defaultValue
if defaultValue == "" {
return nil
}
Expand All @@ -125,7 +143,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
if interpolated == "" {
return nil
}
Expand All @@ -136,7 +154,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
}
spec.SetInt(int64(duration))
case nil:
defaultValue := tag.defaultValue
defaultValue := fieldTag.defaultValue
if defaultValue == "" {
return nil
}
Expand All @@ -154,7 +172,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
if interpolated == "" {
return nil
}
Expand All @@ -167,7 +185,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case float64:
spec.SetInt(int64(json))
case nil:
defaultValue := tag.defaultValue
defaultValue := fieldTag.defaultValue
if defaultValue == "" {
return nil
}
Expand All @@ -190,7 +208,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case bool:
spec.SetBool(json)
case nil:
defaultValue := strings.ToLower(tag.defaultValue)
defaultValue := strings.ToLower(fieldTag.defaultValue)
if defaultValue != "true" && defaultValue != "false" {
return fmt.Errorf("unable to resolve field '%s' to bool value", spec.Type().Name())
}
Expand All @@ -210,7 +228,7 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case float64:
spec.SetFloat(json)
case nil:
defaultValue := tag.defaultValue
defaultValue := fieldTag.defaultValue
if defaultValue == "" {
return nil
}
Expand All @@ -228,15 +246,15 @@ func mapObjectField(ctx Context, raw any, spec reflect.Value, tag tag) error {
case string:
interpolated := interpolate(ctx, json)
if interpolated == "" {
interpolated = tag.defaultValue
interpolated = fieldTag.defaultValue
}
date, err := time.Parse(time.RFC3339, interpolated)
if err != nil {
return fmt.Errorf("expected date as rfc3339 format")
}
spec.Set(reflect.ValueOf(date))
case nil:
date, err := time.Parse(time.RFC3339, tag.defaultValue)
date, err := time.Parse(time.RFC3339, fieldTag.defaultValue)
if err != nil {
return fmt.Errorf("expected date as rfc3339 format")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,27 @@ package activities
import (
"context"
"fmt"
"math/big"
"net/http"

"github.com/formancehq/formance-sdk-go/pkg/models/shared"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

func (a Activities) StripeTransfer(ctx context.Context, request shared.ActivityStripeTransfer) error {
type StripeTransferRequest struct {
Amount *big.Int `json:"amount,omitempty"`
Asset *string `json:"asset,omitempty"`
ConnectorID *string `json:"connectorID,omitempty"`
Destination *string `json:"destination,omitempty"`
// A set of key/value pairs that you can attach to a transfer object.
// It can be useful for storing additional information about the transfer in a structured format.
//
Metadata map[string]string `json:"metadata"`
WaitingValidation *bool `default:"false" json:"waitingValidation"`
}

func (a Activities) StripeTransfer(ctx context.Context, request StripeTransferRequest) error {
validated := true
if request.WaitingValidation != nil && *request.WaitingValidation {
validated = false
Expand Down Expand Up @@ -49,6 +62,6 @@ func (a Activities) StripeTransfer(ctx context.Context, request shared.ActivityS

var StripeTransferActivity = Activities{}.StripeTransfer

func StripeTransfer(ctx workflow.Context, request shared.ActivityStripeTransfer) error {
func StripeTransfer(ctx workflow.Context, request StripeTransferRequest) error {
return executeActivity(ctx, StripeTransferActivity, nil, request)
}
Loading

0 comments on commit 7ef70e1

Please sign in to comment.