Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing bytedelta #532

Merged
merged 13 commits into from
Jul 4, 2023
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ endif()

# Propagate CMAKE_OSX_ARCHITECTURES env variable into CMAKE_SYSTEM_PROCESSOR
if(DEFINED ENV{CMAKE_OSX_ARCHITECTURES})
if($ENV{CMAKE_OSX_ARCHITECTURES} STREQUAL "arm64")
if("$ENV{CMAKE_OSX_ARCHITECTURES}" STREQUAL "arm64")
set(CMAKE_SYSTEM_PROCESSOR arm64)
endif()
endif()
Expand Down
4 changes: 2 additions & 2 deletions blosc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ if(NOT DEACTIVATE_ZLIB)
if(ZLIB_NG_FOUND)
set(BLOSC_INCLUDE_DIRS ${BLOSC_INCLUDE_DIRS} ${ZLIB_NG_INCLUDE_DIR})
elseif(ZLIB_FOUND)
set(BLOSC_INCLUDE_DIRS ${BLOSC_INCLUDE_DIRS} ${ZLIB_INCLUDE_DIR})
set(BLOSC_INCLUDE_DIRS ${BLOSC_INCLUDE_DIRS} ${ZLIB_INCLUDE_DIRS})
else()
set(ZLIB_LOCAL_DIR ${INTERNAL_LIBS}/${ZLIB_NG_DIR})
set(BLOSC_INCLUDE_DIRS ${BLOSC_INCLUDE_DIRS} ${ZLIB_LOCAL_DIR})
Expand Down Expand Up @@ -112,7 +112,7 @@ if(NOT DEACTIVATE_ZLIB)
if(ZLIB_NG_FOUND)
set(LIBS ${LIBS} ${ZLIB_NG_LIBRARY})
elseif(ZLIB_FOUND)
set(LIBS ${LIBS} ${ZLIB_LIBRARY})
set(LIBS ${LIBS} ${ZLIB_LIBRARIES})
else()
set(ZLIB_LOCAL_DIR ${INTERNAL_LIBS}/${ZLIB_NG_DIR})
file(GLOB ZLIB_FILES ${ZLIB_LOCAL_DIR}/*.c)
Expand Down
27 changes: 11 additions & 16 deletions blosc/blosc2.c
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,12 @@ static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_h
return 0;
}

void _cycle_buffers(uint8_t **src, uint8_t **dest, uint8_t **tmp) {
uint8_t *tmp2 = *src;
*src = *dest;
*dest = *tmp;
*tmp = tmp2;
}

uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
const uint8_t* src, const int32_t offset,
Expand Down Expand Up @@ -948,10 +954,7 @@ uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t b
// No more filters are required
return _dest;
}
// Cycle buffers
_src = _dest;
_dest = _tmp;
_tmp = _src;
_cycle_buffers(&_src, &_dest, &_tmp);
}

/* Process the filter pipeline */
Expand All @@ -964,9 +967,7 @@ uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t b
shuffle(typesize, bsize, _src, _dest);
// Cycle filters when required
if (j < filters_meta[i]) {
_src = _dest;
_dest = _tmp;
_tmp = _src;
_cycle_buffers(&_src, &_dest, &_tmp);
}
}
break;
Expand Down Expand Up @@ -1025,9 +1026,7 @@ uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t b

