Skip to content

Commit

Permalink
[fix #321] add sending sigstop test case (#322)
Browse files Browse the repository at this point in the history
* add sending sigstop it

Signed-off-by: zeminzhou <[email protected]>

* fix check

Signed-off-by: zeminzhou <[email protected]>

* add sleep

Signed-off-by: zeminzhou <[email protected]>

* make check

Signed-off-by: zeminzhou <[email protected]>

* adjust sleep time

Signed-off-by: zeminzhou <[email protected]>

* add workload after kill -19

Signed-off-by: zeminzhou <[email protected]>

* remove UP_ADD

Signed-off-by: zeminzhou <[email protected]>

* reuse check count

Signed-off-by: zeminzhou <[email protected]>

* add comment

Signed-off-by: zeminzhou <[email protected]>

* make check

Signed-off-by: zeminzhou <[email protected]>

* add comment

Signed-off-by: zeminzhou <[email protected]>

* add retry

Signed-off-by: zeminzhou <[email protected]>

* add multi pd addr

Signed-off-by: zeminzhou <[email protected]>

Signed-off-by: zeminzhou <[email protected]>
  • Loading branch information
zeminzhou authored Nov 26, 2022
1 parent d050c77 commit 5662608
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 38 deletions.
48 changes: 48 additions & 0 deletions cdc/tests/integration_tests/_utils/check_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash
# parameter 1: expected count
# parameter 2: component name
# parameter 3: pd addr
# parameter 4: max retry
set -eu

expected=$1
name=$2
pd_addr=$3

if [ $# -ge 4 ]; then
max_retry=$4
else
max_retry=30
fi

for ((i = 0; i <= $max_retry; i++)); do
case $name in
tikv)
:
count=$(pd-ctl store --pd $pd_addr | grep 'Up' | wc | awk '{print $1}')
;;
pd)
:
count=$(pd-ctl health --pd $pd_addr | grep '\"health\": true' | wc | awk '{print $1}')
;;
tikv-cdc)
:
count=$(tikv-cdc cli capture list --pd $pd_addr | jq '.|length')
;;
*)
exit 1
;;
esac

if [[ "$count" == "$expected" ]]; then
echo "check $name count successfully"
break
fi

echo "failed to check $name count, expected: $expected, got: $count, retry: $i"
if [ "$i" -eq "$max_retry" ]; then
echo "failed to check $name count, max retires exceed"
exit 1
fi
sleep 2
done
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/_utils/check_sync_diff
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ DOWN_PD=$3
if [ $# -ge 4 ]; then
check_time=$4
else
check_time=30
check_time=50
fi
PWD=$(pwd)

Expand Down
1 change: 1 addition & 0 deletions cdc/tests/integration_tests/cdc_hang_on/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1,http://$UP_PD_HOST_2:$UP_PD_PORT_2,http://$UP_PD_HOST_3:$UP_PD_PORT_3
DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT
RETRY_TIME=10

function restart_cdc() {
id=$1
local count=$(ps -aux | grep "tikv-cdc.test" | grep "cdc$id.log" | wc | awk '{print $1}')
Expand Down
11 changes: 1 addition & 10 deletions cdc/tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,11 @@ function check_no_changefeed() {
fi
}

function check_no_capture() {
pd=$1
count=$(tikv-cdc cli capture list --pd=$pd 2>&1 | jq '.|length')
if [[ ! "$count" -eq "0" ]]; then
exit 1
fi
}

export -f check_changefeed_mark_error
export -f check_changefeed_mark_failed_regex
export -f check_changefeed_mark_stopped_regex
export -f check_changefeed_mark_stopped
export -f check_no_changefeed
export -f check_no_capture

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
Expand Down Expand Up @@ -138,7 +129,7 @@ function run() {

export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/owner/NewChangefeedRetryError=return(true)'
kill $capture_pid
ensure $MAX_RETRIES check_no_capture $UP_PD
check_count 0 "tikv-cdc" $UP_PD $MAX_RETRIES
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_mark_error $UP_PD ${changefeedid} "failpoint injected retriable error"

Expand Down
15 changes: 2 additions & 13 deletions cdc/tests/integration_tests/kill_owner/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ MAX_RETRIES=10
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1
DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT

function check_capture_count() {
pd=$1
expected=$2
count=$(tikv-cdc cli capture list --pd=$pd 2>&1 | jq '.|length')
if [[ ! "$count" -eq "$expected" ]]; then
echo "count: $count expected: $expected"
exit 1
fi
}

function kill_cdc_and_restart() {
pd_addr=$1
work_dir=$2
Expand All @@ -31,12 +21,11 @@ function kill_cdc_and_restart() {
cdc_pid=$(echo "$status" | jq '.pid')

kill $cdc_pid
ensure $MAX_RETRIES check_capture_count $pd_addr 0
check_count 0 "tikv-cdc" $pd_addr $MAX_RETRIES
run_cdc_server --workdir $work_dir --binary $cdc_binary --addr "127.0.0.1:8600" --pd $pd_addr
ensure $MAX_RETRIES check_capture_count $pd_addr 1
check_count 1 "tikv-cdc" $pd_addr $MAX_RETRIES
}

export -f check_capture_count
export -f kill_cdc_and_restart

function run() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ groups=(
["G07"]='kv_client_stream_reconnect multi_capture'
["G08"]='processor_err_chan processor_panic'
["G09"]='processor_resolved_ts_fallback processor_stop_delay'
["G10"]='sink_hang'
["G10"]='sink_hang sigstop'
["G11"]='sorter stop_downstream'
["G12"]='availability' # heavy test case
)
Expand Down
115 changes: 115 additions & 0 deletions cdc/tests/integration_tests/sigstop/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=tikv-cdc.test
SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1,http://$UP_PD_HOST_2:$UP_PD_PORT_2,http://$UP_PD_HOST_3:$UP_PD_PORT_3
DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT

function run_kill_upstream() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd "true"
cd $WORK_DIR

start_ts=$(get_start_ts $UP_PD)
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8600" --pd "$UP_PD"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8601" --pd "$UP_PD"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "3" --addr "127.0.0.1:8602" --pd "$UP_PD"

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
*) SINK_URI="" ;;
esac

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"

rawkv_op $UP_PD put 10000 &
sleep 1
# send sigstop to tikv
n=$(echo $(($RANDOM % 3 + 1)))
tikv_pid=$(pgrep -f "tikv$n" | head -n1)
kill -19 $tikv_pid
sleep 10
check_count 2 "tikv" $UP_PD

kill -18 $tikv_pid
check_count 3 "tikv" $UP_PD
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

# Ignore the test on PD, because sending SIGSTOP to PD may cause CDC to exit,
# `cdc_hang_on` has tested sending SIGSTOP to PD leader.

rawkv_op $UP_PD delete 10000 &
sleep 1
# send sigstop to tikv-cdc
n=$(echo $(($RANDOM % 2 + 1)))
cdc_pid=$(pgrep -f "tikv-cdc" | sed -n "$n"p)
kill -19 $cdc_pid
sleep 10
check_count 2 "tikv-cdc" $UP_PD

kill -18 $cdc_pid
check_count 3 "tikv-cdc" $UP_PD
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
}

function run_kill_downstream() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd "true"
cd $WORK_DIR

# We start 3 tikv and 3 pd in cluster1(usually as upstream),
# 1 tikv and 1 pd in cluster2(usually as downstream).
# Now we treat cluster1 as the downstream cluster and cluster2 as upstream,
# so we can ensure high availability of downstream clusters while sending SIGSTOP.

start_ts=$(get_start_ts $DOWN_PD)
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8600" --pd "$DOWN_PD"

case $SINK_TYPE in
tikv) SINK_URI="tikv://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ;;
*) SINK_URI="" ;;
esac

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --pd $DOWN_PD

