Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 20 additions & 29 deletions tests/python_client/chaos/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=
enable_dynamic_field = kwargs.get("enable_dynamic_field", True)
schema = cf.gen_all_datatype_collection_schema(dim=dim, enable_struct_array_field=enable_struct_array_field, enable_dynamic_field=enable_dynamic_field) if schema is None else schema

log.info(f"schema: {schema}")
log.debug(f"schema: {schema}")
self.schema = schema
self.dim = cf.get_dim_by_schema(schema=schema)
self.int64_field_name = cf.get_int64_field_name(schema=schema)
Expand Down Expand Up @@ -601,8 +601,7 @@ def get_schema(self):

def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None):
partition_name = self.p_name if partition_name is None else partition_name
client_schema = self.milvus_client.describe_collection(collection_name=self.c_name)
data = cf.gen_row_data_by_schema(nb=nb, schema=client_schema)
data = cf.gen_row_data_by_schema(nb=nb, schema=self.get_schema())
ts_data = []
for i in range(nb):
time.sleep(0.001)
Expand Down Expand Up @@ -807,7 +806,7 @@ def run_task(self):
result = self.milvus_client.has_collection(collection_name=new_collection_name)
if result:
self.c_name = new_collection_name
data = cf.gen_row_data_by_schema(nb=1, schema=self.schema)
data = cf.gen_row_data_by_schema(nb=1, schema=self.get_schema())
self.milvus_client.insert(collection_name=new_collection_name, data=data)
return res, result

Expand Down Expand Up @@ -1104,7 +1103,7 @@ def keep_running(self):
try:
self.milvus_client.insert(
collection_name=self.c_name,
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()),
timeout=timeout
)
insert_result = True
Expand Down Expand Up @@ -1162,14 +1161,14 @@ def run_task(self):
try:
self.milvus_client.insert(
collection_name=self.c_name,
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()),
timeout=timeout
)
result = True
except Exception:
result = False
res, result = self.flush()
return res, result
res, result = self.flush()
return res, result
except Exception as e:
log.error(f"run task error: {e}")
return str(e), False

def keep_running(self):
while self._keep_running:
Expand Down Expand Up @@ -1239,9 +1238,7 @@ def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None)

@trace()
def insert_entities(self):
# Use describe_collection directly to preserve struct_fields information
schema = self.milvus_client.describe_collection(self.c_name)
data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())
rows = len(data)
ts_data = []
for i in range(constants.DELTA_PER_INS):
Expand Down Expand Up @@ -1327,8 +1324,7 @@ def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None)
self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"

def insert_entities(self):
schema = self.get_schema()
data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())
ts_data = []
for i in range(constants.DELTA_PER_INS):
time.sleep(0.001)
Expand Down Expand Up @@ -1386,8 +1382,7 @@ def __init__(self, collection_name=None, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("UpsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
schema = self.milvus_client.describe_collection(collection_name=self.c_name)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())

@trace()
def upsert_entities(self):
Expand All @@ -1405,8 +1400,7 @@ def run_task(self):
# half of the data is upsert, the other half is insert
rows = len(self.data)
pk_old = [d[self.int64_field_name] for d in self.data[:rows // 2]]
schema = self.milvus_client.describe_collection(collection_name=self.c_name)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())
pk_new = [d[self.int64_field_name] for d in self.data[rows // 2:]]
pk_update = pk_old + pk_new
for i in range(rows):
Expand All @@ -1429,8 +1423,7 @@ def __init__(self, collection_name=None, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("UpsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
schema = self.get_schema()
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())

def upsert_entities(self):
try:
Expand Down Expand Up @@ -1463,8 +1456,7 @@ def run_task(self):
# half of the data is upsert, the other half is insert
rows = len(self.data[0])
pk_old = self.data[0][:rows // 2]
schema = self.get_schema()
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())
pk_new = self.data[0][rows // 2:]
pk_update = pk_old + pk_new
self.data[0] = pk_update
Expand All @@ -1486,8 +1478,7 @@ def __init__(self, collection_name=None, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("PartialUpdateChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, enable_struct_array_field=False)
schema = self.get_schema()
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema())

@trace()
def partial_update_entities(self):
Expand Down Expand Up @@ -1772,7 +1763,7 @@ def __init__(self, collection_name=None, schema=None):
super().__init__(collection_name=collection_name, schema=schema)
for i in range(5):
self.milvus_client.insert(collection_name=self.c_name,
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()),
timeout=timeout)
# do as a flush before indexing
stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
Expand Down Expand Up @@ -1812,7 +1803,7 @@ def __init__(self, collection_name=None, schema=None):
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name, schema=schema)
for i in range(5):
self.milvus_client.insert(collection_name=self.c_name, data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
self.milvus_client.insert(collection_name=self.c_name, data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.get_schema()),
timeout=timeout)
# do as a flush before indexing
stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
Expand Down Expand Up @@ -2272,7 +2263,7 @@ def prepare(self, data_size=100000):
) as remote_writer:

for _ in range(data_size):
row = cf.gen_row_data_by_schema(nb=1, schema=self.schema)[0]
row = cf.gen_row_data_by_schema(nb=1, schema=self.get_schema())[0]
remote_writer.append_row(row)
remote_writer.commit()
batch_files = remote_writer.batch_files
Expand Down