Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check p2p_agents_list_ before deleting #69

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hipamd/src/hip_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ hip::Stream* getStream(hipStream_t stream, bool wait) {
hip::Stream* hip_stream = reinterpret_cast<hip::Stream*>(stream);
if (wait && !(hip_stream->Flags() & hipStreamNonBlocking)) {
constexpr bool WaitNullStreamOnly = true;
iHipWaitActiveStreams(hip_stream, WaitNullStreamOnly);
hip_stream->GetDevice()->WaitActiveStreams(hip_stream, WaitNullStreamOnly);
}
return hip_stream;
}
Expand Down
143 changes: 141 additions & 2 deletions hipamd/src/hip_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ hip::Stream* Device::NullStream(bool wait) {
}
if (wait == true) {
// Wait for all active streams before executing commands on the default
iHipWaitActiveStreams(null_stream_);
WaitActiveStreams(null_stream_);
}
return null_stream_;
}
Expand Down Expand Up @@ -149,11 +149,150 @@ void Device::Reset() {
mem_pools_.clear();
}
flags_ = hipDeviceScheduleSpin;
hip::Stream::destroyAllStreams(deviceId_);
destroyAllStreams();
amd::MemObjMap::Purge(devices()[0]);
Create();
}

// ================================================================================================
void Device::WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream) {
amd::Command::EventWaitList eventWaitList(0);
bool submitMarker = 0;

auto waitForStream = [&submitMarker,
&eventWaitList](hip::Stream* stream) {
if (amd::Command *command = stream->getLastQueuedCommand(true)) {
amd::Event &event = command->event();
// Check HW status of the ROCcrl event.
// Note: not all ROCclr modes support HW status
bool ready = stream->device().IsHwEventReady(event);
if (!ready) {
ready = (command->status() == CL_COMPLETE);
}
submitMarker |= stream->vdev()->isFenceDirty();
// Check the current active status
if (!ready) {
command->notifyCmdQueue();
eventWaitList.push_back(command);
} else {
command->release();
}
}
};

if (wait_null_stream) {
if (null_stream_) {
waitForStream(null_stream_);
}
} else {
amd::ScopedLock lock(streamSetLock);

for (const auto& active_stream : streamSet) {
// If it's the current device
if (// Make sure it's a default stream
((active_stream->Flags() & hipStreamNonBlocking) == 0) &&
// and it's not the current stream
(active_stream != blocking_stream)) {
// Get the last valid command
waitForStream(active_stream);
}
}
}

// Check if we have to wait anything
if (eventWaitList.size() > 0 || submitMarker) {
amd::Command* command = new amd::Marker(*blocking_stream, kMarkerDisableFlush, eventWaitList);
if (command != nullptr) {
command->enqueue();
command->release();
}
}

// Release all active commands. It's safe after the marker was enqueued
for (const auto& it : eventWaitList) {
it->release();
}
}

// ================================================================================================
void Device::AddStream(Stream* stream) {
amd::ScopedLock lock(streamSetLock);
streamSet.insert(stream);
}

// ================================================================================================
void Device::RemoveStream(Stream* stream){
amd::ScopedLock lock(streamSetLock);
streamSet.erase(stream);
}

// ================================================================================================
bool Device::StreamExists(Stream* stream){
amd::ScopedLock lock(streamSetLock);
if (streamSet.find(stream) != streamSet.end()) {
return true;
}
return false;
}

// ================================================================================================
void Device::destroyAllStreams() {
std::vector<Stream*> toBeDeleted;
{
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->Null() == false ) {
toBeDeleted.push_back(it);
}
}
}
for (auto& it : toBeDeleted) {
hip::Stream::Destroy(it);
}
}

// ================================================================================================
void Device::SyncAllStreams( bool cpu_wait) {
// Make a local copy to avoid stalls for GPU finish with multiple threads
std::vector<hip::Stream*> streams;
streams.reserve(streamSet.size());
{
amd::ScopedLock lock(streamSetLock);
for (auto it : streamSet) {
streams.push_back(it);
it->retain();
}
}
for (auto it : streams) {
it->finish(cpu_wait);
it->release();
}
// Release freed memory for all memory pools on the device
ReleaseFreedMemory();
}

// ================================================================================================
bool Device::StreamCaptureBlocking() {
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->GetCaptureStatus() == hipStreamCaptureStatusActive && it->Flags() != hipStreamNonBlocking) {
return true;
}
}
return false;
}

// ================================================================================================
bool Device::existsActiveStreamForDevice() {
amd::ScopedLock lock(streamSetLock);
for (const auto& active_stream : streamSet) {
if (active_stream->GetQueueStatus()) {
return true;
}
}
return false;
}

