-
Notifications
You must be signed in to change notification settings - Fork 407
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
handle storage formatting and scram users in combined and separate modes
- Loading branch information
Showing
3 changed files
with
154 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,103 @@ | ||
--- | ||
# with kraft combined mode, first install have to define clusterid, instead of getting it from dedicateed controllers | ||
- name: Check meta.properties | ||
when: kraft_combined | ||
ansible.builtin.stat: | ||
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties" | ||
register: meta_properties | ||
|
||
- name: Initialize ClusterId | ||
when: | ||
- kraft_combined | ||
- not meta_properties.stat.exists | ||
run_once: true | ||
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid" | ||
environment: | ||
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions" | ||
register: random_uuid | ||
|
||
- name: Set ClusterId | ||
when: | ||
- kraft_combined | ||
- not meta_properties.stat.exists | ||
run_once: true | ||
set_fact: | ||
clusterid: "{{ random_uuid.stdout }}" | ||
delegate_to: "{{ item }}" | ||
delegate_facts: true | ||
loop: "{{ groups.kafka_broker }}" | ||
|
||
# after first install in combined mode, get clusterid from one broker node | ||
- name: Extract ClusterId from meta.properties on KRaft Controller | ||
when: | ||
- kraft_combined | ||
- meta_properties.stat.exists | ||
run_once: true | ||
slurp: | ||
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties" | ||
register: uuid_broker | ||
|
||
- name: Set ClusterId | ||
when: | ||
- kraft_combined | ||
- meta_properties.stat.exists | ||
run_once: true | ||
set_fact: | ||
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}" | ||
delegate_to: "{{ item }}" | ||
delegate_facts: true | ||
loop: "{{ groups.kafka_broker }}" | ||
|
||
# when combined mode, broker must declare scram users | ||
- name: Prepare SCRAM Users | ||
when: kraft_combined | ||
set_fact: | ||
scram_users_to_create: [] | ||
|
||
- name: Prepare SCRAM 512 Users | ||
when: | ||
- "'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms" | ||
- kraft_combined | ||
set_fact: | ||
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ item.value['principal'] + '\",password=\"' + item.value['password'] + '\"]' ] }}" | ||
loop: "{{ sasl_scram_users_final|dict2items }}" | ||
loop_control: | ||
label: "{{ item.value['principal'] }}" | ||
|
||
- name: Create SCRAM 256 Users | ||
when: | ||
- "'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms" | ||
- kraft_combined | ||
set_fact: | ||
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ item.value['principal'] + '\",password=\"' + item.value['password'] + '\"]' ] }}" | ||
loop: "{{ sasl_scram_users_final|dict2items }}" | ||
loop_control: | ||
label: "{{ item.value['principal'] }}" | ||
|
||
- name: Format Storage Directory | ||
when: kraft_combined | ||
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}" | ||
register: format_meta | ||
|
||
# with dedicated controller nodes, clusterid is already defined onto controller nodes | ||
- name: Extract ClusterId from meta.properties on KRaft Controller | ||
when: not kraft_combined | ||
run_once: true | ||
slurp: | ||
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties" | ||
delegate_to: "{{ groups.kafka_controller[0] }}" | ||
register: uuid_broker | ||
|
||
- name: Set ClusterId | ||
when: not kraft_combined | ||
run_once: true | ||
set_fact: | ||
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}" | ||
delegate_to: "{{ item }}" | ||
delegate_facts: true | ||
loop: "{{ groups.kafka_controller }}" | ||
|
||
- name: Format Storage Directory | ||
when: not kraft_combined | ||
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted" | ||
register: format_meta | ||
vars: | ||
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,70 @@ | ||
--- | ||
- name: Get ClusterId | ||
|
||
- name: Prepare SCRAM Users | ||
set_fact: | ||
scram_users_to_create: [] | ||
|
||
- name: Prepare SCRAM 512 Users | ||
when: | ||
- "'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms" | ||
set_fact: | ||
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ item.value['principal'] + '\",password=\"' + item.value['password'] + '\"]' ] }}" | ||
loop: "{{ sasl_scram_users_final|dict2items }}" | ||
loop_control: | ||
label: "{{ item.value['principal'] }}" | ||
|
||
- name: Prepare SCRAM 256 Users | ||
when: | ||
- "'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms" | ||
set_fact: | ||
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ item.value['principal'] + '\",password=\"' + item.value['password'] + '\"]' ] }}" | ||
loop: "{{ sasl_scram_users_final|dict2items }}" | ||
loop_control: | ||
label: "{{ item.value['principal'] }}" | ||
|
||
- name: Check meta.properties | ||
run_once: true | ||
ansible.builtin.stat: | ||
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties" | ||
register: meta_properties | ||
|
||
# if meta.properties does not exists , create uuid | ||
- name: Initialize ClusterId | ||
when: not meta_properties.stat.exists | ||
run_once: true | ||
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid" | ||
environment: | ||
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions" | ||
register: uuid_key | ||
|
||
- name: Set ClusterId | ||
when: not meta_properties.stat.exists | ||
run_once: true | ||
set_fact: | ||
clusterid: "{{ random_uuid.stdout }}" | ||
delegate_to: "{{ item }}" | ||
delegate_facts: true | ||
loop: groups['kafka_controller'] | ||
|
||
# else, extract it from meta.properties | ||
- name: Extract ClusterId from meta.properties | ||
when: meta_properties.stat.exists | ||
run_once: true | ||
slurp: | ||
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties" | ||
register: uuid_broker | ||
|
||
- name: Set ClusterId | ||
when: meta_properties.stat.exists | ||
run_once: true | ||
set_fact: | ||
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}" | ||
delegate_to: "{{ item }}" | ||
delegate_facts: true | ||
loop: "{{ groups['kafka_controller'] }}" | ||
|
||
- name: Format Data Directory | ||
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted" | ||
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}" | ||
register: format_meta | ||
vars: | ||
clusterid: "{{ uuid_key.stdout }}" |