rawkv_op $DOWN_PD put 10000 &
sleep 1
# send sigstop to tikv
n=$(echo $(($RANDOM % 3 + 1)))
tikv_pid=$(pgrep -f "tikv$n" | head -n1)
kill -19 $tikv_pid
sleep 10
check_count 2 "tikv" $UP_PD

kill -18 $tikv_pid
check_count 3 "tikv" $UP_PD
check_sync_diff $WORK_DIR $DOWN_PD $UP_PD

rawkv_op $DOWN_PD delete 10000 &
sleep 1
# send sigstop to pd
n=$(echo $(($RANDOM % 3 + 1)))
pd_pid=$(pgrep -f "pd-server" | sed -n "$n"p)
kill -19 $pd_pid
sleep 10
check_count 2 "pd" $UP_PD

kill -18 $pd_pid
check_count 3 "pd" $UP_PD
check_sync_diff $WORK_DIR $DOWN_PD $UP_PD

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run_kill_upstream $*
run_kill_downstream $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
8 changes: 6 additions & 2 deletions cdc/tests/utils/rawkv_data/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ func runChecksum(cmd *cobra.Command) error {
}
ctx := context.Background()

srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
srcCli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer srcCli.Close()

dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.DstSec))
dstCli, err := rawkv.NewClientWithOpts(ctx, cfg.DstPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.DstSec))
if err != nil {
return err
}
Expand Down
12 changes: 9 additions & 3 deletions cdc/tests/utils/rawkv_data/gen_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func runDeleteCmd(cmd *cobra.Command) error {
}

ctx := context.Background()
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
cli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down Expand Up @@ -122,12 +124,16 @@ func runPutCmd(cmd *cobra.Command) error {
}
ctx := context.Background()

cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
cli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer cli.Close()
atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
atomicCli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down
22 changes: 14 additions & 8 deletions cdc/tests/utils/rawkv_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ const (
)

type Config struct {
SrcPD string `json:"src-pd"`
DstPD string `json:"dst-pd"`
SrcPD []string `json:"src-pd"`
DstPD []string `json:"dst-pd"`
StartIndex int `json:"start-index"`
Count int `json:"count"`
CAPath string `json:"ca-path"`
Expand All @@ -61,12 +61,18 @@ func AddFlags(cmd *cobra.Command) {

func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error {
var err error
if cfg.SrcPD, err = flags.GetString(flagSrcPD); err != nil {
srcPD, err := flags.GetString(flagSrcPD)
if err != nil {
return err
}
if cfg.DstPD, err = flags.GetString(flagDstPD); err != nil {
cfg.SrcPD = strings.Split(srcPD, ",")

dstPD, err := flags.GetString(flagDstPD)
if err != nil {
return err
}
cfg.DstPD = strings.Split(dstPD, ",")

if cfg.StartIndex, err = flags.GetInt(flagStartIndex); err != nil {
return err
}
Expand All @@ -83,10 +89,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error
return err
}

if cfg.SrcPD == "" {
if len(cfg.SrcPD) == 0 {
return fmt.Errorf("Upstream cluster PD is not set")
}
if strings.HasPrefix(cfg.SrcPD, "https://") {
if strings.HasPrefix(cfg.SrcPD[0], "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
Expand All @@ -96,10 +102,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error
}

if requireDstPD {
if cfg.DstPD == "" {
if len(cfg.DstPD) == 0 {
return fmt.Errorf("Downstream cluster PD is not set")
}
if strings.HasPrefix(cfg.DstPD, "https://") {
if strings.HasPrefix(cfg.DstPD[0], "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
Expand Down

0 comments on commit 5662608

Please sign in to comment.