// ================================================================================================
Device::~Device() {
if (default_mem_pool_ != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion hipamd/src/hip_device_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ hipError_t hipDeviceSetSharedMemConfig(hipSharedMemConfig config) {
hipError_t hipDeviceSynchronize() {
HIP_INIT_API(hipDeviceSynchronize);
constexpr bool kDoWaitForCpu = true;
hip::Stream::SyncAllStreams(hip::getCurrentDevice()->deviceId(), kDoWaitForCpu);
hip::getCurrentDevice()->SyncAllStreams(kDoWaitForCpu);
HIP_RETURN(hipSuccess);
}

Expand Down
3 changes: 2 additions & 1 deletion hipamd/src/hip_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ hipError_t capturehipMemcpyParam2DAsync(hipStream_t& stream, const hip_Memcpy2D*
}
p.dstArray = pCopy->dstArray;
p.dstPos = {pCopy->dstXInBytes, pCopy->dstY, 0};
p.dstPtr.pitch = pCopy->srcPitch;
p.dstPtr.pitch = pCopy->dstPitch;
if (pCopy->dstDevice != nullptr) {
p.dstPtr.ptr = pCopy->dstDevice;
}
Expand Down Expand Up @@ -719,6 +719,7 @@ hipError_t capturehipMemset2DAsync(hipStream_t& stream, void*& dst, size_t& pitc
memsetParams.width = width;
memsetParams.height = height;
memsetParams.pitch = pitch;
memsetParams.elementSize = 1;
hip::Stream* s = reinterpret_cast<hip::Stream*>(stream);
hip::GraphNode* pGraphNode;
hipError_t status =
Expand Down
31 changes: 19 additions & 12 deletions hipamd/src/hip_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,9 @@ class stream_per_thread {
/// Returns the CU mask for the current stream
const std::vector<uint32_t> GetCUMask() const { return cuMask_; }

/// Sync all streams
static void SyncAllStreams(int deviceId, bool cpu_wait = true);

/// Check whether any blocking stream running
static bool StreamCaptureBlocking();

/// Destroy all streams on a given device
static void destroyAllStreams(int deviceId);

static void Destroy(hip::Stream* stream);

/// Check Stream Capture status to make sure it is done
Expand Down Expand Up @@ -416,7 +410,6 @@ class stream_per_thread {
parallelCaptureStreams_.erase(it);
}
}
static bool existsActiveStreamForDevice(hip::Device* device);

/// The stream should be destroyed via release() rather than delete
private:
Expand All @@ -426,6 +419,8 @@ class stream_per_thread {
/// HIP Device class
class Device {
amd::Monitor lock_{"Device lock", true};
amd::Monitor streamSetLock{"Guards device stream set"};
std::unordered_set<hip::Stream*> streamSet;
/// ROCclr context
amd::Context* context_;
/// Device's ID
Expand Down Expand Up @@ -499,7 +494,7 @@ class stream_per_thread {
amd::ScopedLock lock(lock_);
/// Either stream is active or device is active
if (isActive_) return true;
if (Stream::existsActiveStreamForDevice(this)) {
if (existsActiveStreamForDevice()) {
isActive_ = true;
return true;
}
Expand Down Expand Up @@ -540,6 +535,22 @@ class stream_per_thread {

/// Returns true if memory pool is valid on this device
bool IsMemoryPoolValid(MemoryPool* pool);
void AddStream(Stream* stream);

void RemoveStream(Stream* stream);

bool StreamExists(Stream* stream);

void destroyAllStreams();

void SyncAllStreams( bool cpu_wait = true);

bool StreamCaptureBlocking();

bool existsActiveStreamForDevice();
/// Wait all active streams on the blocking queue. The method enqueues a wait command and
/// doesn't stall the current thread
void WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false);
};

/// Thread Local Storage Variables Aggregator Class
Expand Down Expand Up @@ -589,10 +600,6 @@ class stream_per_thread {

extern void WaitThenDecrementSignal(hipStream_t stream, hipError_t status, void* user_data);

/// Wait all active streams on the blocking queue. The method enqueues a wait command and
/// doesn't stall the current thread
extern void iHipWaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false);

extern std::vector<hip::Device*> g_devices;
extern hipError_t ihipDeviceGetCount(int* count);
extern int ihipGetDevice();
Expand Down
8 changes: 4 additions & 4 deletions hipamd/src/hip_memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ hipError_t ihipFree(void *ptr) {
if (memory_object != nullptr) {
// Wait on the device, associated with the current memory object during allocation
auto device_id = memory_object->getUserData().deviceId;
hip::Stream::SyncAllStreams(device_id);
g_devices[device_id]->SyncAllStreams();

// Find out if memory belongs to any memory pool
if (!g_devices[device_id]->FreeMemory(memory_object, nullptr)) {
Expand Down Expand Up @@ -743,7 +743,7 @@ hipError_t ihipArrayDestroy(hipArray_t array) {

auto image = as_amd(memObj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
g_devices[image->getUserData().deviceId]->SyncAllStreams();
image->release();

delete array;
Expand Down Expand Up @@ -1252,7 +1252,7 @@ hipError_t ihipHostUnregister(void* hostPtr) {

if (mem != nullptr) {
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(mem->getUserData().deviceId);
g_devices[mem->getUserData().deviceId]->SyncAllStreams();

amd::MemObjMap::RemoveMemObj(hostPtr);
for (const auto& device: g_devices) {
Expand Down Expand Up @@ -4304,7 +4304,7 @@ hipError_t ihipMipmappedArrayDestroy(hipMipmappedArray_t mipmapped_array_ptr) {

auto image = as_amd(mem_obj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
g_devices[image->getUserData().deviceId]->SyncAllStreams();
image->release();

delete mipmapped_array_ptr;
Expand Down
8 changes: 7 additions & 1 deletion hipamd/src/hip_platform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ void __hipRegisterTexture(
}

void __hipUnregisterFatBinary(hip::FatBinaryInfo** modules) {
hipError_t err = PlatformState::instance().removeFatBinary(modules);
// By calling hipDeviceSynchronize ensure that all HSA signal handlers
// complete before removeFatBinary
hipError_t err = hipDeviceSynchronize();
if (err != hipSuccess) {
LogPrintfError("Error during hipDeviceSynchronize, error: %d", err);
}
err = PlatformState::instance().removeFatBinary(modules);
guarantee((err == hipSuccess), "Cannot Unregister Fat Binary, error:%d", err);
}

Expand Down
Loading