Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ certs
.DS_Store

.claude
.config
34 changes: 25 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,31 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
GOWORK=off GO111MODULE=on $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths=./api/... paths=./internal/... paths=./cmd/...

# source secret.env && make start-sample-workflow TEMPORAL_CLOUD_API_KEY=$TEMPORAL_CLOUD_API_KEY
.PHONY: start-sample-workflow
.SILENT: start-sample-workflow
start-sample-workflow: ## Start a sample workflow.
@$(TEMPORAL) workflow start --type "HelloWorld" --task-queue "default/helloworld" \
--tls-cert-path certs/client.pem \
--tls-key-path certs/client.key \
--address "worker-controller-test.a2dd6.tmprl.cloud:7233" \
-n "worker-controller-test.a2dd6"
# --address replay-2025.ktasd.tmprl.cloud:7233 \
# --api-key $(TEMPORAL_CLOUD_API_KEY)
@set -e; \
# Load env vars from skaffold.env if present so address/namespace aren't hardcoded
if [ -f skaffold.env ]; then set -a; . skaffold.env; set +a; fi; \
if [ -n "$$TEMPORAL_API_KEY" ]; then \
$(TEMPORAL) workflow start --type "HelloWorld" --task-queue "default/helloworld" \
--address "$$TEMPORAL_ADDRESS" \
-n "$$TEMPORAL_NAMESPACE"; \
else \
$(TEMPORAL) workflow start --type "HelloWorld" --task-queue "default/helloworld" \
--tls-cert-path certs/client.pem \
--tls-key-path certs/client.key \
--address "$$TEMPORAL_ADDRESS" \
-n "$$TEMPORAL_NAMESPACE"; \
fi

.PHONY: apply-load-sample-workflow
.SILENT: apply-load-sample-workflow
apply-load-sample-workflow: ## Start a sample workflow every 15 seconds
watch --interval 0.1 -- $(TEMPORAL) workflow start --type "HelloWorld" --task-queue "default/helloworld"
@while true; do \
$(MAKE) -s start-sample-workflow; \
sleep 15; \
done

.PHONY: list-workflow-build-ids
list-workflow-build-ids: ## List workflow executions and their build IDs.
Expand Down Expand Up @@ -300,6 +311,11 @@ create-cloud-mtls-secret:
--cert=certs/client.pem \
--key=certs/client.key

.PHONY: create-api-key-secret
create-api-key-secret:
kubectl create secret generic temporal-api-key --namespace default \
--from-file=api-key=certs/api-key.txt

