From 1613e008ded80a0ed35d1a40c2016a476f0cf278 Mon Sep 17 00:00:00 2001 From: Pete Peterson Date: Thu, 26 Dec 2024 12:53:21 -0500 Subject: [PATCH] Refactor code to multithread reading from disk --- .../src/AlignAndFocusPowderSlim.cpp | 284 +++++++++++------- 1 file changed, 176 insertions(+), 108 deletions(-) diff --git a/Framework/DataHandling/src/AlignAndFocusPowderSlim.cpp b/Framework/DataHandling/src/AlignAndFocusPowderSlim.cpp index ee4932168caa..8b608e05236f 100644 --- a/Framework/DataHandling/src/AlignAndFocusPowderSlim.cpp +++ b/Framework/DataHandling/src/AlignAndFocusPowderSlim.cpp @@ -9,9 +9,7 @@ #include "MantidAPI/Axis.h" #include "MantidAPI/FileProperty.h" #include "MantidAPI/MatrixWorkspace.h" -#include "MantidDataHandling/LoadBankFromDiskTask.h" #include "MantidDataHandling/LoadEventNexus.h" -#include "MantidDataHandling/LoadEventNexusIndexSetup.h" #include "MantidDataObjects/EventList.h" #include "MantidDataObjects/MaskWorkspace.h" #include "MantidDataObjects/TableWorkspace.h" @@ -22,10 +20,11 @@ #include "MantidKernel/ArrayProperty.h" #include "MantidKernel/Timer.h" #include "MantidKernel/VectorHelper.h" -#include "MantidNexus/NexusIOHelper.h" +#include "MantidNexus/H5Util.h" #include "tbb/parallel_for.h" #include "tbb/parallel_reduce.h" +#include #include #include @@ -89,35 +88,32 @@ class NexusLoader { : m_is_time_filtered(is_time_filtered), m_pulse_start_index(pulse_start_index), m_pulse_stop_index(pulse_stop_index) {} - static void loadPulseTimes(std::unique_ptr> &data, ::NeXus::File &h5file) { + static void loadPulseTimes(H5::Group &entry, std::unique_ptr> &data) { // /entry/DASlogs/frequency/time - h5file.openGroup("DASlogs", "NXcollection"); - h5file.openGroup("frequency", "NXlog"); - h5file.openData("time"); + auto logs = entry.openGroup("DASlogs"); // type=NXcollection + auto frequency = entry.openGroup("frequency"); // type=NXlog" - // This is the data size - ::NeXus::Info id_info = h5file.getInfo(); - const auto dim0 = static_cast(LoadBankFromDiskTask::recalculateDataSize(id_info.dims[0])); - data->resize(dim0); + auto dataset = entry.openDataSet("time"); + NeXus::H5Util::readArray1DCoerce(dataset, *data); - Mantid::NeXus::NeXusIOHelper::readNexusVector(*data, h5file, "time"); - - // close the sds - h5file.closeData(); - h5file.closeGroup(); - h5file.closeGroup(); + // groups close themselves } - void loadTOF(std::unique_ptr> &data, ::NeXus::File &h5file, + void loadTOF(H5::Group &event_group, std::unique_ptr> &data, const std::pair &eventRange) { // g_log.information(NxsFieldNames::TIME_OF_FLIGHT); - h5file.openData(NxsFieldNames::TIME_OF_FLIGHT); - + auto tof_SDS = event_group.openDataSet(NxsFieldNames::TIME_OF_FLIGHT); + // TODO probably should resize data array + /* // This is the data size - ::NeXus::Info id_info = h5file.getInfo(); + ::NeXus::Info id_info = m_h5file.getInfo(); const auto dim0 = static_cast(LoadBankFromDiskTask::recalculateDataSize(id_info.dims[0])); + */ if (m_is_time_filtered) { + throw std::runtime_error("filtering not implemented"); + // TODO sort this out H5Util doesn't have slab read yet + /* // These are the arguments to getSlab() std::vector loadStart(1, eventRange.first); int64_t slabsize = (eventRange.second == std::numeric_limits::max()) @@ -127,34 +123,37 @@ class NexusLoader { data->resize(loadSize[0]); Mantid::NeXus::NeXusIOHelper::readNexusSlab( - *data, h5file, NxsFieldNames::TIME_OF_FLIGHT, loadStart, loadSize); + *data, m_h5file, NxsFieldNames::TIME_OF_FLIGHT, loadStart, loadSize); + */ } else { - data->resize(dim0); - Mantid::NeXus::NeXusIOHelper::readNexusVector(*data, h5file, NxsFieldNames::TIME_OF_FLIGHT); + // TODO probably should resize data array + NeXus::H5Util::readArray1DCoerce(tof_SDS, *data); } // get the units std::string tof_unit; - h5file.getAttr("units", tof_unit); - - // close the sds - h5file.closeData(); + NeXus::H5Util::readStringAttribute(tof_SDS, "units", tof_unit); // Convert Tof to microseconds if (tof_unit != MICROSEC) Kernel::Units::timeConversionVector(*data, tof_unit, MICROSEC); } - void loadDetid(std::unique_ptr> &data, ::NeXus::File &h5file, + void loadDetid(H5::Group &event_group, std::unique_ptr> &data, const std::pair &eventRange) { // g_log.information(NxsFieldNames::DETID); - h5file.openData(NxsFieldNames::DETID); - + auto detID_SDS = event_group.openDataSet(NxsFieldNames::DETID); + // TODO probably should resize data array + /* // This is the data size - ::NeXus::Info id_info = h5file.getInfo(); + ::NeXus::Info id_info = m_h5file.getInfo(); const auto dim0 = static_cast(LoadBankFromDiskTask::recalculateDataSize(id_info.dims[0])); + */ if (m_is_time_filtered) { + throw std::runtime_error("filtering not implemented"); + // TODO sort this out H5Util doesn't have slab read yet + /* // These are the arguments to getSlab() std::vector loadStart(1, eventRange.first); int64_t slabsize = (eventRange.second == std::numeric_limits::max()) @@ -164,45 +163,38 @@ class NexusLoader { data->resize(loadSize[0]); Mantid::NeXus::NeXusIOHelper::readNexusSlab( - *data, h5file, NxsFieldNames::DETID, loadStart, loadSize); + *data, m_h5file, NxsFieldNames::DETID, loadStart, loadSize); + */ } else { - data->resize(dim0); - Mantid::NeXus::NeXusIOHelper::readNexusVector(*data, h5file, NxsFieldNames::DETID); + // TODO probably should resize data array + NeXus::H5Util::readArray1DCoerce(detID_SDS, *data); } - - // close the sds - h5file.closeData(); } private: - void loadEventIndex(std::unique_ptr> &data, ::NeXus::File &h5file) { + void loadEventIndex(H5::Group &event_group, std::unique_ptr> &data) { // g_log.information(NxsFieldNames::INDEX_ID); - h5file.openData(NxsFieldNames::INDEX_ID); - - // This is the data size - ::NeXus::Info id_info = h5file.getInfo(); - const auto dim0 = static_cast(LoadBankFromDiskTask::recalculateDataSize(id_info.dims[0])); - data->resize(dim0); - - Mantid::NeXus::NeXusIOHelper::readNexusVector(*data, h5file, NxsFieldNames::INDEX_ID); - - // close the sds - h5file.closeData(); + auto index_SDS = event_group.openDataSet(NxsFieldNames::INDEX_ID); + NeXus::H5Util::readArray1DCoerce(index_SDS, *data); } public: - std::pair getEventIndexRange(::NeXus::File &h5file) { - uint64_t start_event = 0; - uint64_t stop_event = std::numeric_limits::max(); + std::pair getEventIndexRange(H5::Group &event_group) { + constexpr uint64_t START_DEFAULT = 0; + constexpr uint64_t STOP_DEFAULT = std::numeric_limits::max(); + if (m_is_time_filtered) { std::unique_ptr> event_index = std::make_unique>(); - this->loadEventIndex(event_index, h5file); + this->loadEventIndex(event_group, event_index); - start_event = event_index->at(m_pulse_stop_index); + uint64_t start_event = event_index->at(m_pulse_stop_index); + uint64_t stop_event = STOP_DEFAULT; if (m_pulse_stop_index != std::numeric_limits::max()) stop_event = event_index->at(m_pulse_stop_index); + return {start_event, stop_event}; + } else { + return {START_DEFAULT, STOP_DEFAULT}; } - return {start_event, stop_event}; } private: @@ -246,36 +238,6 @@ class Histogrammer { std::optional (*m_findBin)(const MantidVec &, const double, const double, const double, const bool); }; -template class ProcessEventsTask { -public: - ProcessEventsTask(const Histogrammer *histogrammer, const std::vector *detids, - const std::vector *tofs, const AlignAndFocusPowderSlim::BankCalibration *calibration, - std::vector *y_temp, const std::set *masked) - : m_histogrammer(histogrammer), m_detids(detids), m_tofs(tofs), m_calibration(calibration), y_temp(y_temp), - masked(masked) {} - - void operator()(const tbb::blocked_range &range) const { - for (size_t i = range.begin(); i < range.end(); ++i) { - const auto detid = static_cast(m_detids->at(i)); - if (masked->contains(detid)) - continue; - const auto tof = static_cast(m_tofs->at(i)) * m_calibration->value(detid); - - const auto binnum = m_histogrammer->findBin(tof); - if (binnum) - y_temp->at(binnum.value())++; - } - } - -private: - const Histogrammer *m_histogrammer; - const std::vector *m_detids; - const std::vector *m_tofs; - const AlignAndFocusPowderSlim::BankCalibration *m_calibration; - std::vector *y_temp; - const std::set *masked; -}; - template class MinMax { const std::vector *vec; @@ -316,6 +278,118 @@ template std::pair parallel_minmax(const std::vector return std::make_pair(finder.minval, finder.maxval); } } + +template class ProcessEventsTask { +public: + ProcessEventsTask(const Histogrammer *histogrammer, const std::vector *detids, + const std::vector *tofs, const AlignAndFocusPowderSlim::BankCalibration *calibration, + std::vector *y_temp, const std::set *masked) + : m_histogrammer(histogrammer), m_detids(detids), m_tofs(tofs), m_calibration(calibration), y_temp(y_temp), + masked(masked) {} + + void operator()(const tbb::blocked_range &range) const { + for (size_t i = range.begin(); i < range.end(); ++i) { + const auto detid = static_cast(m_detids->at(i)); + if (masked->contains(detid)) + continue; + const auto tof = static_cast(m_tofs->at(i)) * m_calibration->value(detid); + + const auto binnum = m_histogrammer->findBin(tof); + if (binnum) + y_temp->at(binnum.value())++; + } + } + +private: + const Histogrammer *m_histogrammer; + const std::vector *m_detids; + const std::vector *m_tofs; + const AlignAndFocusPowderSlim::BankCalibration *m_calibration; + std::vector *y_temp; + const std::set *masked; +}; + +class ProcessBankTask { +public: + ProcessBankTask(std::vector &bankEntryNames, H5::H5File &h5file, const bool is_time_filtered, + const size_t pulse_start_index, const size_t pulse_stop_index, MatrixWorkspace_sptr &wksp, + const std::map &calibration, const std::set &masked, const double binWidth, + const bool linearBins) + : m_h5file(h5file), m_bankEntries(bankEntryNames), + m_loader(is_time_filtered, pulse_start_index, pulse_stop_index), m_wksp(wksp), m_calibration(calibration), + m_masked(masked), m_binWidth(binWidth), m_linearBins(linearBins) { + if (false) { // H5Freopen_async(h5file.getId(), m_h5file.getId()) < 0) { + throw std::runtime_error("failed to reopen async"); + } + } + + void operator()(const tbb::blocked_range &range) const { + // re-use vectors to save malloc/free calls + std::unique_ptr> event_detid = std::make_unique>(); + std::unique_ptr> event_time_of_flight = std::make_unique>(); + + auto entry = m_h5file.openGroup("entry"); // type=NXentry + for (size_t wksp_index = range.begin(); wksp_index < range.end(); ++wksp_index) { + const auto &bankName = m_bankEntries[wksp_index]; + Kernel::Timer timer; + std::cout << bankName << " start" << std::endl; + + event_detid->clear(); + event_time_of_flight->clear(); + + { // shrink variable scope + // open the bank + auto event_group = entry.openGroup(bankName); // type=NXevent_data + + // get filtering range + const auto eventRange = m_loader.getEventIndexRange(event_group); + // load data + m_loader.loadTOF(event_group, event_time_of_flight, eventRange); + m_loader.loadDetid(event_group, event_detid, eventRange); + } + std::cout << bankName << " done reading " << timer << std::endl; + timer.reset(); + + if (event_time_of_flight->empty() || event_detid->empty()) { + // g_log.warning() << "No data for bank " << entry_name << '\n'; + continue; + } + + // process the events that were loaded + const auto [minval, maxval] = parallel_minmax(event_detid.get()); + AlignAndFocusPowderSlim::BankCalibration calibration(static_cast(minval), static_cast(maxval), + m_calibration); + + auto &spectrum = m_wksp->getSpectrum(wksp_index); + Histogrammer histogrammer(&spectrum.readX(), m_binWidth, m_linearBins); + const auto numEvent = event_time_of_flight->size(); + + // std::atomic allows for multi-threaded accumulation and who cares about floats when you are just + // counting things + std::vector y_temp(spectrum.dataY().size()); + + // threaded processing of the events + constexpr size_t GRAINSIZE_EVENT{2000}; + ProcessEventsTask task(&histogrammer, event_detid.get(), event_time_of_flight.get(), &calibration, &y_temp, + &m_masked); + tbb::parallel_for(tbb::blocked_range(0, numEvent, GRAINSIZE_EVENT), task); + auto &y_values = spectrum.dataY(); + std::copy(y_temp.cbegin(), y_temp.cend(), y_values.begin()); + std::cout << bankName << " stop " << timer << std::endl; + } + } + +private: + H5::H5File m_h5file; + const std::vector m_bankEntries; + mutable NexusLoader m_loader; + MatrixWorkspace_sptr m_wksp; + const std::map m_calibration; // detid: 1/difc + const std::set m_masked; + const double m_binWidth; + const bool m_linearBins; +}; + } // anonymous namespace //---------------------------------------------------------------------------------------------- @@ -411,10 +485,7 @@ void AlignAndFocusPowderSlim::exec() { */ // load the events - ::NeXus::File h5file(filename); - - h5file.openPath("/"); - h5file.openGroup(ENTRY_TOP_LEVEL, "NXentry"); // TODO should this allow other entries? + H5::H5File h5file(filename, H5F_ACC_RDONLY); // filter by time double filter_time_start_sec = getProperty(PropertyNames::FILTER_TIMESTART); @@ -424,7 +495,8 @@ void AlignAndFocusPowderSlim::exec() { is_time_filtered = true; g_log.information() << "Filtering pulses from " << filter_time_start_sec << " to " << filter_time_stop_sec << "s\n"; std::unique_ptr> pulse_times = std::make_unique>(); - NexusLoader::loadPulseTimes(pulse_times, h5file); + auto entry = h5file.openGroup(ENTRY_TOP_LEVEL); + NexusLoader::loadPulseTimes(entry, pulse_times); g_log.information() << "Pulse times from " << pulse_times->front() << " to " << pulse_times->back() << " with length " << pulse_times->size() << '\n'; if (!std::is_sorted(pulse_times->cbegin(), pulse_times->cend())) { @@ -465,7 +537,7 @@ void AlignAndFocusPowderSlim::exec() { const std::set &classEntries = itClassEntries->second; // filter out the diagnostic entries - std::set bankEntryNames; + std::vector bankEntryNames; { const std::regex classRegex("(/entry/)([^/]*)"); std::smatch groups; @@ -478,13 +550,20 @@ void AlignAndFocusPowderSlim::exec() { } else if (classEntry.ends_with("bank_unmapped_events")) { // do nothing } else { - bankEntryNames.insert(entry_name); + bankEntryNames.push_back(entry_name); } } } } - NexusLoader loader(is_time_filtered, pulse_start_index, pulse_stop_index); + // threaded processing of the banks + ProcessBankTask task(bankEntryNames, h5file, is_time_filtered, pulse_start_index, pulse_stop_index, wksp, + m_calibration, m_masked, binWidth, linearBins); + constexpr size_t GRAINSIZE_BANK{2}; + tbb::parallel_for(tbb::blocked_range(0, bankEntryNames.size(), GRAINSIZE_BANK), task); + + /* + NexusLoader loader(h5file, is_time_filtered, pulse_start_index, pulse_stop_index); size_t specnum = 0; for (const std::string &entry_name : bankEntryNames) { @@ -501,16 +580,8 @@ void AlignAndFocusPowderSlim::exec() { // get filtering range const auto eventRange = loader.getEventIndexRange(h5file); - { - const auto startTime = std::chrono::high_resolution_clock::now(); - loader.loadTOF(event_time_of_flight, h5file, eventRange); - addTimer("readTOF" + entry_name, startTime, std::chrono::high_resolution_clock::now()); - } - { - const auto startTime = std::chrono::high_resolution_clock::now(); - loader.loadDetid(event_detid, h5file, eventRange); - addTimer("readDetID" + entry_name, startTime, std::chrono::high_resolution_clock::now()); - } + loader.loadTOF(event_time_of_flight, eventRange); + loader.loadDetid(event_detid, eventRange); if (event_time_of_flight->empty() || event_detid->empty()) { g_log.warning() << "No data for bank " << entry_name << '\n'; @@ -543,12 +614,9 @@ void AlignAndFocusPowderSlim::exec() { h5file.closeGroup(); specnum++; } + */ } - // go back to where we started - h5file.closeGroup(); - h5file.close(); - // TODO load logs wksp->setYUnit("Counts"); wksp->getAxis(0)->setUnit("DSpacing");