Skip to content

Commit

Permalink
fix(operator): fix topics name at creation
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Dec 5, 2023
1 parent 84ed29c commit efe5d0d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (f *StackFinalizer) retrieveModuleTopic() []string {
for _, v := range mod.Versions() {
services := v.Services(*f.reconcileConf)
for _, service := range services {
if service.NeedTopic {
subjectSet.Put(service.Name)
if service.Topics != nil {
subjectSet.Put(service.Topics.Name)
break
}
}
Expand Down
1 change: 1 addition & 0 deletions components/operator/internal/modules/gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (g module) Versions() map[string]modules.Version {
Path: "/",
},
Liveness: modules.LivenessDisable,
Topics: &modules.Topics{Name: "audit"},
Annotations: ctx.Configuration.Spec.Services.Gateway.Annotations.Service,
Configs: func(resolveContext modules.ServiceInstallConfiguration) modules.Configs {
return modules.Configs{
Expand Down
4 changes: 2 additions & 2 deletions components/operator/internal/modules/ledger/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (l module) Versions() map[string]modules.Version {
InjectPostgresVariables: true,
HasVersionEndpoint: true,
ExposeHTTP: modules.DefaultExposeHTTP,
NeedTopic: true,
Topics: &modules.Topics{Name: l.Name()},
Annotations: ctx.Configuration.Spec.Services.Ledger.Annotations.Service,
Container: func(resolveContext modules.ContainerResolutionConfiguration) modules.Container {
env := modules.NewEnv().Append(
Expand Down Expand Up @@ -117,7 +117,7 @@ func (l module) Versions() map[string]modules.Version {
InjectPostgresVariables: true,
HasVersionEndpoint: true,
ExposeHTTP: modules.DefaultExposeHTTP,
NeedTopic: true,
Topics: &modules.Topics{Name: l.Name()},
Container: createContainer(modules.NewEnv()),
}
ret := modules.Services{&main}
Expand Down
42 changes: 22 additions & 20 deletions components/operator/internal/modules/payments/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p module) Versions() map[string]modules.Version {
HasVersionEndpoint: true,
ListenEnvVar: "LISTEN",
ExposeHTTP: modules.DefaultExposeHTTP,
NeedTopic: true,
Topics: &modules.Topics{Name: p.Name()},
Liveness: modules.LivenessLegacy,
Annotations: ctx.Configuration.Spec.Services.Payments.Annotations.Service,
Container: func(resolveContext modules.ContainerResolutionConfiguration) modules.Container {
Expand Down Expand Up @@ -81,7 +81,7 @@ func (p module) Versions() map[string]modules.Version {
},
PostUpgrade: PostUpgradePreV1,
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.6.7": {
Expand All @@ -95,7 +95,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.6.8": {
Expand All @@ -109,7 +109,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.7.0": {
Expand All @@ -123,7 +123,7 @@ func (p module) Versions() map[string]modules.Version {
},
PostUpgrade: PostUpgradePreV1,
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.8.0": {
Expand All @@ -137,7 +137,7 @@ func (p module) Versions() map[string]modules.Version {
},
PostUpgrade: PostUpgradePreV1,
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.8.1": {
Expand All @@ -151,7 +151,7 @@ func (p module) Versions() map[string]modules.Version {
},
PostUpgrade: PostUpgradePreV1,
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.9.0": {
Expand All @@ -164,7 +164,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.9.1": {
Expand All @@ -177,7 +177,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.9.4": {
Expand All @@ -190,7 +190,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v0.10.0": {
Expand All @@ -203,7 +203,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v1.0.0-alpha.1": {
Expand All @@ -216,7 +216,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v1.0.0-alpha.3": {
Expand All @@ -229,7 +229,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServices(paymentsEnvVars)
return paymentsServices(p.Name(), paymentsEnvVars)
},
},
"v1.0.0-alpha.6": {
Expand All @@ -242,7 +242,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServicesSplitted(paymentsEnvVars)
return paymentsServicesSplitted(p.Name(), paymentsEnvVars)
},
},
"v1.0.0-beta.3": {
Expand All @@ -266,7 +266,7 @@ func (p module) Versions() map[string]modules.Version {
},
},
Services: func(ctx modules.ReconciliationConfig) modules.Services {
return paymentsServicesSplitted(paymentsEnvVars)
return paymentsServicesSplitted(p.Name(), paymentsEnvVars)
},
},
}
Expand All @@ -291,14 +291,15 @@ func paymentsEnvVars(resolveContext modules.ContainerResolutionConfiguration) mo
}

func paymentsServices(
serviceName string,
env func(resolveContext modules.ContainerResolutionConfiguration) modules.ContainerEnv,
) modules.Services {
return modules.Services{{
InjectPostgresVariables: true,
HasVersionEndpoint: true,
ListenEnvVar: "LISTEN",
ExposeHTTP: modules.DefaultExposeHTTP,
NeedTopic: true,
Topics: &modules.Topics{Name: serviceName},
Liveness: modules.LivenessLegacy,
Container: func(resolveContext modules.ContainerResolutionConfiguration) modules.Container {
return modules.Container{
Expand All @@ -314,6 +315,7 @@ func paymentsServices(
}

func paymentsServicesSplitted(
serviceName string,
env func(resolveContext modules.ContainerResolutionConfiguration) modules.ContainerEnv,
) modules.Services {
return modules.Services{
Expand Down Expand Up @@ -349,8 +351,8 @@ func paymentsServicesSplitted(
Name: "transfer-initiations-read",
},
},
NeedTopic: true,
Liveness: modules.LivenessLegacy,
Topics: &modules.Topics{Name: serviceName},
Liveness: modules.LivenessLegacy,
Container: func(resolveContext modules.ContainerResolutionConfiguration) modules.Container {
return modules.Container{
Args: []string{"api", "serve"},
Expand Down Expand Up @@ -389,8 +391,8 @@ func paymentsServicesSplitted(
Name: "bank-accounts-write",
},
},
NeedTopic: true,
Liveness: modules.LivenessLegacy,
Topics: &modules.Topics{Name: serviceName},
Liveness: modules.LivenessLegacy,
Container: func(resolveContext modules.ContainerResolutionConfiguration) modules.Container {
return modules.Container{
Args: []string{"connectors", "serve"},
Expand Down
8 changes: 6 additions & 2 deletions components/operator/internal/modules/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ type ExposeHTTP struct {
Name string
}

type Topics struct {
Name string
}

type Path struct {
Path string
Methods []string
Expand Down Expand Up @@ -327,7 +331,7 @@ type Service struct {
Secrets func(resolveContext ServiceInstallConfiguration) Secrets
Container func(resolveContext ContainerResolutionConfiguration) Container
InitContainer func(resolveContext ContainerResolutionConfiguration) []Container
NeedTopic bool
Topics *Topics
Replicas *int32

EnvPrefix string
Expand Down Expand Up @@ -366,7 +370,7 @@ func (r *serviceReconciler) prepare() {
func (r *serviceReconciler) reconcile(ctx context.Context, config ServiceInstallConfiguration) error {

// TODO: Use a job
if r.Configuration.Spec.Broker.Nats != nil && r.service.NeedTopic {
if r.Configuration.Spec.Broker.Nats != nil && r.service.Topics != nil {
r.configureNats()
}

Expand Down

0 comments on commit efe5d0d

Please sign in to comment.