diff --git a/include/blosc2.h b/include/blosc2.h index 13b600be8..128755955 100644 --- a/include/blosc2.h +++ b/include/blosc2.h @@ -219,7 +219,7 @@ enum { BLOSC2_GLOBAL_REGISTERED_FILTERS_START = 32, BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP = 159, //!< Blosc-registered filters must be between 32 - 159. - BLOSC2_GLOBAL_REGISTERED_FILTERS = 3, + BLOSC2_GLOBAL_REGISTERED_FILTERS = 4, //!< Number of Blosc-registered filters at the moment. BLOSC2_USER_REGISTERED_FILTERS_START = 160, BLOSC2_USER_REGISTERED_FILTERS_STOP = 255, diff --git a/include/blosc2/filters-registry.h b/include/blosc2/filters-registry.h index 49a98970c..b39e2aa7e 100644 --- a/include/blosc2/filters-registry.h +++ b/include/blosc2/filters-registry.h @@ -18,7 +18,8 @@ extern "C" { enum { BLOSC_FILTER_NDCELL = 32, BLOSC_FILTER_NDMEAN = 33, - BLOSC_FILTER_BYTEDELTA = 34, + BLOSC_FILTER_BYTEDELTA_BUGGY = 34, // buggy version. See #524 + BLOSC_FILTER_BYTEDELTA = 35, // fixed version }; void register_filters(void); diff --git a/plugins/filters/bytedelta/bytedelta.c b/plugins/filters/bytedelta/bytedelta.c index 70168b653..32e81f7f3 100644 --- a/plugins/filters/bytedelta/bytedelta.c +++ b/plugins/filters/bytedelta/bytedelta.c @@ -46,6 +46,8 @@ bytes16 simd_prefix_sum(bytes16 x) return x; } +uint8_t simd_get_last(bytes16 x) { return (_mm_extract_epi16(x, 7) >> 8) & 0xFF; } + #elif defined(__aarch64__) || defined(_M_ARM64) // ARM v8 NEON code path #define CPU_HAS_SIMD 1 @@ -72,6 +74,8 @@ bytes16 simd_prefix_sum(bytes16 x) return x; } +uint8_t simd_get_last(bytes16 x) { return vgetq_lane_u8(x, 15); } + #endif @@ -93,6 +97,7 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin const int stream_len = length / typesize; for (int ich = 0; ich < typesize; ++ich) { int ip = 0; + uint8_t _v2 = 0; // SIMD delta within each channel, store #if defined(CPU_HAS_SIMD) bytes16 v2 = {0}; @@ -104,9 +109,11 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin output += 16; v2 = v; } + if (stream_len > 15) { + _v2 = simd_get_last(v2); + } #endif // #if defined(CPU_HAS_SIMD) // scalar leftover - uint8_t _v2 = 0; for (; ip < stream_len ; ip++) { uint8_t v = *input; input++; @@ -121,7 +128,100 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin // Fetch 16b from N streams, sum SIMD undelta int bytedelta_backward(const uint8_t *input, uint8_t *output, int32_t length, uint8_t meta, - blosc2_dparams *dparams, uint8_t id) { + blosc2_dparams *dparams, uint8_t id) { + BLOSC_UNUSED_PARAM(id); + + int typesize = meta; + if (typesize == 0) { + if (dparams->schunk == NULL) { + BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!"); + BLOSC_ERROR(BLOSC2_ERROR_FAILURE); + } + blosc2_schunk* schunk = (blosc2_schunk*)(dparams->schunk); + typesize = schunk->typesize; + } + + const int stream_len = length / typesize; + for (int ich = 0; ich < typesize; ++ich) { + int ip = 0; + uint8_t _v2 = 0; + // SIMD fetch 16 bytes from each channel, prefix-sum un-delta +#if defined(CPU_HAS_SIMD) + bytes16 v2 = {0}; + for (; ip < stream_len - 15; ip += 16) { + bytes16 v = simd_load(input); + input += 16; + // un-delta via prefix sum + v2 = simd_add(simd_prefix_sum(v), simd_duplane15(v2)); + simd_store(output, v2); + output += 16; + } + if (stream_len > 15) { + _v2 = simd_get_last(v2); + } +#endif // #if defined(CPU_HAS_SIMD) + // scalar leftover + for (; ip < stream_len; ip++) { + uint8_t v = *input + _v2; + input++; + *output = v; + output++; + _v2 = v; + } + } + + return BLOSC2_ERROR_SUCCESS; +} + +// This is the original (and buggy) version of bytedelta. It is kept here for backwards compatibility. +// See #524 for details. +// Fetch 16b from N streams, compute SIMD delta +int bytedelta_forward_buggy(const uint8_t *input, uint8_t *output, int32_t length, + uint8_t meta, blosc2_cparams *cparams, uint8_t id) { + BLOSC_UNUSED_PARAM(id); + + int typesize = meta; + if (typesize == 0) { + if (cparams->schunk == NULL) { + BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!"); + BLOSC_ERROR(BLOSC2_ERROR_FAILURE); + } + blosc2_schunk* schunk = (blosc2_schunk*)(cparams->schunk); + typesize = schunk->typesize; + } + + const int stream_len = length / typesize; + for (int ich = 0; ich < typesize; ++ich) { + int ip = 0; + // SIMD delta within each channel, store +#if defined(CPU_HAS_SIMD) + bytes16 v2 = {0}; + for (; ip < stream_len - 15; ip += 16) { + bytes16 v = simd_load(input); + input += 16; + bytes16 delta = simd_sub(v, simd_concat(v, v2)); + simd_store(output, delta); + output += 16; + v2 = v; + } +#endif // #if defined(CPU_HAS_SIMD) + // scalar leftover + uint8_t _v2 = 0; + for (; ip < stream_len ; ip++) { + uint8_t v = *input; + input++; + *output = v - _v2; + output++; + _v2 = v; + } + } + + return BLOSC2_ERROR_SUCCESS; +} + +// Fetch 16b from N streams, sum SIMD undelta +int bytedelta_backward_buggy(const uint8_t *input, uint8_t *output, int32_t length, + uint8_t meta, blosc2_dparams *dparams, uint8_t id) { BLOSC_UNUSED_PARAM(id); int typesize = meta; diff --git a/plugins/filters/bytedelta/bytedelta.h b/plugins/filters/bytedelta/bytedelta.h index 66e82203d..48c1f3936 100644 --- a/plugins/filters/bytedelta/bytedelta.h +++ b/plugins/filters/bytedelta/bytedelta.h @@ -19,6 +19,12 @@ int bytedelta_forward(const uint8_t* input, uint8_t* output, int32_t length, uin blosc2_cparams* cparams, uint8_t id); int bytedelta_backward(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta, - blosc2_dparams* dparams, uint8_t id); + blosc2_dparams* dparams, uint8_t id); -#endif /* BLOSC_PLUGINS_FILTERS_BYTEDELTA_BYTEDELTA_H*/ +int bytedelta_forward_buggy(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta, + blosc2_cparams* cparams, uint8_t id); + +int bytedelta_backward_buggy(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta, + blosc2_dparams* dparams, uint8_t id); + +#endif /* BLOSC_PLUGINS_FILTERS_BYTEDELTA_BYTEDELTA_H */ diff --git a/plugins/filters/bytedelta/test_bytedelta.c b/plugins/filters/bytedelta/test_bytedelta.c index a30ecede5..e7142fa35 100644 --- a/plugins/filters/bytedelta/test_bytedelta.c +++ b/plugins/filters/bytedelta/test_bytedelta.c @@ -15,17 +15,29 @@ To run: $ ./test_bytedelta + Testing bytedelta with write_correct=1, read_correct=1 + Testing bytedelta with write_correct=0, read_correct=1 + Testing bytedelta with write_correct=1, read_correct=0 + Testing bytedelta with write_correct=0, read_correct=0 Successful roundtrip! - Compression: 41472 -> 4937 (8.4x) - rand: 36535 obtained + Compression: 41472 -> 5599 (7.4x) + rand: saved 35873 bytes + Testing bytedelta with write_correct=1, read_correct=1 + Testing bytedelta with write_correct=0, read_correct=1 + Testing bytedelta with write_correct=1, read_correct=0 + Testing bytedelta with write_correct=0, read_correct=0 Successful roundtrip! - Compression: 1792 -> 1005 (1.8x) - mixed_values: 787 obtained + Compression: 1792 -> 499 (3.6x) + mixed_values: saved 1293 bytes + Testing bytedelta with write_correct=1, read_correct=1 + Testing bytedelta with write_correct=0, read_correct=1 + Testing bytedelta with write_correct=1, read_correct=0 + Testing bytedelta with write_correct=0, read_correct=0 Successful roundtrip! - Compression: 16128 -> 1599 (10.1x) - arange_like: 14529 obtained + Compression: 16128 -> 1473 (10.9x) + arange_like: saved 14655 bytes **********************************************************************/ @@ -37,6 +49,67 @@ #include #include +/* The original implmentation of the bytedelta filter had incorrect + * roundtrip behavior between SIMD and non-SIMD binaries. This filter provides + * the correct behavior regardless of SIMD availability. + * See https://github.com/Blosc/c-blosc2/issues/524 */ +int correct_bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uint8_t meta, + blosc2_cparams *cparams, uint8_t id) +{ + BLOSC_UNUSED_PARAM(id); + int typesize = meta; + if (typesize == 0) { + if (cparams->schunk == NULL) { + BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!"); + BLOSC_ERROR(BLOSC2_ERROR_FAILURE); + } + blosc2_schunk* schunk = (blosc2_schunk*)(cparams->schunk); + typesize = schunk->typesize; + } + int stream_len = length / typesize; + for (int ich = 0; ich < typesize; ++ich) { + int ip = 0; + uint8_t _v2 = 0; + for (; ip < stream_len ; ip++) { + uint8_t v = *input; + input++; + *output = v - _v2; + output++; + _v2 = v; + } + } + return BLOSC2_ERROR_SUCCESS; +} + +int correct_bytedelta_backward(const uint8_t *input, uint8_t *output, int32_t length, uint8_t meta, + blosc2_dparams *dparams, uint8_t id) { + BLOSC_UNUSED_PARAM(id); + int typesize = meta; + if (typesize == 0) { + if (dparams->schunk == NULL) { + BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!"); + BLOSC_ERROR(BLOSC2_ERROR_FAILURE); + } + blosc2_schunk* schunk = (blosc2_schunk*)(dparams->schunk); + typesize = schunk->typesize; + } + + const int stream_len = length / typesize; + for (int ich = 0; ich < typesize; ++ich) { + int ip = 0; + uint8_t _v2 = 0; + for (; ip < stream_len; ip++) { + uint8_t v = *input + _v2; + input++; + *output = v; + output++; + _v2 = v; + } + } + + return BLOSC2_ERROR_SUCCESS; +} + static int test_bytedelta(blosc2_schunk *schunk) { int64_t nchunks = schunk->nchunks; @@ -49,75 +122,103 @@ static int test_bytedelta(blosc2_schunk *schunk) { uint8_t *data_out = malloc(chunksize + BLOSC2_MAX_OVERHEAD); uint8_t *data_dest = malloc(chunksize); - /* Create a context for compression */ - blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS; - cparams.typesize = schunk->typesize; - cparams.compcode = BLOSC_LZ4; - cparams.filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_SHUFFLE; - cparams.filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_FILTER_BYTEDELTA; - cparams.filters_meta[BLOSC2_MAX_FILTERS - 1] = 0; // 0 means typesize when using schunks - cparams.clevel = 9; - cparams.nthreads = 1; - cparams.blocksize = schunk->blocksize; - cparams.schunk = schunk; - blosc2_context *cctx; - cctx = blosc2_create_cctx(cparams); - if (cctx == NULL) { - printf("Cannot create compression context!\n"); + blosc2_filter correct_bytedelta = {.id = 250, .name = "bytedelta_correct", .version = 1, + .forward = correct_bytedelta_forward, .backward = correct_bytedelta_backward}; + if (blosc2_register_filter(&correct_bytedelta)) { + printf("Cannot register bytedelta filter!"); return -1; } - blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS; - dparams.nthreads = 1; - dparams.schunk = schunk; - blosc2_context *dctx; - dctx = blosc2_create_dctx(dparams); - if (cctx == NULL) { - printf("Cannot create decompression context!\n"); - return -1; - } - for (int ci = 0; ci < nchunks; ci++) { - - decompressed = blosc2_schunk_decompress_chunk(schunk, ci, data_in, chunksize); - if (decompressed < 0) { - printf("Error decompressing chunk \n"); + // Test each pair of possible forward/backward implementations: + for (int direction = 0; direction < 4; direction++) { + bool write_correct = (direction % 2 == 0); + bool read_correct = (direction / 2 == 0); + printf("Testing bytedelta with write_correct=%d, read_correct=%d\n", write_correct, read_correct); + + /* Create a context for compression */ + blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS; + cparams.typesize = schunk->typesize; + cparams.compcode = BLOSC_LZ4; + cparams.filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_SHUFFLE; + if (write_correct) { + cparams.filters[BLOSC2_MAX_FILTERS - 1] = 250; + } else { + cparams.filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_FILTER_BYTEDELTA; + } + cparams.filters_meta[BLOSC2_MAX_FILTERS - 1] = 0; // 0 means typesize when using schunks + cparams.clevel = 9; + cparams.nthreads = 1; + cparams.blocksize = schunk->blocksize; + cparams.schunk = schunk; + blosc2_context *cctx; + cctx = blosc2_create_cctx(cparams); + if (cctx == NULL) { + printf("Cannot create compression context!\n"); return -1; } - /* Compress with clevel=5 and shuffle active */ - csize = blosc2_compress_ctx(cctx, data_in, chunksize, data_out, chunksize + BLOSC2_MAX_OVERHEAD); - if (csize == 0) { - printf("Buffer is incompressible. Giving up.\n"); + blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS; + dparams.nthreads = 1; + dparams.schunk = schunk; + blosc2_context *dctx; + dctx = blosc2_create_dctx(dparams); + if (cctx == NULL) { + printf("Cannot create decompression context!\n"); return -1; - } else if (csize < 0) { - printf("Compression error. Error code: %" PRId64 "\n", csize); - return (int) csize; } - csize_f += csize; - /* Decompress */ - dsize = blosc2_decompress_ctx(dctx, data_out, chunksize + BLOSC2_MAX_OVERHEAD, data_dest, chunksize); - if (dsize <= 0) { - printf("Decompression error. Error code: %" PRId64 "\n", dsize); - return (int) dsize; - } + for (int ci = 0; ci < nchunks; ci++) { + + decompressed = blosc2_schunk_decompress_chunk(schunk, ci, data_in, chunksize); + if (decompressed < 0) { + printf("Error decompressing chunk \n"); + return -1; + } - for (int i = 0; i < chunksize; i++) { - if (data_in[i] != data_dest[i]) { - printf("i: %d, data %u, dest %u", i, data_in[i], data_dest[i]); - printf("\n Decompressed data differs from original!\n"); + /* Compress with clevel=5 and shuffle active */ + csize = blosc2_compress_ctx(cctx, data_in, chunksize, data_out, + chunksize + BLOSC2_MAX_OVERHEAD); + if (csize == 0) { + printf("Buffer is incompressible. Giving up.\n"); return -1; + } else if (csize < 0) { + printf("Compression error. Error code: %" PRId64 "\n", csize); + return (int) csize; + } + csize_f += csize; + + // Force usage of alternative decoder + if (read_correct) { + data_out[BLOSC2_CHUNK_FILTER_CODES + BLOSC2_MAX_FILTERS - 1] = 250; + } else { + data_out[BLOSC2_CHUNK_FILTER_CODES + BLOSC2_MAX_FILTERS - 1] = BLOSC_FILTER_BYTEDELTA; + } + + /* Decompress */ + dsize = blosc2_decompress_ctx(dctx, data_out, chunksize + BLOSC2_MAX_OVERHEAD, + data_dest, chunksize); + if (dsize <= 0) { + printf("Decompression error. Error code: %" PRId64 "\n", dsize); + return (int) dsize; + } + + for (int i = 0; i < chunksize; i++) { + if (data_in[i] != data_dest[i]) { + printf("i: %d, data %u, dest %u", i, data_in[i], data_dest[i]); + printf("\n Decompressed data differs from original!\n"); + return -1; + } } } - } - csize_f = csize_f / nchunks; + csize_f = csize_f / nchunks; + blosc2_free_ctx(cctx); + blosc2_free_ctx(dctx); + } free(data_in); free(data_out); free(data_dest); - blosc2_free_ctx(cctx); - blosc2_free_ctx(dctx); printf("Successful roundtrip!\n"); printf("Compression: %d -> %" PRId64 " (%.1fx)\n", chunksize, csize_f, (1. * chunksize) / (double) csize_f); @@ -146,8 +247,8 @@ int rand_() { blosc2_storage b2_storage = {.cparams=&cparams}; b2_storage.contiguous = true; - b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, NULL, 0, - NULL, 0); + b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, NULL, + 0,NULL, 0); b2nd_array_t *arr; BLOSC_ERROR(b2nd_from_cbuffer(ctx, &arr, data, size)); @@ -183,8 +284,8 @@ int mixed_values() { blosc2_storage b2_storage = {.cparams=&cparams}; b2_storage.contiguous = true; - b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, NULL, 0, - NULL, 0); + b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, + NULL, 0, NULL, 0); b2nd_array_t *arr; BLOSC_ERROR(b2nd_from_cbuffer(ctx, &arr, data, size)); @@ -220,8 +321,8 @@ int arange_like() { blosc2_storage b2_storage = {.cparams=&cparams}; b2_storage.contiguous = true; - b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, NULL, 0, - NULL, 0); + b2nd_context_t *ctx = b2nd_create_ctx(&b2_storage, ndim, shape, chunkshape, blockshape, + NULL, 0,NULL, 0); b2nd_array_t *arr; BLOSC_ERROR(b2nd_from_cbuffer(ctx, &arr, data, size)); diff --git a/plugins/filters/filters-registry.c b/plugins/filters/filters-registry.c index 58ee65dc2..04b69ac93 100644 --- a/plugins/filters/filters-registry.c +++ b/plugins/filters/filters-registry.c @@ -1,5 +1,5 @@ /* - Copyright (c) 2021 The Blosc Development Team + Copyright (c) 2021 The Blosc Development Team https://blosc.org License: BSD 3-Clause (see LICENSE.txt) */ @@ -29,11 +29,21 @@ void register_filters(void) { ndmean.backward = &ndmean_backward; register_filter_private(&ndmean); + // Buggy version. See #524 + blosc2_filter bytedelta_buggy; + bytedelta_buggy.id = BLOSC_FILTER_BYTEDELTA_BUGGY; + bytedelta_buggy.name = "bytedelta_buggy"; + bytedelta_buggy.version = 1; + bytedelta_buggy.forward = &bytedelta_forward_buggy; + bytedelta_buggy.backward = &bytedelta_backward_buggy; + register_filter_private(&bytedelta_buggy); + + // Fixed version. See #524 blosc2_filter bytedelta; bytedelta.id = BLOSC_FILTER_BYTEDELTA; bytedelta.name = "bytedelta"; + bytedelta.version = 1; bytedelta.forward = &bytedelta_forward; bytedelta.backward = &bytedelta_backward; register_filter_private(&bytedelta); - }