Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device max memory watermark tracking #2392

Merged
merged 11 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <spdlog/sinks/ostream_sink.h>
#include <spdlog/spdlog.h>

#include <algorithm>
#include <chrono>
#include <exception>
#include <map>
Expand Down Expand Up @@ -203,6 +204,8 @@ struct task_metrics {
// The amount of time that this thread has lost due to retries (not including blocked time)
long time_lost_nanos = 0;

long gpu_max_memory_allocated = 0;

void take_from(task_metrics& other)
{
add(other);
Expand All @@ -215,6 +218,8 @@ struct task_metrics {
this->num_times_split_retry_throw += other.num_times_split_retry_throw;
this->time_blocked_nanos += other.time_blocked_nanos;
this->time_lost_nanos += other.time_lost_nanos;
this->gpu_max_memory_allocated =
std::max(this->gpu_max_memory_allocated, other.gpu_max_memory_allocated);
}

void clear()
Expand Down Expand Up @@ -295,6 +300,8 @@ class full_thread_state {
// time)
long time_retry_running_nanos = 0;
std::chrono::time_point<std::chrono::steady_clock> block_start;
long gpu_memory_allocated_bytes = 0;

// metrics for the current thread
task_metrics metrics;

Expand Down Expand Up @@ -799,6 +806,11 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos);
}

long get_and_reset_gpu_max_memory_allocated(long const task_id)
{
return get_and_reset_metric(task_id, &task_metrics::gpu_max_memory_allocated);
}

void check_and_break_deadlocks()
{
std::unique_lock<std::mutex> lock(state_mutex);
Expand All @@ -807,7 +819,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {

bool cpu_prealloc(size_t const amount, bool const blocking)
{
// amount is not used yet, but is here in case we want it in the future.
std::unique_lock<std::mutex> lock(state_mutex);
auto const thread_id = static_cast<long>(pthread_self());
return pre_alloc_core(thread_id, true, blocking, lock);
Expand All @@ -820,10 +831,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
{
// addr is not used yet, but is here in case we want it in the future.
// amount is not used yet, but is here in case we want it for debugging/metrics.
// blocking is not used yet. It could be used for some debugging so we are keeping it.
std::unique_lock<std::mutex> lock(state_mutex);
auto const thread_id = static_cast<long>(pthread_self());
post_alloc_success_core(thread_id, true, was_recursive, lock);
post_alloc_success_core(thread_id, true, was_recursive, amount, lock);
}

bool cpu_postalloc_failed(bool const was_oom, bool const blocking, bool const was_recursive)
Expand All @@ -838,7 +848,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
// addr is not used yet, but is here in case we want it in the future.
// amount is not used yet, but is here in case we want it for debugging/metrics.
zpuller marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock<std::mutex> lock(state_mutex);
dealloc_core(true, lock);
dealloc_core(true, lock, amount);
}

/**
Expand Down Expand Up @@ -1333,15 +1343,18 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
* `likely_spill` if this allocation should be treated differently, because
* we detected recursion while handling a prior allocation in this thread.
*/
void post_alloc_success(long const thread_id, bool const likely_spill)
void post_alloc_success(long const thread_id,
bool const likely_spill,
std::size_t const num_bytes)
{
std::unique_lock<std::mutex> lock(state_mutex);
post_alloc_success_core(thread_id, false, likely_spill, lock);
post_alloc_success_core(thread_id, false, likely_spill, num_bytes, lock);
}

void post_alloc_success_core(long const thread_id,
bool const is_for_cpu,
bool const was_recursive,
std::size_t const num_bytes,
std::unique_lock<std::mutex>& lock)
{
// pre allocate checks
Expand All @@ -1360,6 +1373,14 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
}
transition(thread->second, thread_state::THREAD_RUNNING);
thread->second.is_cpu_alloc = false;
// num_bytes is likely not padded, which could cause slight inaccuracies
// but for now it shouldn't matter for watermark purposes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure that we document this in the metrics page when it is fully ready

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which page are you referring to exactly?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!is_for_cpu) {
thread->second.gpu_memory_allocated_bytes += num_bytes;
thread->second.metrics.gpu_max_memory_allocated =
std::max(thread->second.metrics.gpu_max_memory_allocated,
thread->second.gpu_memory_allocated_bytes);
}
break;
default: break;
}
Expand Down Expand Up @@ -1735,7 +1756,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
bool const likely_spill = pre_alloc(tid);
try {
void* ret = resource->allocate(num_bytes, stream);
post_alloc_success(tid, likely_spill);
post_alloc_success(tid, likely_spill, num_bytes);
return ret;
} catch (rmm::out_of_memory const& e) {
// rmm::out_of_memory is what is thrown when an allocation failed
Expand All @@ -1751,7 +1772,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
throw rmm::bad_alloc("Internal Error");
}

void dealloc_core(bool const is_for_cpu, std::unique_lock<std::mutex>& lock)
void dealloc_core(bool const is_for_cpu,
std::unique_lock<std::mutex>& lock,
std::size_t const num_bytes)
{
auto const tid = static_cast<long>(pthread_self());
auto const thread = threads.find(tid);
Expand Down Expand Up @@ -1779,6 +1802,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
if (is_for_cpu == t_state.is_cpu_alloc) {
transition(t_state, thread_state::THREAD_ALLOC_FREE);
}
if (!is_for_cpu) { t_state.gpu_memory_allocated_bytes -= num_bytes; }
break;
default: break;
}
Expand All @@ -1793,7 +1817,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
// deallocate success
if (size > 0) {
std::unique_lock<std::mutex> lock(state_mutex);
dealloc_core(false, lock);
dealloc_core(false, lock, size);
}
}
};
Expand Down Expand Up @@ -2079,6 +2103,19 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost
CATCH_STD(env, 0)
}

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetGpuMaxMemoryAllocated(
JNIEnv* env, jclass, jlong ptr, jlong task_id)
{
JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0);
try {
cudf::jni::auto_set_device(env);
auto mr = reinterpret_cast<spark_resource_adaptor*>(ptr);
return mr->get_and_reset_gpu_max_memory_allocated(task_id);
}
CATCH_STD(env, 0)
}

JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_startRetryBlock(
JNIEnv* env, jclass, jlong ptr, jlong thread_id)
{
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,22 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) {
}
}

/**
* Get the max device memory footprint, in bytes, that this task had allocated over its lifetime
* @param taskId the id of the task to get the metric for.
* @return the max device memory footprint.
*/
public static long getAndResetGpuMaxMemoryAllocated(long taskId) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
return sra.getAndResetGpuMaxMemoryAllocated(taskId);
} else {
// sra is not set so the value is by definition 0
return 0;
}
}
}

/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
* with testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public long getAndResetComputeTimeLostToRetry(long taskId) {
return getAndResetComputeTimeLostToRetry(getHandle(), taskId);
}

public long getAndResetGpuMaxMemoryAllocated(long taskId) {
return getAndResetGpuMaxMemoryAllocated(getHandle(), taskId);
}


/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
Expand Down Expand Up @@ -319,6 +323,7 @@ public void cpuDeallocate(long ptr, long amount) {
private static native int getAndResetSplitRetryThrowInternal(long handle, long taskId);
private static native long getAndResetBlockTimeInternal(long handle, long taskId);
private static native long getAndResetComputeTimeLostToRetry(long handle, long taskId);
private static native long getAndResetGpuMaxMemoryAllocated(long handle, long taskId);
private static native void startRetryBlock(long handle, long threadId);
private static native void endRetryBlock(long handle, long threadId);
private static native void checkAndBreakDeadlocks(long handle);
Expand Down
30 changes: 21 additions & 9 deletions src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import static org.junit.jupiter.api.Assertions.fail;

public class RmmSparkTest {
private final static long ALIGNMENT = 256;

@BeforeEach
public void setup() {
if (Rmm.isInitialized()) {
Expand Down Expand Up @@ -317,6 +319,7 @@ public void testInsertOOMsGpu() {
assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetComputeTimeLostToRetryNs(taskid));
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));
RmmSpark.startDedicatedTaskThread(threadId, taskid, t);
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
try {
Expand All @@ -343,6 +346,7 @@ public void testInsertOOMsGpu() {
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
assertEquals(1, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(ALIGNMENT, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));
RmmSpark.blockThreadUntilReady();

// Allocate something small and verify that it works...
Expand All @@ -356,6 +360,7 @@ public void testInsertOOMsGpu() {
assertThrows(GpuSplitAndRetryOOM.class, () -> Rmm.alloc(100).close());
assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(1, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));

// Verify that injecting OOM does not cause the block to actually happen
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
Expand Down Expand Up @@ -635,8 +640,8 @@ void setupRmmForTestingWithLimits(long maxAllocSize, RmmEventHandler eventHandle
boolean succeeded = false;
try {
resource = new RmmCudaMemoryResource();
resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, 256);
resource = new RmmTrackingResourceAdaptor<>(resource, 256);
resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, ALIGNMENT);
resource = new RmmTrackingResourceAdaptor<>(resource, ALIGNMENT);
Rmm.setCurrentDeviceResource(resource, null, false);
succeeded = true;
} finally {
Expand Down Expand Up @@ -760,9 +765,9 @@ public void testBasicCpuBlocking() throws ExecutionException, InterruptedExcepti

@Test
public void testBasicMixedBlocking() throws ExecutionException, InterruptedException, TimeoutException {
// 10 MiB
setupRmmForTestingWithLimits(10 * 1024 * 1024);
LimitingOffHeapAllocForTests.setLimit(10 * 1024 * 1024);
final long MB = 1024 * 1024;
setupRmmForTestingWithLimits(10 * MB);
LimitingOffHeapAllocForTests.setLimit(10 * MB);
TaskThread taskOne = new TaskThread("TEST THREAD ONE", 1);
TaskThread taskTwo = new TaskThread("TEST THREAD TWO", 2);
TaskThread taskThree = new TaskThread("TEST THREAD THREE", 3);
Expand All @@ -771,6 +776,9 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
taskTwo.initialize();
taskThree.initialize();
taskFour.initialize();

final long FIVE_MB = 5 * MB;
final long SIX_MB = 6 * MB;
try {
long tOneId = taskOne.getThreadId();
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tOneId));
Expand All @@ -784,18 +792,18 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
long tFourId = taskFour.getThreadId();
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tFourId));

try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, 5 * 1024 * 1024)) {
try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, FIVE_MB)) {
firstGpuAlloc.waitForAlloc();

try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, 5 * 1024 * 1024)) {
try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, FIVE_MB)) {
firstCpuAlloc.waitForAlloc();

// Blocking GPU Alloc
try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, 6 * 1024 * 1024)) {
try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, SIX_MB)) {
taskThree.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS);

// Blocking CPU Alloc
try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, 6 * 1024 * 1024)) {
try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, SIX_MB)) {
taskFour.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS);

// We want to make sure that the order of wakeup corresponds to the location of the data that was released
Expand All @@ -814,9 +822,13 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
}
} finally {
taskOne.done();
assertEquals(FIVE_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(1));
taskTwo.done();
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(2));
taskThree.done();
assertEquals(SIX_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(3));
taskFour.done();
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(4));
}
}

Expand Down
Loading