Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/1937/do not write append ref keys when staging incompletes #1985

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion python/arcticdb/toolbox/library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,14 @@ def update_append_data_column_type(self, key : VariantKey, column : str, to_type
old_df = self.read_to_dataframe(key)
assert column in old_df.columns
new_df = old_df.astype({column: to_type})
return self.overwrite_append_data_with_dataframe(key, new_df)
return self.overwrite_append_data_with_dataframe(key, new_df)

def append_incomplete(self, symbol: str, df: pd.DataFrame, validate_index: bool = False):
"""
Appends the given dataframe to the APPEND_DATA key linked list. Useful for testing, as staging segments through
either the V1 or V2 API only creates APPEND_DATA keys, not the APPEND_REF key or the linked-list structure that
# streaming data does.
"""
dynamic_strings = self._nvs._resolve_dynamic_strings({})
_, item, norm_meta = self._nvs._try_normalize(symbol, df, None, False, dynamic_strings, None)
self._nvs.version_store.append_incomplete(symbol, item, norm_meta, None, validate_index)
7 changes: 2 additions & 5 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,12 +609,9 @@ def write(
)
# TODO: allow_sparse for write_parallel / recursive normalizers as well.
if isinstance(item, NPDDataFrame):
if parallel:
if parallel or incomplete:
self.version_store.write_parallel(symbol, item, norm_meta, validate_index, False, None)
return None
elif incomplete:
self.version_store.append_incomplete(symbol, item, norm_meta, udm, validate_index)
return None
else:
vit = self.version_store.write_versioned_dataframe(
symbol, item, norm_meta, udm, prune_previous_version, sparsify_floats, validate_index
Expand Down Expand Up @@ -740,7 +737,7 @@ def append(
if isinstance(item, NPDDataFrame):
with _diff_long_stream_descriptor_mismatch(self):
if incomplete:
self.version_store.append_incomplete(symbol, item, norm_meta, udm, validate_index)
self.version_store.write_parallel(symbol, item, norm_meta, validate_index, False, None)
else:
vit = self.version_store.append(
symbol, item, norm_meta, udm, write_if_missing, prune_previous_version, validate_index
Expand Down
3 changes: 2 additions & 1 deletion python/tests/hypothesis/arcticdb/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_append_partial_read(version_store_factory, colnum, periods, rownum, col
def test_incomplete_append_partial_read(version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point):
tz = "America/New_York"
version_store = version_store_factory(col_per_group=colnum, row_per_segment=rownum)
lib_tool = version_store.library_tool()
dtidx = pd.date_range("2019-02-06 11:43", periods=6).tz_localize(tz)
a = np.arange(dtidx.shape[0])
tf = TimeFrame(dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10])
Expand All @@ -86,7 +87,7 @@ def test_incomplete_append_partial_read(version_store_factory, colnum, periods,
sid = "XXX"
version_store.write(sid, tf1)
tf2 = tf.tsloc[c2:]
version_store.append(sid, tf2, incomplete=True)
lib_tool.append_incomplete(sid, tf2)

dtr = (dtidx[tsbounds[0]], dtidx[tsbounds[1]])
vit = version_store.read(sid, date_range=dtr, columns=list(cols), incomplete=True)
Expand Down
13 changes: 7 additions & 6 deletions python/tests/integration/toolbox/test_library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def iterate_through_version_chain(key):

def test_overwrite_append_data(object_and_mem_and_lmdb_version_store):
lib = object_and_mem_and_lmdb_version_store
lib_tool = lib.library_tool()
if lib._lib_cfg.lib_desc.version.encoding_version == 1:
# TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is
# not encoding aware so all incomplete writes are broken with v2 encoding.
Expand All @@ -252,9 +253,9 @@ def get_df(num_rows, start_index, col_type):

# Deliberately write mismatching incomplete types
lib.write(sym, get_df(3, 0, np.int64))
lib.write(sym, get_df(1, 3, np.int64), incomplete=True)
lib.write(sym, get_df(1, 4, str), incomplete=True)
lib.write(sym, get_df(1, 5, np.int64), incomplete=True)
lib_tool.append_incomplete(sym, get_df(1, 3, np.int64))
lib_tool.append_incomplete(sym, get_df(1, 4, str))
lib_tool.append_incomplete(sym, get_df(1, 5, np.int64))

def read_append_data_keys_from_ref(symbol):
append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0]
Expand Down Expand Up @@ -321,9 +322,9 @@ def read_type(key, column):
assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64))

# Also try adding new incompletes all with wrong type and see that we again can't read or compact
lib.write(sym, get_df(1, 15, str), incomplete=True)
lib.write(sym, get_df(1, 16, str), incomplete=True)
lib.write(sym, get_df(1, 17, str), incomplete=True)
lib_tool.append_incomplete(sym, get_df(1, 15, str))
lib_tool.append_incomplete(sym, get_df(1, 16, str))
lib_tool.append_incomplete(sym, get_df(1, 17, str))
append_keys = read_append_data_keys_from_ref(sym)
assert len(append_keys) == 3
assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, str_dtype]
Expand Down
3 changes: 2 additions & 1 deletion python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,11 @@ def test_append_docs_example(lmdb_version_store):
def test_read_incomplete_no_warning(s3_store_factory, sym, get_stderr):
pytest.skip("This test is flaky due to trying to retrieve the log messages")
lib = s3_store_factory(dynamic_strings=True, incomplete=True)
lib_tool = lib.library_tool()
symbol = sym

write_df = pd.DataFrame({"a": [1, 2, 3]}, index=pd.DatetimeIndex([1, 2, 3]))
lib.append(symbol, write_df, incomplete=True)
lib_tool.append_incomplete(symbol, write_df)
# Need to compact so that the APPEND_REF points to a non-existent APPEND_DATA (intentionally)
lib.compact_incomplete(symbol, True, False, False, True)
set_log_level("DEBUG")
Expand Down
21 changes: 18 additions & 3 deletions python/tests/unit/arcticdb/version_store/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,29 @@
)
from arcticdb.util._versions import IS_PANDAS_TWO
from arcticdb_ext.storage import KeyType
import arcticdb.toolbox.library_tool


