55from crate .client .exceptions import ProgrammingError
66from cr8 .run_crate import CrateNode
77
8- from crate .qa .tests import NodeProvider , insert_data , wait_for_active_shards , UpgradePath
8+ from crate .qa .tests import NodeProvider , insert_data , wait_for_active_shards , UpgradePath , assert_busy
99
1010ROLLING_UPGRADES_V4 = (
1111 # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
@@ -96,9 +96,17 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
9696 cluster = self ._new_cluster (path .from_version , nodes , settings = settings )
9797 cluster .start ()
9898 node = cluster .node ()
99+ remote_node = None
99100 with connect (node .http_url , error_trace = True ) as conn :
100101 new_shards = init_data (conn , node .version , shards , replicas )
101102 expected_active_shards += new_shards
103+ if node .version >= (5 , 10 , 0 ):
104+ remote_cluster = self ._new_cluster (path .from_version , 1 , settings = settings , explicit_discovery = False )
105+ remote_cluster .start ()
106+ remote_node = remote_cluster .node ()
107+ with connect (remote_node .http_url , error_trace = True ) as remote_conn :
108+ new_shards = init_logical_replication_data (self , conn , remote_conn , node .addresses .transport .port , remote_node .addresses .transport .port , expected_active_shards )
109+ expected_active_shards += new_shards
102110
103111 for idx , node in enumerate (cluster ):
104112 # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
@@ -129,6 +137,9 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
129137 c = conn .cursor ()
130138 new_shards = self ._test_queries_on_new_node (idx , c , node , new_node , nodes , shards , expected_active_shards )
131139 expected_active_shards += new_shards
140+ if node .version >= (5 , 10 , 0 ):
141+ with connect (remote_node .http_url , error_trace = True ) as remote_conn :
142+ test_logical_replication_queries (self , conn , remote_conn )
132143
133144 # Finally validate that all shards (primaries and replicas) of all partitions are started
134145 # and writes into the partitioned table while upgrading were successful
@@ -328,3 +339,55 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl
328339 c .execute ("INSERT INTO doc.parted (id, value) VALUES (1, 1)" )
329340 new_shards += shards
330341 return new_shards
342+
343+
344+ def init_logical_replication_data (self , local_conn : Connection , remote_conn : Connection , local_transport_port :int , remote_transport_port : int , local_active_shards : int ) -> int :
345+ assert 4300 <= local_transport_port <= 4310 and 4300 <= remote_transport_port <= 4310
346+
347+ c = local_conn .cursor ()
348+ c .execute ("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)" )
349+ c .execute ("create publication p for table doc.x" )
350+
351+ rc = remote_conn .cursor ()
352+ rc .execute ("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)" )
353+ rc .execute ("create publication rp for table doc.rx" )
354+
355+ rc .execute (f"create subscription rs connection 'crate://localhost:{ local_transport_port } ?user=crate&sslmode=sniff' publication p" )
356+ c .execute (f"create subscription s connection 'crate://localhost:{ remote_transport_port } ?user=crate&sslmode=sniff' publication rp" )
357+
358+ new_shards = 2 # 1 shard for doc.x and another 1 shard for doc.rx
359+ wait_for_active_shards (rc , new_shards )
360+ wait_for_active_shards (c , local_active_shards + new_shards )
361+ assert_busy (lambda : self .assertEqual (num_docs_x (rc ), 0 ))
362+ assert_busy (lambda : self .assertEqual (num_docs_rx (c ), 0 ))
363+
364+ return new_shards
365+
366+
367+ def test_logical_replication_queries (self , local_conn : Connection , remote_conn : Connection ):
368+ c = local_conn .cursor ()
369+ rc = remote_conn .cursor ()
370+
371+ # Cannot drop replicated tables
372+ with self .assertRaises (ProgrammingError ):
373+ rc .execute ("drop table doc.x" )
374+ c .execute ("drop table doc.rx" )
375+
376+ count = num_docs_x (rc )
377+ count2 = num_docs_rx (c )
378+
379+ c .execute ("insert into doc.x values (1)" )
380+ rc .execute ("insert into doc.rx values (1)" )
381+
382+ assert_busy (lambda : self .assertEqual (num_docs_x (rc ), count + 1 ))
383+ assert_busy (lambda : self .assertEqual (num_docs_rx (c ), count2 + 1 ))
384+
385+
386+ def num_docs_x (cursor ):
387+ cursor .execute ("select count(*) from doc.x" )
388+ return cursor .fetchall ()[0 ][0 ]
389+
390+
391+ def num_docs_rx (cursor ):
392+ cursor .execute ("select count(*) from doc.rx" )
393+ return cursor .fetchall ()[0 ][0 ]
0 commit comments