Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add central synchronize configuration parameter to Alpaka modules #47028

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ The `...` can in principle be any of the module abilities listed in the linked T

New base classes (or other functionality) can be added based on new use cases that come up.

The Alpaka-based ESProducers should use the `ESProducer` base class (`#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"`). Note that the Alpaka-based ESProducer constructor must pass the argument `edm::ParameterSet` object to the constructor of the `ESProducer` base class.
The Alpaka-based ESProducers should use the `ESProducer` base class (`#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"`).

Note that both the Alpaka-based EDProducer and ESProducer constructors must pass the argument `edm::ParameterSet` object to the constructor of their base class.

Note that currently Alpaka-based ESSources are not supported. If you need to produce EventSetup data products into a Record for which there is no ESSource yet, use [`EmptyESSource`](https://twiki.cern.ch/twiki/bin/view/CMSPublic/SWGuideEDMParametersForModules#EmptyESSource).

Expand Down Expand Up @@ -237,8 +239,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
class ExampleAlpakaProducer : public global::EDProducer<> {
public:
ExampleAlpakaProducer(edm::ParameterSet const& iConfig)
// produces() must not specify the product type, it is deduced from deviceToken_
: deviceToken_{produces()}, size_{iConfig.getParameter<int32_t>("size")} {}
: EDProducer<>(iConfig),
// produces() must not specify the product type, it is deduced from deviceToken_
deviceToken_{produces()},
size_{iConfig.getParameter<int32_t>("size")} {}

// device::Event and device::EventSetup are defined in ALPAKA_ACCELERATOR_NAMESPACE as well
void produce(edm::StreamID sid, device::Event& iEvent, device::EventSetup const& iSetup) const override {
Expand Down Expand Up @@ -479,6 +483,24 @@ process.ProcessAcceleratorAlpaka.setBackend("serial_sync") # or "cuda_async" or
process.options.accelerators = ["cpu"] # or "gpu-nvidia" or "gpu-amd"
```

### Blocking synchronization (for testing)

While the general approach is to favor asynchronous operations with non-blocking synchronization, for testing purposes it can be useful to synchronize the EDModule's `acquire()` / `produce()` or ESProducer's production functions in a blocking way. Such a blocking synchronization can be specified for individual modules via the `alpaka` `PSet` along
```python
process.producer = cms.EDProducer("ExampleAlpakaProducer@alpaka",
...
alpaka = cms.untracked.PSet(
synchronize = cms.untracked.bool(True)
)
)
```

The blocking synchronization can be specified for all Alpaka modules via the `ProcessAcceleratorAlpaka` along
```python
process.ProcessAcceleratorAlpaka.setSynchronize(True)
```
Note that the possible per-module parameter overrides this global setting.


## Unit tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
public:
// TODO: WaitingTaskWithArenaHolder not really needed for host synchronous case
// Constructor overload to be called from acquire()
EDMetadataAcquireSentry(edm::StreamID stream, edm::WaitingTaskWithArenaHolder holder);
EDMetadataAcquireSentry(edm::StreamID stream, edm::WaitingTaskWithArenaHolder holder, bool synchronize);

// Constructor overload to be called from registerTransformAsync()
EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder);
EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder, bool synchronize = false);

EDMetadataAcquireSentry(EDMetadataAcquireSentry const&) = delete;
EDMetadataAcquireSentry& operator=(EDMetadataAcquireSentry const&) = delete;
Expand All @@ -40,6 +40,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
std::shared_ptr<EDMetadata> metadata_;

edm::WaitingTaskWithArenaHolder waitingTaskHolder_;
bool const synchronize_;
};
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
class EDMetadataSentry {
public:
// For normal module
EDMetadataSentry(edm::StreamID stream);
EDMetadataSentry(edm::StreamID stream, bool synchronize);

// For ExternalWork-module's produce()
EDMetadataSentry(std::shared_ptr<EDMetadata> metadata) : metadata_(std::move(metadata)) {}
EDMetadataSentry(std::shared_ptr<EDMetadata> metadata, bool synchronize)
: metadata_(std::move(metadata)), synchronize_(synchronize) {}

EDMetadataSentry(EDMetadataSentry const&) = delete;
EDMetadataSentry& operator=(EDMetadataSentry const&) = delete;
Expand All @@ -31,6 +32,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

private:
std::shared_ptr<EDMetadata> metadata_;
bool const synchronize_;
};
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
4 changes: 2 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "FWCore/Framework/interface/ESProducer.h"
#include "FWCore/Framework/interface/MakeDataException.h"
#include "FWCore/Framework/interface/produce_helpers.h"
#include "HeterogeneousCore/AlpakaCore/interface/module_backend_config.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESDeviceProduct.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESDeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/Record.h"
Expand All @@ -30,7 +30,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
public:
static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Base::prevalidate(descriptions);
cms::alpakatools::module_backend_config(descriptions);
cms::alpakatools::modulePrevalidate(descriptions);
}

protected:
Expand Down
16 changes: 14 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include "FWCore/Framework/interface/FrameworkfwdMostUsed.h"
#include "FWCore/Framework/interface/moduleAbilities.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/Utilities/interface/EDPutToken.h"
#include "FWCore/Utilities/interface/Transition.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/QueueCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/module_backend_config.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaInterface/interface/Backend.h"
#include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h"

Expand Down Expand Up @@ -46,7 +47,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
using Base = BaseT<Args..., edm::Transformer>;

public:
// TODO: default constructor to be removed after all derived classes have been migrated
ProducerBase() : backendToken_(Base::produces("backend")) {}
ProducerBase(edm::ParameterSet const& iConfig)
: backendToken_(Base::produces("backend")),
// The 'synchronize' parameter can be unset in Alpaka
// modules specified with the namespace prefix instead if
// '@alpaka' suffix
synchronize_(iConfig.getUntrackedParameter<edm::ParameterSet>("alpaka").getUntrackedParameter<bool>(
"synchronize", false)) {}

template <edm::Transition Tr = edm::Transition::Event>
[[nodiscard]] auto produces() noexcept {
Expand All @@ -60,16 +69,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Base::prevalidate(descriptions);
cms::alpakatools::module_backend_config(descriptions);
cms::alpakatools::modulePrevalidate(descriptions);
}

protected:
void putBackend(edm::Event& iEvent) const {
iEvent.emplace(this->backendToken_, static_cast<unsigned short>(kBackend));
}

bool synchronize() const { return synchronize_; }

private:
edm::EDPutTokenT<unsigned short> const backendToken_;
bool const synchronize_ = false;

template <typename TProducer, edm::Transition Tr>
friend class ProducerBaseAdaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
static_assert(not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ALPAKA_ACCELERATOR_NAMESPACE::global::EDProducer may not be used with ExternalWork ability. "
"Please use ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer instead.");
using Base = ProducerBase<edm::global::EDProducer, Args...>;

protected:
EDProducer() = default; // to be removed in the near future
EDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void produce(edm::StreamID sid, edm::Event& iEvent, edm::EventSetup const& iSetup) const final {
detail::EDMetadataSentry sentry(sid);
detail::EDMetadataSentry sentry(sid, this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(sid, ev, es);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
static_assert(not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ALPAKA_ACCELERATOR_NAMESPACE::stream::EDProducer may not be used with ExternalWork ability. "
"Please use ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer instead.");
using Base = ProducerBase<edm::stream::EDProducer, Args...>;

protected:
EDProducer() = default; // to be removed in the near future
EDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) final {
detail::EDMetadataSentry sentry(iEvent.streamID());
detail::EDMetadataSentry sentry(iEvent.streamID(), this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ExternalWork ability is redundant with ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer."
"Please remove it.");
using Base = ProducerBase<edm::stream::EDProducer, edm::ExternalWork, Args...>;

protected:
SynchronizingEDProducer() = default; // to be removed in the near future
SynchronizingEDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void acquire(edm::Event const& iEvent,
edm::EventSetup const& iSetup,
edm::WaitingTaskWithArenaHolder holder) final {
detail::EDMetadataAcquireSentry sentry(iEvent.streamID(), std::move(holder));
detail::EDMetadataAcquireSentry sentry(iEvent.streamID(), std::move(holder), this->synchronize());
device::Event const ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
acquire(ev, es);
metadata_ = sentry.finish();
}

void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) final {
detail::EDMetadataSentry sentry(std::move(metadata_));
detail::EDMetadataSentry sentry(std::move(metadata_), this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
Expand Down
10 changes: 10 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#ifndef HeterogeneousCore_AlpakaCore_interface_modulePrevalidate_h
#define HeterogeneousCore_AlpakaCore_interface_modulePrevalidate_h

#include "FWCore/Framework/interface/FrameworkfwdMostUsed.h"

namespace cms::alpakatools {
void modulePrevalidate(edm::ConfigurationDescriptions& iDesc);
} // namespace cms::alpakatools

#endif
10 changes: 0 additions & 10 deletions HeterogeneousCore/AlpakaCore/interface/module_backend_config.h

This file was deleted.

20 changes: 18 additions & 2 deletions HeterogeneousCore/AlpakaCore/python/ProcessAcceleratorAlpaka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from HeterogeneousCore.Common.PlatformStatus import PlatformStatus

class ModuleTypeResolverAlpaka:
def __init__(self, accelerators, backend):
def __init__(self, accelerators, backend, synchronize):
# first element is used as the default if nothing is set
self._valid_backends = []
if "gpu-nvidia" in accelerators:
Expand All @@ -23,6 +23,7 @@ def __init__(self, accelerators, backend):
if backend != self._valid_backends[0]:
self._valid_backends.remove(backend)
self._valid_backends.insert(0, backend)
self._synchronize = synchronize

def plugin(self):
return "ModuleTypeResolverAlpaka"
Expand All @@ -31,6 +32,11 @@ def setModuleVariant(self, module):
if module.type_().endswith("@alpaka"):
defaultBackend = self._valid_backends[0]
if hasattr(module, "alpaka"):
# Ensure the untrackedness already here, because the
# C++ ModuleTypeResolverAlpaka relies on the
# untrackedness (before the configuration validation)
if module.alpaka.isTracked():
raise cms.EDMException(cms.edm.errors.Configuration, "The 'alpaka' PSet in module '{}' is tracked, but it should be untracked".format(module.label()))
if hasattr(module.alpaka, "backend"):
if module.alpaka.backend == "":
module.alpaka.backend = defaultBackend
Expand All @@ -42,6 +48,12 @@ def setModuleVariant(self, module):
module.alpaka = cms.untracked.PSet(
backend = cms.untracked.string(defaultBackend)
)
isDefaultValue = lambda v: \
isinstance(v, type(cms.optional.untracked.bool)) \
and not v.isTracked() \
and v.isCompatibleCMSType(cms.bool)
if not hasattr(module.alpaka, "synchronize") or isDefaultValue(module.alpaka.synchronize):
module.alpaka.synchronize = cms.untracked.bool(self._synchronize)

class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
"""ProcessAcceleratorAlpaka itself does not define or inspect
Expand All @@ -53,14 +65,18 @@ class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
def __init__(self):
super(ProcessAcceleratorAlpaka, self).__init__()
self._backend = None
self._synchronize = False

# User-facing interface
def setBackend(self, backend):
self._backend = backend

def setSynchronize(self, synchronize):
self._synchronize = synchronize

# Framework-facing interface
def moduleTypeResolver(self, accelerators):
return ModuleTypeResolverAlpaka(accelerators, self._backend)
return ModuleTypeResolverAlpaka(accelerators, self._backend, self._synchronize)

def apply(self, process, accelerators):
# Propagate the AlpakaService messages through the MessageLogger
Expand Down
18 changes: 13 additions & 5 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataAcquireSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

namespace ALPAKA_ACCELERATOR_NAMESPACE {
namespace detail {
EDMetadataAcquireSentry::EDMetadataAcquireSentry(edm::StreamID streamID, edm::WaitingTaskWithArenaHolder holder)
: EDMetadataAcquireSentry(detail::chooseDevice(streamID), std::move(holder)) {}
EDMetadataAcquireSentry::EDMetadataAcquireSentry(edm::StreamID streamID,
edm::WaitingTaskWithArenaHolder holder,
bool synchronize)
: EDMetadataAcquireSentry(detail::chooseDevice(streamID), std::move(holder), synchronize) {}

EDMetadataAcquireSentry::EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder)
: waitingTaskHolder_(std::move(holder)) {
EDMetadataAcquireSentry::EDMetadataAcquireSentry(Device const& device,
edm::WaitingTaskWithArenaHolder holder,
bool synchronize)
: waitingTaskHolder_(std::move(holder)), synchronize_(synchronize) {
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
// all synchronous backends
metadata_ = std::make_shared<EDMetadata>(cms::alpakatools::getQueueCache<Queue>().get(device));
Expand All @@ -23,7 +27,11 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
#ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
// all asynchronous backends
std::shared_ptr<EDMetadata> EDMetadataAcquireSentry::finish() {
metadata_->enqueueCallback(std::move(waitingTaskHolder_));
if (synchronize_) {
alpaka::wait(metadata_->queue());
} else {
metadata_->enqueueCallback(std::move(waitingTaskHolder_));
}
return std::move(metadata_);
}
#endif
Expand Down
15 changes: 11 additions & 4 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace ALPAKA_ACCELERATOR_NAMESPACE {
namespace detail {
EDMetadataSentry::EDMetadataSentry(edm::StreamID streamID) {
EDMetadataSentry::EDMetadataSentry(edm::StreamID streamID, bool synchronize) : synchronize_(synchronize) {
auto const& device = detail::chooseDevice(streamID);
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
metadata_ = std::make_shared<EDMetadata>(cms::alpakatools::getQueueCache<Queue>().get(device));
Expand All @@ -16,12 +16,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
}

void EDMetadataSentry::finish(bool launchedAsyncWork) {
if (launchedAsyncWork) {
if constexpr (not std::is_same_v<Queue, alpaka::Queue<Device, alpaka::Blocking>>) {
if (launchedAsyncWork and synchronize_) {
alpaka::wait(metadata_->queue());
}
}

if (launchedAsyncWork and not synchronize_) {
metadata_->recordEvent();
} else {
// If we are certain no asynchronous work was launched (i.e.
// the Queue was not used in any way), there is no need to
// synchronize, and the Event can be discarded.
// the Queue was not used in any way), or a blocking
// synchronization was explicitly requested, there is no need
// to synchronize later, and the Event can be discarded.
metadata_->discardEvent();
}
}
Expand Down
Loading