diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index b68cc1630..8eeb047dd 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -203,6 +204,8 @@ struct task_metrics { // The amount of time that this thread has lost due to retries (not including blocked time) long time_lost_nanos = 0; + long gpu_max_memory_allocated = 0; + void take_from(task_metrics& other) { add(other); @@ -215,6 +218,8 @@ struct task_metrics { this->num_times_split_retry_throw += other.num_times_split_retry_throw; this->time_blocked_nanos += other.time_blocked_nanos; this->time_lost_nanos += other.time_lost_nanos; + this->gpu_max_memory_allocated = + std::max(this->gpu_max_memory_allocated, other.gpu_max_memory_allocated); } void clear() @@ -295,6 +300,8 @@ class full_thread_state { // time) long time_retry_running_nanos = 0; std::chrono::time_point block_start; + long gpu_memory_allocated_bytes = 0; + // metrics for the current thread task_metrics metrics; @@ -799,6 +806,11 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos); } + long get_and_reset_gpu_max_memory_allocated(long const task_id) + { + return get_and_reset_metric(task_id, &task_metrics::gpu_max_memory_allocated); + } + void check_and_break_deadlocks() { std::unique_lock lock(state_mutex); @@ -807,7 +819,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { bool cpu_prealloc(size_t const amount, bool const blocking) { - // amount is not used yet, but is here in case we want it in the future. std::unique_lock lock(state_mutex); auto const thread_id = static_cast(pthread_self()); return pre_alloc_core(thread_id, true, blocking, lock); @@ -820,10 +831,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { { // addr is not used yet, but is here in case we want it in the future. // amount is not used yet, but is here in case we want it for debugging/metrics. - // blocking is not used yet. It could be used for some debugging so we are keeping it. std::unique_lock lock(state_mutex); auto const thread_id = static_cast(pthread_self()); - post_alloc_success_core(thread_id, true, was_recursive, lock); + post_alloc_success_core(thread_id, true, was_recursive, amount, lock); } bool cpu_postalloc_failed(bool const was_oom, bool const blocking, bool const was_recursive) @@ -838,7 +848,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // addr is not used yet, but is here in case we want it in the future. // amount is not used yet, but is here in case we want it for debugging/metrics. std::unique_lock lock(state_mutex); - dealloc_core(true, lock); + dealloc_core(true, lock, amount); } /** @@ -1333,15 +1343,18 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { * `likely_spill` if this allocation should be treated differently, because * we detected recursion while handling a prior allocation in this thread. */ - void post_alloc_success(long const thread_id, bool const likely_spill) + void post_alloc_success(long const thread_id, + bool const likely_spill, + std::size_t const num_bytes) { std::unique_lock lock(state_mutex); - post_alloc_success_core(thread_id, false, likely_spill, lock); + post_alloc_success_core(thread_id, false, likely_spill, num_bytes, lock); } void post_alloc_success_core(long const thread_id, bool const is_for_cpu, bool const was_recursive, + std::size_t const num_bytes, std::unique_lock& lock) { // pre allocate checks @@ -1360,6 +1373,14 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { } transition(thread->second, thread_state::THREAD_RUNNING); thread->second.is_cpu_alloc = false; + // num_bytes is likely not padded, which could cause slight inaccuracies + // but for now it shouldn't matter for watermark purposes + if (!is_for_cpu) { + thread->second.gpu_memory_allocated_bytes += num_bytes; + thread->second.metrics.gpu_max_memory_allocated = + std::max(thread->second.metrics.gpu_max_memory_allocated, + thread->second.gpu_memory_allocated_bytes); + } break; default: break; } @@ -1735,7 +1756,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { bool const likely_spill = pre_alloc(tid); try { void* ret = resource->allocate(num_bytes, stream); - post_alloc_success(tid, likely_spill); + post_alloc_success(tid, likely_spill, num_bytes); return ret; } catch (rmm::out_of_memory const& e) { // rmm::out_of_memory is what is thrown when an allocation failed @@ -1751,7 +1772,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { throw rmm::bad_alloc("Internal Error"); } - void dealloc_core(bool const is_for_cpu, std::unique_lock& lock) + void dealloc_core(bool const is_for_cpu, + std::unique_lock& lock, + std::size_t const num_bytes) { auto const tid = static_cast(pthread_self()); auto const thread = threads.find(tid); @@ -1779,6 +1802,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { if (is_for_cpu == t_state.is_cpu_alloc) { transition(t_state, thread_state::THREAD_ALLOC_FREE); } + if (!is_for_cpu) { t_state.gpu_memory_allocated_bytes -= num_bytes; } break; default: break; } @@ -1793,7 +1817,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // deallocate success if (size > 0) { std::unique_lock lock(state_mutex); - dealloc_core(false, lock); + dealloc_core(false, lock, size); } } }; @@ -2079,6 +2103,19 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost CATCH_STD(env, 0) } +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetGpuMaxMemoryAllocated( + JNIEnv* env, jclass, jlong ptr, jlong task_id) +{ + JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0); + try { + cudf::jni::auto_set_device(env); + auto mr = reinterpret_cast(ptr); + return mr->get_and_reset_gpu_max_memory_allocated(task_id); + } + CATCH_STD(env, 0) +} + JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_startRetryBlock( JNIEnv* env, jclass, jlong ptr, jlong thread_id) { diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java index e17189460..45a234dcc 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java @@ -589,6 +589,22 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) { } } + /** + * Get the max device memory footprint, in bytes, that this task had allocated over its lifetime + * @param taskId the id of the task to get the metric for. + * @return the max device memory footprint. + */ + public static long getAndResetGpuMaxMemoryAllocated(long taskId) { + synchronized (Rmm.class) { + if (sra != null && sra.isOpen()) { + return sra.getAndResetGpuMaxMemoryAllocated(taskId); + } else { + // sra is not set so the value is by definition 0 + return 0; + } + } + } + /** * Called before doing an allocation on the CPU. This could throw an injected exception to help * with testing. diff --git a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java index d766c3423..9e3414f7d 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java @@ -251,6 +251,10 @@ public long getAndResetComputeTimeLostToRetry(long taskId) { return getAndResetComputeTimeLostToRetry(getHandle(), taskId); } + public long getAndResetGpuMaxMemoryAllocated(long taskId) { + return getAndResetGpuMaxMemoryAllocated(getHandle(), taskId); + } + /** * Called before doing an allocation on the CPU. This could throw an injected exception to help @@ -319,6 +323,7 @@ public void cpuDeallocate(long ptr, long amount) { private static native int getAndResetSplitRetryThrowInternal(long handle, long taskId); private static native long getAndResetBlockTimeInternal(long handle, long taskId); private static native long getAndResetComputeTimeLostToRetry(long handle, long taskId); + private static native long getAndResetGpuMaxMemoryAllocated(long handle, long taskId); private static native void startRetryBlock(long handle, long threadId); private static native void endRetryBlock(long handle, long threadId); private static native void checkAndBreakDeadlocks(long handle); diff --git a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java index 373deb9ca..987dd5853 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java @@ -43,6 +43,8 @@ import static org.junit.jupiter.api.Assertions.fail; public class RmmSparkTest { + private final static long ALIGNMENT = 256; + @BeforeEach public void setup() { if (Rmm.isInitialized()) { @@ -317,6 +319,7 @@ public void testInsertOOMsGpu() { assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetComputeTimeLostToRetryNs(taskid)); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); RmmSpark.startDedicatedTaskThread(threadId, taskid, t); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); try { @@ -343,6 +346,7 @@ public void testInsertOOMsGpu() { assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); assertEquals(1, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); + assertEquals(ALIGNMENT, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); RmmSpark.blockThreadUntilReady(); // Allocate something small and verify that it works... @@ -356,6 +360,7 @@ public void testInsertOOMsGpu() { assertThrows(GpuSplitAndRetryOOM.class, () -> Rmm.alloc(100).close()); assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(1, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); + assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); // Verify that injecting OOM does not cause the block to actually happen assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); @@ -635,8 +640,8 @@ void setupRmmForTestingWithLimits(long maxAllocSize, RmmEventHandler eventHandle boolean succeeded = false; try { resource = new RmmCudaMemoryResource(); - resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, 256); - resource = new RmmTrackingResourceAdaptor<>(resource, 256); + resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, ALIGNMENT); + resource = new RmmTrackingResourceAdaptor<>(resource, ALIGNMENT); Rmm.setCurrentDeviceResource(resource, null, false); succeeded = true; } finally { @@ -760,9 +765,9 @@ public void testBasicCpuBlocking() throws ExecutionException, InterruptedExcepti @Test public void testBasicMixedBlocking() throws ExecutionException, InterruptedException, TimeoutException { - // 10 MiB - setupRmmForTestingWithLimits(10 * 1024 * 1024); - LimitingOffHeapAllocForTests.setLimit(10 * 1024 * 1024); + final long MB = 1024 * 1024; + setupRmmForTestingWithLimits(10 * MB); + LimitingOffHeapAllocForTests.setLimit(10 * MB); TaskThread taskOne = new TaskThread("TEST THREAD ONE", 1); TaskThread taskTwo = new TaskThread("TEST THREAD TWO", 2); TaskThread taskThree = new TaskThread("TEST THREAD THREE", 3); @@ -771,6 +776,9 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep taskTwo.initialize(); taskThree.initialize(); taskFour.initialize(); + + final long FIVE_MB = 5 * MB; + final long SIX_MB = 6 * MB; try { long tOneId = taskOne.getThreadId(); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tOneId)); @@ -784,18 +792,18 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep long tFourId = taskFour.getThreadId(); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tFourId)); - try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, 5 * 1024 * 1024)) { + try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, FIVE_MB)) { firstGpuAlloc.waitForAlloc(); - try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, 5 * 1024 * 1024)) { + try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, FIVE_MB)) { firstCpuAlloc.waitForAlloc(); // Blocking GPU Alloc - try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, 6 * 1024 * 1024)) { + try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, SIX_MB)) { taskThree.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS); // Blocking CPU Alloc - try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, 6 * 1024 * 1024)) { + try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, SIX_MB)) { taskFour.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS); // We want to make sure that the order of wakeup corresponds to the location of the data that was released @@ -814,9 +822,13 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep } } finally { taskOne.done(); + assertEquals(FIVE_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(1)); taskTwo.done(); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(2)); taskThree.done(); + assertEquals(SIX_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(3)); taskFour.done(); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(4)); } }