Skip to content

Commit 2fb456e

Browse files
authored
Merge pull request #203 from PasinduDissMrYum/main
Add pod override fields affinity and tolerations
2 parents 73a517f + 263498e commit 2fb456e

File tree

7 files changed

+57
-7
lines changed

7 files changed

+57
-7
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations
89

910
## 5.2.0 - 2024-08-08
1011

README.adoc

+20-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,25 @@ contexts:
144144
# optional: nodeSelector to add to the pod
145145
nodeSelector:
146146
key: value
147+
148+
# optional: affinity to add to the pod
149+
affinity:
150+
# note: other types of affinity also supported
151+
nodeAffinity:
152+
requiredDuringSchedulingIgnoredDuringExecution:
153+
nodeSelectorTerms:
154+
- matchExpressions:
155+
- key: "<key>"
156+
operator: "<operator>"
157+
values: [ "<value>" ]
158+
159+
# optional: tolerations to add to the pod
160+
tolerations:
161+
- key: "<key>"
162+
operator: "<operator>"
163+
value: "<value>"
164+
effect: "<effect>"
165+
147166
# optional: clientID config (defaults to kafkactl-{username})
148167
clientID: my-client-id
149168
@@ -624,7 +643,7 @@ Producing protobuf message converted from JSON:
624643
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
625644
----
626645

627-
A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.
646+
A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.
628647

629648
For example, if you have the following protobuf definition (`complex.proto`):
630649

internal/common-operation.go

+14
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ type TLSConfig struct {
6161
Insecure bool
6262
}
6363

64+
type K8sToleration struct {
65+
Key string `json:"key" yaml:"key"`
66+
Operator string `json:"operator" yaml:"operator"`
67+
Value string `json:"value" yaml:"value"`
68+
Effect string `json:"effect" yaml:"effect"`
69+
}
70+
6471
type K8sConfig struct {
6572
Enabled bool
6673
Binary string
@@ -74,6 +81,8 @@ type K8sConfig struct {
7481
Labels map[string]string
7582
Annotations map[string]string
7683
NodeSelector map[string]string
84+
Affinity map[string]any
85+
Tolerations []K8sToleration
7786
}
7887

7988
type ConsumerConfig struct {
@@ -174,6 +183,11 @@ func CreateClientContext() (ClientContext, error) {
174183
context.Kubernetes.Labels = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.labels")
175184
context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations")
176185
context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector")
186+
context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity")
187+
188+
if err := viper.UnmarshalKey("contexts."+context.Name+".kubernetes.tolerations", &context.Kubernetes.Tolerations); err != nil {
189+
return context, err
190+
}
177191

178192
return context, nil
179193
}

internal/consumergroupoffsets/consumer-group-offset-operation.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type ConsumerGroupOffsetOperation struct {
2929

3030
func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags ResetConsumerGroupOffsetFlags, groupName string) error {
3131

32-
if (flags.Topic == nil || len(flags.Topic) == 0) && (!flags.AllTopics) {
32+
if (len(flags.Topic) == 0) && (!flags.AllTopics) {
3333
return errors.New("no topic specified")
3434
}
3535

internal/k8s/executor.go

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type executor struct {
3737
labels map[string]string
3838
annotations map[string]string
3939
nodeSelector map[string]string
40+
affinity map[string]any
41+
tolerations []internal.K8sToleration
4042
}
4143

4244
const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789"
@@ -111,6 +113,8 @@ func newExecutor(context internal.ClientContext, runner Runner) *executor {
111113
labels: context.Kubernetes.Labels,
112114
annotations: context.Kubernetes.Annotations,
113115
nodeSelector: context.Kubernetes.NodeSelector,
116+
affinity: context.Kubernetes.Affinity,
117+
tolerations: context.Kubernetes.Tolerations,
114118
runner: runner,
115119
}
116120
}

internal/k8s/pod_overrides.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package k8s
22

3+
import "github.com/deviceinsight/kafkactl/v5/internal"
4+
35
type imagePullSecretType struct {
46
Name string `json:"name"`
57
}
@@ -10,9 +12,11 @@ type metadataType struct {
1012
}
1113

1214
type specType struct {
13-
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
14-
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
15-
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
15+
ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"`
16+
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
17+
NodeSelector *map[string]string `json:"nodeSelector,omitempty"`
18+
Affinity *map[string]any `json:"affinity,omitempty"`
19+
Tolerations *[]internal.K8sToleration `json:"tolerations,omitempty"`
1620
}
1721

1822
type PodOverrideType struct {
@@ -29,7 +33,7 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
2933
var override PodOverrideType
3034
override.APIVersion = "v1"
3135

32-
if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 {
36+
if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 || len(kubectl.affinity) > 0 || len(kubectl.tolerations) > 0 {
3337
override.Spec = &specType{}
3438

3539
if kubectl.serviceAccount != "" {
@@ -44,6 +48,14 @@ func (kubectl *executor) createPodOverride() PodOverrideType {
4448
if len(kubectl.nodeSelector) > 0 {
4549
override.Spec.NodeSelector = &kubectl.nodeSelector
4650
}
51+
52+
if len(kubectl.affinity) > 0 {
53+
override.Spec.Affinity = &kubectl.affinity
54+
}
55+
56+
if len(kubectl.tolerations) > 0 {
57+
override.Spec.Tolerations = &kubectl.tolerations
58+
}
4759
}
4860

4961
if len(kubectl.labels) > 0 || len(kubectl.annotations) > 0 {

internal/producer/producer-operation.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
213213
}
214214

215215
if inputMessage, err = inputParser.ParseLine(line); err != nil {
216-
return failWithMessageCount(messageCount, err.Error())
216+
return failWithMessageCount(messageCount, err.Error()) //nolint:govet
217217
}
218218

219219
messageCount++

0 commit comments

Comments
 (0)