From 152a0f8804c333c4078b2fe1570260668bba6e36 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Fri, 30 Aug 2024 11:48:05 -0700 Subject: [PATCH 01/11] max memory allocated impl Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 46 ++++++++++++++++--- .../com/nvidia/spark/rapids/jni/RmmSpark.java | 12 +++++ .../rapids/jni/SparkResourceAdaptor.java | 5 ++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index b68cc16308..78043e8be4 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -203,6 +204,8 @@ struct task_metrics { // The amount of time that this thread has lost due to retries (not including blocked time) long time_lost_nanos = 0; + long max_memory_allocated = 0; + void take_from(task_metrics& other) { add(other); @@ -215,6 +218,7 @@ struct task_metrics { this->num_times_split_retry_throw += other.num_times_split_retry_throw; this->time_blocked_nanos += other.time_blocked_nanos; this->time_lost_nanos += other.time_lost_nanos; + this->max_memory_allocated = std::max(this->max_memory_allocated, other.max_memory_allocated); } void clear() @@ -295,6 +299,8 @@ class full_thread_state { // time) long time_retry_running_nanos = 0; std::chrono::time_point block_start; + long memory_allocated_bytes = 0; + // metrics for the current thread task_metrics metrics; @@ -799,6 +805,10 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos); } + long get_and_reset_max_memory_allocated(long const task_id) { + return get_and_reset_metric(task_id, &task_metrics::max_memory_allocated); + } + void check_and_break_deadlocks() { std::unique_lock lock(state_mutex); @@ -823,7 +833,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // blocking is not used yet. It could be used for some debugging so we are keeping it. std::unique_lock lock(state_mutex); auto const thread_id = static_cast(pthread_self()); - post_alloc_success_core(thread_id, true, was_recursive, lock); + post_alloc_success_core(thread_id, true, was_recursive, amount, lock); } bool cpu_postalloc_failed(bool const was_oom, bool const blocking, bool const was_recursive) @@ -838,7 +848,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // addr is not used yet, but is here in case we want it in the future. // amount is not used yet, but is here in case we want it for debugging/metrics. std::unique_lock lock(state_mutex); - dealloc_core(true, lock); + dealloc_core(true, lock, amount); } /** @@ -1333,18 +1343,20 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { * `likely_spill` if this allocation should be treated differently, because * we detected recursion while handling a prior allocation in this thread. */ - void post_alloc_success(long const thread_id, bool const likely_spill) + void post_alloc_success(long const thread_id, bool const likely_spill, std::size_t const num_bytes) { std::unique_lock lock(state_mutex); - post_alloc_success_core(thread_id, false, likely_spill, lock); + post_alloc_success_core(thread_id, false, likely_spill, num_bytes, lock); } void post_alloc_success_core(long const thread_id, bool const is_for_cpu, bool const was_recursive, + std::size_t const num_bytes, std::unique_lock& lock) { // pre allocate checks + std::cerr << "post_alloc_success_core" << std::endl; auto const thread = threads.find(thread_id); if (!was_recursive && thread != threads.end()) { switch (thread->second.state) { @@ -1359,7 +1371,13 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { throw std::invalid_argument(ss.str()); } transition(thread->second, thread_state::THREAD_RUNNING); + // TODO why do we not set this to is_for_cpu thread->second.is_cpu_alloc = false; + thread->second.memory_allocated_bytes += num_bytes; + thread->second.metrics.max_memory_allocated = std::max( + thread->second.metrics.max_memory_allocated, + thread->second.memory_allocated_bytes + ); break; default: break; } @@ -1735,7 +1753,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { bool const likely_spill = pre_alloc(tid); try { void* ret = resource->allocate(num_bytes, stream); - post_alloc_success(tid, likely_spill); + post_alloc_success(tid, likely_spill, num_bytes); return ret; } catch (rmm::out_of_memory const& e) { // rmm::out_of_memory is what is thrown when an allocation failed @@ -1751,7 +1769,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { throw rmm::bad_alloc("Internal Error"); } - void dealloc_core(bool const is_for_cpu, std::unique_lock& lock) + void dealloc_core(bool const is_for_cpu, std::unique_lock& lock, std::size_t const num_bytes) { auto const tid = static_cast(pthread_self()); auto const thread = threads.find(tid); @@ -1779,6 +1797,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { if (is_for_cpu == t_state.is_cpu_alloc) { transition(t_state, thread_state::THREAD_ALLOC_FREE); } + t_state.memory_allocated_bytes -= num_bytes; break; default: break; } @@ -1793,7 +1812,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // deallocate success if (size > 0) { std::unique_lock lock(state_mutex); - dealloc_core(false, lock); + dealloc_core(false, lock, size); } } }; @@ -2079,6 +2098,19 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost CATCH_STD(env, 0) } +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetMaxMemoryAllocated( + JNIEnv* env, jclass, jlong ptr, jlong task_id) +{ + JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0); + try { + cudf::jni::auto_set_device(env); + auto mr = reinterpret_cast(ptr); + return mr->get_and_reset_max_memory_allocated(task_id); + } + CATCH_STD(env, 0) +} + JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_startRetryBlock( JNIEnv* env, jclass, jlong ptr, jlong thread_id) { diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java index e171894601..c7e39c7899 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java @@ -589,6 +589,18 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) { } } + // TODO add + public static long getAndResetMaxMemoryAllocated(long taskId) { + synchronized (Rmm.class) { + if (sra != null && sra.isOpen()) { + return sra.getAndResetMaxMemoryAllocated(taskId); + } else { + // sra is not set so the value is by definition 0 + return 0; + } + } + } + /** * Called before doing an allocation on the CPU. This could throw an injected exception to help * with testing. diff --git a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java index d766c34230..f3b5640780 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java @@ -251,6 +251,10 @@ public long getAndResetComputeTimeLostToRetry(long taskId) { return getAndResetComputeTimeLostToRetry(getHandle(), taskId); } + public long getAndResetMaxMemoryAllocated(long taskId) { + return getAndResetMaxMemoryAllocated(getHandle(), taskId); + } + /** * Called before doing an allocation on the CPU. This could throw an injected exception to help @@ -319,6 +323,7 @@ public void cpuDeallocate(long ptr, long amount) { private static native int getAndResetSplitRetryThrowInternal(long handle, long taskId); private static native long getAndResetBlockTimeInternal(long handle, long taskId); private static native long getAndResetComputeTimeLostToRetry(long handle, long taskId); + private static native long getAndResetMaxMemoryAllocated(long handle, long taskId); private static native void startRetryBlock(long handle, long threadId); private static native void endRetryBlock(long handle, long threadId); private static native void checkAndBreakDeadlocks(long handle); From d90e515693165f52320cc68fd5b4f71d0504db20 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 10 Sep 2024 07:29:07 -0700 Subject: [PATCH 02/11] add comment Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index 78043e8be4..54cfb1beb7 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -1373,6 +1373,8 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { transition(thread->second, thread_state::THREAD_RUNNING); // TODO why do we not set this to is_for_cpu thread->second.is_cpu_alloc = false; + // num_bytes is likely not padded, which could cause slight inaccuracies + // but for now it shouldn't matter for watermark purposes thread->second.memory_allocated_bytes += num_bytes; thread->second.metrics.max_memory_allocated = std::max( thread->second.metrics.max_memory_allocated, From 4a7fc82648768ad86fa5ba3c67acb65aaf6fb991 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 17 Sep 2024 09:55:02 -0700 Subject: [PATCH 03/11] rm TODO comment Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index 54cfb1beb7..1f62bab8f2 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -1371,7 +1371,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { throw std::invalid_argument(ss.str()); } transition(thread->second, thread_state::THREAD_RUNNING); - // TODO why do we not set this to is_for_cpu thread->second.is_cpu_alloc = false; // num_bytes is likely not padded, which could cause slight inaccuracies // but for now it shouldn't matter for watermark purposes From 03b42099ca2b883ebb2ef6485c4fd0185b607491 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 17 Sep 2024 10:00:39 -0700 Subject: [PATCH 04/11] add javadoc comment Signed-off-by: Zach Puller --- src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java index c7e39c7899..d8993b3904 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java @@ -589,7 +589,11 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) { } } - // TODO add + /** + * Get the max device memory footprint, in bytes, that this task had allocated over its lifetime + * @param taskId the id of the task to get the metric for. + * @return the max device memory footprint. + */ public static long getAndResetMaxMemoryAllocated(long taskId) { synchronized (Rmm.class) { if (sra != null && sra.isOpen()) { From df553df7808bd3abd5b8b40a7d86bda3069d525d Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 17 Sep 2024 10:38:12 -0700 Subject: [PATCH 05/11] clang format Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index 1f62bab8f2..f0916d6a13 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -805,7 +805,8 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos); } - long get_and_reset_max_memory_allocated(long const task_id) { + long get_and_reset_max_memory_allocated(long const task_id) + { return get_and_reset_metric(task_id, &task_metrics::max_memory_allocated); } @@ -1343,7 +1344,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { * `likely_spill` if this allocation should be treated differently, because * we detected recursion while handling a prior allocation in this thread. */ - void post_alloc_success(long const thread_id, bool const likely_spill, std::size_t const num_bytes) + void post_alloc_success(long const thread_id, + bool const likely_spill, + std::size_t const num_bytes) { std::unique_lock lock(state_mutex); post_alloc_success_core(thread_id, false, likely_spill, num_bytes, lock); @@ -1376,9 +1379,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // but for now it shouldn't matter for watermark purposes thread->second.memory_allocated_bytes += num_bytes; thread->second.metrics.max_memory_allocated = std::max( - thread->second.metrics.max_memory_allocated, - thread->second.memory_allocated_bytes - ); + thread->second.metrics.max_memory_allocated, thread->second.memory_allocated_bytes); break; default: break; } @@ -1770,7 +1771,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { throw rmm::bad_alloc("Internal Error"); } - void dealloc_core(bool const is_for_cpu, std::unique_lock& lock, std::size_t const num_bytes) + void dealloc_core(bool const is_for_cpu, + std::unique_lock& lock, + std::size_t const num_bytes) { auto const tid = static_cast(pthread_self()); auto const thread = threads.find(tid); @@ -2100,8 +2103,10 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost } JNIEXPORT jlong JNICALL -Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetMaxMemoryAllocated( - JNIEnv* env, jclass, jlong ptr, jlong task_id) +Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetMaxMemoryAllocated(JNIEnv* env, + jclass, + jlong ptr, + jlong task_id) { JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0); try { From 6c6e2bb7f52e8da583883c140d619aa49d81e87e Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Tue, 17 Sep 2024 11:36:00 -0700 Subject: [PATCH 06/11] unit test memory watermark Signed-off-by: Zach Puller --- .../nvidia/spark/rapids/jni/RmmSparkTest.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java index 373deb9ca0..6e0f504a96 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java @@ -43,6 +43,8 @@ import static org.junit.jupiter.api.Assertions.fail; public class RmmSparkTest { + private final static long ALIGNMENT = 256; + @BeforeEach public void setup() { if (Rmm.isInitialized()) { @@ -317,6 +319,7 @@ public void testInsertOOMsGpu() { assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetComputeTimeLostToRetryNs(taskid)); + assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); RmmSpark.startDedicatedTaskThread(threadId, taskid, t); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); try { @@ -343,6 +346,7 @@ public void testInsertOOMsGpu() { assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); assertEquals(1, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); + assertEquals(ALIGNMENT, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); RmmSpark.blockThreadUntilReady(); // Allocate something small and verify that it works... @@ -356,6 +360,7 @@ public void testInsertOOMsGpu() { assertThrows(GpuSplitAndRetryOOM.class, () -> Rmm.alloc(100).close()); assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(1, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); + assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); // Verify that injecting OOM does not cause the block to actually happen assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); @@ -635,8 +640,8 @@ void setupRmmForTestingWithLimits(long maxAllocSize, RmmEventHandler eventHandle boolean succeeded = false; try { resource = new RmmCudaMemoryResource(); - resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, 256); - resource = new RmmTrackingResourceAdaptor<>(resource, 256); + resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, ALIGNMENT); + resource = new RmmTrackingResourceAdaptor<>(resource, ALIGNMENT); Rmm.setCurrentDeviceResource(resource, null, false); succeeded = true; } finally { @@ -760,9 +765,9 @@ public void testBasicCpuBlocking() throws ExecutionException, InterruptedExcepti @Test public void testBasicMixedBlocking() throws ExecutionException, InterruptedException, TimeoutException { - // 10 MiB - setupRmmForTestingWithLimits(10 * 1024 * 1024); - LimitingOffHeapAllocForTests.setLimit(10 * 1024 * 1024); + final long MB = 1024 * 1024; + setupRmmForTestingWithLimits(10 * MB); + LimitingOffHeapAllocForTests.setLimit(10 * MB); TaskThread taskOne = new TaskThread("TEST THREAD ONE", 1); TaskThread taskTwo = new TaskThread("TEST THREAD TWO", 2); TaskThread taskThree = new TaskThread("TEST THREAD THREE", 3); @@ -771,6 +776,9 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep taskTwo.initialize(); taskThree.initialize(); taskFour.initialize(); + + final long FIVE_MB = 5 * MB; + final long SIX_MB = 6 * MB; try { long tOneId = taskOne.getThreadId(); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tOneId)); @@ -784,18 +792,18 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep long tFourId = taskFour.getThreadId(); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tFourId)); - try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, 5 * 1024 * 1024)) { + try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, FIVE_MB)) { firstGpuAlloc.waitForAlloc(); - try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, 5 * 1024 * 1024)) { + try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, FIVE_MB)) { firstCpuAlloc.waitForAlloc(); // Blocking GPU Alloc - try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, 6 * 1024 * 1024)) { + try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, SIX_MB)) { taskThree.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS); // Blocking CPU Alloc - try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, 6 * 1024 * 1024)) { + try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, SIX_MB)) { taskFour.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS); // We want to make sure that the order of wakeup corresponds to the location of the data that was released @@ -814,9 +822,13 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep } } finally { taskOne.done(); + assertEquals(FIVE_MB, RmmSpark.getAndResetMaxMemoryAllocated(1)); taskTwo.done(); + assertEquals(FIVE_MB, RmmSpark.getAndResetMaxMemoryAllocated(2)); taskThree.done(); + assertEquals(SIX_MB, RmmSpark.getAndResetMaxMemoryAllocated(3)); taskFour.done(); + assertEquals(SIX_MB, RmmSpark.getAndResetMaxMemoryAllocated(4)); } } From a890f2761079c51dcaedcd5994ac58cb06542a28 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Fri, 20 Sep 2024 14:08:22 -0700 Subject: [PATCH 07/11] pr comments Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index f0916d6a13..04d132f72c 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -299,7 +299,7 @@ class full_thread_state { // time) long time_retry_running_nanos = 0; std::chrono::time_point block_start; - long memory_allocated_bytes = 0; + long gpu_memory_allocated_bytes = 0; // metrics for the current thread task_metrics metrics; @@ -1359,7 +1359,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { std::unique_lock& lock) { // pre allocate checks - std::cerr << "post_alloc_success_core" << std::endl; auto const thread = threads.find(thread_id); if (!was_recursive && thread != threads.end()) { switch (thread->second.state) { @@ -1377,9 +1376,12 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { thread->second.is_cpu_alloc = false; // num_bytes is likely not padded, which could cause slight inaccuracies // but for now it shouldn't matter for watermark purposes - thread->second.memory_allocated_bytes += num_bytes; - thread->second.metrics.max_memory_allocated = std::max( - thread->second.metrics.max_memory_allocated, thread->second.memory_allocated_bytes); + if (!is_for_cpu) { + thread->second.gpu_memory_allocated_bytes += num_bytes; + thread->second.metrics.max_memory_allocated = + std::max(thread->second.metrics.max_memory_allocated, + thread->second.gpu_memory_allocated_bytes); + } break; default: break; } @@ -1801,7 +1803,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { if (is_for_cpu == t_state.is_cpu_alloc) { transition(t_state, thread_state::THREAD_ALLOC_FREE); } - t_state.memory_allocated_bytes -= num_bytes; + t_state.gpu_memory_allocated_bytes -= num_bytes; break; default: break; } From 042d37f5f89851eecdd26c7f3343ed8f6011f7a8 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Fri, 20 Sep 2024 14:15:08 -0700 Subject: [PATCH 08/11] fix tests Signed-off-by: Zach Puller --- src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java index 6e0f504a96..50346dc7f2 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java @@ -824,11 +824,11 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep taskOne.done(); assertEquals(FIVE_MB, RmmSpark.getAndResetMaxMemoryAllocated(1)); taskTwo.done(); - assertEquals(FIVE_MB, RmmSpark.getAndResetMaxMemoryAllocated(2)); + assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(2)); taskThree.done(); assertEquals(SIX_MB, RmmSpark.getAndResetMaxMemoryAllocated(3)); taskFour.done(); - assertEquals(SIX_MB, RmmSpark.getAndResetMaxMemoryAllocated(4)); + assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(4)); } } From daaab620f4f5b8710fe6ab4af4fd6bc6f9264a51 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Mon, 23 Sep 2024 09:15:23 -0700 Subject: [PATCH 09/11] pr comments Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 19 +++++++++---------- .../rapids/jni/SparkResourceAdaptor.java | 6 +++--- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index 04d132f72c..bc14faeb60 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -204,7 +204,7 @@ struct task_metrics { // The amount of time that this thread has lost due to retries (not including blocked time) long time_lost_nanos = 0; - long max_memory_allocated = 0; + long gpu_max_memory_allocated = 0; void take_from(task_metrics& other) { @@ -218,7 +218,8 @@ struct task_metrics { this->num_times_split_retry_throw += other.num_times_split_retry_throw; this->time_blocked_nanos += other.time_blocked_nanos; this->time_lost_nanos += other.time_lost_nanos; - this->max_memory_allocated = std::max(this->max_memory_allocated, other.max_memory_allocated); + this->gpu_max_memory_allocated = + std::max(this->gpu_max_memory_allocated, other.gpu_max_memory_allocated); } void clear() @@ -805,9 +806,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos); } - long get_and_reset_max_memory_allocated(long const task_id) + long get_and_reset_gpu_max_memory_allocated(long const task_id) { - return get_and_reset_metric(task_id, &task_metrics::max_memory_allocated); + return get_and_reset_metric(task_id, &task_metrics::gpu_max_memory_allocated); } void check_and_break_deadlocks() @@ -818,7 +819,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { bool cpu_prealloc(size_t const amount, bool const blocking) { - // amount is not used yet, but is here in case we want it in the future. std::unique_lock lock(state_mutex); auto const thread_id = static_cast(pthread_self()); return pre_alloc_core(thread_id, true, blocking, lock); @@ -831,7 +831,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { { // addr is not used yet, but is here in case we want it in the future. // amount is not used yet, but is here in case we want it for debugging/metrics. - // blocking is not used yet. It could be used for some debugging so we are keeping it. std::unique_lock lock(state_mutex); auto const thread_id = static_cast(pthread_self()); post_alloc_success_core(thread_id, true, was_recursive, amount, lock); @@ -1378,8 +1377,8 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { // but for now it shouldn't matter for watermark purposes if (!is_for_cpu) { thread->second.gpu_memory_allocated_bytes += num_bytes; - thread->second.metrics.max_memory_allocated = - std::max(thread->second.metrics.max_memory_allocated, + thread->second.metrics.gpu_max_memory_allocated = + std::max(thread->second.metrics.gpu_max_memory_allocated, thread->second.gpu_memory_allocated_bytes); } break; @@ -1803,7 +1802,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { if (is_for_cpu == t_state.is_cpu_alloc) { transition(t_state, thread_state::THREAD_ALLOC_FREE); } - t_state.gpu_memory_allocated_bytes -= num_bytes; + if (!is_for_cpu) { t_state.gpu_memory_allocated_bytes -= num_bytes; } break; default: break; } @@ -2114,7 +2113,7 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetMaxMemoryAlloca try { cudf::jni::auto_set_device(env); auto mr = reinterpret_cast(ptr); - return mr->get_and_reset_max_memory_allocated(task_id); + return mr->get_and_reset_gpu_max_memory_allocated(task_id); } CATCH_STD(env, 0) } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java index f3b5640780..9e3414f7d3 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/SparkResourceAdaptor.java @@ -251,8 +251,8 @@ public long getAndResetComputeTimeLostToRetry(long taskId) { return getAndResetComputeTimeLostToRetry(getHandle(), taskId); } - public long getAndResetMaxMemoryAllocated(long taskId) { - return getAndResetMaxMemoryAllocated(getHandle(), taskId); + public long getAndResetGpuMaxMemoryAllocated(long taskId) { + return getAndResetGpuMaxMemoryAllocated(getHandle(), taskId); } @@ -323,7 +323,7 @@ public void cpuDeallocate(long ptr, long amount) { private static native int getAndResetSplitRetryThrowInternal(long handle, long taskId); private static native long getAndResetBlockTimeInternal(long handle, long taskId); private static native long getAndResetComputeTimeLostToRetry(long handle, long taskId); - private static native long getAndResetMaxMemoryAllocated(long handle, long taskId); + private static native long getAndResetGpuMaxMemoryAllocated(long handle, long taskId); private static native void startRetryBlock(long handle, long threadId); private static native void endRetryBlock(long handle, long threadId); private static native void checkAndBreakDeadlocks(long handle); From 7fe7c22886e8a64ad36d51cf9c66c42c2b31b8ee Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Mon, 23 Sep 2024 15:32:47 -0700 Subject: [PATCH 10/11] fix test Signed-off-by: Zach Puller --- .../com/nvidia/spark/rapids/jni/RmmSparkTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java index 50346dc7f2..987dd58534 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java @@ -319,7 +319,7 @@ public void testInsertOOMsGpu() { assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetComputeTimeLostToRetryNs(taskid)); - assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); RmmSpark.startDedicatedTaskThread(threadId, taskid, t); assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); try { @@ -346,7 +346,7 @@ public void testInsertOOMsGpu() { assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); assertEquals(1, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); - assertEquals(ALIGNMENT, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); + assertEquals(ALIGNMENT, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); RmmSpark.blockThreadUntilReady(); // Allocate something small and verify that it works... @@ -360,7 +360,7 @@ public void testInsertOOMsGpu() { assertThrows(GpuSplitAndRetryOOM.class, () -> Rmm.alloc(100).close()); assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid)); assertEquals(1, RmmSpark.getAndResetNumSplitRetryThrow(taskid)); - assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetMaxMemoryAllocated(taskid)); + assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid)); // Verify that injecting OOM does not cause the block to actually happen assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); @@ -822,13 +822,13 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep } } finally { taskOne.done(); - assertEquals(FIVE_MB, RmmSpark.getAndResetMaxMemoryAllocated(1)); + assertEquals(FIVE_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(1)); taskTwo.done(); - assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(2)); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(2)); taskThree.done(); - assertEquals(SIX_MB, RmmSpark.getAndResetMaxMemoryAllocated(3)); + assertEquals(SIX_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(3)); taskFour.done(); - assertEquals(0, RmmSpark.getAndResetMaxMemoryAllocated(4)); + assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(4)); } } From d83193435b9e6cf2aa89cc3b31e6fd9eadfe78f5 Mon Sep 17 00:00:00 2001 From: Zach Puller Date: Mon, 23 Sep 2024 15:35:36 -0700 Subject: [PATCH 11/11] fix Signed-off-by: Zach Puller --- src/main/cpp/src/SparkResourceAdaptorJni.cpp | 6 ++---- src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/cpp/src/SparkResourceAdaptorJni.cpp b/src/main/cpp/src/SparkResourceAdaptorJni.cpp index bc14faeb60..8eeb047ddc 100644 --- a/src/main/cpp/src/SparkResourceAdaptorJni.cpp +++ b/src/main/cpp/src/SparkResourceAdaptorJni.cpp @@ -2104,10 +2104,8 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost } JNIEXPORT jlong JNICALL -Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetMaxMemoryAllocated(JNIEnv* env, - jclass, - jlong ptr, - jlong task_id) +Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetGpuMaxMemoryAllocated( + JNIEnv* env, jclass, jlong ptr, jlong task_id) { JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0); try { diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java index d8993b3904..45a234dcca 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java @@ -594,10 +594,10 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) { * @param taskId the id of the task to get the metric for. * @return the max device memory footprint. */ - public static long getAndResetMaxMemoryAllocated(long taskId) { + public static long getAndResetGpuMaxMemoryAllocated(long taskId) { synchronized (Rmm.class) { if (sra != null && sra.isOpen()) { - return sra.getAndResetMaxMemoryAllocated(taskId); + return sra.getAndResetGpuMaxMemoryAllocated(taskId); } else { // sra is not set so the value is by definition 0 return 0;