Skip to content

Commit

Permalink
feat: adds support for TLS configuration options in exotic jetstream …
Browse files Browse the repository at this point in the history
…EventBus settings (#3381)

Signed-off-by: fullykubed <[email protected]>
  • Loading branch information
fullykubed authored Dec 15, 2024
1 parent 393e133 commit 8513b4a
Show file tree
Hide file tree
Showing 21 changed files with 1,267 additions and 836 deletions.
1 change: 1 addition & 0 deletions USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Organizations below are **officially** using Argo Events. Please send a PR with
1. [MariaDB](https://mariadb.com/)
1. [Mobimeo GmbH](https://mobimeo.com/en/home/)
1. [OneCause](https://www.onecause.com/)
1. [Panfactum](https://panfactum.com/)
1. [Pinnacle Reliability](https://pinnaclereliability.com/)
1. [PDOK](https://pdok.nl)
1. [Phrase](https://www.phrase.com/)
Expand Down
4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,10 @@
"streamConfig": {
"type": "string"
},
"tls": {
"$ref": "#/definitions/io.argoproj.events.v1alpha1.TLSConfig",
"description": "SSL/TLS settings for the NATS client"
},
"url": {
"description": "JetStream (Nats) URL",
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -11332,6 +11332,26 @@ Secret for auth

</tr>

<tr>

<td>

<code>tls</code></br> <em> <a href="#argoproj.io/v1alpha1.TLSConfig">
TLSConfig </a> </em>
</td>

<td>

<em>(Optional)</em>
<p>

SSL/TLS settings for the NATS client
</p>

</td>

</tr>

</tbody>

</table>
Expand Down Expand Up @@ -19815,6 +19835,7 @@ TLSConfig
<a href="#argoproj.io/v1alpha1.BitbucketServerEventSource">BitbucketServerEventSource</a>,
<a href="#argoproj.io/v1alpha1.EmitterEventSource">EmitterEventSource</a>,
<a href="#argoproj.io/v1alpha1.HTTPTrigger">HTTPTrigger</a>,
<a href="#argoproj.io/v1alpha1.JetStreamConfig">JetStreamConfig</a>,
<a href="#argoproj.io/v1alpha1.KafkaBus">KafkaBus</a>,
<a href="#argoproj.io/v1alpha1.KafkaEventSource">KafkaEventSource</a>,
<a href="#argoproj.io/v1alpha1.KafkaTrigger">KafkaTrigger</a>,
Expand Down
1,676 changes: 865 additions & 811 deletions pkg/apis/events/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/apis/events/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/apis/events/v1alpha1/jetstream_eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,7 @@ type JetStreamConfig struct {
AccessSecret *corev1.SecretKeySelector `json:"accessSecret,omitempty" protobuf:"bytes,2,opt,name=accessSecret"`
// +optional
StreamConfig string `json:"streamConfig,omitempty" protobuf:"bytes,3,opt,name=streamConfig"`
// SSL/TLS settings for the NATS client
// +optional
TLS *TLSConfig `json:"tls,omitempty" protobuf:"bytes,4,opt,name=tls"`
}
8 changes: 7 additions & 1 deletion pkg/apis/events/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/events/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/eventbus/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetEventSourceDriver(ctx context.Context, eventBusConfig v1alpha1.BusConfig
}
dvr = stansource.NewSourceSTAN(eventBusConfig.NATS.URL, *eventBusConfig.NATS.ClusterID, eventSourceName, defaultSubject, auth, logger)
case v1alpha1.EventBusJetStream:
dvr, err = jetstreamsource.NewSourceJetstream(eventBusConfig.JetStream.URL, eventSourceName, eventBusConfig.JetStream.StreamConfig, auth, logger) // don't need to pass in subject because subjects will be derived from dependencies
dvr, err = jetstreamsource.NewSourceJetstream(eventBusConfig.JetStream.URL, eventSourceName, eventBusConfig.JetStream.StreamConfig, auth, logger, eventBusConfig.JetStream.TLS) // don't need to pass in subject because subjects will be derived from dependencies
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func GetSensorDriver(ctx context.Context, eventBusConfig v1alpha1.BusConfig, sen
dvr = stansensor.NewSensorSTAN(eventBusConfig.NATS.URL, *eventBusConfig.NATS.ClusterID, sensorSpec.Name, auth, logger)
return dvr, nil
case v1alpha1.EventBusJetStream:
dvr, err = jetstreamsensor.NewSensorJetstream(eventBusConfig.JetStream.URL, sensorSpec, eventBusConfig.JetStream.StreamConfig, auth, logger) // don't need to pass in subject because subjects will be derived from dependencies
dvr, err = jetstreamsensor.NewSensorJetstream(eventBusConfig.JetStream.URL, sensorSpec, eventBusConfig.JetStream.StreamConfig, auth, logger, eventBusConfig.JetStream.TLS) // don't need to pass in subject because subjects will be derived from dependencies
return dvr, err
case v1alpha1.EventBusKafka:
dvr = kafkasensor.NewKafkaSensor(eventBusConfig.Kafka, sensorSpec, hostname, logger)
Expand Down
18 changes: 15 additions & 3 deletions pkg/eventbus/jetstream/base/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type Jetstream struct {
url string
auth *eventbuscommon.Auth
tls *v1alpha1.TLSConfig

MgmtConnection JetstreamConnection

Expand All @@ -24,10 +25,11 @@ type Jetstream struct {
Logger *zap.SugaredLogger
}

func NewJetstream(url string, streamSettings string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger) (*Jetstream, error) {
func NewJetstream(url string, streamSettings string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger, tls *v1alpha1.TLSConfig) (*Jetstream, error) {
js := &Jetstream{
url: url,
auth: auth,
tls: tls,
Logger: logger,
streamSettings: streamSettings,
}
Expand Down Expand Up @@ -67,9 +69,19 @@ func (stream *Jetstream) MakeConnection() (*JetstreamConnection, error) {
conn.NATSConnected = true
log.Info("Reconnected to NATS server")
}),
nats.Secure(&tls.Config{
}

if stream.tls != nil {
tlsConfig, err := sharedutil.GetTLSConfig(stream.tls)
if err != nil {
return nil, err
}
opts = append(opts, nats.Secure(tlsConfig))
log.Info("Client-side TLS configuration enabled on the NATS connection")
} else {
opts = append(opts, nats.Secure(&tls.Config{
InsecureSkipVerify: true,
}),
}))
}

switch stream.auth.Strategy {
Expand Down
5 changes: 3 additions & 2 deletions pkg/eventbus/jetstream/eventsource/source_jetstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventsource

import (
"github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
eventbuscommon "github.com/argoproj/argo-events/pkg/eventbus/common"
jetstreambase "github.com/argoproj/argo-events/pkg/eventbus/jetstream/base"
"go.uber.org/zap"
Expand All @@ -11,8 +12,8 @@ type SourceJetstream struct {
eventSourceName string
}

func NewSourceJetstream(url, eventSourceName string, streamConfig string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger) (*SourceJetstream, error) {
baseJetstream, err := jetstreambase.NewJetstream(url, streamConfig, auth, logger)
func NewSourceJetstream(url, eventSourceName string, streamConfig string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger, tls *v1alpha1.TLSConfig) (*SourceJetstream, error) {
baseJetstream, err := jetstreambase.NewJetstream(url, streamConfig, auth, logger, tls)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/eventbus/jetstream/eventsource/source_jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNewSourceJetstream(t *testing.T) {
logger := zap.NewExample().Sugar()

auth := &common.Auth{}
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger)
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger, nil)
assert.NotNil(t, sourceJetstream)
assert.Nil(t, err)
}
Expand All @@ -29,7 +29,7 @@ func TestSourceJetstream_Connect(t *testing.T) {
logger := zap.NewExample().Sugar()

auth := &common.Auth{}
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger)
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger, nil)
assert.NotNil(t, sourceJetstream)
assert.Nil(t, err)

Expand All @@ -44,7 +44,7 @@ func TestSourceJetstream_Initialize_Failure(t *testing.T) {
auth := &common.Auth{
Strategy: v1alpha1.AuthStrategyNone,
}
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger)
sourceJetstream, err := NewSourceJetstream(testURL, testEventSource, testStreamConfig, auth, logger, nil)
assert.NotNil(t, sourceJetstream)
assert.Nil(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/eventbus/jetstream/sensor/sensor_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type SensorJetstream struct {
keyValueStore nats.KeyValue
}

func NewSensorJetstream(url string, sensorSpec *v1alpha1.Sensor, streamConfig string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger) (*SensorJetstream, error) {
func NewSensorJetstream(url string, sensorSpec *v1alpha1.Sensor, streamConfig string, auth *eventbuscommon.Auth, logger *zap.SugaredLogger, tls *v1alpha1.TLSConfig) (*SensorJetstream, error) {
if sensorSpec == nil {
errStr := SensorNilError
logger.Errorf(errStr)
return nil, fmt.Errorf(errStr)
}

baseJetstream, err := eventbusjetstreambase.NewJetstream(url, streamConfig, auth, logger)
baseJetstream, err := eventbusjetstreambase.NewJetstream(url, streamConfig, auth, logger, tls)
if err != nil {
return nil, err
}
Expand Down
61 changes: 56 additions & 5 deletions pkg/reconciler/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"fmt"
"sort"

"github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
aev1 "github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
controllerscommon "github.com/argoproj/argo-events/pkg/reconciler/common"
sharedutil "github.com/argoproj/argo-events/pkg/shared/util"
"github.com/imdario/mergo"
"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
Expand All @@ -17,11 +21,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
aev1 "github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
controllerscommon "github.com/argoproj/argo-events/pkg/reconciler/common"
sharedutil "github.com/argoproj/argo-events/pkg/shared/util"
)

// AdaptorArgs are the args needed to create a sensor deployment
Expand Down Expand Up @@ -222,14 +221,29 @@ func buildDeployment(args *AdaptorArgs, eventBus *aev1.EventBus) (*appv1.Deploym

var secretObjs []interface{}
var accessSecret *corev1.SecretKeySelector
var caCertSecret *corev1.SecretKeySelector
var clientCertSecret *corev1.SecretKeySelector
var clientKeySecret *corev1.SecretKeySelector
switch {
case eventBus.Status.Config.NATS != nil:
caCertSecret = nil
clientCertSecret = nil
clientKeySecret = nil
accessSecret = eventBus.Status.Config.NATS.AccessSecret
secretObjs = []interface{}{eventSourceCopy}
case eventBus.Status.Config.JetStream != nil:
tlsOptions := eventBus.Status.Config.JetStream.TLS
if tlsOptions != nil {
caCertSecret = tlsOptions.CACertSecret
clientCertSecret = tlsOptions.ClientCertSecret
clientKeySecret = tlsOptions.ClientKeySecret
}
accessSecret = eventBus.Status.Config.JetStream.AccessSecret
secretObjs = []interface{}{eventSourceCopy}
case eventBus.Status.Config.Kafka != nil:
caCertSecret = nil
clientCertSecret = nil
clientKeySecret = nil
accessSecret = nil
secretObjs = []interface{}{eventSourceCopy, eventBus} // kafka requires secrets for sasl and tls
default:
Expand Down Expand Up @@ -259,6 +273,43 @@ func buildDeployment(args *AdaptorArgs, eventBus *aev1.EventBus) (*appv1.Deploym
})
}

uniqueCertVolumeMap := make(map[string][]corev1.KeyToPath)
for _, secret := range []*corev1.SecretKeySelector{caCertSecret, clientCertSecret, clientKeySecret} {
if secret != nil {
uniqueCertVolumeMap[secret.Name] = append(uniqueCertVolumeMap[secret.Name], corev1.KeyToPath{
Key: secret.Key,
Path: secret.Key,
})
}
}

// We deduplicate the certificate secret mounts to ensure every secret under the TLS config is only mounted once
// because the secrets MUST be mounted at /argo-events/secrets/<secret-name>
// in order for util.GetTLSConfig to work without modification
for secretName, items := range uniqueCertVolumeMap {
// The names of volumes MUST be valid DNS_LABELs; as the secret names are user-supplied,
// we perform some input cleansing to ensure they conform
volumeName := sharedutil.ConvertToDNSLabel(secretName)

optional := false

volumes = append(volumes, corev1.Volume{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
Items: items,
Optional: &optional,
},
},
})

volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: volumeName,
MountPath: fmt.Sprintf("/argo-events/secrets/%s", secretName),
})
}

// secrets
volSecrets, volSecretMounts := sharedutil.VolumesFromSecretsOrConfigMaps(v1alpha1.SecretKeySelectorType, secretObjs...)
volumes = append(volumes, volSecrets...)
Expand Down
Loading

0 comments on commit 8513b4a

Please sign in to comment.