def get_append_keys(lib, sym):
lib_tool = lib.library_tool()
keys = lib_tool.find_keys_for_symbol(arcticdb.toolbox.library_tool.KeyType.APPEND_DATA, sym)
keys = lib_tool.find_keys_for_symbol(KeyType.APPEND_DATA, sym)
return keys


def test_staging_doesnt_write_append_ref(lmdb_version_store_v1):
lib = lmdb_version_store_v1
lib_tool = lib.library_tool()
sym = "test_staging_doesnt_write_append_ref"
df = pd.DataFrame({"col": [0]})
lib.write(sym, df, parallel=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))
lib.version_store.clear()
lib.write(sym, df, incomplete=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))
lib.version_store.clear()
lib.append(sym, df, incomplete=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))


def test_remove_incomplete(basic_store):
lib = basic_store
lib_tool = lib.library_tool()
Expand Down Expand Up @@ -565,7 +581,6 @@ def test_parallel_no_column_slicing(lmdb_version_store_tiny_segment):
def test_parallel_write_static_schema_type_changing(lmdb_version_store_tiny_segment, rows_per_incomplete, delete_staged_data_on_failure):
lib = lmdb_version_store_tiny_segment
sym = "test_parallel_write_static_schema_type_changing"
lib_tool = lib.library_tool()
df_0 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-01", periods=rows_per_incomplete))
df_1 = pd.DataFrame({"col": np.arange(rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16)}, index=pd.date_range("2024-01-03", periods=rows_per_incomplete))
lib.write(sym, df_0, parallel=True)
Expand Down
Loading