|
33 | 33 |
|
34 | 34 | MIGRATION_LOG_ALLOW_LIST = [
|
35 | 35 | 'Error during log recovery: cloud_storage::missing_partition_exception',
|
36 |
| - 'Requested data migration does not exist', |
37 |
| - 'Invalid data migration state transition requested', |
| 36 | + 'Requested data migration does not exist', # failed get migration by by id |
| 37 | + 'Invalid data migration state transition requested', # failed migration deletes |
| 38 | + 'Data migration contains resources that does not exists or are already being migrated', # failed create |
| 39 | + r'/v1/migrations.*Requested feature is disabled' # cloud storage disabled |
38 | 40 | ]
|
39 | 41 |
|
40 | 42 |
|
@@ -193,8 +195,11 @@ def migration_is_absent(id: int):
|
193 | 195 | def wait_partitions_appear(self, topics: list[TopicSpec]):
|
194 | 196 | # we may be unlucky to query a slow node
|
195 | 197 | def topic_has_all_partitions(t: TopicSpec):
|
196 |
| - return t.partition_count == \ |
197 |
| - len(self.client().describe_topic(t.name).partitions) |
| 198 | + exp_part_cnt = len(self.client().describe_topic(t.name).partitions) |
| 199 | + self.logger.debug( |
| 200 | + f"topic {t.name} has {t.partition_count} partitions out of {exp_part_cnt} expected" |
| 201 | + ) |
| 202 | + return t.partition_count == exp_part_cnt |
198 | 203 |
|
199 | 204 | wait_until(lambda: all(topic_has_all_partitions(t) for t in topics),
|
200 | 205 | timeout_sec=90,
|
@@ -234,6 +239,40 @@ def migration_id_if_exists():
|
234 | 239 | self.wait_migration_appear(migration_id)
|
235 | 240 | return migration_id
|
236 | 241 |
|
| 242 | + def assure_not_migratable(self, topic: TopicSpec): |
| 243 | + out_migration = OutboundDataMigration( |
| 244 | + [make_namespaced_topic(topic.name)], consumer_groups=[]) |
| 245 | + try: |
| 246 | + self.create_and_wait(out_migration) |
| 247 | + assert False |
| 248 | + except requests.exceptions.HTTPError as e: |
| 249 | + pass |
| 250 | + |
| 251 | + @cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST) |
| 252 | + def test_creating_with_topic_no_remote_writes(self): |
| 253 | + self.redpanda.set_cluster_config( |
| 254 | + {"cloud_storage_enable_remote_write": False}, expect_restart=True) |
| 255 | + topic = TopicSpec(partition_count=3) |
| 256 | + self.client().create_topic(topic) |
| 257 | + self.wait_partitions_appear([topic]) |
| 258 | + self.redpanda.set_cluster_config( |
| 259 | + {"cloud_storage_enable_remote_write": True}, expect_restart=True) |
| 260 | + self.assure_not_migratable(topic) |
| 261 | + |
| 262 | + @cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST) |
| 263 | + @matrix(param_to_disable=[ |
| 264 | + "cloud_storage_enabled", "cloud_storage_disable_archiver_manager" |
| 265 | + ]) |
| 266 | + def test_creating_when_cluster_misconfigured(self, param_to_disable): |
| 267 | + self.redpanda.set_cluster_config({param_to_disable: False}, |
| 268 | + expect_restart=True) |
| 269 | + topic = TopicSpec(partition_count=3) |
| 270 | + self.client().create_topic(topic) |
| 271 | + self.assure_not_migratable(topic) |
| 272 | + # for scrubbing to complete |
| 273 | + self.redpanda.set_cluster_config({param_to_disable: True}, |
| 274 | + expect_restart=True) |
| 275 | + |
237 | 276 | @cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
|
238 | 277 | def test_creating_and_listing_migrations(self):
|
239 | 278 | topics = [TopicSpec(partition_count=3) for i in range(5)]
|
|
0 commit comments