From 962e79eff3bf1977e3e302b34a1a32070d87e7bb Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 12:40:02 +0200 Subject: [PATCH 01/20] Let's see it --- using-mirrord/queue-splitting.md | 97 ++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 31 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index ab8f90c..1bbc16b 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -16,67 +16,92 @@ description: Sharing queues by splitting messages between multiple clients and t # Queue Splitting -If your application consumes messages from a queue service, you should choose a configuration that matches your intention: +If your application consumes messages from a message broker (e.g. Kafka cluster), you should choose a configuration that matches your intention: -1. Running your application with mirrord without any special configuration will result in your local application competing with the remote target (and potentially other mirrord runs by teammates) for queue messages. -2. Running your application with [`copy_target` + `scale_down`](copy-target.md#replacing-a-whole-deployment-using-scale_down) will result in the deployed application not consuming any messages, and your local application being the exclusive consumer of queue messages. -3. If you want to control which messages will be consumed by the deployed application, and which ones will reach your local application, set up queue splitting for the relevant target, and define a messages filter in the mirrord configuration. Messages that match the filter will reach your local application, and messages that do not, will reach either the deployed application, or another teammate's local application, if they match their filter. +1. If you're ok with your local application competing for queue messages with the remote target, and with your teammates' mirrord sessions — run the application with mirrord without any special configuration. +2. If you want your local application to be an exclusive consumer of queue messages — run the application with [`copy_target` + `scale_down`](copy-target.md#replacing-a-whole-deployment-using-scale_down) features. +3. If you want to precisely control which messages will be consumed by your local application — run the application with the queue splitting feature. The allows you define a message filter in your mirrord configuration. All messages matching that filter will be redirected by the mirrord operator to your local application. Other messages will **not** reach your local application. {% hint style="info" %} This feature is only relevant for users on the Team and Enterprise pricing plans. {% endhint %} -**NOTE:** So far queue splitting is available for [Amazon SQS](https://aws.amazon.com/sqs/) and [Kafka](https://kafka.apache.org/). Pretty soon we'll support RabbitMQ as well. +{% hint style="info" %} +So far queue splitting is available for [Amazon SQS](https://aws.amazon.com/sqs/) and [Kafka](https://kafka.apache.org/). Pretty soon we'll support RabbitMQ as well. +{% endhint %} + +## How It Works -### How It Works +When a Kafka splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue or topic. +That temporary queue/topic is *exclusive* to the target workload, and its name is randomized. +Similarly, the local application is redirected to consume messages from its own *exclusive* temporary queue or topic. + +{% hint style="warning" %} +In both cases, the redirections are done by manipulating environment variables. +For this reason, queue splitting always requires that the application reads queue or topic name from environment variables. +This is a prerequisite. +{% endhint %} -#### SQS Splitting +Once all temporary topics or queues are prepared, the mirrord operator starts consuming messages from the original queue or topic, and publishing them to the correct temporary one. +This routing is based on message filters provided by the users in their mirrord configs. -When an SQS splitting session starts, the operator changes the target workload to consume messages from a different, temporary queue created by the operator. The operator also creates a temporary queue that the local application reads from. +{% tabs %} -So if we have a consumer app reading messages from a queue: +{% tab title="Amazon SQS" %} + +First, we have a consumer app reading messages from an SQS queue: ![A K8s application that consumes messages from an SQS queue](queue-splitting/before-splitting-sqs.svg) -After a mirrord SQS splitting session starts, the setup will change to this: +Then, the first mirrord SQS splitting session starts. Two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), +and the mirrord operator routes messages according to the user's filter (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): ![One SQS splitting session](queue-splitting/1-user-sqs.svg) -The operator will consume messages from the original queue, and try to match their attributes with filter defined by the user in the mirrord configuration file (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). A message that matches the filter will be sent to the queue consumed by the local application. Other messages will be sent to the queue consumed by the remote application. - -And as soon as a second mirrord SQS splitting session starts, the operator will create another temporary queue for the new local app: +Then, another mirrord SQS splitting session starts. The third temporary queue is created (for the second user's local application). +The mirrord operator includes the new queue and the second user's filter in the routing logic. ![Two SQS splitting sessions](queue-splitting/2-users-sqs.svg) -The users' filters will be matched in the order of the start of their sessions. If filters defined by two users both match a message, the message will go to whichever user started their session first. +If the filters defined by the two users both match some message, it is not defined which one of the users will receive that message. -After a mirrord session ends, the operator will delete the temporary queue that was created for that session. When all sessions that split a certain queue end, the mirrord Operator will wait for the deployed application to consume the remaining messages in its temporary queue, and then delete that temporary queue as well, and change the deployed application to consume messages back from the original queue. +{% endtab %} -#### Kafka Splitting +{% tab title="Kafka" %} -When a Kafka splitting session starts, the operator changes the target workload to consume messages from a different, temporary topic created by the operator in the same Kafka cluster. The operator also creates a temporary topic that the local application reads from. - -So if we have a consumer app reading messages from a topic: +First, we have a consumer app reading messages from a Kafka topic: ![A K8s application that consumes messages from a Kafka topic](queue-splitting/before-splitting-kafka.svg) -After a mirrord Kafka splitting session starts, the setup will change to this: +Then, the first mirrord Kafka splitting session starts. Two temporary topics are created (one for the target deployed in the cluster, one for the user's local application), +and the mirrord operator routes messages according to the user's filter (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): ![One Kafka splitting session](queue-splitting/1-user-kafka.svg) -The operator will consume messages from the original topic (using the same consumer group id as the target workload), and try to match their headers with filter defined by the user in the mirrord configuration file (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). A message that matches the filter will be sent to the topic consumed by the local application. Other messages will be sent to the topic consumed by the remote application. - -And as soon as a second mirrord Kafka splitting session starts, the operator will create another temporary queue for the new local app: +Then, another mirrord Kafka splitting session starts. The third temporary topic is created (for the second user's local application). +The mirrord operator includes the new topic and the second user's filter in the routing logic. ![Two Kafka splitting sessions](queue-splitting/2-users-kafka.svg) -The users' filters will be matched in the order of the start of their sessions. If filters defined by two users both match a message, the message will go to whichever user started their session first. +If the filters defined by the two users both match some message, it is not defined which one of the users will receive that message. -After a mirrord session ends, the operator will delete the temporary topic that was created for that session. When all sessions that split a certain topic end, the mirrord Operator will change the deployed application to consume messages back from the original topic and delete the temporary topic as well. +{% endtab %} -### Getting Started with SQS Splitting +{% endtabs %} -#### Enabling SQS Splitting in Your Cluster +Temporary queues and topics are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources. + +Plese note that: +1. Temporary queues and topics created for the deployed targets will not be deleted as long as there are any targets' pods that use them. +2. In case of SQS splitting, deployed targets will remain redirected as long as their temporary queues have unconsumed messages. + + +## Enabling SQS Splitting in Your Cluster + +{% stepper %} +{% step %} + +### Preparing mirrord Operator's IAM Role In order to use the SQS splitting feature, some extra values need be provided during the installation of the mirrord Operator. @@ -175,7 +200,16 @@ Any temporary queues created by mirrord are created with the same policy as the However, if the workload gets its access to the queue by an IAM policy (and not an SQS policy, see [SQS docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-using-identity-based-policies.html#sqs-using-sqs-and-iam-policies)) that grants access to that specific queue by its exact name, you would have to add a policy that would allow that workload to also read from new temporary queues created by mirrord on the run. -#### Creating a Queue Registry +{% endstep %} +{% step %} + +### Configuring SQS Splitting in the Helm Chart +Step 2 text + +{% endstep %} +{% step %} + +### Creating the Queue Registry On operator installation, a new [`CustomResources`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type was created on your cluster: `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs, can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. After an SQS-enabled operator is installed, and before you can start splitting queues, a resource of that type must be created for the target you want to run against, in the target's namespace. @@ -218,9 +252,10 @@ spec: * `spec.consumer` is the workload that consumes these queues. The queues specified above will be split whenever that workload is targeted. * `container` is optional, when set - this queue registry only applies to runs that target that container. -### Getting Started with Kafka Splitting +{% endstep %} +{% endstepper %} -#### Enabling Kafka Splitting in Your Cluster +## Enabling Kafka Splitting in Your Cluster In order to use the Kafka splitting feature, some extra values need be provided during the installation of the mirrord Operator. @@ -414,7 +449,7 @@ The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK} * `{{FALLBACK}}` will resolve either to `-fallback-` or `-` literal. * `{{ORIGINAL_TOPIC}}` will resolve to the name of the original topic that is being split. -### Setting a Filter for a mirrord Run +## Setting a Filter for a mirrord Run Once everything else is set, you can start using message filters in your mirrord configuration file. Below is an example for what such a configuration might look like: From 556af6aef200cc503a335bb601b9aa28515f5edb Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 14:09:50 +0200 Subject: [PATCH 02/20] Moar --- using-mirrord/queue-splitting.md | 57 +++++++++++++++----------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 1bbc16b..cb5f948 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -101,20 +101,23 @@ Plese note that: {% stepper %} {% step %} -### Preparing mirrord Operator's IAM Role +### Enable SQS splitting in the Helm chart -In order to use the SQS splitting feature, some extra values need be provided during the installation of the mirrord Operator. +Enable the `operator.sqsSplitting` setting in the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/blob/main/mirrord-operator/values.yaml). -First of all, the SQS splitting feature needs to be enabled: +{% endstep %} +{% step %} -* When installing with the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/tree/main/mirrord-operator) it is enabled by setting the [`operator.sqsSplitting`](https://github.com/metalbear-co/charts/blob/06efc8666bd26ff7f3a0863333ea4a109aaa6b62/mirrord-operator/values.yaml#L22) [value](https://helm.sh/docs/chart_template_guide/values_files/) to `true`. -* When installing via the `mirrord operator setup` command, set the `--sqs-splitting` flag. +### Authenticate and authorize the mirrord operator with SQS -When SQS splitting is enabled during installation, some additional resources are created, and the SQS component of the mirrord Operator is started. +The mirrord operator will need to be able to do some operations on the SQS queues on your behalf. +To do this, it will build an SQS client, using the default credentials provider chain. -Additionally, the operator needs to be able to do some operations on SQS queues in your account. For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. Please follow [AWS's documentation on how to do that](https://docs.aws.amazon.com/eks/latest/userguide/associate-service-account-role.html). +The easiest way to provide the crendentials for the operator is with IAM role assumption. +For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. Please follow [AWS's documentation on how to do that](https://docs.aws.amazon.com/eks/latest/userguide/associate-service-account-role.html). Note that operator's service account can be annotated with the IAM role's ARN with the `sa.roleArn` setting in the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/blob/main/mirrord-operator/values.yaml). -Some of the permissions are needed for your actual queues that you would like to split, and some permissions are only needed for the temporary queues the mirrord Operator creates and later deletes. Here is an overview: +Some of the SQS permissions are needed for your actual queues that you would like to split, and some permissions are only needed for the temporary queues, managed by the operator. +Here is an overview: | SQS Permission | needed for your queues | needed for temporary queues | | ------------------ | :--------------------: | :-------------------------: | @@ -128,17 +131,17 @@ Some of the permissions are needed for your actual queues that you would like to | SendMessage | | ✓ | | DeleteQueue | | ✓ | -Here we provide a short explanation for each required permission. +And here we provide a short explanation for each required permission: -* `sqs:GetQueueUrl`: the operator finds queue names to split in the provided source, and then it fetches the URL from SQS in order to make all other API calls. -* `sqs:GetQueueAttributes`: the operator gives all temporary queues the same attributes as their corresponding original queue, so it needs permission to get the original queue's attributes. It also reads the attributes of temporary queues it created, in order to check how many messages they have approximately. -* `sqs:ListQueueTags`: the operator queries your queue's tags, in order to give all temporary queues that are created for that queue the same tags. -* `sqs:ReceiveMessage`: the mirrord Operator will read messages from queues you want to split. -* `sqs:DeleteMessage`: after reading a message and forwarding it to a temporary queue, the operator deletes it. -* `sqs:CreateQueue`: the mirrord Operator will create temporary queues in your SQS account. -* `sqs:TagQueue`: all the queues mirrord creates will be tagged with all the tags of their respective original queues, plus any tags that are configured for them in the `MirrordWorkloadQueueRegistry` in which they are declared. -* `sqs:SendMessage`: mirrord will send the messages it reads from an original queue to the temporary queue of the client whose filter matches it, or to the temporary queue the deployed application reads from. -* `sqs:DeleteQueue`: when a user session is done, mirrord will delete the temporary queue it created for that session. After all sessions that split a certain queue end, also the temporary queue that is for the deployed application is deleted. +* `sqs:GetQueueUrl`: the operator finds queue names to split in the provided source, and then it fetches the URL from SQS in order to make other API calls. +* `sqs:GetQueueAttributes`: the operator queries your queue's attributes, in order to clone these attributes to all derived temporary queues. It also reads the attributes of the temporary queues, in order to check the number of remaining messages. +* `sqs:ListQueueTags`: the operator queries your queue's tags, in order to clone these tags to all derived temporary queues. +* `sqs:ReceiveMessage`: the operator reads messages from the queues you split. +* `sqs:DeleteMessage`: after reading a message and forwarding it to a temporary queue, the operator deletes the message from the split queue. +* `sqs:CreateQueue`: the operator creates temporary queues in your SQS account. +* `sqs:TagQueue`: the operator sets tags on the temporary queues. +* `sqs:SendMessage`: the operator sends messages to the temporary queues. +* `sqs:DeleteQueue`: the operator deletes stale temporary queues in the background. This is an example for a policy that gives the operator's roles the minimal permissions it needs to split a queue called `ClientUploads`: @@ -179,7 +182,7 @@ This is an example for a policy that gives the operator's roles the minimal perm Instead of specifying the queues you would like to be able to split in the first statement, you could alternatively make that statement apply for all resources in the account, and limit the queues it applies to using conditions instead of resource names. For example, you could add a condition that makes the statement only apply to queues with the tag `splittable=true` or `env=dev` etc. and set those tags for all queues you would like to allow the operator to split. * The second statement in the example gives the role the permissions it needs for the temporary queues. Since all the temporary queues created by mirrord are created with the name prefix `mirrord-`, that statement in the example is limited to resources with that prefix in their name. - If you would like to limit the second statement with conditions instead of (only) with the resource name, you can [set a condition that requires a tag](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-abac-tagging-resource-control.html), and in the `MirrordWorkloadQueueRegistry` resource you can specify for each queue tags that mirrord will set for temporary queues that it creates for that original queue. + If you would like to limit the second statement with conditions instead of (only) with the resource name, you can [set a condition that requires a tag](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-abac-tagging-resource-control.html), and in the `MirrordWorkloadQueueRegistry` resource you can specify for each queue tags that mirrord will set for temporary queues that it creates for that original queue (see [relevant section](queue-splitting.md#create-the-queue-registry)). If the queue messages are encrypted, the operator's IAM role should also have the following permissions: @@ -187,12 +190,10 @@ If the queue messages are encrypted, the operator's IAM role should also have th * `kms:Decrypt` * `kms:GenerateDataKey` -The ARN of the IAM role has to be passed when installing the operator. - -* When installing with Helm, the ARN is passed via the `sa.roleArn` value -* When installing via the `mirrord operator setup` command, use the `--aws-role-arn` flag. +{% endstep %} +{% step %} -#### Permissions for Target Workloads +### Permissions for Target Workloads In order to be targeted with SQS queue splitting, a workload has to be able to read from queues that are created by mirrord. @@ -203,13 +204,7 @@ However, if the workload gets its access to the queue by an IAM policy (and not {% endstep %} {% step %} -### Configuring SQS Splitting in the Helm Chart -Step 2 text - -{% endstep %} -{% step %} - -### Creating the Queue Registry +### Create the queue registry On operator installation, a new [`CustomResources`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type was created on your cluster: `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs, can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. After an SQS-enabled operator is installed, and before you can start splitting queues, a resource of that type must be created for the target you want to run against, in the target's namespace. From 4c41a0956094da46ed19a65b67f534437432f176 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 14:21:45 +0200 Subject: [PATCH 03/20] Authorize deployed consumers --- using-mirrord/queue-splitting.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index cb5f948..d9f1e0a 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -193,13 +193,15 @@ If the queue messages are encrypted, the operator's IAM role should also have th {% endstep %} {% step %} -### Permissions for Target Workloads +### Authorize deployed consumers -In order to be targeted with SQS queue splitting, a workload has to be able to read from queues that are created by mirrord. +In order to be targeted with SQS splitting, a deployed consumer must be able to use the temporary queues created by mirrord. +E.g. if the consumer application retrieves the queue's URL based on its name, lists queue's tags, consumes and deletes messages from the queue — it must be able to do the same on a temporary queue. -Any temporary queues created by mirrord are created with the same policy as the original queues they are splitting (with the single change of the queue name in the policy), so if a queue has a policy that allows the target workload to call `ReceiveMessage` on it, that is enough. +Any temporary queues managed by mirrord are created with the same policy as the original queues they are splitting (with the single change of updating the queue name in the policy). +Therefore, access control based on SQS policies should automatically be taken care of. -However, if the workload gets its access to the queue by an IAM policy (and not an SQS policy, see [SQS docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-using-identity-based-policies.html#sqs-using-sqs-and-iam-policies)) that grants access to that specific queue by its exact name, you would have to add a policy that would allow that workload to also read from new temporary queues created by mirrord on the run. +However, if the consumer's access to the queue is controlled by an IAM policy (and not an SQS policy, see [SQS docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-using-identity-based-policies.html#sqs-using-sqs-and-iam-policies)), you will need to adjust it. {% endstep %} {% step %} From 7e75108ca7ca74a39ff569694c9f7c8c95b816ab Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 16:02:16 +0200 Subject: [PATCH 04/20] Queue registry finished --- using-mirrord/queue-splitting.md | 72 ++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index d9f1e0a..d731555 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -208,16 +208,24 @@ However, if the consumer's access to the queue is controlled by an IAM policy (a ### Create the queue registry -On operator installation, a new [`CustomResources`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type was created on your cluster: `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs, can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. After an SQS-enabled operator is installed, and before you can start splitting queues, a resource of that type must be created for the target you want to run against, in the target's namespace. +On operator installation with `operator.sqsSplitting` enabled, a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster — `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. +Before you can run sessions with SQS splitting, you must create a queue registry for the desired target. +This is because the queue registry contains additional application context required by the mirrord operator. +For example, the operator needs to know which environment variables contain the names of the SQS queues to split. -Below we have an example for such a resource, for a meme app that consumes messages from two queues: +See an example queue registry defined for a deployment `meme-app` living in namespace `meme`: ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha kind: MirrordWorkloadQueueRegistry metadata: name: meme-app-q-registry + namespace: meme spec: + consumer: + name: meme-app + workloadType: Deployment + container: main queues: meme-queue: queueType: SQS @@ -231,23 +239,53 @@ spec: envVar: AD_QUEUE_NAME tags: tool: mirrord - consumer: - name: meme-app - container: main - workloadType: Deployment ``` -* `spec.queues` holds queues that should be split when running mirrord with this target. It is a mapping from a queue ID to the details of the queue. - * The queue ID is chosen by you, and will be used by every teammate who wishes to filter messages from this queue. You can choose any string for that, it does not have to be the same as the name of the queue. In the example above the first queue has the queue id `meme-queue` and the second one `ad-queue`. - * `nameSource` tells mirrord where the app finds the name of this queue. - * Currently `envVar` is the only supported source for the queue name. The value of `envVar` is the name of the - environment variable the app reads the queue name from. That environment variable could be one that has a value - directly in the spec, or it could get its value from a ConfigMap via `valueFrom` or `envFrom`. It is crucial that - both the local and the deployed app use the queue name they find in that environment variable. mirrord changes the - value of that environment variable in order to make the application read from a temporary queue it creates. - * `tags` is an optional field where you can specify queue tags that should be added to all temporary queues mirrord creates for splitting this queue. -* `spec.consumer` is the workload that consumes these queues. The queues specified above will be split whenever that workload is targeted. - * `container` is optional, when set - this queue registry only applies to runs that target that container. +The registry above says that: +1. It provides context for container `main` running in deployment `meme-app` in namespace `meme`. +2. The container consumes two SQS queues. Their names are read from environment variables `INCOMING_MEME_QUEUE_NAME` and `AD_QUEUE_NAME`. +3. The SQS queues can be referenced in a mirrord config under IDs `meme-queue` and `ad-queue`, respectively. +4. When creating a temporary queue derived from either of the two queues, mirrord operator should add the tag `tool=mirrord`. + +#### Link the registry to the deployed consumer + +The queue registry is a namespaced resource, so it can only reference a consumer deployed in the same namespace. +The reference is specified with `spec.consumer`: +* `name` — name of the Kubernetes workload of the deployed consumer. +* `workloadType` — type of the Kubernetes workload of the deployed consumer. Right now only consumers deployed in deployments and rollouts are supported. +* `container` — name of the exact container running in the workload. This field is optional. If you omit it, the registry will reference all of the workload's containers. + +#### Desribe consumed queues in the registry + +The queue registry describes SQS queues consumed by the referenced consumer. +The queues are described in entries of the `spec.queues` object. + +The entry's key can be arbitrary, as it will only be referenced from the user's mirrord config +(compare with the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). + +The entry's value is an object describing single or multiple SQS queues consumed by the workload: + +* `nameSource` describes which environment variables contain names/URLs of the consumed queues. Either `envVar` or `regexPattern` field is required. + * `envVar` stores a name of a single environment variables. + * `regexPattern` selects multiple environment variables based on a regular expression. +* `fallbackName` stores an optional fallback name/URL, in case `nameSource` is not found in the workload spec. + `nameSource` will still be used to inject the name/URL of the temporary queue. +* `namesFromJsonMap` specifies how to process the values of environment variables that contain queue names/URLs. + If set to `true`, values of all variables of will be parsed as JSON objects with string values. All values in these objects will be treated as queue names/URLs. + If set to `false`, values of all variables will be treated directly as queue names/URLs. + Defaults to `false`. +* `tags` specifies additional tags to be set on all created temporary queues. +* `sns` specifies whether the queues contains SQS messages created from SNS notifications. + If set to `true`, message bodies will be parsed and matched against users' filters, + as SNS notification attributes are found in the SQS message body. + If set to `false`, message attributes will be used matched against users' filters. + Defaults to `false`. + +{% hint style="warning" %} +The mirrord operator can only read consumer's environment variables if they are either: +1. defined directly in the workload's pod template, with value defined in `value` or in `valueFrom` with a config map reference; or +2. loaded from config maps using `envFrom`. +{% endhint %} {% endstep %} {% endstepper %} From 61e39ca2c775e62ba4795a449bd56dc9a6788e8c Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 16:06:55 +0200 Subject: [PATCH 05/20] .. --- using-mirrord/queue-splitting.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index d731555..721c6e6 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -283,7 +283,7 @@ The entry's value is an object describing single or multiple SQS queues consumed {% hint style="warning" %} The mirrord operator can only read consumer's environment variables if they are either: -1. defined directly in the workload's pod template, with value defined in `value` or in `valueFrom` with a config map reference; or +1. defined directly in the workload's pod template, with the value defined in `value` or in `valueFrom` via config map reference; or 2. loaded from config maps using `envFrom`. {% endhint %} From d5d039ba23c236b580ff39510a21d0ac3ed9ece0 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 17:10:51 +0200 Subject: [PATCH 06/20] mirrord.json configuration --- using-mirrord/queue-splitting.md | 157 +++++++++++++++++++------------ 1 file changed, 98 insertions(+), 59 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 721c6e6..c2e10af 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -147,33 +147,33 @@ This is an example for a policy that gives the operator's roles the minimal perm ```json { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "sqs:GetQueueUrl", - "sqs:GetQueueAttributes", - "sqs:ListQueueTags", - "sqs:ReceiveMessage", - "sqs:DeleteMessage" - ], - "Resource": [ - "arn:aws:sqs:eu-north-1:314159265359:ClientUploads" - ] - }, - { - "Effect": "Allow", - "Action": [ - "sqs:CreateQueue", - "sqs:TagQueue", - "sqs:SendMessage", - "sqs:GetQueueAttributes", - "sqs:DeleteQueue" - ], - "Resource": "arn:aws:sqs:eu-north-1:314159265359:mirrord-*" - } - ] + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "sqs:GetQueueUrl", + "sqs:GetQueueAttributes", + "sqs:ListQueueTags", + "sqs:ReceiveMessage", + "sqs:DeleteMessage" + ], + "Resource": [ + "arn:aws:sqs:eu-north-1:314159265359:ClientUploads" + ] + }, + { + "Effect": "Allow", + "Action": [ + "sqs:CreateQueue", + "sqs:TagQueue", + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:DeleteQueue" + ], + "Resource": "arn:aws:sqs:eu-north-1:314159265359:mirrord-*" + } + ] } ``` @@ -486,47 +486,86 @@ The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK} ## Setting a Filter for a mirrord Run -Once everything else is set, you can start using message filters in your mirrord configuration file. Below is an example for what such a configuration might look like: +Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. +[`feature.split_queues`](https://app.gitbook.com/s/Z7vBpFMZTH8vUGJBGRZ4/options#feature.split_queues) is the configuration field they need to specify in order to filter queue messages. +Directly under it, mirrord expects a mapping from a queue or topic ID to a queue filter definition. + +Filter definition contains two fields: +* `queue_type` — `SQS` or `Kafka` +* `message_filter` — mapping from message attribute (SQS) or header (Kafka) name to a regex for its value. + The local application will only see queue messages that have **all** of the specified message attributes/headers. + +{% hint style="info" %} +Empty `message_filter` is treated as a match-none directive. +{% endhint %} + +See example configurations below: + +{% tabs %} + +{% tab title="SQS and Kafka" %} ```json { - "operator": true, - "target": "deployment/meme-app/main", - "feature": { - "split_queues": { - "meme-queue": { - "queue_type": "SQS", - "message_filter": { - "author": "^me$", - "level": "^(beginner|intermediate)$" - } - }, - "ad-queue": { - "queue_type": "SQS", - "message_filter": {} - }, - "views-topic": { - "queue_type": "Kafka", - "message_filter": { - "author": "^me$", - "source": "^my-session-" - } - } + "operator": true, + "target": "deployment/meme-app/container/main", + "feature": { + "split_queues": { + "meme-queue": { + "queue_type": "SQS", + "message_filter": { + "author": "^me$", + "level": "^(beginner|intermediate)$" + } + }, + "ad-queue": { + "queue_type": "SQS", + "message_filter": {} + }, + "views-topic": { + "queue_type": "Kafka", + "message_filter": { + "author": "^me$", + "source": "^my-session-" } + } } + } } ``` -* [`feature.split_queues`](https://app.gitbook.com/s/Z7vBpFMZTH8vUGJBGRZ4/options#feature.split_queues) is the configuration field you need to specify in order to filter queue messages. Directly under it, we have a mapping from a queue or topic ID to a queue filter definition. - * Queue or topic ID is the ID that was set in the [SQS queue registry resource](queue-splitting.md#creating-a-queue-registry) or [Kafka topics consumer resource](queue-splitting.md#creating-a-topics-registry). - * `message_filter` is a mapping from message attribute (SQS) or header (Kafka) names to message attribute or header value regexes. Your local application will only see queue messages that have **all** of the specified message attributes or headers. +In the example above, the local application: + +* Will receive a subset of messages from SQS queues desribed in the registry under ID `meme-queue`. + All received messages will have an attribute `author` with the value `me`, AND an attribute `level` with value either `beginner` or `intermediate`. +* Will receive no messages from SQS queues described in the registry under ID `ad-queue`. +* Will receive a subset of messages from Kafka topic with ID `views-topic`. + All received messages will have an attribute `author` with the value `me`, AND an attribute `source` with value starting with `my-session-` (e.g `my-session-844cb78789-2fmsw`). - Empty `message_filter` is treated as a match-none directive. -In the example above, the local application: +{% endtab %} +{% tab title="SQS with wildcard" %} + +```json +{ + "operator": true, + "target": "deployment/meme-app/container/main", + "feature": { + "split_queues": { + "*": { + "queue_type": "SQS", + "message_filter": { + "author": "^me$", + } + }, + } + } +} +``` -* Will receive a subset of messages from SQS queue with ID `meme-queue`. All received messages will have an attribute `author` with the value `me`, AND an attribute `level` with value either `beginner` or `intermediate`. -* Will receive a subset of messages from Kafka topic with ID `views-topic`. All received messages will have an attribute `author` with the value `me`, AND an attribute `source` with value starting with `my-session-` (e.g `my-session-844cb78789-2fmsw`). -* Will receive no messages from SQS queue with id `ad-queue`. +In the example above, the local application will receive a subset of message from **all** SQS queues described in the registry. +All received messages will have an attribute `author` with the value `me`. +`*` is a special queue ID for SQS queues, and resolves to all queues described in the registry. -Once all users stop filtering a queue (i.e. end their mirrord sessions), the temporary queues (SQS) and topics (Kafka) that mirrord operator created will be deleted. +{% endtab %} +{% endtabs %} From 2aa9de0a149158493b10ac2888065d8d3526bc67 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 17:20:38 +0200 Subject: [PATCH 07/20] .. --- using-mirrord/queue-splitting.md | 76 ++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index c2e10af..dc7dd50 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -108,7 +108,7 @@ Enable the `operator.sqsSplitting` setting in the [mirrord-operator Helm chart]( {% endstep %} {% step %} -### Authenticate and authorize the mirrord operator with SQS +### Authenticate and authorize the mirrord operator The mirrord operator will need to be able to do some operations on the SQS queues on your behalf. To do this, it will build an SQS client, using the default credentials provider chain. @@ -292,14 +292,57 @@ The mirrord operator can only read consumer's environment variables if they are ## Enabling Kafka Splitting in Your Cluster -In order to use the Kafka splitting feature, some extra values need be provided during the installation of the mirrord Operator. +{% stepper %} +{% step %} + +### Enable Kafka splitting in the Helm chart + +Enable the `operator.kafkaSplitting` setting in the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/blob/main/mirrord-operator/values.yaml). + +{% endstep %} +{% step %} + +### Create the Kafka client configuration + +{% endstep %} +{% step %} + +### Authorize deployed consumers + +{% endstep %} +{% step %} + +### Create the topics consumer resource + +{% endstep %} +{% endstepper %} + +### Additional Options + +#### Customizing Temporary Kafka Topic Names + +{% hint style="info" %} +Available since chart version `1.27` and operator version `3.114.0`. +{% endhint %} -First of all, the Kafka splitting feature needs to be enabled: +To serve Kafka splitting sessions, mirrord operator creates temporary topics in the Kafka cluster. The default format for their names is as follows: -* When installing with the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/tree/main/mirrord-operator) it is enabled by setting the [`operator.kafkaSplitting`](https://github.com/metalbear-co/charts/blob/06efc8666bd26ff7f3a0863333ea4a109aaa6b62/mirrord-operator/values.yaml#L24) [value](https://helm.sh/docs/chart_template_guide/values_files/) to `true`. -* When installing via the `mirrord operator setup` command, set the `--kafka-splitting` flag. +* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback topic (unfiltered messages, consumed by the deployed workload). +* `mirrord-tmp-0987654321-original-topic` - for the user topics (filtered messages, consumed by local applications running with mirrord). + +Note that the random digits will be unique for each temporary topic created by the operator. -When Kafka splitting is enabled during installation, some additional resources are created, and the Kafka component of the mirrord Operator is started. +You can adjust the format of the created topic names to suit your needs (RBAC, Security, Policies, etc.), +using the `OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT` environment variable of the mirrord operator, +or `operator.kafkaSplittingTopicFormat` helm chart value. The default value is: + +`mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL_TOPIC}}` + +The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK}}` and `{{ORIGINAL_TOPIC}}`. + +* `{{RANDOM}}` will resolve to random digits. +* `{{FALLBACK}}` will resolve either to `-fallback-` or `-` literal. +* `{{ORIGINAL_TOPIC}}` will resolve to the name of the original topic that is being split. #### Configuring Kafka Splitting with Custom Resources @@ -463,27 +506,6 @@ When this kind is specified, additional properties are automatically merged into > _**NOTE:**_ By default, the operator will only have access to secrets in its own namespace (`mirrord` by default). -#### Customizing mirrord created Kafka Topic Names - -> _**NOTE:**_ Available since chart version `1.27` and operator version 3.114.0 - -To serve Kafka splitting sessions, mirrord Operator creates temporary topics in the Kafka cluster. The default format for their names is as follows: - -* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback topic (unfiltered messages, consumed by the deployed workload). -* `mirrord-tmp-9183298231-original-topic` - for the user topics (filtered messages, consumed by local applications running with mirrord). - -Note that the random digits will be unique for each temporary topic created by the mirrord Operator. - -You can adjust the format of the created topic names to suit your needs (RBAC, Security, Policies, etc.), using the `OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT` environment variable of the mirrord Operator, or `operator.kafkaSplittingTopicFormat` helm chart value. The default value is: - -`mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL_TOPIC}}` - -The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK}}` and `{{ORIGINAL_TOPIC}}`. - -* `{{RANDOM}}` will resolve to random digits. -* `{{FALLBACK}}` will resolve either to `-fallback-` or `-` literal. -* `{{ORIGINAL_TOPIC}}` will resolve to the name of the original topic that is being split. - ## Setting a Filter for a mirrord Run Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. From 609b384a273c412625ea09ec1b0b6e80759863ce Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 17:23:54 +0200 Subject: [PATCH 08/20] . --- using-mirrord/queue-splitting.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index dc7dd50..1ed378e 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -206,7 +206,7 @@ However, if the consumer's access to the queue is controlled by an IAM policy (a {% endstep %} {% step %} -### Create the queue registry +### Provide application context On operator installation with `operator.sqsSplitting` enabled, a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster — `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. Before you can run sessions with SQS splitting, you must create a queue registry for the desired target. @@ -312,7 +312,7 @@ Enable the `operator.kafkaSplitting` setting in the [mirrord-operator Helm chart {% endstep %} {% step %} -### Create the topics consumer resource +### Provide application context {% endstep %} {% endstepper %} @@ -518,7 +518,7 @@ Filter definition contains two fields: The local application will only see queue messages that have **all** of the specified message attributes/headers. {% hint style="info" %} -Empty `message_filter` is treated as a match-none directive. +An empty `message_filter` is treated as a match-none directive. {% endhint %} See example configurations below: From ffc392891d963b2484922c4ceabed9008ad85fdb Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 25 Aug 2025 17:31:17 +0200 Subject: [PATCH 09/20] Change title --- using-mirrord/queue-splitting.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 1ed378e..60427fa 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -302,7 +302,7 @@ Enable the `operator.kafkaSplitting` setting in the [mirrord-operator Helm chart {% endstep %} {% step %} -### Create the Kafka client configuration +### Configure the operator's Kafka client {% endstep %} {% step %} From dc15a1099ec0a95a9f08ef330b7c64d23f8ad595 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Tue, 26 Aug 2025 16:02:40 +0200 Subject: [PATCH 10/20] Finished --- using-mirrord/queue-splitting.md | 326 +++++++++++++++++++++++-------- 1 file changed, 248 insertions(+), 78 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 60427fa..f371e67 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -208,10 +208,12 @@ However, if the consumer's access to the queue is controlled by an IAM policy (a ### Provide application context -On operator installation with `operator.sqsSplitting` enabled, a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster — `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. +On operator installation with `operator.sqsSplitting` enabled, a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) +type is defined in your cluster — `MirrordWorkloadQueueRegistry`. Users with permissions to get CRDs can verify its existence +with `kubectl get crd mirrordworkloadqueueregistries.queues.mirrord.metalbear.co`. Before you can run sessions with SQS splitting, you must create a queue registry for the desired target. This is because the queue registry contains additional application context required by the mirrord operator. -For example, the operator needs to know which environment variables contain the names of the SQS queues to split. +For example, the operator needs to know which environment variables contain the names of the SQS queues to split. See an example queue registry defined for a deployment `meme-app` living in namespace `meme`: @@ -304,64 +306,75 @@ Enable the `operator.kafkaSplitting` setting in the [mirrord-operator Helm chart ### Configure the operator's Kafka client -{% endstep %} -{% step %} +The mirrord operator will need to be able to do some operations on the Kafka cluster on your behalf. +To allow for properly configuring the operator's Kafka client, on operator installation with `operator.kafkaSplitting` enabled, +a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster +— `MirrordKafkaClientConfig`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordkafkaclientconfigs.queues.mirrord.metalbear.co`. -### Authorize deployed consumers - -{% endstep %} -{% step %} +The resource allows for specifying a list of properties for the Kafka client, like this: -### Provide application context +```yaml +apiVersion: queues.mirrord.metalbear.co/v1alpha +kind: MirrordKafkaClientConfig +metadata: + name: base-config + namespace: mirrord +spec: + properties: + - name: bootstrap.servers + value: kafka.default.svc.cluster.local:9092 + - name: client.id + value: mirrord-operator +``` -{% endstep %} -{% endstepper %} +When used by the mirrord Operator for Kafka splitting, the example below will be resolved to following `.properties` file: -### Additional Options +```properties +bootstrap.servers=kafka.default.svc.cluster.local:9092 +client.id=mirrord-operator +``` -#### Customizing Temporary Kafka Topic Names +This file will be used when creating a Kafka client for managing temporary topics, consuming messages from the original topic and producing messages to the temporary topics. Full list of available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). {% hint style="info" %} -Available since chart version `1.27` and operator version `3.114.0`. +`group.id` property will always be overwritten by mirrord Operator when resolving the `.properties` file. {% endhint %} -To serve Kafka splitting sessions, mirrord operator creates temporary topics in the Kafka cluster. The default format for their names is as follows: - -* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback topic (unfiltered messages, consumed by the deployed workload). -* `mirrord-tmp-0987654321-original-topic` - for the user topics (filtered messages, consumed by local applications running with mirrord). - -Note that the random digits will be unique for each temporary topic created by the operator. - -You can adjust the format of the created topic names to suit your needs (RBAC, Security, Policies, etc.), -using the `OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT` environment variable of the mirrord operator, -or `operator.kafkaSplittingTopicFormat` helm chart value. The default value is: - -`mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL_TOPIC}}` +{% hint style="warning" %} +`MirrordKafkaClientConfig` resources must always be created in the operator's namespace. +{% endhint %} -The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK}}` and `{{ORIGINAL_TOPIC}}`. +See [additional options](queue-splitting.md#additional-options) section for more Kafka configuration info. -* `{{RANDOM}}` will resolve to random digits. -* `{{FALLBACK}}` will resolve either to `-fallback-` or `-` literal. -* `{{ORIGINAL_TOPIC}}` will resolve to the name of the original topic that is being split. +{% endstep %} +{% step %} -#### Configuring Kafka Splitting with Custom Resources +### Authorize deployed consumers -On operator installation, new [`CustomResources`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) types were created on your cluster: `MirrordKafkaTopicsConsumer` and `MirrordKafkaClientConfig`. Users with permissions to get CRDs, can verify their existence with `kubectl get crd mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co` and `kubectl get crd mirrordkafkaclientconfigs.queues.mirrord.metalbear.co`. +In order to be targeted with Kafka splitting, a deployed consumer must be able to use the temporary topics created by mirrord. +E.g. if the consumer application describes the topic or reads messages from it — it must be able to do the same on a temporary topic. +This might require extra actions on your side to adjust the authorization, for example based on topic name prefix. See [topic names](queue-splitting.md#customizing-temporary-kafka-topic-names) section for more info. -After a Kafka-enabled operator is installed, and before you can start splitting queues, resources of these types must be created. +{% endstep %} +{% step %} -1. `MirrordKafkaTopicsConsumer` is a resource that must be created in the same namespace as the target workload. It describes Kafka topics that this workload consumes and contains instructions for the mirrord Operator on how to execture splitting. Each `MirrordKafkaTopicsConsumer` is linked to a single workload that can be targeted with a Kafka splitting session. -2. `MirrordKafkaClientConfig` is a resource that must be created in the namespace where mirrord operator is installed. It contains properties that the operator will use when creating a Kafka client used for all Kafka operations during the split. This resource is referenced by `MirrordKafkaTopicsConsumer`. +### Provide application context -**`MirrordKafkaTopicsConsumer`** +On operator installation with `operator.kafkaSplitting` enabled, +a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster +— `MirrordKafkaTopicsConsumer`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co`. +Before you can run sessions with Kafka splitting, you must create a topics consumer resource for the desired target. +This is because the topics consumer resource contains additional application context required by the mirrord operator. +For example, the operator needs to know which environment variables contain the names of the Kafka topics to split. -Below we have an example for `MirrordKafkaTopicsConsumer` resource, for a meme app that consumes messages from a Kafka topic: +See an example topics consumer resource, for a meme app that consumes messages from a Kafka topic: ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha kind: MirrordKafkaTopicsConsumer metadata: name: meme-app-topics-consumer + namespace: meme spec: consumerApiVersion: apps/v1 consumerKind: Deployment @@ -379,50 +392,83 @@ spec: variable: KAFKA_TOPIC_NAME ``` -* `spec.topics` is a list of topics that can be split when running mirrord with this target. - * The topic ID is chosen by you, and will be used by every teammate who wishes to filter messages from this topic. You can choose any string for that, it does not have to be the same as the name of the queue. In the example above the topic has id `views-topic`. - * `clientConfig` is the name of the `MirrordKafkaClientConfig` resource living in the mirrord Operator's namespace that will be used when interacting with the Kafka cluster. - * `groupIdSources` holds a list of all occurences of Kafka consumer group id in the workload's pod spec. mirrord Operator will use this group id when consuming messages from the topic. - - Currently the only supported source type is an environment variable with value defined directly in the pod spec. - * `nameSources` holds a list of all occurences of topic name in the workload's pod spec. mirrord Operator will use this name when consuming messages. It is crucial that both the local and deployed app take topic name from these sources, as mirrord Operator will use them to inject the names of temporary topics. - - Currently the only supported source type is an environment variable with value defined directly in the pod spec. +The topics consumer resource above says that: +1. It provides context for deployment `meme-app` in namespace `meme`. +2. The deployment consumes one topic. Its name is read from environment variable `KAFKA_TOPIC_NAME` in container `consumer`. +The Kafka consumer group id is read from environment variable `KAFKA_GROUP_ID` in container `consumer`. +3. The Kafka topic can be referenced in a mirrord config under ID `views-topic`. -**`MirrordKafkaClientConfig`** +#### Link the topics consumer resource to the deployed consumer -Below we have an example for `MirrordKafkaClientConfig` resource: +The topics consumer resource is namespaced, so it can only reference a Kafka consumer deployed in the same namespace. +The reference is specified with `spec.consumer*` fields, which cover api version, kind, and name of the Kubernetes workload. +For instance to configure Kafka splitting of a consumer deployed in a stateful set `kafka-notifications-worker`, you would set: ```yaml -apiVersion: queues.mirrord.metalbear.co/v1alpha -kind: MirrordKafkaClientConfig -metadata: - name: base-config - namespace: mirrord -spec: - properties: - - name: bootstrap.servers - value: kafka.default.svc.cluster.local:9092 +consumerApiVersion: apps/v1 +consumerKind: StatefulSet +consumerName: kafka-notifications-worker ``` -When used by the mirrord Operator for Kafka splitting, the example below will be resolved to following `.properties` file: +The operator supports Kafka splitting on deployments, stateful sets, and Argo rollouts. -```properties -bootstrap.servers=kafka.default.svc.cluster.local:9092 -``` +#### Desribe consumed topics in the topics consumer resource -This file will be used when creating a Kafka client for managing temporary topics, consuming messages from the original topic and producing messages to the temporary topics. Full list of available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +The topics consumer resource describes Kafka topics consumed by the referenced consumer. +The topics are described in entries of the `spec.topics` list: +* `id` can be arbitrary, as it will only be referenced from the user's mirrord config +(compare with the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). +* `clientConfig` stores the name of the `MirrordKafkaClientConfig` to use when making connections to the Kafka cluster. +* `nameSources` stores a list of all occurences of the topic name in the consumer workload's pod template. +* `groupIdSources` stores a list of all occurences of the consumer Kafka group ID in the consumer workload's pod template. +The operator will use the same group ID when consuming messages from the topic. + +{% hint style="warning" %} +The mirrord operator can only read consumer's environment variables if they are either: +1. defined directly in the workload's pod template, with the value defined in `value` or in `valueFrom` via config map reference; or +2. loaded from config maps using `envFrom`. +{% endhint %} -> _**NOTE:**_ `group.id` property will always be overwritten by mirrord Operator when resolving the `.properties` file. +{% endstep %} +{% endstepper %} + +### Additional Options + +#### Customizing Temporary Kafka Topic Names + +{% hint style="info" %} +Available since chart version `1.27` and operator version `3.114.0`. +{% endhint %} + +To serve Kafka splitting sessions, mirrord operator creates temporary topics in the Kafka cluster. The default format for their names is as follows: + +* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback topic (unfiltered messages, consumed by the deployed workload). +* `mirrord-tmp-0987654321-original-topic` - for the user topics (filtered messages, consumed by local applications running with mirrord). + +Note that the random digits will be unique for each temporary topic created by the operator. + +You can adjust the format of the created topic names to suit your needs (RBAC, Security, Policies, etc.), +using the `OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT` environment variable of the mirrord operator, +or `operator.kafkaSplittingTopicFormat` helm chart value. The default value is: + +`mirrord-tmp-{{RANDOM}}{{FALLBACK}}{{ORIGINAL_TOPIC}}` + +The provided format must contain the three variables: `{{RANDOM}}`, `{{FALLBACK}}` and `{{ORIGINAL_TOPIC}}`. + +* `{{RANDOM}}` will resolve to random digits. +* `{{FALLBACK}}` will resolve either to `-fallback-` or `-` literal. +* `{{ORIGINAL_TOPIC}}` will resolve to the name of the original topic that is being split. + +#### Reusing Kafka Client Configs -`MirrordKafkaClientConfig` resource supports property inheritance via `spec.parent` field. When resolving a resource `X` that has parent `Y`: +`MirrordKafkaClientConfig` resource supports property inheritance via `spec.parent` field. When resolving a resource `config-A` that has a parent `config-B`: -1. `Y` is resolved into a `.properties` file. -2. For each property defined in `X`: +1. `config-B` is resolved into a `.properties` file. +2. For each property defined in `config-A`: * If `value` is provided, it overrides any previous value of that property * If `value` is not provided (`null`), that property is removed -Below we have an example of two `MirrordKafkaClientConfig`s with inheritance relation: +Below we have an example of two `MirrordKafkaClientConfig`s with an inheritance relation: ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha @@ -453,23 +499,27 @@ spec: value: null ``` -When used by the mirrord Operator for Kafka splitting, the `with-client-id` below will be resolved to following `.properties` file: +When used by the mirrord operator for Kafka splitting, the `with-client-id` below will be resolved to following `.properties` file: ```properties bootstrap.servers=kafka.default.svc.cluster.local:9092 client.id=mirrord-operator ``` -`MirrordKafkaClientConfig` also supports setting properties from a Kubernetes [`Secret`](https://kubernetes.io/docs/concepts/configuration/secret/) with the `spec.loadFromSecret` field. The value for `loadFromSecret` is given in the form: `/`. +#### Configuring Kafka Clients with Secrets -Each key-value entry defined in secret's data will be included in the resulting `.properties` file. Property inheritance from the parent still occurs, and within each `MirrordKafkaClientConfig` properties loaded from the secret are overwritten by those in `properties`. +`MirrordKafkaClientConfig` also supports loading properties from a Kubernetes [`Secret`](https://kubernetes.io/docs/concepts/configuration/secret/), with the `spec.loadFromSecret` field. +The value for `spec.loadFromSecret` is given in the form: `/`. + +Each key-value entry defined in the secret's data will be included in the resulting `.properties` file. +Property inheritance from the parent still occurs, and within each `MirrordKafkaClientConfig` properties loaded from the secret are overwritten by those in `properties`. This means the priority of setting properties (from highest to lowest) is like so: -* `childProperty` -* `childSecret` -* `parentProperty` -* `parentSecret` +* child `spec.properties` +* child `spec.loadFromSecret` +* parent `spec.properties` +* parent `spec.loadFromSecret` Below is an example for a `MirrordKafkaClientConfig` resource that references a secret: @@ -484,7 +534,17 @@ spec: properties: [] ``` -For additional authentication configuration, here is an example of a `MirrordKafkaClientConfig` resource that supports IAM/OAUTHBEARER authentication with Amazon Managed Streaming for Apache Kafka: +{% hint style="info" %} +Note that by default, mirrord operator has read access only to the secrets in the operator's namespace. +{% endhint %} + +#### Configuring Custom Kafka Authentication + +For authentication methods that cannot be handled just by setting [client properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md), +we provide a separate field `spec.authenticationExtra`. The field allows for specifying custom authentication methods: + +{% tabs %} +{% tab title="MSK IAM/OAUTHBEARER" %} ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha @@ -499,12 +559,35 @@ spec: properties: [] ``` -Currently, `MSK_IAM` is the only supported value for `spec.authenticationExtra.kind`. -When this kind is specified, additional properties are automatically merged into the configuration: +The example above configures IAM/OAUTHBEARER authentication with Amazon Managed Streaming for Apache Kafka. +When the `MSK_IAM` kind is used, two additional properties are automatically merged into the configuration: 1. `sasl.mechanism=OAUTHBEARER` 2. `security.protocol=SASL_SSL` -> _**NOTE:**_ By default, the operator will only have access to secrets in its own namespace (`mirrord` by default). +To produce the authentication tokens, the operator will use the default credentials provider chain. +The easiest way to provide the crendentials for the operator is with IAM role assumption. +For that, an IAM role with an appropriate policy has to be assigned to the operator's service account. +Please follow [AWS's documentation on how to do that](https://docs.aws.amazon.com/eks/latest/userguide/associate-service-account-role.html). +Note that operator's service account can be annotated with the IAM role's ARN with the `sa.roleArn` setting in the [mirrord-operator Helm chart](https://github.com/metalbear-co/charts/blob/main/mirrord-operator/values.yaml). + +{% endtab %} +{% endtabs %} + +#### Configuring Workload Restart + +To inject the names of the temporary topics into the consumer workload, +the operator always requires the workload to be restarted. +Depending on cluster conditions, and the workload itself, this might take some time. + +`MirrordKafkaTopicsConsumer` allows for specifying two more options for this: +1. `spec.consumerRestartTimeout` — specifies how long the operator should wait, +before a new pod becomes ready, and after the workload restart is triggered. +This allows for silencing timeout errors when the workload pods take a long time to start. +Specified in seconds, defaults to 60s. +2. `spec.splitTtl` — specifies how long the consumer workload should remain patched, +after the last Kafka splitting session against it have finished. +This allows for skipping the subsequent restart in case the next Kafka splitting session +is started before the TTL elapses. Specified in seconds. ## Setting a Filter for a mirrord Run @@ -591,3 +674,90 @@ All received messages will have an attribute `author` with the value `me`. {% endtab %} {% endtabs %} + +## FAQ + +#### How do I authenticate operator's Kafka client with an SSL certificate? + +An example `MirrordKafkaClientConfig` would look as follows: + +```yaml +apiVersion: queues.mirrord.metalbear.co/v1alpha +kind: MirrordKafkaClientConfig +metadata: + name: ssl-auth + namespace: mirrord +spec: + properties: + # Contents of the PEM file with client certificate. + - name: ssl.certificate.pem + value: "..." + # Contents of the PEM file with client private key. + - name: ssl.key.pem + value: "..." + # Contents of the PEM file with CA. + - name: ssl.ca.pem + value: "..." + # Password for the client private key (if password protected). + - name: ssl.key.password + value: "..." +``` + +Alternatively, you can store the credentials in a secret, and have them loaded to the config automatically: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: mirrord-kafka-ssl + namespace: mirrord +type: Opaque +data: + ssl.certificate.pem: "..." + ssl.key.pem: "..." + ssl.ca.pem: "..." + ssl.key.password: "..." + +--- +apiVersion: queues.mirrord.metalbear.co/v1alpha +kind: MirrordKafkaClientConfig +metadata: + name: ssl-auth + namespace: mirrord +spec: + loadFromSecret: mirrord/mirrord-kafka-ssl + properties: [] +``` + +#### How do I authenticate operator's Kafka client with a Java KeyStore? + +The mirrord operator does not support direct use of JKS files. +In order to use JKS files with Kafka splitting, first extract all necessary certificates and key to PEM files. +You can do it like this: + +```sh +# Convert keystore.jks to PKCS12 format. +keytool -importkeystore \ + -srckeystore keystore.jks \ + -srcstoretype JKS \ + -destkeystore keystore.p12 \ + -deststoretype PKCS12 + +# Extract client certificate PEM from the converted keystore +openssl pkcs12 -in keystore.p12 -clcerts -nokeys -out client-cert.pem + +# Extract client private key PEM from the converted keystore. +openssl pkcs12 -in keystore.p12 -nocerts -nodes -out client-key.pem + +# Convert truststore.jks to PKCS12 format. +keytool -importkeystore \ + -srckeystore truststore.jks \ + -srcstoretype JKS \ + -destkeystore truststore.p12 \ + -deststoretype PKCS12 + +# Extract CA PEM from the converted truststore. +openssl pkcs12 -in truststore.p12 -nokeys -out ca-cert.pem +``` + +Then, follow the guide for [authenticating with an SSL certificate](queue-splitting.md#how-do-i-authenticate-operators-kafka-client-with-an-ssl-certificate). From acae6ee7284121d1eb00fe6c03181d167c6422a8 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 13:54:22 +0200 Subject: [PATCH 11/20] Some fixes --- using-mirrord/queue-splitting.md | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index f371e67..947a2b7 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -16,30 +16,29 @@ description: Sharing queues by splitting messages between multiple clients and t # Queue Splitting -If your application consumes messages from a message broker (e.g. Kafka cluster), you should choose a configuration that matches your intention: +If your application consumes messages from a queue service, you should choose a configuration that matches your intention: -1. If you're ok with your local application competing for queue messages with the remote target, and with your teammates' mirrord sessions — run the application with mirrord without any special configuration. -2. If you want your local application to be an exclusive consumer of queue messages — run the application with [`copy_target` + `scale_down`](copy-target.md#replacing-a-whole-deployment-using-scale_down) features. -3. If you want to precisely control which messages will be consumed by your local application — run the application with the queue splitting feature. The allows you define a message filter in your mirrord configuration. All messages matching that filter will be redirected by the mirrord operator to your local application. Other messages will **not** reach your local application. +1. Running your application with mirrord without any special configuration will result in your local application competing with the deployed application (and potentially other mirrord runs by teammates) for queue messages. +2. Running your application with [`copy_target` + `scale_down`](copy-target.md#replacing-a-whole-deployment-using-scale_down) will result in the deployed application not consuming any messages, and your local application being the exclusive consumer of queue messages. +3. If you want to control which messages will be consumed by the deployed application, and which ones will reach your local application, set up queue splitting for the relevant target, and define a messages filter in the mirrord configuration. Messages that match the filter will reach your local application, and messages that do not, will reach either the deployed application, or another teammate's local application, if they match their filter. {% hint style="info" %} -This feature is only relevant for users on the Team and Enterprise pricing plans. +This feature is only available for users on the Team and Enterprise pricing plans. {% endhint %} {% hint style="info" %} -So far queue splitting is available for [Amazon SQS](https://aws.amazon.com/sqs/) and [Kafka](https://kafka.apache.org/). Pretty soon we'll support RabbitMQ as well. +Queue splitting is currently available for [Amazon SQS](https://aws.amazon.com/sqs/) and [Kafka](https://kafka.apache.org/). Pretty soon we'll support RabbitMQ as well. {% endhint %} ## How It Works -When a Kafka splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue or topic. +When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue or topic. That temporary queue/topic is *exclusive* to the target workload, and its name is randomized. -Similarly, the local application is redirected to consume messages from its own *exclusive* temporary queue or topic. +Similarly, the local application is reconfigured to consume messages from its own *exclusive* temporary queue or topic. {% hint style="warning" %} In both cases, the redirections are done by manipulating environment variables. -For this reason, queue splitting always requires that the application reads queue or topic name from environment variables. -This is a prerequisite. +For this reason, queue splitting always requires that the application reads the queue or topic name from environment variables. {% endhint %} Once all temporary topics or queues are prepared, the mirrord operator starts consuming messages from the original queue or topic, and publishing them to the correct temporary one. From c0a756782f21e04a6c6839bd871b0ff112572956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Mon, 1 Sep 2025 14:05:36 +0200 Subject: [PATCH 12/20] Update using-mirrord/queue-splitting.md Co-authored-by: Eyal Bukchin --- using-mirrord/queue-splitting.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 947a2b7..4897da9 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -41,7 +41,7 @@ In both cases, the redirections are done by manipulating environment variables. For this reason, queue splitting always requires that the application reads the queue or topic name from environment variables. {% endhint %} -Once all temporary topics or queues are prepared, the mirrord operator starts consuming messages from the original queue or topic, and publishing them to the correct temporary one. +Once all temporary topics or queues are prepared, the mirrord operator starts consuming messages from the original queue or topic, and publishing them to one of the temporary queues, based on message filters provided by the users in their mirrord configs. This routing is based on message filters provided by the users in their mirrord configs. {% tabs %} From 6d7b275d00f16dcd081714471e5f4e898f564f9c Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:09:28 +0200 Subject: [PATCH 13/20] ... --- using-mirrord/queue-splitting.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 4897da9..a50be8b 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -33,7 +33,7 @@ Queue splitting is currently available for [Amazon SQS](https://aws.amazon.com/s ## How It Works When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue or topic. -That temporary queue/topic is *exclusive* to the target workload, and its name is randomized. +That temporary queue/topic is *exclusive* to the target workload. Similarly, the local application is reconfigured to consume messages from its own *exclusive* temporary queue or topic. {% hint style="warning" %} From 90ae4b58e1e7a79b00d2227eb0c0ceed8f87112b Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:10:28 +0200 Subject: [PATCH 14/20] Rephrase --- using-mirrord/queue-splitting.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index a50be8b..ae843c7 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -52,17 +52,17 @@ First, we have a consumer app reading messages from an SQS queue: ![A K8s application that consumes messages from an SQS queue](queue-splitting/before-splitting-sqs.svg) -Then, the first mirrord SQS splitting session starts. Two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), +When the first mirrord SQS splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the user's filter (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): ![One SQS splitting session](queue-splitting/1-user-sqs.svg) -Then, another mirrord SQS splitting session starts. The third temporary queue is created (for the second user's local application). +If a second user then starts a mirrord SQS splitting session on the same queue, a the third temporary queue is created (for the second user's local application). The mirrord operator includes the new queue and the second user's filter in the routing logic. ![Two SQS splitting sessions](queue-splitting/2-users-sqs.svg) -If the filters defined by the two users both match some message, it is not defined which one of the users will receive that message. +If the filters defined by the two users both match some message, one of the users will receive the message at random. {% endtab %} From bd44905b2583aec7131e7fcf1229076bc219268a Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:12:23 +0200 Subject: [PATCH 15/20] Rephrase --- using-mirrord/queue-splitting.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index ae843c7..1c88b05 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -109,7 +109,7 @@ Enable the `operator.sqsSplitting` setting in the [mirrord-operator Helm chart]( ### Authenticate and authorize the mirrord operator -The mirrord operator will need to be able to do some operations on the SQS queues on your behalf. +The mirrord operator will need to be able to perform operations on the SQS queues. To do this, it will build an SQS client, using the default credentials provider chain. The easiest way to provide the crendentials for the operator is with IAM role assumption. From 986c6bf87ab256a799c512e16850ad43462bccbe Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:16:29 +0200 Subject: [PATCH 16/20] misleading link name --- using-mirrord/queue-splitting.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 1c88b05..f004ac6 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -53,7 +53,7 @@ First, we have a consumer app reading messages from an SQS queue: ![A K8s application that consumes messages from an SQS queue](queue-splitting/before-splitting-sqs.svg) When the first mirrord SQS splitting session starts, two temporary queues are created (one for the target deployed in the cluster, one for the user's local application), -and the mirrord operator routes messages according to the user's filter (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): +and the mirrord operator routes messages according to the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run): ![One SQS splitting session](queue-splitting/1-user-sqs.svg) @@ -73,7 +73,7 @@ First, we have a consumer app reading messages from a Kafka topic: ![A K8s application that consumes messages from a Kafka topic](queue-splitting/before-splitting-kafka.svg) Then, the first mirrord Kafka splitting session starts. Two temporary topics are created (one for the target deployed in the cluster, one for the user's local application), -and the mirrord operator routes messages according to the user's filter (read more in the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): +and the mirrord operator routes messages according to the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): ![One Kafka splitting session](queue-splitting/1-user-kafka.svg) @@ -261,8 +261,7 @@ The reference is specified with `spec.consumer`: The queue registry describes SQS queues consumed by the referenced consumer. The queues are described in entries of the `spec.queues` object. -The entry's key can be arbitrary, as it will only be referenced from the user's mirrord config -(compare with the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). +The entry's key can be arbitrary, as it will only be [referenced](queue-splitting.md#setting-a-filter-for-a-mirrord-run) from the user's mirrord config. The entry's value is an object describing single or multiple SQS queues consumed by the workload: @@ -415,8 +414,7 @@ The operator supports Kafka splitting on deployments, stateful sets, and Argo ro The topics consumer resource describes Kafka topics consumed by the referenced consumer. The topics are described in entries of the `spec.topics` list: -* `id` can be arbitrary, as it will only be referenced from the user's mirrord config -(compare with the [last section](queue-splitting.md#setting-a-filter-for-a-mirrord-run)). +* `id` can be arbitrary, as it will only be [referenced](queue-splitting.md#setting-a-filter-for-a-mirrord-run) from the user's mirrord config. * `clientConfig` stores the name of the `MirrordKafkaClientConfig` to use when making connections to the Kafka cluster. * `nameSources` stores a list of all occurences of the topic name in the consumer workload's pod template. * `groupIdSources` stores a list of all occurences of the consumer Kafka group ID in the consumer workload's pod template. From 5e0d5c8e97875995c4612b39fe54ab9d83e16761 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:20:41 +0200 Subject: [PATCH 17/20] All in tabs --- using-mirrord/queue-splitting.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index f004ac6..c3e4d96 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -95,7 +95,11 @@ Plese note that: 2. In case of SQS splitting, deployed targets will remain redirected as long as their temporary queues have unconsumed messages. -## Enabling SQS Splitting in Your Cluster +## Enabling Queue Splitting in Your Cluster + +{% tabs %} + +{% tab title="Amazon SQS" %} {% stepper %} {% step %} @@ -290,7 +294,8 @@ The mirrord operator can only read consumer's environment variables if they are {% endstep %} {% endstepper %} -## Enabling Kafka Splitting in Your Cluster +{% endtab %} +{% tab title="Kafka" %} {% stepper %} {% step %} @@ -586,6 +591,9 @@ after the last Kafka splitting session against it have finished. This allows for skipping the subsequent restart in case the next Kafka splitting session is started before the TTL elapses. Specified in seconds. +{% endtab %} +{% endtabs %} + ## Setting a Filter for a mirrord Run Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. From e2408ca555fd48e3084d7796462274e17290cd41 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Mon, 1 Sep 2025 14:31:00 +0200 Subject: [PATCH 18/20] Rephrase --- using-mirrord/queue-splitting.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index c3e4d96..529235c 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -72,17 +72,17 @@ First, we have a consumer app reading messages from a Kafka topic: ![A K8s application that consumes messages from a Kafka topic](queue-splitting/before-splitting-kafka.svg) -Then, the first mirrord Kafka splitting session starts. Two temporary topics are created (one for the target deployed in the cluster, one for the user's local application), -and the mirrord operator routes messages according to the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run)): +When the first mirrord Kafka splitting session starts, two temporary topics are created (one for the target deployed in the cluster, one for the user's local application), +and the mirrord operator routes messages according to the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run): ![One Kafka splitting session](queue-splitting/1-user-kafka.svg) -Then, another mirrord Kafka splitting session starts. The third temporary topic is created (for the second user's local application). +If a second user then starts a mirrord Kafka splitting session on the same topic, a third temporary topic is created (for the second user's local application). The mirrord operator includes the new topic and the second user's filter in the routing logic. ![Two Kafka splitting sessions](queue-splitting/2-users-kafka.svg) -If the filters defined by the two users both match some message, it is not defined which one of the users will receive that message. +If the filters defined by the two users both match some message, one of the users will receive the message at random. {% endtab %} @@ -309,7 +309,7 @@ Enable the `operator.kafkaSplitting` setting in the [mirrord-operator Helm chart ### Configure the operator's Kafka client -The mirrord operator will need to be able to do some operations on the Kafka cluster on your behalf. +The mirrord operator will need to be able to perform some operations on the Kafka cluster. To allow for properly configuring the operator's Kafka client, on operator installation with `operator.kafkaSplitting` enabled, a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) type is defined in your cluster — `MirrordKafkaClientConfig`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordkafkaclientconfigs.queues.mirrord.metalbear.co`. From 6926b09062dbc05cea8cc22103cd72af53c484c8 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Wed, 15 Oct 2025 12:21:38 +0200 Subject: [PATCH 19/20] Rename --- using-mirrord/queue-splitting.md | 73 ++++++++++++++++---------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/using-mirrord/queue-splitting.md b/using-mirrord/queue-splitting.md index 529235c..44c7fba 100644 --- a/using-mirrord/queue-splitting.md +++ b/using-mirrord/queue-splitting.md @@ -28,20 +28,21 @@ This feature is only available for users on the Team and Enterprise pricing plan {% hint style="info" %} Queue splitting is currently available for [Amazon SQS](https://aws.amazon.com/sqs/) and [Kafka](https://kafka.apache.org/). Pretty soon we'll support RabbitMQ as well. +The word "queue" in this doc is used to also refer to "topic" in the context of Kafka. {% endhint %} ## How It Works -When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue or topic. -That temporary queue/topic is *exclusive* to the target workload. -Similarly, the local application is reconfigured to consume messages from its own *exclusive* temporary queue or topic. +When a queue splitting session starts, the mirrord operator patches the target workload (e.g. deployment or rollout) to consume messages from a different, temporary queue. +That temporary queue is *exclusive* to the target workload. +Similarly, the local application is reconfigured to consume messages from its own *exclusive* temporary queue. {% hint style="warning" %} -In both cases, the redirections are done by manipulating environment variables. -For this reason, queue splitting always requires that the application reads the queue or topic name from environment variables. +Queue splitting requires that the application read the queue name from an environment variable. +This lets the operator override the environment variable to change the queue that the application reads from. {% endhint %} -Once all temporary topics or queues are prepared, the mirrord operator starts consuming messages from the original queue or topic, and publishing them to one of the temporary queues, based on message filters provided by the users in their mirrord configs. +Once all temporary queues are prepared, the mirrord operator starts consuming messages from the original queue, and publishing them to one of the temporary queues, based on message filters provided by the users in their mirrord configs. This routing is based on message filters provided by the users in their mirrord configs. {% tabs %} @@ -68,17 +69,17 @@ If the filters defined by the two users both match some message, one of the user {% tab title="Kafka" %} -First, we have a consumer app reading messages from a Kafka topic: +First, we have a consumer app reading messages from a Kafka queue: -![A K8s application that consumes messages from a Kafka topic](queue-splitting/before-splitting-kafka.svg) +![A K8s application that consumes messages from a Kafka queue](queue-splitting/before-splitting-kafka.svg) -When the first mirrord Kafka splitting session starts, two temporary topics are created (one for the target deployed in the cluster, one for the user's local application), +When the first mirrord Kafka splitting session starts, two temporary quques are created (one for the target deployed in the cluster, one for the user's local application), and the mirrord operator routes messages according to the [user's filter](queue-splitting.md#setting-a-filter-for-a-mirrord-run): ![One Kafka splitting session](queue-splitting/1-user-kafka.svg) -If a second user then starts a mirrord Kafka splitting session on the same topic, a third temporary topic is created (for the second user's local application). -The mirrord operator includes the new topic and the second user's filter in the routing logic. +If a second user then starts a mirrord Kafka splitting session on the same queue, a third temporary queue is created (for the second user's local application). +The mirrord operator includes the new queue and the second user's filter in the routing logic. ![Two Kafka splitting sessions](queue-splitting/2-users-kafka.svg) @@ -88,11 +89,11 @@ If the filters defined by the two users both match some message, one of the user {% endtabs %} -Temporary queues and topics are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources. +Temporary queues are managed by the mirrord operator and garbage collected in the background. After all queue splitting sessions end, the operator promptly deletes the allocated resources. Plese note that: -1. Temporary queues and topics created for the deployed targets will not be deleted as long as there are any targets' pods that use them. -2. In case of SQS splitting, deployed targets will remain redirected as long as their temporary queues have unconsumed messages. +1. Temporary queues created for the deployed targets will not be deleted as long as there are any targets' pods that use them. +2. In case of SQS splitting, deployed targets will keep reading from the temporary queues as long as their temporary queues have unconsumed messages. ## Enabling Queue Splitting in Your Cluster @@ -337,7 +338,7 @@ bootstrap.servers=kafka.default.svc.cluster.local:9092 client.id=mirrord-operator ``` -This file will be used when creating a Kafka client for managing temporary topics, consuming messages from the original topic and producing messages to the temporary topics. Full list of available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +This file will be used when creating a Kafka client for managing temporary queues, consuming messages from the original queue and producing messages to the temporary queues. Full list of available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). {% hint style="info" %} `group.id` property will always be overwritten by mirrord Operator when resolving the `.properties` file. @@ -354,9 +355,9 @@ See [additional options](queue-splitting.md#additional-options) section for more ### Authorize deployed consumers -In order to be targeted with Kafka splitting, a deployed consumer must be able to use the temporary topics created by mirrord. -E.g. if the consumer application describes the topic or reads messages from it — it must be able to do the same on a temporary topic. -This might require extra actions on your side to adjust the authorization, for example based on topic name prefix. See [topic names](queue-splitting.md#customizing-temporary-kafka-topic-names) section for more info. +In order to be targeted with Kafka splitting, a deployed consumer must be able to use the temporary queues created by mirrord. +E.g. if the consumer application describes the queue or reads messages from it — it must be able to do the same on a temporary queue. +This might require extra actions on your side to adjust the authorization, for example based on queue name prefix. See [queue names](queue-splitting.md#customizing-temporary-kafka-queue-names) section for more info. {% endstep %} {% step %} @@ -368,9 +369,9 @@ a new [`CustomResource`](https://kubernetes.io/docs/concepts/extend-kubernetes/a — `MirrordKafkaTopicsConsumer`. Users with permissions to get CRDs can verify its existence with `kubectl get crd mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co`. Before you can run sessions with Kafka splitting, you must create a topics consumer resource for the desired target. This is because the topics consumer resource contains additional application context required by the mirrord operator. -For example, the operator needs to know which environment variables contain the names of the Kafka topics to split. +For example, the operator needs to know which environment variables contain the names of the Kafka queues to split. -See an example topics consumer resource, for a meme app that consumes messages from a Kafka topic: +See an example topics consumer resource, for a meme app that consumes messages from a Kafka queue: ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha @@ -397,9 +398,9 @@ spec: The topics consumer resource above says that: 1. It provides context for deployment `meme-app` in namespace `meme`. -2. The deployment consumes one topic. Its name is read from environment variable `KAFKA_TOPIC_NAME` in container `consumer`. +2. The deployment consumes one queue. Its name is read from environment variable `KAFKA_TOPIC_NAME` in container `consumer`. The Kafka consumer group id is read from environment variable `KAFKA_GROUP_ID` in container `consumer`. -3. The Kafka topic can be referenced in a mirrord config under ID `views-topic`. +3. The Kafka queue can be referenced in a mirrord config under ID `views-topic`. #### Link the topics consumer resource to the deployed consumer @@ -415,15 +416,15 @@ consumerName: kafka-notifications-worker The operator supports Kafka splitting on deployments, stateful sets, and Argo rollouts. -#### Desribe consumed topics in the topics consumer resource +#### Desribe consumed queues in the topics consumer resource -The topics consumer resource describes Kafka topics consumed by the referenced consumer. -The topics are described in entries of the `spec.topics` list: +The topics consumer resource describes Kafka queues consumed by the referenced consumer. +The queues are described in entries of the `spec.topics` list: * `id` can be arbitrary, as it will only be [referenced](queue-splitting.md#setting-a-filter-for-a-mirrord-run) from the user's mirrord config. * `clientConfig` stores the name of the `MirrordKafkaClientConfig` to use when making connections to the Kafka cluster. -* `nameSources` stores a list of all occurences of the topic name in the consumer workload's pod template. +* `nameSources` stores a list of all occurences of the queue name in the consumer workload's pod template. * `groupIdSources` stores a list of all occurences of the consumer Kafka group ID in the consumer workload's pod template. -The operator will use the same group ID when consuming messages from the topic. +The operator will use the same group ID when consuming messages from the queue. {% hint style="warning" %} The mirrord operator can only read consumer's environment variables if they are either: @@ -436,20 +437,20 @@ The mirrord operator can only read consumer's environment variables if they are ### Additional Options -#### Customizing Temporary Kafka Topic Names +#### Customizing Temporary Kafka Queue Names {% hint style="info" %} Available since chart version `1.27` and operator version `3.114.0`. {% endhint %} -To serve Kafka splitting sessions, mirrord operator creates temporary topics in the Kafka cluster. The default format for their names is as follows: +To serve Kafka splitting sessions, mirrord operator creates temporary queues in the Kafka cluster. The default format for their names is as follows: -* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback topic (unfiltered messages, consumed by the deployed workload). -* `mirrord-tmp-0987654321-original-topic` - for the user topics (filtered messages, consumed by local applications running with mirrord). +* `mirrord-tmp-1234567890-fallback-topic-original-topic` - for the fallback queue (unfiltered messages, consumed by the deployed workload). +* `mirrord-tmp-0987654321-original-topic` - for the user queues (filtered messages, consumed by local applications running with mirrord). -Note that the random digits will be unique for each temporary topic created by the operator. +Note that the random digits will be unique for each temporary queue created by the operator. -You can adjust the format of the created topic names to suit your needs (RBAC, Security, Policies, etc.), +You can adjust the format of the created queues names to suit your needs (RBAC, Security, Policies, etc.), using the `OPERATOR_KAFKA_SPLITTING_TOPIC_FORMAT` environment variable of the mirrord operator, or `operator.kafkaSplittingTopicFormat` helm chart value. The default value is: @@ -577,7 +578,7 @@ Note that operator's service account can be annotated with the IAM role's ARN wi #### Configuring Workload Restart -To inject the names of the temporary topics into the consumer workload, +To inject the names of the temporary queues into the consumer workload, the operator always requires the workload to be restarted. Depending on cluster conditions, and the workload itself, this might take some time. @@ -598,7 +599,7 @@ is started before the TTL elapses. Specified in seconds. Once cluster setup is done, mirrord users can start running sessions with queue message filters in their mirrord configuration files. [`feature.split_queues`](https://app.gitbook.com/s/Z7vBpFMZTH8vUGJBGRZ4/options#feature.split_queues) is the configuration field they need to specify in order to filter queue messages. -Directly under it, mirrord expects a mapping from a queue or topic ID to a queue filter definition. +Directly under it, mirrord expects a mapping from a queue or queue ID to a queue filter definition. Filter definition contains two fields: * `queue_type` — `SQS` or `Kafka` @@ -649,7 +650,7 @@ In the example above, the local application: * Will receive a subset of messages from SQS queues desribed in the registry under ID `meme-queue`. All received messages will have an attribute `author` with the value `me`, AND an attribute `level` with value either `beginner` or `intermediate`. * Will receive no messages from SQS queues described in the registry under ID `ad-queue`. -* Will receive a subset of messages from Kafka topic with ID `views-topic`. +* Will receive a subset of messages from Kafka queue with ID `views-topic`. All received messages will have an attribute `author` with the value `me`, AND an attribute `source` with value starting with `my-session-` (e.g `my-session-844cb78789-2fmsw`). From 566e1d024898561d99cc596a9db3fa9e1cc6c606 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Wed, 15 Oct 2025 12:28:19 +0200 Subject: [PATCH 20/20] ... --- docs/using-mirrord/queue-splitting.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/using-mirrord/queue-splitting.md b/docs/using-mirrord/queue-splitting.md index 5a4f921..a430b98 100644 --- a/docs/using-mirrord/queue-splitting.md +++ b/docs/using-mirrord/queue-splitting.md @@ -835,4 +835,3 @@ value in the operator's helm chart, to set a timeout for the draining of the tem If that service is trying to consume messages correctly, and the temporary queue is already empty, but the target application still doesn't get restored to its original state, please try restarting the application, deleting any lingering `MirrordSqsSession` objects, and if possible, restart the mirrord operator. ->>>>>>> main:docs/using-mirrord/queue-splitting.md