From 6bc1998f922067bd9fbf379a909c5bb584bf7da3 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Wed, 18 Dec 2024 20:42:32 +0800 Subject: [PATCH] Fix addr conflict. (#2390) ### What problem does this PR solve? Remove docker --net=host to fix addr conflict. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- .github/workflows/slow_test.yml | 4 +- .github/workflows/tests.yml | 79 +++-- conf/mock_follower.toml | 58 ++++ conf/mock_leader.toml | 58 ++++ python/test_cluster/clear_docker.py | 37 --- python/test_cluster/conftest.py | 68 +--- .../test_cluster/docker_infinity_cluster.py | 300 ------------------ python/test_cluster/infinity_cluster.py | 41 +-- .../test_cluster/mocked_infinity_cluster.py | 6 +- python/test_cluster/test_basic.py | 53 ++-- python/test_cluster/test_insert.py | 264 ++++++++------- tools/run_cluster_test.py | 40 --- 12 files changed, 315 insertions(+), 693 deletions(-) create mode 100644 conf/mock_follower.toml create mode 100644 conf/mock_leader.toml delete mode 100644 python/test_cluster/clear_docker.py delete mode 100644 python/test_cluster/docker_infinity_cluster.py diff --git a/.github/workflows/slow_test.yml b/.github/workflows/slow_test.yml index 30b062eb9c..9e2d96baed 100644 --- a/.github/workflows/slow_test.yml +++ b/.github/workflows/slow_test.yml @@ -60,8 +60,6 @@ jobs: echo "BUILDER_CONTAINER=${BUILDER_CONTAINER}" >> $GITHUB_ENV echo "CPUS=${CPUS}" >> $GITHUB_ENV TZ=${TZ:-$(readlink -f /etc/localtime | awk -F '/zoneinfo/' '{print $2}')} - INF_DIRECTORY=$PWD - echo "${INF_DIRECTORY}" | sudo tee .tester_env sudo docker pull infiniflow/infinity_builder:centos7_clang18 sudo docker rm -f -v ${BUILDER_CONTAINER} && sudo docker run --net=host --privileged --cap-add=NET_ADMIN -d --name ${BUILDER_CONTAINER} -e TZ=$TZ -e CMAKE_BUILD_PARALLEL_LEVEL=${CPUS} -v $PWD:/infinity -v /boot:/boot -v /var/run/docker.sock:/var/run/docker.sock --cpus ${CPUS} infiniflow/infinity_builder:centos7_clang18 @@ -113,7 +111,7 @@ jobs: # Fix sanitizer: https://github.com/ClickHouse/ClickHouse/issues/64086 old_value=$(sudo sysctl -n vm.mmap_rnd_bits) sudo sysctl -w vm.mmap_rnd_bits=28 - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && export INF_DIRECTORY=`cat .tester_env` && echo INF_DIRECTORY=${INF_DIRECTORY} && python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity --infinity_dir=${INF_DIRECTORY} --minio_port=9005 --minio_console_port=9006" + sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity" sudo sysctl -w vm.mmap_rnd_bits=$old_value - name: Collect thread sanitizer output in cluster test diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8dddf82295..b5b5dd268a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -54,10 +54,21 @@ jobs: echo "BUILDER_CONTAINER=${BUILDER_CONTAINER}" >> $GITHUB_ENV echo "CPUS=${CPUS}" >> $GITHUB_ENV TZ=${TZ:-$(readlink -f /etc/localtime | awk -F '/zoneinfo/' '{print $2}')} - INF_DIRECTORY=$PWD - echo "${INF_DIRECTORY}" | sudo tee .tester_env sudo docker pull infiniflow/infinity_builder:centos7_clang18 - sudo docker rm -f -v ${BUILDER_CONTAINER} && sudo docker run --net=host --privileged --cap-add=NET_ADMIN -d --name ${BUILDER_CONTAINER} -e TZ=$TZ -e CMAKE_BUILD_PARALLEL_LEVEL=${CPUS} -v $PWD:/infinity -v /boot:/boot -v /var/run/docker.sock:/var/run/docker.sock --cpus ${CPUS} infiniflow/infinity_builder:centos7_clang18 + sudo docker rm -f -v ${BUILDER_CONTAINER} && sudo docker run --privileged --cap-add=NET_ADMIN -d --name ${BUILDER_CONTAINER} -e TZ=$TZ -e CMAKE_BUILD_PARALLEL_LEVEL=${CPUS} -v $PWD:/infinity -v /boot:/boot -v /var/run/docker.sock:/var/run/docker.sock --cpus ${CPUS} infiniflow/infinity_builder:centos7_clang18 + + - name: Start minio container + if: ${{ !cancelled() && !failure() }} + run: | + MINIO_CONTAINER=minio_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') + MINIO_DIR=~/minio_data_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') + echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV + echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV + sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s + if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then + echo "Minio container is not running" + exit 1 + fi - name: Build debug version if: ${{ !cancelled() && !failure() }} @@ -71,7 +82,7 @@ jobs: if: ${{ !cancelled() && !failure() }} id: run_cluster_test run: | - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && export INF_DIRECTORY=`cat .tester_env` && echo INF_DIRECTORY=${INF_DIRECTORY} && python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity --infinity_dir=${INF_DIRECTORY} --minio_port=9005 --minio_console_port=9006" + sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity" - name: Collect cluster test output if: ${{ !cancelled() }} @@ -96,19 +107,6 @@ jobs: failure="${{ steps.run_restart_test.outcome == 'failure'}}" sudo python3 scripts/collect_restart_log.py --executable_path=cmake-build-debug/src/infinity --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - - name: Start minio container - if: ${{ !cancelled() && !failure() }} - run: | - MINIO_CONTAINER=minio_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') - MINIO_DIR=~/minio_data_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') - echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV - echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV - sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s - if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then - echo "Minio container is not running" - exit 1 - fi - - name: Start infinity debug version with minio if: ${{ !cancelled() && !failure() }} run: | @@ -235,6 +233,18 @@ jobs: TZ=${TZ:-$(readlink -f /etc/localtime | awk -F '/zoneinfo/' '{print $2}')} sudo docker pull infiniflow/infinity_builder:centos7_clang18 sudo docker rm -f -v ${BUILDER_CONTAINER} && sudo docker run --cap-add=NET_ADMIN -d --name ${BUILDER_CONTAINER} -e TZ=$TZ -e CMAKE_BUILD_PARALLEL_LEVEL=${CPUS} -v $PWD:/infinity -v /boot:/boot --cpus ${CPUS} infiniflow/infinity_builder:centos7_clang18 + - name: Start minio container + if: ${{ !cancelled() && !failure() }} + run: | + MINIO_CONTAINER=minio_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') + MINIO_DIR=~/minio_data_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') + echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV + echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV + sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s + if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then + echo "Minio container is not running" + exit 1 + fi - name: Build release version if: ${{ !cancelled() && !failure() }} @@ -244,17 +254,17 @@ jobs: if: ${{ !cancelled() && !failure() }} run: sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk infinity-embedded-sdk && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd python/infinity_sdk/ && pip3 install . -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." - # - name: Run cluster test - # if: ${{ !cancelled() && !failure() }} - # id: run_cluster_test - # run: | - # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && export INF_DIRECTORY=`cat .tester_env` && echo INF_DIRECTORY=${INF_DIRECTORY} && python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity --infinity_dir=${INF_DIRECTORY} --minio_port=9005 --minio_console_port=9006" - - # - name: Collect cluster test output - # if: ${{ !cancelled() }} - # run: | - # failure="${{ steps.run_cluster_test.outcome == 'failure'}}" - # sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} + - name: Run cluster test + if: ${{ !cancelled() && !failure() }} + id: run_cluster_test + run: | + sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity" + + - name: Collect cluster test output + if: ${{ !cancelled() }} + run: | + failure="${{ steps.run_cluster_test.outcome == 'failure'}}" + sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Prepare restart test data if: ${{ !cancelled() && !failure() }} @@ -284,19 +294,6 @@ jobs: failure="${{ steps.run_pysdk_local_infinity_test.outcome == 'failure'}}" sudo python3 scripts/collect_embedded_log.py --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} --log_path=/var/infinity/log/infinity.log - - name: Start minio container - if: ${{ !cancelled() && !failure() }} - run: | - MINIO_CONTAINER=minio_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') - MINIO_DIR=~/minio_data_$(od -An -N4 -tx4 /dev/urandom | tr -d ' ') - echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV - echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV - sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s - if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then - echo "Minio container is not running" - exit 1 - fi - - name: Start infinity release version with minio if: ${{ !cancelled() && !failure() }} run: | diff --git a/conf/mock_follower.toml b/conf/mock_follower.toml new file mode 100644 index 0000000000..58e8e7602d --- /dev/null +++ b/conf/mock_follower.toml @@ -0,0 +1,58 @@ +[general] +version = "0.5.0" +time_zone = "utc-8" +server_mode = "admin" + +[network] +server_address = "17.0.0.3" +postgres_port = 5434 +http_port = 23822 +client_port = 23819 +connection_pool_size = 128 +peer_ip = "17.0.0.3" +peer_port = 23852 +peer_retry_delay = 0 +peer_retry_count = 0 +peer_connect_timeout = 2000 +peer_recv_timeout = 0 +peer_send_timeout = 0 + +[log] +log_filename = "infinity.log" +log_dir = "/var/infinity/follower/log" +log_to_stdout = false +log_file_max_size = "10GB" +log_file_rotate_count = 10 +log_level = "trace" + +[storage] +persistence_dir = "/var/infinity/follower/persistence" +data_dir = "/var/infinity/follower/data" +optimize_interval = "10s" +cleanup_interval = "60s" +compact_interval = "120s" +mem_index_capacity = 65536 +storage_type = "minio" + +[storage.object_storage] +url = "17.0.0.1:9005" +bucket_name = "infinity" +access_key = "minioadmin" +secret_key = "minioadmin" +enable_https = false + +[buffer] +buffer_manager_size = "4GB" +lru_num = 7 +temp_dir = "/var/infinity/follower/tmp" +memindex_memory_quota = "1GB" + +[wal] +wal_dir = "/var/infinity/follower/wal" +full_checkpoint_interval = "86400s" +delta_checkpoint_interval = "60s" +wal_compact_threshold = "1GB" +wal_flush = "only_write" + +[resource] +resource_dir = "/var/infinity/follower/resource" diff --git a/conf/mock_leader.toml b/conf/mock_leader.toml new file mode 100644 index 0000000000..821aa41da4 --- /dev/null +++ b/conf/mock_leader.toml @@ -0,0 +1,58 @@ +[general] +version = "0.5.0" +time_zone = "utc-8" +server_mode = "admin" + +[network] +server_address = "17.0.0.2" +postgres_port = 5433 +http_port = 23821 +client_port = 23818 +connection_pool_size = 128 +peer_ip = "17.0.0.2" +peer_port = 23851 +peer_retry_delay = 0 +peer_retry_count = 0 +peer_connect_timeout = 2000 +peer_recv_timeout = 0 +peer_send_timeout = 0 + +[log] +log_filename = "infinity.log" +log_dir = "/var/infinity/leader/log" +log_to_stdout = false +log_file_max_size = "10GB" +log_file_rotate_count = 10 +log_level = "trace" + +[storage] +persistence_dir = "/var/infinity/leader/persistence" +data_dir = "/var/infinity/leader/data" +optimize_interval = "10s" +cleanup_interval = "60s" +compact_interval = "120s" +mem_index_capacity = 65536 +storage_type = "minio" + +[storage.object_storage] +url = "17.0.0.1:9005" +bucket_name = "infinity" +access_key = "minioadmin" +secret_key = "minioadmin" +enable_https = false + +[buffer] +buffer_manager_size = "4GB" +lru_num = 7 +temp_dir = "/var/infinity/leader/tmp" +memindex_memory_quota = "1GB" + +[wal] +wal_dir = "/var/infinity/leader/wal" +full_checkpoint_interval = "86400s" +delta_checkpoint_interval = "60s" +wal_compact_threshold = "1GB" +wal_flush = "only_write" + +[resource] +resource_dir = "/var/infinity/leader/resource" diff --git a/python/test_cluster/clear_docker.py b/python/test_cluster/clear_docker.py deleted file mode 100644 index 84645e83c5..0000000000 --- a/python/test_cluster/clear_docker.py +++ /dev/null @@ -1,37 +0,0 @@ -import docker -import shutil -import os -import argparse - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Clear docker containers") - parser.add_argument( - "--docker", - action="store_true", - default=False, - ) - - use_docker = parser.parse_args().docker - - # sudo docker rm -f $(sudo docker ps -a -q -f name=minio); - # sudo docker rm -f $(sudo docker ps -a -q -f name=infinity_build); - # sudo docker network rm $(sudo docker network ls -q -f name=infinity_network) - - client = docker.from_env() - - if use_docker: - for container in client.containers.list(all=True, filters={"name": "minio_docker"}): - container.remove(force=True) - for container in client.containers.list( - all=True, filters={"name": "infinity_build"} - ): - container.remove(force=True) - for network in client.networks.list(filters={"name": "infinity_network"}): - network.remove() - else: - for container in client.containers.list(all=True, filters={"name": "minio_host"}): - container.remove(force=True) - - dir_path = "./minio" - if os.path.exists(dir_path) and os.path.isdir(dir_path): - shutil.rmtree(dir_path) \ No newline at end of file diff --git a/python/test_cluster/conftest.py b/python/test_cluster/conftest.py index e2dd953503..d08acf71e9 100644 --- a/python/test_cluster/conftest.py +++ b/python/test_cluster/conftest.py @@ -1,6 +1,5 @@ import pytest -from infinity_cluster import InfinityCluster, MinioParams -from docker_infinity_cluster import DockerInfinityCluster +from infinity_cluster import InfinityCluster from mocked_infinity_cluster import MockInfinityCluster @@ -10,28 +9,6 @@ def pytest_addoption(parser): action="store", default="cmake-build-debug/src/infinity", ) - parser.addoption( - "--minio_dir", - action="store", - default="minio", - ) - parser.addoption( - "--minio_port", - action="store", - default=9000, - ) - parser.addoption( - "--minio_console_port", - action="store", - default=9001, - ) - parser.addoption( - "--infinity_dir", - action="store", - required=True, - help="Path to infinity directory. For local test, $pwd is ok", - ) - parser.addoption("--docker", action="store_true", default=False) parser.addoption( "--use_sudo", action="store_true", @@ -40,56 +17,19 @@ def pytest_addoption(parser): ) -def pytest_configure(config): - config.addinivalue_line( - "markers", "docker: mark test to run only when --docker option is provided" - ) - - -def pytest_collection_modifyitems(config, items): - if config.getoption("--docker"): - return # do not skip docker test - skip_docker = pytest.mark.skip(reason="need --docker option to run") - for item in items: - if "docker" in item.keywords: - print(f"skip {item.name}") - item.add_marker(skip_docker) - - def pytest_generate_tests(metafunc): infinity_path = metafunc.config.getoption("infinity_path") - minio_dir = metafunc.config.getoption("minio_dir") - minio_port = metafunc.config.getoption("minio_port") - minio_console_port = metafunc.config.getoption("minio_console_port") - minio_params = MinioParams(minio_dir, minio_port, minio_console_port) - infinity_dir = metafunc.config.getoption("infinity_dir") use_sudo = metafunc.config.getoption("use_sudo") - if len(infinity_dir) == 0: - # raise ValueError("Please provide a valid infinity_dir") - pass - print("infinity_dir: ", infinity_dir) - # print(metafunc.fixturenames) - test_name = metafunc.function.__name__ - if "docker_cluster" in metafunc.fixturenames: - # skip if docker is in option and the testcase is marked with docker - if not metafunc.config.getoption("--docker"): - return - docker_infinity_cluster = DockerInfinityCluster( - infinity_path, minio_params=minio_params, infinity_dir=infinity_dir - ) - metafunc.parametrize("docker_cluster", [docker_infinity_cluster]) - elif "cluster" in metafunc.fixturenames: - infinity_cluster = InfinityCluster( - infinity_path, minio_params=minio_params, test_name=test_name - ) + + if "cluster" in metafunc.fixturenames: + infinity_cluster = InfinityCluster(infinity_path, test_name=test_name) metafunc.parametrize("cluster", [infinity_cluster]) elif "mock_cluster" in metafunc.fixturenames: mock_infinity_cluster = MockInfinityCluster( infinity_path, - minio_params=minio_params, test_name=test_name, use_sudo=use_sudo, ) diff --git a/python/test_cluster/docker_infinity_cluster.py b/python/test_cluster/docker_infinity_cluster.py deleted file mode 100644 index 5e34ffe7bc..0000000000 --- a/python/test_cluster/docker_infinity_cluster.py +++ /dev/null @@ -1,300 +0,0 @@ -import json -import logging -import os -import random -import string -import sys -import time -from numpy import dtype -import pandas as pd -import tomli -import tomli_w -import docker -from infinity_cluster import ( - BaseInfinityRunner, - InfinityCluster, - MinioParams, - convert_request_to_curl, -) -from mocked_infinity_cluster import convert_request_to_curl -import shutil - -current_dir = os.path.dirname(os.path.abspath(__file__)) -parent_dir = os.path.dirname(current_dir) -if parent_dir not in sys.path: - sys.path.insert(0, parent_dir) -from infinity_http import http_network_util, infinity_http - - -class docker_http_response: - def __init__(self, json_output): - self.json_output = json_output - - def json(self): - return self.json_output - - -class docker_http_network(http_network_util): - def __init__(self, container, *args, **kwargs): - super().__init__(*args, **kwargs) - self.container = container - - def request(self, url, method, header={}, data={}): - if header is None: - header = {} - url = self.base_url + url - logging.debug("url: " + url) - - cmd = convert_request_to_curl(method, header, data, url) - print(cmd) - exit_code, output = self.container.exec_run(cmd) - print(output) - assert exit_code is None or exit_code == 0 - try: - return docker_http_response(json.loads(output)) - except json.JSONDecodeError as e: - logging.error(f"Failed to decode JSON response: {e}") - raise - - def raise_exception(self, resp, expect={}): - # todo: handle curl exception - pass - - -class DockerInfinityRunner(BaseInfinityRunner): - def __init__( - self, - container: docker.models.containers.Container, - mock_ip: str, - minio_ip: str | None, - *args, - **kwargs, - ): - self.minio_ip = minio_ip - self.container = container - self.mock_ip = mock_ip - super().__init__(*args, **kwargs) - - def init(self, config_path: str | None): - if config_path: - if self.config_path is not None and os.path.exists(self.config_path): - os.remove(self.config_path) - self.config_path = config_path - self.load_config() - run_cmds = " && ".join( - [ - "cd /infinity", - f"{self.executable_path} --config={self.config_path} 2>&1", - ] - ) - print(run_cmds) - exit_code, output = self.container.exec_run( - f"bash -c '{run_cmds}'", detach=True - ) - print(f"init infinity: {output}") - assert exit_code is None or exit_code == 0 - - def uninit(self): - timeout = 60 - run_cmds = " && ".join( - [ - "cd /infinity", - f"pid=$(pgrep -f infinity || true)", - f"echo $pid", - f"bash scripts/timeout_kill.sh {timeout} $pid", - ] - ) - print(run_cmds) - exit_code, output = self.container.exec_run(f"bash -c '{run_cmds}'") - print(f"uninit infinity: {output}") - # assert exit_code is None or exit_code == 0 - # self.container.stop() - # self.container.remove(force=True, v=True) - - if os.path.exists(self.config_path): - os.remove(self.config_path) - - def add_client(self, http_addr: str): - http_addr = http_addr.replace("http://", "") - http_ip, http_port = http_addr.split(":") - mock_addr = f"http://{self.mock_ip}:{http_port}" - print(f"add client: {mock_addr}") - net=docker_http_network(self.container, mock_addr) - net.set_retry() - self.client = infinity_http(net = net) - - def peer_uri(self): - peer_port = self.network_config["peer_port"] - return self.mock_ip, peer_port - - def load_config(self): - if not os.path.basename(self.config_path).startswith("mock_"): - config_dir, config_filename = os.path.split(self.config_path) - mock_config_path = os.path.join(config_dir, f"mock_{config_filename}") - shutil.copyfile(self.config_path, mock_config_path) - self.config_path = mock_config_path - - with open(self.config_path, "rb") as f: - config = tomli.load(f) - self.network_config = config["network"] - - if self.minio_ip is not None: - minio_url = config["storage"]["object_storage"]["url"] - minio_ip, minio_port = minio_url.split(":") - - new_minio_url = f"{self.minio_ip}:{minio_port}" - config["storage"]["object_storage"]["url"] = new_minio_url - - config["network"]["server_address"] = self.mock_ip - config["network"]["peer_ip"] = self.mock_ip - - with open(self.config_path, "wb") as f: - tomli_w.dump(config, f) - - -class DockerInfinityCluster(InfinityCluster): - def __init__( - self, - executable_path: str, - *, - minio_params: MinioParams = None, - infinity_dir: str, - ): - docker_client = docker.from_env() - network_name = "infinity_network" - try: - self.network = docker_client.networks.get(network_name) - except docker.errors.NotFound: - self.network = docker_client.networks.create( - network_name, - driver="bridge", - ) - super().__init__(executable_path, minio_params=minio_params) - self.infinity_dir = infinity_dir - - image_name = "infiniflow/infinity_builder:centos7_clang18" - self.image_name = image_name - - if minio_params is not None: - add = self.add_minio(minio_params) - if add: - self.network.connect(self.minio_container) - info = docker_client.api.inspect_network(self.network.id) - minio_ip = info["Containers"][self.minio_container.id]["IPv4Address"] - minio_ip = minio_ip.split("/")[0] - self.minio_ip = minio_ip - self.minio_params = minio_params - - def add_minio(self, minio_params: MinioParams): - minio_image_name = "quay.io/minio/minio" - - minio_cmd = f'server /data --console-address ":{minio_params.minio_port}"' - docker_client = docker.from_env() - kargs = {} - container_name = "minio_docker" - - try: - self.minio_container = docker_client.containers.get(container_name) - self.minio_container.start() - except docker.errors.NotFound: - self.minio_container = docker_client.containers.run( - image=minio_image_name, - name=container_name, - detach=True, - environment=[ - "MINIO_ROOT_PASSWORD=minioadmin", - "MINIO_ROOT_USER=minioadmin", - ], - volumes=[f"{minio_params.minio_dir}:/data"], - command=minio_cmd, - **kargs, - ) - self.network.connect(self.minio_container) - info = docker_client.api.inspect_network(self.network.id) - minio_ip = info["Containers"][self.minio_container.id]["IPv4Address"] - minio_ip = minio_ip.split("/")[0] - self.minio_ip = minio_ip - self.minio_params = minio_params - - def clear(self): - super().clear() - for runner in self.runners.values(): - runner.uninit() - # self.network.remove() - - def add_node(self, node_name: str, config_path: str): - if node_name in self.runners: - raise ValueError(f"Node {node_name} already exists in the cluster.") - container_name, cpus, tz = self.__init_docker_params() - pwd = self.infinity_dir - print(f"pwd: {pwd}") - docker_client = docker.from_env() - - try: - container = docker_client.containers.get(container_name) - except docker.errors.NotFound: - container = docker_client.containers.run( - image=self.image_name, - name=container_name, - detach=True, - cpuset_cpus=f"0-{cpus - 1}", - volumes=["/boot:/boot"], - mounts=[ - docker.types.Mount(target="/infinity", source=pwd, type="bind") - ], - environment=[f"TZ={tz}"], - ) - - try: - self.network.connect(container) - except docker.errors.APIError as e: - pass - info = docker_client.api.inspect_network(self.network.id) - # print(info) - mock_ip = info["Containers"][container.id]["IPv4Address"] - mock_ip = mock_ip.split("/")[0] - - runner = DockerInfinityRunner( - container, - mock_ip, - self.minio_ip, - node_name, - self.executable_path, - config_path, - ) - self.runners[node_name] = runner - - def remove_node(self, node_name: str): - if node_name not in self.runners: - raise ValueError(f"Node {node_name} not found in the cluster.") - cur_runner: DockerInfinityRunner = self.runners[node_name] - self.network.disconnect(cur_runner.container) - cur_runner.uninit() - del self.runners[node_name] - - def disconnect(self, node_name: str): - if node_name not in self.runners: - raise ValueError(f"Node {node_name} not found in the cluster.") - cur_runner: DockerInfinityRunner = self.runners[node_name] - self.network.disconnect(cur_runner.container) - - def reconnect(self, node_name: str): - if node_name not in self.runners: - raise ValueError(f"Node {node_name} not found in the cluster.") - cur_runner: DockerInfinityRunner = self.runners[node_name] - self.network.connect(cur_runner.container) - docker_client = docker.from_env() - info = docker_client.api.inspect_network(self.network.id) - # print(info) - mock_ip = info["Containers"][cur_runner.container.id]["IPv4Address"] - cur_runner.mock_ip = mock_ip - - def __init_docker_params(self): - container_name = f"infinity_build_{len(self.runners)}" - cpus = os.cpu_count() - tz = os.readlink("/etc/localtime").split("/zoneinfo/")[1] - return container_name, cpus, tz - - -if __name__ == "__main__": - pass diff --git a/python/test_cluster/infinity_cluster.py b/python/test_cluster/infinity_cluster.py index f15257c18d..420214a132 100644 --- a/python/test_cluster/infinity_cluster.py +++ b/python/test_cluster/infinity_cluster.py @@ -33,13 +33,6 @@ def is_port_in_use(port: int) -> bool: return s.connect_ex(("localhost", port)) == 0 -class MinioParams: - def __init__(self, minio_dir: str, minio_port: int, minio_console_port: int): - self.minio_dir = minio_dir - self.minio_port = minio_port - self.minio_console_port = minio_console_port - - class BaseInfinityRunner: def __init__( @@ -161,14 +154,13 @@ def load_config(self): class InfinityCluster: def __init__( - self, executable_path: str, *, minio_params: MinioParams, test_name: str = None + self, executable_path: str, *, test_name: str = None ): self.executable_path = executable_path self.runners: dict[str, InfinityRunner] = {} self.leader_runner: InfinityRunner | None = None self.leader_name = None self.logger_name = test_name - self.minio_params = minio_params def _log_filename(self): if self.logger_name is None: @@ -190,7 +182,6 @@ def init_logger(self, logger_name: str): def __enter__(self): self.init_logger(self.logger_name) - self.add_minio(self.minio_params) return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -218,9 +209,6 @@ def clear_log(self): if os.path.exists(log_file): os.remove(log_file) - # if self.minio_container is not None: - # self.minio_container.remove(force=True, v=True) - def add_node(self, node_name: str, config_path: str, init=True): runner = InfinityRunner( node_name, self.executable_path, config_path, init, self.logger @@ -242,33 +230,6 @@ def add_node(self, node_name: str, config_path: str, init=True): raise ValueError(f"Node {node_name} already exists in the cluster.") self.runners[node_name] = runner - def add_minio(self, minio_params: MinioParams): - minio_image_name = "quay.io/minio/minio" - - minio_cmd = f'server /data --address ":{minio_params.minio_port}" --console-address ":{minio_params.minio_console_port}"' - docker_client = docker.from_env() - container_name = "minio_host" - - try: - self.minio_container = docker_client.containers.get(container_name) - print(self.minio_container) - self.minio_container.start() - self.logger.debug(f"Minio container {container_name} already exists.") - except docker.errors.NotFound: - self.minio_container = docker_client.containers.run( - image=minio_image_name, - name=container_name, - detach=True, - environment=[ - "MINIO_ROOT_PASSWORD=minioadmin", - "MINIO_ROOT_USER=minioadmin", - ], - volumes=[f"{minio_params.minio_dir}:/data"], - command=minio_cmd, - network="host", - ) - self.logger.debug(f"Minio container {container_name} created.") - def set_standalone(self, node_name: str): if self.leader_runner is not None and self.leader_name == node_name: self.leader_name = None diff --git a/python/test_cluster/mocked_infinity_cluster.py b/python/test_cluster/mocked_infinity_cluster.py index 393cc8cd97..9fa45d0b03 100644 --- a/python/test_cluster/mocked_infinity_cluster.py +++ b/python/test_cluster/mocked_infinity_cluster.py @@ -10,7 +10,6 @@ from infinity_cluster import ( InfinityRunner, InfinityCluster, - MinioParams, convert_request_to_curl, ) import os @@ -146,14 +145,11 @@ def __init__( self, executable_path: str, *, - minio_params: MinioParams = None, test_name: str = None, use_sudo: bool = False, ): self.use_sudo = use_sudo - super().__init__( - executable_path, minio_params=minio_params, test_name=test_name - ) + super().__init__(executable_path, test_name=test_name) self.ns_prefix = "ns" self.bridge_name = "br0" self.mock_ip_prefix = "17.0.0." diff --git a/python/test_cluster/test_basic.py b/python/test_cluster/test_basic.py index 3955d9aba2..f3fb89ad1e 100644 --- a/python/test_cluster/test_basic.py +++ b/python/test_cluster/test_basic.py @@ -3,7 +3,6 @@ import pytest from infinity_cluster import InfinityCluster from mocked_infinity_cluster import MockInfinityCluster -from docker_infinity_cluster import DockerInfinityCluster import time @@ -47,53 +46,53 @@ def test_0(cluster: InfinityCluster): # with cluster: # cluster.add_node("node1", "conf/leader.toml") # cluster.add_node("node2", "conf/follower.toml") -# + # cluster.set_leader("node1") # cluster.set_follower("node2") -# + # time.sleep(1) -# + # cluster.disconnect("node2") # time.sleep(0.1) # cluster.reconnect("node2") -# + # cluster.block_peer_net("node2") # time.sleep(0.1) # cluster.restore_peer_net("node2") -# + # time.sleep(1) -# + # cluster.remove_node("node2") # cluster.remove_node("node1") -@pytest.mark.docker -def test_docker(docker_cluster: DockerInfinityCluster): - cluster = docker_cluster +# @pytest.mark.docker +# def test_docker(docker_cluster: DockerInfinityCluster): +# cluster = docker_cluster - cluster.add_node("node1", "conf/leader.toml") - cluster.add_node("node2", "conf/follower.toml") +# cluster.add_node("node1", "conf/leader.toml") +# cluster.add_node("node2", "conf/follower.toml") - print("init nodes") +# print("init nodes") - cluster.set_leader("node1") - cluster.set_follower("node2") +# cluster.set_leader("node1") +# cluster.set_follower("node2") - time.sleep(1) +# time.sleep(1) - cluster.disconnect("node2") - time.sleep(0.1) - cluster.reconnect("node2") +# cluster.disconnect("node2") +# time.sleep(0.1) +# cluster.reconnect("node2") - res = cluster.client("node1").list_databases() - print(res.db_names) +# res = cluster.client("node1").list_databases() +# print(res.db_names) - time.sleep(1) +# time.sleep(1) - print("remove nodes") +# print("remove nodes") - cluster.clear() +# cluster.clear() -# n1 admin, n2 admin, n2 connect n1 as follower-> failed, n2 connect n1 as learner-> failed -# n1 leader, n2 follower, n3 connect n2 as follower-> failed, n3 connect n2 as learner-> failed -# +# # n1 admin, n2 admin, n2 connect n1 as follower-> failed, n2 connect n1 as learner-> failed +# # n1 leader, n2 follower, n3 connect n2 as follower-> failed, n3 connect n2 as learner-> failed +# # diff --git a/python/test_cluster/test_insert.py b/python/test_cluster/test_insert.py index 1ad7559806..ff6ccce8dc 100644 --- a/python/test_cluster/test_insert.py +++ b/python/test_cluster/test_insert.py @@ -1,7 +1,6 @@ from numpy import dtype import pandas as pd from infinity_cluster import InfinityCluster -from docker_infinity_cluster import DockerInfinityCluster import pytest import time from infinity.errors import ErrorCode @@ -63,139 +62,132 @@ def test_insert_11(self, cluster: InfinityCluster): # def test_insert_12(self, mock_cluster: MockInfinityCluster): # self.__test_inner_1(mock_cluster) - @pytest.mark.docker - def test_insert_13(self, docker_cluster: DockerInfinityCluster): - self.__test_inner_1(docker_cluster) - # read/write when leader/follower is disconnected - @pytest.mark.docker - def test_insert_2(self, docker_cluster: DockerInfinityCluster): - with docker_cluster: - docker_cluster.add_node("node1", "conf/leader.toml") - docker_cluster.add_node("node2", "conf/follower.toml") - print("init nodes") - time.sleep(1) - docker_cluster.set_leader("node1") - docker_cluster.set_follower("node2") - - time.sleep(1) - print("insert in node1") - - infinity1 = docker_cluster.client("node1") - - table_name = "table2" - db1 = infinity1.get_database("default_db") - db1.drop_table(table_name, ConflictType.Ignore) - table1 = db1.create_table( - table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} - ) - table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) - - res_gt = pd.DataFrame( - { - "c1": (1), - "c2": ([[1.0, 2.0, 3.0, 4.0]]), - } - ).astype({"c1": dtype("int32"), "c2": dtype("object")}) - - time.sleep(1) - print("select in node2") - - infinity2 = docker_cluster.client("node2") - db2 = infinity2.get_database("default_db") - table2 = db2.get_table(table_name) - res, extra_result = table2.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - - docker_cluster.disconnect("node2") - try: - - @timeout_decorator.timeout(1) - def noreturn_request(): - table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) - - noreturn_request() - - except Exception as e: - pass - docker_cluster.reconnect("node2") - - docker_cluster.disconnect("node1") - res, extra_result = table2.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - docker_cluster.reconnect("node1") - - db1.drop_table(table_name) - - @pytest.mark.skip("Bug") - @pytest.mark.docker - def test_insert_3(self, docker_cluster: DockerInfinityCluster): - try: - docker_cluster.add_node("node1", "conf/leader.toml") - docker_cluster.add_node("node2", "conf/follower.toml") - print("init nodes") - time.sleep(1) - docker_cluster.set_leader("node1") - docker_cluster.set_follower("node2") - - time.sleep(1) - print("insert in node1") - - infinity1 = docker_cluster.client("node1") - - table_name = "table2" - db1 = infinity1.get_database("default_db") - db1.drop_table(table_name, ConflictType.Ignore) - table1 = db1.create_table( - table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} - ) - table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) - time.sleep(1) - - # reconnect follower - docker_cluster.remove_node("node2") - docker_cluster.add_node("node2", "conf/follower.toml") - time.sleep(1) - docker_cluster.set_follower("node2") - - table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) - res_gt = pd.DataFrame( - { - "c1": (1, 2), - "c2": ([[1.0, 2.0, 3.0, 4.0]], [[5.0, 6.0, 7.0, 8.0]]), - } - ).astype({"c1": dtype("int32"), "c2": dtype("object")}) - - infinity2 = docker_cluster.client("node2") - db2 = infinity2.get_database("default_db") - table2 = db2.get_table(table_name) - res, extra_result = table2.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - - # reconnect leader - docker_cluster.remove_node("node1") - docker_cluster.add_node("node1", "conf/leader.toml") - time.sleep(1) - docker_cluster.set_leader("node1") - - table1.insert([{"c1": 3, "c2": [9.0, 10.0, 11.0, 12.0]}]) - res_gt = pd.DataFrame( - { - "c1": (1, 2, 3), - "c2": ( - [[1.0, 2.0, 3.0, 4.0]], - [[5.0, 6.0, 7.0, 8.0]], - [[9.0, 10.0, 11.0, 12.0]], - ), - } - ).astype({"c1": dtype("int32"), "c2": dtype("object")}) - res, extra_result = table2.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - - db1.drop_table(table_name) - except Exception as e: - print(e) - docker_cluster.clear() - raise - else: - docker_cluster.clear() + # def test_insert_2(self, mock_cluster: MockInfinityCluster): + # with mock_cluster: + # mock_cluster.add_node("node1", "conf/leader.toml") + # mock_cluster.add_node("node2", "conf/follower.toml") + # print("init nodes") + # time.sleep(1) + # mock_cluster.set_leader("node1") + # mock_cluster.set_follower("node2") + + # time.sleep(1) + # print("insert in node1") + + # infinity1 = mock_cluster.client("node1") + + # table_name = "table2" + # db1 = infinity1.get_database("default_db") + # db1.drop_table(table_name, ConflictType.Ignore) + # table1 = db1.create_table( + # table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + # ) + # table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + + # res_gt = pd.DataFrame( + # { + # "c1": (1), + # "c2": ([[1.0, 2.0, 3.0, 4.0]]), + # } + # ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + # time.sleep(1) + # print("select in node2") + + # infinity2 = mock_cluster.client("node2") + # db2 = infinity2.get_database("default_db") + # table2 = db2.get_table(table_name) + # res, extra_result = table2.output(["*"]).to_df() + # pd.testing.assert_frame_equal(res, res_gt) + + # mock_cluster.disconnect("node2") + # try: + + # @timeout_decorator.timeout(1) + # def noreturn_request(): + # table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) + + # noreturn_request() + + # except Exception as e: + # pass + # mock_cluster.reconnect("node2") + + # mock_cluster.disconnect("node1") + # res, extra_result = table2.output(["*"]).to_df() + # pd.testing.assert_frame_equal(res, res_gt) + # mock_cluster.reconnect("node1") + + # db1.drop_table(table_name) + + # def test_insert_3(self, mock_cluster: MockInfinityCluster): + # try: + # mock_cluster.add_node("node1", "conf/leader.toml") + # mock_cluster.add_node("node2", "conf/follower.toml") + # print("init nodes") + # time.sleep(1) + # mock_cluster.set_leader("node1") + # mock_cluster.set_follower("node2") + + # time.sleep(1) + # print("insert in node1") + + # infinity1 = mock_cluster.client("node1") + + # table_name = "table2" + # db1 = infinity1.get_database("default_db") + # db1.drop_table(table_name, ConflictType.Ignore) + # table1 = db1.create_table( + # table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + # ) + # table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + # time.sleep(1) + + # # reconnect follower + # mock_cluster.remove_node("node2") + # mock_cluster.add_node("node2", "conf/follower.toml") + # time.sleep(1) + # mock_cluster.set_follower("node2") + + # table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) + # res_gt = pd.DataFrame( + # { + # "c1": (1, 2), + # "c2": ([[1.0, 2.0, 3.0, 4.0]], [[5.0, 6.0, 7.0, 8.0]]), + # } + # ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + # infinity2 = mock_cluster.client("node2") + # db2 = infinity2.get_database("default_db") + # table2 = db2.get_table(table_name) + # res, extra_result = table2.output(["*"]).to_df() + # pd.testing.assert_frame_equal(res, res_gt) + + # # reconnect leader + # mock_cluster.remove_node("node1") + # mock_cluster.add_node("node1", "conf/leader.toml") + # time.sleep(1) + # mock_cluster.set_leader("node1") + + # table1.insert([{"c1": 3, "c2": [9.0, 10.0, 11.0, 12.0]}]) + # res_gt = pd.DataFrame( + # { + # "c1": (1, 2, 3), + # "c2": ( + # [[1.0, 2.0, 3.0, 4.0]], + # [[5.0, 6.0, 7.0, 8.0]], + # [[9.0, 10.0, 11.0, 12.0]], + # ), + # } + # ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + # res, extra_result = table2.output(["*"]).to_df() + # pd.testing.assert_frame_equal(res, res_gt) + + # db1.drop_table(table_name) + # except Exception as e: + # print(e) + # mock_cluster.clear() + # raise + # else: + # mock_cluster.clear() diff --git a/tools/run_cluster_test.py b/tools/run_cluster_test.py index 64f3cf9dbe..4611315480 100644 --- a/tools/run_cluster_test.py +++ b/tools/run_cluster_test.py @@ -12,16 +12,6 @@ type=str, default="./build/Debug/src/infinity", ) - parser.add_argument( - "--docker", - action="store_true", - default=False, - ) - parser.add_argument( - "--infinity_dir", - type=str, - required=True, - ) parser.add_argument( "--test_case", type=str, @@ -33,24 +23,10 @@ default=False, help="Use sudo to run command", ) - parser.add_argument( - "--minio_port", - type=int, - default=9000, - ) - parser.add_argument( - "--minio_console_port", - type=int, - default=9001, - ) args = parser.parse_args() infinity_path = args.infinity_path - docker = args.docker - infinity_dir = args.infinity_dir use_sudo = args.use_sudo - minio_port = args.minio_port - minio_console_port = args.minio_console_port current_path = os.getcwd() python_test_dir = current_path + "/python" @@ -72,24 +48,8 @@ "-s", "-m", "not slow", - f"--infinity_dir={infinity_dir}", - f"--minio_port={minio_port}", - f"--minio_console_port={minio_console_port}", ] if use_sudo: cmd.append("--use_sudo") - if docker: - cmd.append("--docker") process = subprocess.Popen(cmd) process.wait() - - cmd = [ - python_executable, - f"{python_test_dir}/test_cluster/clear_docker.py", - ] - if docker: - cmd.append("--docker") - process2 = subprocess.run(cmd) - if process.returncode != 0: - print(f"An error occurred: {process.stderr}") - sys.exit(-1)