@@ -17,6 +17,8 @@ package logrepl_test
17
17
import (
18
18
"context"
19
19
"fmt"
20
+ "github.com/apecloud/myduckserver/adapter"
21
+ "github.com/jackc/pglogrepl"
20
22
"log"
21
23
"os"
22
24
"os/exec"
@@ -660,6 +662,7 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) {
660
662
}
661
663
662
664
const slotName = "myduck_slot"
665
+ const subscriptionName = "my_sub_test"
663
666
664
667
// RunReplicationScript runs the given ReplicationTest.
665
668
func RunReplicationScript (t * testing.T , dsn string , script ReplicationTest ) {
@@ -686,8 +689,18 @@ func RunReplicationScript(t *testing.T, dsn string, script ReplicationTest) {
686
689
})
687
690
}
688
691
689
- func newReplicator (t * testing.T , primaryDns string ) * logrepl.LogicalReplicator {
690
- r , err := logrepl .NewLogicalReplicator ("mySubTest" , primaryDns )
692
+ func newReplicator (sqlCtx * sql.Context , t * testing.T , primaryDns string ) * logrepl.LogicalReplicator {
693
+ err := logrepl .CreateSubscription (sqlCtx , subscriptionName , primaryDns , slotName , pglogrepl .LSN (0 ).String (), true )
694
+ require .NoError (t , err )
695
+
696
+ tx := adapter .TryGetTxn (sqlCtx )
697
+ if tx != nil {
698
+ err := tx .Commit ()
699
+ require .NoError (t , err )
700
+ adapter .CloseTxn (sqlCtx )
701
+ }
702
+
703
+ r , err := logrepl .NewLogicalReplicator (subscriptionName , primaryDns )
691
704
require .NoError (t , err )
692
705
return r
693
706
}
@@ -701,7 +714,7 @@ func runReplicationScript(
701
714
replicaConn * pgx.Conn ,
702
715
primaryDns string ,
703
716
) {
704
- r := newReplicator (t , primaryDns )
717
+ r := newReplicator (server . NewInternalCtx (), t , primaryDns )
705
718
defer r .Stop ()
706
719
707
720
if script .Skip {
0 commit comments