##### Checks #####
goimports: fmt-imports $(GOIMPORTS)
@printf $(COLOR) "Run goimports for all files..."
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/temporalconnection_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type TemporalConnectionSpec struct {
// https://kubernetes.io/docs/concepts/configuration/secret/#tls-secrets
// +optional
MutualTLSSecretRef *SecretReference `json:"mutualTLSSecretRef,omitempty"`

// APIKeyRef is the name of the Secret that contains the API key. The secret must be `type: kubernetes.io/opaque` and exist
// in the same Kubernetes namespace as the TemporalConnection resource.
// +optional
APIKeyRef *SecretReference `json:"apiKeyRef,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything else we need to communicate about the expected format of the secret? Like that the API Key should be stored under the key "api-key".

	clientOpts.Credentials = sdkclient.NewAPIKeyDynamicCredentials(func(ctx context.Context) (string, error) {
		return string(secret.Data["api-key"]), nil
	})

}

// TemporalConnectionStatus defines the observed state of TemporalConnection
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ spec:
type: object
spec:
properties:
apiKeyRef:
properties:
name:
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$
type: string
required:
- name
type: object
hostPort:
pattern: ^[a-zA-Z0-9.-]+:[0-9]+$
type: string
Expand Down
221 changes: 165 additions & 56 deletions internal/controller/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,35 @@
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type AuthMode string

const (
AuthModeTLS AuthMode = "TLS"
AuthModeAPIKey AuthMode = "API_KEY"
AuthModeInMemory AuthMode = "IN_MEMORY"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does IN_MEMORY mean?

// Add more auth modes here as they are supported
)

type ClientPoolKey struct {
HostPort string
Namespace string
MutualTLSSecret string // Include secret name in key to invalidate cache when the secret name changes
HostPort string
Namespace string
SecretName string // Include secret name in key to invalidate cache when the secret name changes
AuthMode AuthMode // Include auth mode in key to invalidate cache when the auth mode changes for the secret
}

type MTLSAuth struct {
tlsConfig *tls.Config
expiryTime time.Time // cert NotAfter - buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is unclear to me -- would you mind elaborating?

}

type ClientAuth struct {
mode AuthMode
mTLS *MTLSAuth // non-nil when mode == AuthMTLS, nil when mode == AuthAPIKey
}

type ClientInfo struct {
client sdkclient.Client
tls *tls.Config // Storing the TLS config associated with the client to check certificate expiration. If the certificate is expired, a new client will be created.
expiryTime time.Time // Effective expiration time (cert.NotAfter - buffer) for efficient expiration checking
client sdkclient.Client
auth ClientAuth
}

type ClientPool struct {
Expand All @@ -48,7 +67,7 @@
}
}

func (cp *ClientPool) GetSDKClient(key ClientPoolKey, withMTLS bool) (sdkclient.Client, bool) {
func (cp *ClientPool) GetSDKClient(key ClientPoolKey) (sdkclient.Client, bool) {
cp.mux.RLock()
defer cp.mux.RUnlock()

Expand All @@ -57,9 +76,9 @@
return nil, false
}

if withMTLS {
if key.AuthMode == AuthModeTLS {
// Check if any certificate is expired
expired, err := isCertificateExpired(info.expiryTime)
expired, err := isCertificateExpired(info.auth.mTLS.expiryTime)
if err != nil {
cp.logger.Error("Error checking certificate expiration", "error", err)
return nil, false
Expand All @@ -79,56 +98,41 @@
Spec v1alpha1.TemporalConnectionSpec
}

func (cp *ClientPool) UpsertClient(ctx context.Context, opts NewClientOptions) (sdkclient.Client, error) {
func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (sdkclient.Client, error) {

clientOpts := sdkclient.Options{
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
// TODO(jlegrone): Make API Keys work
}

var pemCert []byte
var expiryTime time.Time

// Get the connection secret if it exists
if opts.Spec.MutualTLSSecretRef != nil {
var secret corev1.Secret
if err := cp.k8sClient.Get(ctx, types.NamespacedName{
Name: opts.Spec.MutualTLSSecretRef.Name,
Namespace: opts.K8sNamespace,
}, &secret); err != nil {
return nil, err
}
if secret.Type != corev1.SecretTypeTLS {
err := fmt.Errorf("secret %s must be of type kubernetes.io/tls", secret.Name)
return nil, err
}

// Extract the certificate to calculate the effective expiration time
pemCert = secret.Data["tls.crt"]
// Extract the certificate to calculate the effective expiration time
pemCert = secret.Data["tls.crt"]

// Check if certificate is expired before creating the client
exp, err := calculateCertificateExpirationTime(pemCert, 5*time.Minute)
if err != nil {
return nil, fmt.Errorf("failed to check certificate expiration: %v", err)
}
expired, err := isCertificateExpired(exp)
if err != nil {
return nil, fmt.Errorf("failed to check certificate expiration: %v", err)
}
if expired {
return nil, fmt.Errorf("certificate is expired or is going to expire soon")
}
// Check if certificate is expired before creating the client
exp, err := calculateCertificateExpirationTime(pemCert, 5*time.Minute)
if err != nil {
return nil, fmt.Errorf("failed to check certificate expiration: %v", err)
}
expired, err := isCertificateExpired(exp)
if err != nil {
return nil, fmt.Errorf("failed to check certificate expiration: %v", err)
}
if expired {
return nil, fmt.Errorf("certificate is expired or is going to expire soon")

Check failure on line 125 in internal/controller/clientpool/clientpool.go

View workflow job for this annotation

GitHub Actions / golangci

use-errors-new: replace fmt.Errorf by errors.New (revive)
}

cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"])
if err != nil {
return nil, err
}
clientOpts.ConnectionOptions.TLS = &tls.Config{
Certificates: []tls.Certificate{cert},
}
expiryTime = exp
cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"])
if err != nil {
return nil, err
}
clientOpts.ConnectionOptions.TLS = &tls.Config{
Certificates: []tls.Certificate{cert},
}
expiryTime = exp

c, err := sdkclient.Dial(clientOpts)
if err != nil {
Expand All @@ -142,24 +146,129 @@
cp.mux.Lock()
defer cp.mux.Unlock()

var mutualTLSSecret string
if opts.Spec.MutualTLSSecretRef != nil {
mutualTLSSecret = opts.Spec.MutualTLSSecretRef.Name
key := ClientPoolKey{
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
SecretName: opts.Spec.MutualTLSSecretRef.Name,
AuthMode: AuthModeTLS,
}
cp.clients[key] = ClientInfo{
client: c,
auth: ClientAuth{
mode: AuthModeTLS,
mTLS: &MTLSAuth{tlsConfig: clientOpts.ConnectionOptions.TLS, expiryTime: expiryTime},
},
}

return c, nil
}

func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts NewClientOptions) (sdkclient.Client, error) {
clientOpts := sdkclient.Options{
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: &tls.Config{},
},
}

clientOpts.Credentials = sdkclient.NewAPIKeyDynamicCredentials(func(ctx context.Context) (string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the secret object stay in memory? what is the benefit of using DynamicCredentials instead of just a string

return string(secret.Data["api-key"]), nil
})

c, err := sdkclient.Dial(clientOpts)
if err != nil {
return nil, err
}

cp.mux.Lock()
defer cp.mux.Unlock()

key := ClientPoolKey{
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
SecretName: opts.Spec.APIKeyRef.Name,
AuthMode: AuthModeAPIKey,
}
cp.clients[key] = ClientInfo{
client: c,
auth: ClientAuth{
mode: AuthModeAPIKey,
mTLS: nil,
},
}

return c, nil
}

func (cp *ClientPool) fetchClientUsingInMemoryConnection(opts NewClientOptions) (sdkclient.Client, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on calling this "Unsafe" or "NoCredentials" instead of "InMemory"?
"InMemory" feels like the wrong term to me because all of these clients and connections live "in memory"

And using no mTLS or API Key auth is not exclusive to the in-memory server, it works for any server that is running without credential-based auth

clientOpts := sdkclient.Options{
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
}

c, err := sdkclient.Dial(clientOpts)
if err != nil {
return nil, err
}

key := ClientPoolKey{
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
MutualTLSSecret: mutualTLSSecret,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
SecretName: "", // no secret name for in-memory connection
AuthMode: AuthModeInMemory,
}
cp.clients[key] = ClientInfo{
client: c,
tls: clientOpts.ConnectionOptions.TLS,
expiryTime: expiryTime,
client: c,
auth: ClientAuth{
mode: AuthModeInMemory,
mTLS: nil,
},
}

return c, nil
}

func (cp *ClientPool) UpsertClient(ctx context.Context, secretName string, authMode AuthMode, opts NewClientOptions) (sdkclient.Client, error) {

// Fetch the secret from k8s cluster, if it exists. Otherwise, use an in-memory connection to the server.
var secret corev1.Secret
if secretName != "" {
if err := cp.k8sClient.Get(ctx, types.NamespacedName{
Name: secretName,
Namespace: opts.K8sNamespace,
}, &secret); err != nil {
return nil, err
}
}

// Check the secret type
switch authMode {
case AuthModeTLS:
if secret.Type != corev1.SecretTypeTLS {
err := fmt.Errorf("secret %s must be of type kubernetes.io/tls", secret.Name)
return nil, err
}
return cp.fetchClientUsingMTLSSecret(secret, opts)

case AuthModeAPIKey:
if secret.Type != corev1.SecretTypeOpaque {
err := fmt.Errorf("secret %s must be of type kubernetes.io/opaque", secret.Name)
return nil, err
}
return cp.fetchClientUsingAPIKeySecret(secret, opts)

case AuthModeInMemory:
return cp.fetchClientUsingInMemoryConnection(opts)

default:
return nil, fmt.Errorf("invalid auth mode: %s", authMode)
}

}

func (cp *ClientPool) Close() {
cp.mux.Lock()
defer cp.mux.Unlock()
Expand Down
Loading
Loading