From 46a2d17651b51e9273912de91d151b213fdd7a4c Mon Sep 17 00:00:00 2001 From: Hossein Moein Date: Wed, 22 May 2024 12:18:28 -0400 Subject: [PATCH] Implemented reading binary data in chunks --- docs/HTML/read.html | 23 +-- docs/HTML/write.html | 20 +- .../DataFrame/Internals/DataFrame_misc.tcc | 6 +- .../Internals/DataFrame_private_decl.h | 5 +- .../DataFrame/Internals/DataFrame_read.tcc | 175 ++++++++++-------- .../Internals/DataFrame_standalone.tcc | 172 +++++++++++++---- .../DataFrame/Internals/DataFrame_write.tcc | 18 +- test/dataframe_tester_3.cc | 43 +++++ 8 files changed, 318 insertions(+), 144 deletions(-) diff --git a/docs/HTML/read.html b/docs/HTML/read.html index b56e1534..a0783e99 100644 --- a/docs/HTML/read.html +++ b/docs/HTML/read.html @@ -41,7 +41,7 @@ - +

 bool
 read(const char *file_name,
@@ -76,7 +76,7 @@
       .
.
All empty lines or lines starting with # will be skipped.
- NOTE: Only in CSV2 format you can specify starting_row and num_rows. This way you can read very large files (that don't fit into memory) in chunks and process them. In this case the reading starts at starting_row and continues until either num_rows rows is read or EOF is reached.

+ NOTE: Only in CSV2 and binary formats you can specify starting_row and num_rows. This way you can read very large files (that don't fit into memory) in chunks and process them. In this case the reading starts at starting_row and continues until either num_rows rows is read or EOF is reached.

-----------------------------------------------
JSON file format looks like this:
@@ -96,7 +96,8 @@
  • Fields in column dictionaries must be in N (name), T (type), D (data) order
  • -----------------------------------------------
    - Binary format is a proprietary format, that is optimized for compressing algorithms. It also takes care of different endianness. The file is always written with the same endianness as the writing host. But it will be adjusted accordingly when reading it from a different host with a different endianness.

    + Binary format is a proprietary format, that is optimized for compressing algorithms. It also takes care of different endianness. The file is always written with the same endianness as the writing host. But it will be adjusted accordingly when reading it from a different host with a different endianness.
    + NOTE: Only in CSV2 and binary formats you can specify starting_row and num_rows. This way you can read very large files (that don't fit into memory) in chunks and process them. In this case the reading starts at starting_row and continues until either num_rows rows is read or end-of-column is reached.

    -----------------------------------------------
    In all formats the following data types are supported: @@ -161,7 +162,7 @@ NOTE:: This version of read() can be substantially faster, especially for larger files, than if you open the file yourself and use the read() version below. - + file_name: Complete path to the file
    iof: Specifies the I/O format. The default is CSV
    columns_only: If true, the index column is not read. You may want to do that to read multiple files into the same DataFrame. If columns_only is false the index column must exist in the stream. If columns_only is true the index column may or may not exist
    @@ -171,7 +172,7 @@ - +
    
     template<typename S>
     bool
    @@ -183,12 +184,12 @@
           
             Same as read() above, but takes a reference to a stream
           
    -      
    +      
           
         
     
         
    -       
    +       
             
    
     std::future<bool>
     read_async(const char *file_name,
    @@ -199,12 +200,12 @@
           
             Same as read() above, but executed asynchronously
           
    -      
    +      
           
         
     
         
    -       
    +       
             
    
     template<typename S>
     std::future<bool>
    @@ -216,7 +217,7 @@
           
             Same as read_async() above, but takes a reference to a stream
           
    -      
    +      
           
         
     
    @@ -239,7 +240,7 @@
             
     
           
    -      
    +      
             data_frame: A null terminated string that was generated by calling to_string(). It must contain a complete DataFrame
    diff --git a/docs/HTML/write.html b/docs/HTML/write.html index b0b00c4e..1cf59da6 100644 --- a/docs/HTML/write.html +++ b/docs/HTML/write.html @@ -41,7 +41,7 @@ - +
    
     template<typename S, typename ... Ts>
     bool
    @@ -52,7 +52,7 @@
           long max_recs = std::numeric_limits::max()) const; 
             
    - + It outputs the content of DataFrame into the stream o. Currently 4 formats (i.e. csv, csv2, json, binary) are supported specified by the iof parameter.


    The CSV file format is written:
    @@ -151,7 +151,7 @@
             
    - + S: Output stream type
    Ts: The list of types for all columns. A type should be specified only once
    o: Reference to an streamable object (e.g. cout, file, ...)
    @@ -163,7 +163,7 @@ - +
    
     template<typename ... Ts>
     std::future<bool>
    @@ -174,7 +174,7 @@
           long max_recs = std::numeric_limits::max()) const; 
             
    - + Same as write() above, but it takes a file name

    NOTE:: This version of write() can be substantially faster, especially for larger files, than if you open the file yourself and use the write() version above. @@ -183,7 +183,7 @@ - +
    
     template<typename S, typename ... Ts>
     std::future<bool>
    @@ -202,7 +202,7 @@
         
     
         
    -       
    +       
             
    
     template<typename ... Ts>
     std::future<bool>
    @@ -221,14 +221,14 @@
         
     
         
    -       
    +       
             
    
     template<typename ... Ts>
     std::string
     to_string(std::streamsize precision = 12) const; 
             
    - + This is a convenient function (simple implementation) to convert a DataFrame into a string that could be restored later by calling from_string(). It utilizes the write() member function of DataFrame.
    These functions could be used to transmit a DataFrame from one place to another or store a DataFrame in databases, caches, ...

    @@ -240,7 +240,7 @@ - + Ts: The list of types for all columns. A type should be specified only once
    precision: Specifies the precision for floating point numbers
    diff --git a/include/DataFrame/Internals/DataFrame_misc.tcc b/include/DataFrame/Internals/DataFrame_misc.tcc index e326ff8c..17dcd3fa 100644 --- a/include/DataFrame/Internals/DataFrame_misc.tcc +++ b/include/DataFrame/Internals/DataFrame_misc.tcc @@ -227,11 +227,11 @@ DataFrame::print_binary_functor_::operator() (const T &vec) { std::strncpy(col_name, name, sizeof(col_name)); os.write(col_name, sizeof(col_name)); if constexpr (std::is_same_v) - _write_binary_string_(os, vec); + _write_binary_string_(os, vec, start_row, end_row); else if constexpr (std::is_same_v) - _write_binary_datetime_(os, vec); + _write_binary_datetime_(os, vec, start_row, end_row); else - _write_binary_data_(os, vec); + _write_binary_data_(os, vec, start_row, end_row); return; } diff --git a/include/DataFrame/Internals/DataFrame_private_decl.h b/include/DataFrame/Internals/DataFrame_private_decl.h index 637770c0..195956b3 100644 --- a/include/DataFrame/Internals/DataFrame_private_decl.h +++ b/include/DataFrame/Internals/DataFrame_private_decl.h @@ -61,7 +61,10 @@ using JoinSortingPair = std::pair; // ---------------------------------------------------------------------------- void read_json_(std::istream &file, bool columns_only); -void read_binary_(std::istream &file); +void read_binary_(std::istream &file, + bool columns_only, + size_type starting_row, + size_type num_rows); void read_csv_(std::istream &file, bool columns_only); void read_csv2_(std::istream &file, bool columns_only, diff --git a/include/DataFrame/Internals/DataFrame_read.tcc b/include/DataFrame/Internals/DataFrame_read.tcc index 2a0d02b0..1220432e 100644 --- a/include/DataFrame/Internals/DataFrame_read.tcc +++ b/include/DataFrame/Internals/DataFrame_read.tcc @@ -1274,7 +1274,11 @@ read_csv2_(std::istream &stream, // ---------------------------------------------------------------------------- template -void DataFrame::read_binary_(std::istream &stream) { +void DataFrame:: +read_binary_(std::istream &stream, + bool columns_only, + size_type starting_row, + size_type num_rows) { endians ed; @@ -1288,63 +1292,79 @@ void DataFrame::read_binary_(std::istream &stream) { col_num = SwapBytes { }(col_num); char col_name[64]; + char col_type[32]; std::memset(col_name, 0, sizeof(col_name)); - stream.read(col_name, sizeof(col_name)); - if (std::strcmp(col_name, DF_INDEX_COL_NAME)) { - String1K err; + if (! columns_only) [[likely]] { + stream.read(col_name, sizeof(col_name)); + if (std::strcmp(col_name, DF_INDEX_COL_NAME)) { + String1K err; - err.printf("read_binary_(): ERROR: Expecting name '%s'", - DF_INDEX_COL_NAME); - throw DataFrameError(err.c_str()); - } + err.printf("read_binary_(): ERROR: Expecting name '%s'", + DF_INDEX_COL_NAME); + throw DataFrameError(err.c_str()); + } - char col_type[32]; + std::memset(col_type, 0, sizeof(col_type)); + stream.read(col_type, sizeof(col_type)); - std::memset(col_type, 0, sizeof(col_type)); - stream.read(col_type, sizeof(col_type)); - - IndexVecType idx_vec; - - if constexpr (std::is_same_v) - _read_binary_string_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_datetime_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else if constexpr (std::is_same_v) - _read_binary_data_(stream, idx_vec, needs_flipping); - else { - String1K err; + IndexVecType idx_vec; + + if constexpr (std::is_same_v) + _read_binary_string_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_datetime_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else if constexpr (std::is_same_v) + _read_binary_data_(stream, idx_vec, needs_flipping, + starting_row, num_rows); + else { + String1K err; - err.printf( - "read_binary_(): ERROR: Type '%s' is not supported for index", - col_type); - throw DataFrameError(err.c_str()); + err.printf( + "read_binary_(): ERROR: Type '%s' is not supported for index", + col_type); + throw DataFrameError(err.c_str()); + } + load_index(std::move(idx_vec)); } - load_index(std::move(idx_vec)); for (uint16_t i = 0; i < col_num; ++i) { stream.read(col_name, sizeof(col_name)); @@ -1353,105 +1373,120 @@ void DataFrame::read_binary_(std::istream &stream) { if (! std::strcmp(col_type, "string")) { ColumnVecType vec; - _read_binary_string_(stream, vec, needs_flipping); + _read_binary_string_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if (! std::strcmp(col_type, "DateTime")) { ColumnVecType vec; - _read_binary_datetime_(stream, vec, needs_flipping); + _read_binary_datetime_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if (! std::strcmp(col_type, "float")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "double")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "short")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "ushort")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "int")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "uint")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "long")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "ulong")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "longlong")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "ulonglong")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "char")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "uchar")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } else if ( ! std::strcmp(col_type, "bool")) { ColumnVecType vec; - _read_binary_data_(stream, vec, needs_flipping); + _read_binary_data_(stream, vec, needs_flipping, + starting_row, num_rows); load_column(col_name, std::move(vec), nan_policy::dont_pad_with_nans); } @@ -1523,13 +1558,7 @@ read (S &in_s, read_json_ (in_s, columns_only); } else if (iof == io_format::binary) { - if (columns_only || starting_row != 0 || - num_rows != std::numeric_limits::max()) [[unlikely]] - throw NotImplemented("read(): Reading columns only or in chunks " - "currently not implemented for " - "io_format::binary"); - - read_binary_ (in_s); + read_binary_ (in_s, columns_only, starting_row, num_rows); } else throw NotImplemented("read(): This io_format is not implemented"); diff --git a/include/DataFrame/Internals/DataFrame_standalone.tcc b/include/DataFrame/Internals/DataFrame_standalone.tcc index 97090aa3..d16452b0 100644 --- a/include/DataFrame/Internals/DataFrame_standalone.tcc +++ b/include/DataFrame/Internals/DataFrame_standalone.tcc @@ -884,26 +884,41 @@ inline static S &_write_csv_df_index_(S &o, unsigned char value) { // ---------------------------------------------------------------------------- template -inline static STRM &_write_binary_string_(STRM &strm, const V &str_vec) { +inline static STRM & +_write_binary_string_(STRM &strm, const V &str_vec, + std::size_t start_row, std::size_t end_row) { char buffer[32]; std::strncpy(buffer, "string", sizeof(buffer)); strm.write(buffer, sizeof(buffer)); - const uint64_t vec_size = str_vec.size(); + if (end_row > str_vec.size() || start_row > end_row) { + const uint64_t vec_size = 0; - strm.write(reinterpret_cast(&vec_size), sizeof(vec_size)); + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + return(strm); + } + else { + const uint64_t vec_size = end_row - start_row; + + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + } // It is better for compression, if you write the alike data together // - for (const auto &str : str_vec) { - const uint16_t str_sz = static_cast(str.size()); + for (uint64_t i = start_row; i < end_row; ++i) { + const uint16_t str_sz = static_cast(str_vec[i].size()); strm.write(reinterpret_cast(&str_sz), sizeof(str_sz)); } - for (const auto &str : str_vec) + for (uint64_t i = start_row; i < end_row; ++i) { + const auto &str = str_vec[i]; + strm.write(str.data(), str.size() * sizeof(char)); + } return (strm); } @@ -911,7 +926,9 @@ inline static STRM &_write_binary_string_(STRM &strm, const V &str_vec) { // ---------------------------------------------------------------------------- template -inline static STRM &_write_binary_data_(STRM &strm, const V &vec) { +inline static STRM & +_write_binary_data_(STRM &strm, const V &vec, + std::size_t start_row, std::size_t end_row) { using VecType = typename std::remove_reference::type; using ValueType = typename VecType::value_type; @@ -925,12 +942,23 @@ inline static STRM &_write_binary_data_(STRM &strm, const V &vec) { std::strncpy(buffer, "N/A", sizeof(buffer)); strm.write(buffer, sizeof(buffer)); - const uint64_t vec_size = vec.size(); + if (end_row > vec.size() || start_row > end_row) { + const uint64_t vec_size = 0; + + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + return(strm); + } + else { + const uint64_t vec_size = end_row - start_row; + + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + } - strm.write(reinterpret_cast(&vec_size), sizeof(vec_size)); if constexpr (std::is_same_v) { - for (const auto &b : vec) { - const bool bval = b; + for (uint64_t i = start_row; i < end_row; ++i) { + const bool bval = vec[i]; strm.write(reinterpret_cast(&bval), sizeof(bool)); } @@ -942,11 +970,11 @@ inline static STRM &_write_binary_data_(STRM &strm, const V &vec) { requires(const VecType &v) { v.data(); }; if constexpr (has_data_method) { - strm.write(reinterpret_cast(vec.data()), - vec_size * sizeof(ValueType)); + strm.write(reinterpret_cast(vec.data() + start_row), + (end_row - start_row) * sizeof(ValueType)); } else { - for (std::size_t i = 0; i < vec_size; ++i) + for (uint64_t i = start_row; i < end_row; ++i) strm.write(reinterpret_cast(&(vec[i])), sizeof(ValueType)); } @@ -958,18 +986,31 @@ inline static STRM &_write_binary_data_(STRM &strm, const V &vec) { // ---------------------------------------------------------------------------- template -inline static STRM &_write_binary_datetime_(STRM &strm, const V &dt_vec) { +inline static STRM & +_write_binary_datetime_(STRM &strm, const V &dt_vec, + std::size_t start_row, std::size_t end_row) { char buffer[32]; std::strncpy(buffer, "DateTime", sizeof(buffer)); strm.write(buffer, sizeof(buffer)); - const uint64_t vec_size = dt_vec.size(); + if (end_row > dt_vec.size() || start_row > end_row) { + const uint64_t vec_size = 0; + + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + return(strm); + } + else { + const uint64_t vec_size = end_row - start_row; + + strm.write(reinterpret_cast(&vec_size), + sizeof(vec_size)); + } - strm.write(reinterpret_cast(&vec_size), sizeof(vec_size)); - for (const auto &dt : dt_vec) { - const double val = static_cast(dt); + for (uint64_t i = start_row; i < end_row; ++i) { + const double val = static_cast(dt_vec[i]); strm.write(reinterpret_cast(&val), sizeof(val)); } @@ -981,7 +1022,8 @@ inline static STRM &_write_binary_datetime_(STRM &strm, const V &dt_vec) { template inline static STRM & -_read_binary_string_(STRM &strm, V &str_vec, bool needs_flipping) { +_read_binary_string_(STRM &strm, V &str_vec, bool needs_flipping, + std::size_t start_row, std::size_t num_rows) { uint64_t vec_size { 0 }; @@ -990,6 +1032,15 @@ _read_binary_string_(STRM &strm, V &str_vec, bool needs_flipping) { vec_size = SwapBytes { }(vec_size); + if (start_row > vec_size) { + String1K err; + + err.printf("_read_binary_string_(): ERROR: start_row %lu > " + "vec_size %lu", + start_row, vec_size); + throw DataFrameError(err.c_str()); + } + std::vector sizes (vec_size, 0); strm.read(reinterpret_cast(sizes.data()), @@ -1001,14 +1052,24 @@ _read_binary_string_(STRM &strm, V &str_vec, bool needs_flipping) { s = swaper(s); } - // Now read the strings + const uint64_t read_end = + num_rows == std::numeric_limits::max() + ? vec_size : uint64_t(start_row + num_rows); + + // Now read the strings. We read all data regardless of num_rows + // to advance the file pointer // - str_vec.reserve(vec_size); - for (const auto s : sizes) { - std::string str (std::size_t(s), 0); + str_vec.reserve(read_end > vec_size + ? vec_size - start_row : read_end - start_row); + for (uint64_t i = 0; i < vec_size; ++i) { + if (i >= start_row && i < read_end) [[likely]] { + std::string str (std::size_t(sizes[i]), 0); - strm.read(str.data(), s * sizeof(char)); - str_vec.emplace_back(std::move(str)); + strm.read(str.data(), sizes[i] * sizeof(char)); + str_vec.emplace_back(std::move(str)); + } + else + strm.seekg(sizes[i], std::ios_base::cur); } return (strm); @@ -1018,7 +1079,8 @@ _read_binary_string_(STRM &strm, V &str_vec, bool needs_flipping) { template inline static STRM & -_read_binary_data_(STRM &strm, V &vec, bool needs_flipping) { +_read_binary_data_(STRM &strm, V &vec, bool needs_flipping, + std::size_t start_row, std::size_t num_rows) { using VecType = typename std::remove_reference::type; using ValueType = typename VecType::value_type; @@ -1030,9 +1092,24 @@ _read_binary_data_(STRM &strm, V &vec, bool needs_flipping) { vec_size = SwapBytes { }(vec_size); + if (start_row > vec_size) { + String1K err; + + err.printf("_read_binary_data_(): ERROR: start_row %lu > " + "vec_size %lu", + start_row, vec_size); + throw DataFrameError(err.c_str()); + } + + const uint64_t read_end = + (num_rows == std::numeric_limits::max() || + (start_row + num_rows) > vec_size) + ? vec_size : uint64_t(start_row + num_rows); + + strm.seekg(start_row * sizeof(ValueType), std::ios_base::cur); if constexpr (std::is_same_v) { - vec.reserve(vec_size); - for (uint64_t i = 0; i < vec_size; ++i) { + vec.reserve(read_end - start_row); + for (uint64_t i = start_row; i < read_end; ++i) { bool val; strm.read(reinterpret_cast(&val), sizeof(val)); @@ -1040,12 +1117,13 @@ _read_binary_data_(STRM &strm, V &vec, bool needs_flipping) { } } else { - vec.resize(vec_size); + vec.resize(read_end - start_row); strm.read(reinterpret_cast(vec.data()), - vec_size * sizeof(ValueType)); + (read_end - start_row) * sizeof(ValueType)); if (needs_flipping) flip_endianness(vec); } + strm.seekg((vec_size - read_end) * sizeof(ValueType), std::ios_base::cur); return (strm); } @@ -1053,7 +1131,10 @@ _read_binary_data_(STRM &strm, V &vec, bool needs_flipping) { template inline static STRM & -_read_binary_datetime_(STRM &strm, V &dt_vec, bool needs_flipping) { +_read_binary_datetime_(STRM &strm, V &dt_vec, bool needs_flipping, + std::size_t start_row, std::size_t num_rows) { + + using ValueType = double; uint64_t vec_size { 0 }; @@ -1062,11 +1143,27 @@ _read_binary_datetime_(STRM &strm, V &dt_vec, bool needs_flipping) { vec_size = SwapBytes { }(vec_size); - SwapBytes swaper { }; + if (start_row > vec_size) { + String1K err; - dt_vec.reserve(vec_size); - for (uint64_t i = 0; i < vec_size; ++i) { - double val { 0 }; + err.printf("_read_binary_datetime_(): ERROR: start_row %lu > " + "vec_size %lu", + start_row, vec_size); + throw DataFrameError(err.c_str()); + } + + const uint64_t read_end = + (num_rows == std::numeric_limits::max() || + (start_row + num_rows) > vec_size) + ? vec_size : uint64_t(start_row + num_rows); + + strm.seekg(start_row * sizeof(ValueType), std::ios_base::cur); + + SwapBytes swaper { }; + + dt_vec.reserve(read_end - start_row); + for (uint64_t i = start_row; i < read_end; ++i) { + ValueType val { 0 }; strm.read(reinterpret_cast(&val), sizeof(val)); if (needs_flipping) val = swaper(val); @@ -1076,12 +1173,13 @@ _read_binary_datetime_(STRM &strm, V &dt_vec, bool needs_flipping) { static_cast(val); const DateTime::NanosecondType nano = static_cast( - (val - static_cast(tm)) * 1'000'000'000.0); + (val - static_cast(tm)) * 1'000'000'000.0); dt.set_time(tm, nano); dt_vec.emplace_back(dt); } + strm.seekg((vec_size - read_end) * sizeof(ValueType), std::ios_base::cur); return (strm); } diff --git a/include/DataFrame/Internals/DataFrame_write.tcc b/include/DataFrame/Internals/DataFrame_write.tcc index da429500..d337f311 100644 --- a/include/DataFrame/Internals/DataFrame_write.tcc +++ b/include/DataFrame/Internals/DataFrame_write.tcc @@ -101,7 +101,7 @@ write(S &o, o.precision(precision); if (iof == io_format::json) { o << "{\n"; - if (! columns_only) { + if (! columns_only) [[likely]] { _write_json_df_header_(o, DF_INDEX_COL_NAME, end_row - start_row); @@ -132,7 +132,7 @@ write(S &o, } } else if (iof == io_format::csv) { - if (! columns_only) { + if (! columns_only) [[likely]] { _write_csv_df_header_(o, DF_INDEX_COL_NAME, end_row - start_row) << ':'; @@ -154,7 +154,7 @@ write(S &o, } } else if (iof == io_format::csv2) { - if (! columns_only) { + if (! columns_only) [[likely]] { _write_csv_df_header_(o, DF_INDEX_COL_NAME, end_row - start_row); @@ -177,7 +177,7 @@ write(S &o, for (long i = start_row; i < end_row; ++i) { size_type count = 0; - if (! columns_only) { + if (! columns_only) [[likely]] { o << indices_[i]; need_pre_comma = true; count += 1; @@ -205,12 +205,12 @@ write(S &o, o.write(reinterpret_cast(&col_num), sizeof(col_num)); - print_binary_functor_ idx_functor (DF_INDEX_COL_NAME, - o, - start_row, - end_row); + if (! columns_only) [[likely]] { + print_binary_functor_ idx_functor ( + DF_INDEX_COL_NAME, o, start_row, end_row); - idx_functor(indices_); + idx_functor(indices_); + } const SpinGuard guard(lock_); diff --git a/test/dataframe_tester_3.cc b/test/dataframe_tester_3.cc index f793228a..ff3f4719 100644 --- a/test/dataframe_tester_3.cc +++ b/test/dataframe_tester_3.cc @@ -4015,6 +4015,48 @@ static void test_writing_binary_2() { // ---------------------------------------------------------------------------- +static void test_reading_in_binary_chunks() { + + std::cout << "\nTesting reading_in_binary_chunks( ) ..." << std::endl; + + try { + StrDataFrame df1; + + df1.read("SHORT_IBM.dat", io_format::binary, false, 0, 10); + assert(df1.get_index().size() == 10); + assert(df1.get_column("IBM_Close").size() == 10); + assert(df1.get_index()[0] == "2014-01-02"); + assert(df1.get_index()[9] == "2014-01-15"); + assert(fabs(df1.get_column("IBM_Close")[0] - 185.53) < 0.0001); + assert(fabs(df1.get_column("IBM_Close")[9] - 187.74) < 0.0001); + + StrDataFrame df2; + + df2.read("SHORT_IBM.dat", io_format::binary, false, 800, 10); + assert(df2.get_index().size() == 10); + assert(df2.get_column("IBM_Close").size() == 10); + assert(df2.get_index()[0] == "2017-03-08"); + assert(df2.get_index()[9] == "2017-03-21"); + assert(fabs(df2.get_column("IBM_Close")[0] - 179.45) < 0.0001); + assert(fabs(df2.get_column("IBM_Close")[9] - 173.88) < 0.0001); + + StrDataFrame df3; + + df3.read("SHORT_IBM.dat", io_format::binary, false, 1716, 10); + assert(df3.get_index().size() == 5); + assert(df3.get_column("IBM_Close").size() == 5); + assert(df3.get_index()[0] == "2020-10-26"); + assert(df3.get_index()[4] == "2020-10-30"); + assert(fabs(df3.get_column("IBM_Close")[0] - 112.22) < 0.0001); + assert(fabs(df3.get_column("IBM_Close")[4] - 111.66) < 0.0001); + } + catch (const DataFrameError &ex) { + std::cout << ex.what() << std::endl; + } +} + +// ---------------------------------------------------------------------------- + int main(int, char *[]) { MyDataFrame::set_optimum_thread_level(); @@ -4096,6 +4138,7 @@ int main(int, char *[]) { test_EhlersBandPassFilterVisitor(); test_writing_binary(); test_writing_binary_2(); + test_reading_in_binary_chunks(); return (0); }