// Cycle buffers when required
if (filters[i] != BLOSC_NOFILTER) {
_src = _dest;
_dest = _tmp;
_tmp = _src;
_cycle_buffers(&_src, &_dest, &_tmp);
}
}
return _src;
Expand Down Expand Up @@ -1344,9 +1343,7 @@ int pipeline_backward(struct thread_context* thread_context, const int32_t bsize
unshuffle(typesize, bsize, _src, _dest);
// Cycle filters when required
if (j < filters_meta[i]) {
_src = _dest;
_dest = _tmp;
_tmp = _src;
_cycle_buffers(&_src, &_dest, &_tmp);
}
// Check whether we have to copy the intermediate _dest buffer to final destination
if (last_copy_filter && (filters_meta[i] % 2) == 1 && j == filters_meta[i]) {
Expand Down Expand Up @@ -1424,9 +1421,7 @@ int pipeline_backward(struct thread_context* thread_context, const int32_t bsize

// Cycle buffers when required
if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
_src = _dest;
_dest = _tmp;
_tmp = _src;
_cycle_buffers(&_src, &_dest, &_tmp);
}
if (last_filter_index == i) {
break;
Expand Down
8 changes: 4 additions & 4 deletions include/blosc2.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ enum {
BLOSC2_GLOBAL_REGISTERED_FILTERS_START = 32,
BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP = 159,
//!< Blosc-registered filters must be between 32 - 159.
BLOSC2_GLOBAL_REGISTERED_FILTERS = 3,
BLOSC2_GLOBAL_REGISTERED_FILTERS = 4,
//!< Number of Blosc-registered filters at the moment.
BLOSC2_USER_REGISTERED_FILTERS_START = 160,
BLOSC2_USER_REGISTERED_FILTERS_STOP = 255,
Expand Down Expand Up @@ -1053,9 +1053,9 @@ typedef struct {
} blosc2_io;

static const blosc2_io BLOSC2_IO_DEFAULTS = {
.id = BLOSC2_IO_FILESYSTEM,
.name = "filesystem",
.params = NULL,
/* .id = */ BLOSC2_IO_FILESYSTEM,
/* .name = */ "filesystem",
/* .params = */ NULL,
};


Expand Down
3 changes: 2 additions & 1 deletion include/blosc2/filters-registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ extern "C" {
enum {
BLOSC_FILTER_NDCELL = 32,
BLOSC_FILTER_NDMEAN = 33,
BLOSC_FILTER_BYTEDELTA = 34,
BLOSC_FILTER_BYTEDELTA_BUGGY = 34, // buggy version. See #524
BLOSC_FILTER_BYTEDELTA = 35, // fixed version
};

void register_filters(void);
Expand Down
104 changes: 102 additions & 2 deletions plugins/filters/bytedelta/bytedelta.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ bytes16 simd_prefix_sum(bytes16 x)
return x;
}

uint8_t simd_get_last(bytes16 x) { return (_mm_extract_epi16(x, 7) >> 8) & 0xFF; }

#elif defined(__aarch64__) || defined(_M_ARM64)
// ARM v8 NEON code path
#define CPU_HAS_SIMD 1
Expand All @@ -72,6 +74,8 @@ bytes16 simd_prefix_sum(bytes16 x)
return x;
}

uint8_t simd_get_last(bytes16 x) { return vgetq_lane_u8(x, 15); }

#endif


Expand All @@ -93,6 +97,7 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin
const int stream_len = length / typesize;
for (int ich = 0; ich < typesize; ++ich) {
int ip = 0;
uint8_t _v2 = 0;
// SIMD delta within each channel, store
#if defined(CPU_HAS_SIMD)
bytes16 v2 = {0};
Expand All @@ -104,9 +109,11 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin
output += 16;
v2 = v;
}
if (stream_len > 15) {
_v2 = simd_get_last(v2);
}
#endif // #if defined(CPU_HAS_SIMD)
// scalar leftover
uint8_t _v2 = 0;
for (; ip < stream_len ; ip++) {
uint8_t v = *input;
input++;
Expand All @@ -121,7 +128,100 @@ int bytedelta_forward(const uint8_t *input, uint8_t *output, int32_t length, uin

// Fetch 16b from N streams, sum SIMD undelta
int bytedelta_backward(const uint8_t *input, uint8_t *output, int32_t length, uint8_t meta,
blosc2_dparams *dparams, uint8_t id) {
blosc2_dparams *dparams, uint8_t id) {
BLOSC_UNUSED_PARAM(id);

int typesize = meta;
if (typesize == 0) {
if (dparams->schunk == NULL) {
BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!");
BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
}
blosc2_schunk* schunk = (blosc2_schunk*)(dparams->schunk);
typesize = schunk->typesize;
}

const int stream_len = length / typesize;
for (int ich = 0; ich < typesize; ++ich) {
int ip = 0;
uint8_t _v2 = 0;
// SIMD fetch 16 bytes from each channel, prefix-sum un-delta
#if defined(CPU_HAS_SIMD)
bytes16 v2 = {0};
for (; ip < stream_len - 15; ip += 16) {
bytes16 v = simd_load(input);
input += 16;
// un-delta via prefix sum
v2 = simd_add(simd_prefix_sum(v), simd_duplane15(v2));
simd_store(output, v2);
output += 16;
}
if (stream_len > 15) {
_v2 = simd_get_last(v2);
}
#endif // #if defined(CPU_HAS_SIMD)
// scalar leftover
for (; ip < stream_len; ip++) {
uint8_t v = *input + _v2;
input++;
*output = v;
output++;
_v2 = v;
}
}

return BLOSC2_ERROR_SUCCESS;
}

// This is the original (and buggy) version of bytedelta. It is kept here for backwards compatibility.
// See #524 for details.
// Fetch 16b from N streams, compute SIMD delta
int bytedelta_forward_buggy(const uint8_t *input, uint8_t *output, int32_t length,
uint8_t meta, blosc2_cparams *cparams, uint8_t id) {
BLOSC_UNUSED_PARAM(id);

int typesize = meta;
if (typesize == 0) {
if (cparams->schunk == NULL) {
BLOSC_TRACE_ERROR("When meta is 0, you need to be on a schunk!");
BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
}
blosc2_schunk* schunk = (blosc2_schunk*)(cparams->schunk);
typesize = schunk->typesize;
}

const int stream_len = length / typesize;
for (int ich = 0; ich < typesize; ++ich) {
int ip = 0;
// SIMD delta within each channel, store
#if defined(CPU_HAS_SIMD)
bytes16 v2 = {0};
for (; ip < stream_len - 15; ip += 16) {
bytes16 v = simd_load(input);
input += 16;
bytes16 delta = simd_sub(v, simd_concat(v, v2));
simd_store(output, delta);
output += 16;
v2 = v;
}
#endif // #if defined(CPU_HAS_SIMD)
// scalar leftover
uint8_t _v2 = 0;
for (; ip < stream_len ; ip++) {
uint8_t v = *input;
input++;
*output = v - _v2;
output++;
_v2 = v;
}
}

return BLOSC2_ERROR_SUCCESS;
}

// Fetch 16b from N streams, sum SIMD undelta
int bytedelta_backward_buggy(const uint8_t *input, uint8_t *output, int32_t length,
uint8_t meta, blosc2_dparams *dparams, uint8_t id) {
BLOSC_UNUSED_PARAM(id);

int typesize = meta;
Expand Down
10 changes: 8 additions & 2 deletions plugins/filters/bytedelta/bytedelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ int bytedelta_forward(const uint8_t* input, uint8_t* output, int32_t length, uin
blosc2_cparams* cparams, uint8_t id);

int bytedelta_backward(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta,
blosc2_dparams* dparams, uint8_t id);
blosc2_dparams* dparams, uint8_t id);

#endif /* BLOSC_PLUGINS_FILTERS_BYTEDELTA_BYTEDELTA_H*/
int bytedelta_forward_buggy(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta,
blosc2_cparams* cparams, uint8_t id);

int bytedelta_backward_buggy(const uint8_t* input, uint8_t* output, int32_t length, uint8_t meta,
blosc2_dparams* dparams, uint8_t id);

#endif /* BLOSC_PLUGINS_FILTERS_BYTEDELTA_BYTEDELTA_H */
Loading
Loading