Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Add Kafka support for workflow notifications #4323

Open
2 tasks done
devictr opened this issue Oct 30, 2023 · 7 comments
Open
2 tasks done

[Core feature] Add Kafka support for workflow notifications #4323

devictr opened this issue Oct 30, 2023 · 7 comments
Assignees
Labels
azure Used to group issues related to support for Azure backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request

Comments

@devictr
Copy link

devictr commented Oct 30, 2023

Motivation: Why do you think this is important?

Notifications only support AWS and GCP for the moment. My company is on Azure and would benefit from having notifications as well.
It seems like Flyte uses gizmo for this, which only supports AWS, GCP, Kafka and HTTP.

Adding Kafka support would benefit Azure deployments, but also on-premise or other deployments that already have Kafka setups internally.

Goal: What should the final outcome look like, ideally?

I want to be able to configure Kafka for workflow notifications.

Describe alternatives you've considered

Alternatives: HTTP support from gizmo. This feature doesn't exclude HTTP support from being added in the future.

Propose: Link/Inline OR Additional context

Code:

switch config.Type {
case common.AWS:
sqsConfig := gizmoAWS.SQSConfig{
QueueName: config.NotificationsProcessorConfig.QueueName,
QueueOwnerAccountID: config.NotificationsProcessorConfig.AccountID,
// The AWS configuration type uses SNS to SQS for notifications.
// Gizmo by default will decode the SQS message using Base64 decoding.
// However, the message body of SQS is the SNS message format which isn't Base64 encoded.
ConsumeBase64: &enable64decoding,
}
if config.AWSConfig.Region != "" {
sqsConfig.Region = config.AWSConfig.Region
} else {
sqsConfig.Region = config.Region
}
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
sub, err = gizmoAWS.NewSubscriber(sqsConfig)
if err != nil {
logger.Warnf(context.TODO(), "Failed to initialize new gizmo aws subscriber with config [%+v] and err: %v", sqsConfig, err)
}
return err
})
if err != nil {
panic(err)
}
emailer = GetEmailer(config, scope)
return implementations.NewProcessor(sub, emailer, scope)
case common.GCP:
projectID := config.GCPConfig.ProjectID
subscription := config.NotificationsProcessorConfig.QueueName
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
sub, err = gizmoGCP.NewSubscriber(context.TODO(), projectID, subscription)
if err != nil {
logger.Warnf(context.TODO(), "Failed to initialize new gizmo gcp subscriber with config [ProjectID: %s, Subscription: %s] and err: %v", projectID, subscription, err)
}
return err
})
if err != nil {
panic(err)
}
emailer = GetEmailer(config, scope)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
return implementations.NewSandboxProcessor(msgChan, emailer)
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop notifications processor implementation for config type [%s]", config.Type)

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@devictr devictr added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Oct 30, 2023
@Future-Outlier
Copy link
Member

I added the sandbox email publisher 3 months ago.
If you need help, feel free to collaborate with me.
PR Link: flyteorg/flyteadmin#595

@devictr
Copy link
Author

devictr commented Oct 30, 2023

Thanks @Future-Outlier!

@devictr
Copy link
Author

devictr commented Oct 30, 2023

@pingsutw, any preference as to where to store Kafka offsets? Since this is flyteadmin, maybe I can create a table in postgres for that?

@devictr
Copy link
Author

devictr commented Oct 31, 2023

Actually, seems like we use Sarama behind the scenes, which supports auto commit and is enabled by default: https://github.com/IBM/sarama/blob/main/config.go#L427

@eapolinario eapolinario removed the untriaged This issues has not yet been looked at by the Maintainers label Nov 2, 2023
@eapolinario eapolinario added the backlogged For internal use. Reserved for contributor team workflow. label Nov 2, 2023
@eapolinario eapolinario added the azure Used to group issues related to support for Azure label Aug 19, 2024
@davidmirror-ops
Copy link
Contributor

with gizmo on maintenance mode do we still consider following their http route?

@eapolinario
Copy link
Contributor

We talked offline about this and the conclusion is that we're not married to gizmo. We'd welcome a PR that brings in another dependency to implement Azure workflow notifications.

@Sovietaced
Copy link
Contributor

We have an implementation of this internally that we could potentially upstream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
azure Used to group issues related to support for Azure backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request
Projects
Status: Assigned
Development

No branches or pull requests

6 participants