From 7fc10ce037a459a32dea51298a153d2ef99cd0df Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Wed, 30 Aug 2023 07:40:32 -0700
Subject: [PATCH 01/15] Initial exposure of cuDF logging information
---
distributed/diagnostics/cudf.py | 26 ++++++++++++++++++++++++++
distributed/worker.py | 16 +++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
create mode 100644 distributed/diagnostics/cudf.py
diff --git a/distributed/diagnostics/cudf.py b/distributed/diagnostics/cudf.py
new file mode 100644
index 0000000000..01118fb0c8
--- /dev/null
+++ b/distributed/diagnostics/cudf.py
@@ -0,0 +1,26 @@
+"""
+Diagnostics for memory spilling managed by cuDF.
+"""
+
+from __future__ import annotations
+
+try:
+ from cudf.core.buffer.spill_manager import get_global_manager
+except ImportError:
+ get_global_manager = None
+
+
+def real_time():
+ if get_global_manager is None:
+ return {}
+ mgr = get_global_manager()
+ if mgr is None:
+ return {}
+ keys = {
+ "gpu-to-cpu": {"nbytes": 0, "time": 0},
+ "cpu-to-gpu": {"nbytes": 0, "time": 0},
+ }
+ for (src, dst), (nbytes, time) in mgr.statistics.spill_totals.items():
+ keys[f"{src}-to-{dst}"]["nbytes"] = nbytes
+ keys[f"{src}-to-{dst}"]["time"] = time
+ return keys
diff --git a/distributed/worker.py b/distributed/worker.py
index a6acace93f..17769f6fa0 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -76,7 +76,7 @@
)
from distributed.core import rpc as RPCType
from distributed.core import send_recv
-from distributed.diagnostics import nvml, rmm
+from distributed.diagnostics import cudf, nvml, rmm
from distributed.diagnostics.plugin import _get_plugin_name
from distributed.diskutils import WorkSpace
from distributed.exceptions import Reschedule
@@ -3217,6 +3217,20 @@ async def rmm_metric(worker):
del _rmm
+try:
+ import cudf as _cudf
+except Exception:
+ pass
+else:
+
+ async def cudf_metric(worker):
+ result = await offload(cudf.real_time)
+ return result
+
+ DEFAULT_METRICS["cudf"] = cudf_metric
+ del _cudf
+
+
def print(
*args,
sep: str | None = " ",
From 04137ef9a34ef161756e85cbb4309a25dbb3158e Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Thu, 31 Aug 2023 09:45:48 -0700
Subject: [PATCH 02/15] Initial plot of GPU to CPU nbytes
---
distributed/dashboard/components/cudf.py | 112 +++++++++++++++++++++++
distributed/dashboard/scheduler.py | 2 +
2 files changed, 114 insertions(+)
create mode 100644 distributed/dashboard/components/cudf.py
diff --git a/distributed/dashboard/components/cudf.py b/distributed/dashboard/components/cudf.py
new file mode 100644
index 0000000000..8f31f17895
--- /dev/null
+++ b/distributed/dashboard/components/cudf.py
@@ -0,0 +1,112 @@
+from __future__ import annotations
+
+import math
+
+from bokeh.core.properties import without_property_validation
+from bokeh.models import BasicTicker, ColumnDataSource, NumeralTickFormatter
+from bokeh.plotting import figure
+
+from distributed.dashboard.components import DashboardComponent, add_periodic_callback
+from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024
+from distributed.dashboard.utils import update
+from distributed.utils import log_errors
+
+
+class CudfSpillingStatistics(DashboardComponent):
+ """
+ Plot giving an overview of per-worker GPU spilling statistics, including the number
+ of bytes spilled to/from CPU and the time spent spilling.
+ """
+
+ def __init__(self, scheduler, width=600, **kwargs):
+ with log_errors():
+ self.last = 0
+ self.scheduler = scheduler
+ self.source = ColumnDataSource(
+ {
+ "from-gpu": [1, 2],
+ "from-gpu-half": [0.5, 1],
+ "worker": ["a", "b"],
+ "gpu-index": [0, 0],
+ "y": [1, 2],
+ }
+ )
+
+ bytes_spilled = figure(
+ title="Bytes spilled from GPU",
+ tools="",
+ width=int(width / 2),
+ name="bytes_spilled_histogram",
+ **kwargs,
+ )
+
+ rect = bytes_spilled.rect(
+ source=self.source,
+ x="from-gpu-half",
+ y="y",
+ width="from-gpu",
+ height=1,
+ color="#76B900",
+ alpha=1.0,
+ )
+ rect.nonselection_glyph = None
+
+ bytes_spilled.axis[0].ticker = BasicTicker(**TICKS_1024)
+ bytes_spilled.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
+ bytes_spilled.xaxis.major_label_orientation = -math.pi / 12
+ bytes_spilled.x_range.start = 0
+
+ for fig in [bytes_spilled]:
+ fig.xaxis.minor_tick_line_alpha = 0
+ fig.yaxis.visible = False
+ fig.ygrid.visible = False
+
+ fig.toolbar_location = None
+ fig.yaxis.visible = False
+
+ self.bytes_spilled_figure = bytes_spilled
+
+ @without_property_validation
+ def update(self):
+ with log_errors():
+ workers = list(self.scheduler.workers.values())
+ from_gpu = []
+ gpu_index = []
+ y = []
+ worker = []
+ memory_max = 0
+
+ for idx, ws in enumerate(workers):
+ try:
+ cudf_metrics = ws.metrics["cudf"]
+ gpu_info = ws.extra["gpu"]
+ except KeyError:
+ continue
+
+ from_gpu.append(cudf_metrics["gpu-to-cpu"]["nbytes"])
+ worker.append(ws.address)
+ gpu_index.append(idx)
+ y.append(idx)
+
+ memory_max = max(memory_max, gpu_info["memory-total"])
+
+ result = {
+ "from-gpu": from_gpu,
+ "from-gpu-half": [m // 2 for m in from_gpu],
+ "worker": worker,
+ "gpu-index": gpu_index,
+ "y": y,
+ }
+
+ self.bytes_spilled_figure.x_range.end = memory_max
+
+ update(self.source, result)
+
+
+def cudf_spilling_doc(scheduler, extra, doc):
+ with log_errors():
+ cudf_spilling = CudfSpillingStatistics(scheduler, sizing_mode="stretch_both")
+ cudf_spilling.update()
+ add_periodic_callback(doc, cudf_spilling, 100)
+ doc.add_root(cudf_spilling.bytes_spilled_figure)
+ doc.theme = BOKEH_THEME
diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py
index 4fbe8b24b1..c9e8ed9982 100644
--- a/distributed/dashboard/scheduler.py
+++ b/distributed/dashboard/scheduler.py
@@ -6,6 +6,7 @@
from tornado import web
from tornado.ioloop import IOLoop
+from distributed.dashboard.components.cudf import cudf_spilling_doc
from distributed.dashboard.components.nvml import (
gpu_doc,
gpu_memory_doc,
@@ -119,6 +120,7 @@
"/individual-gpu-memory": gpu_memory_doc,
"/individual-gpu-utilization": gpu_utilization_doc,
"/individual-rmm-memory": rmm_memory_doc,
+ "/individual-cudf-spilling": cudf_spilling_doc,
}
From d38de06c51d36220ec6b1e905d0974d701d90f21 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Fri, 1 Sep 2023 11:41:40 -0700
Subject: [PATCH 03/15] Refactor RMM plot to include spilled memory
---
distributed/dashboard/components/rmm.py | 346 ++++++++++++------------
distributed/diagnostics/cudf.py | 17 +-
2 files changed, 179 insertions(+), 184 deletions(-)
diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py
index b0118a5582..b53c1ab0de 100644
--- a/distributed/dashboard/components/rmm.py
+++ b/distributed/dashboard/components/rmm.py
@@ -1,7 +1,7 @@
from __future__ import annotations
-import math
-from textwrap import dedent
+from collections.abc import Iterable
+from typing import TypeVar
from bokeh.core.properties import without_property_validation
from bokeh.models import (
@@ -10,6 +10,7 @@
HoverTool,
NumeralTickFormatter,
OpenURL,
+ Range1d,
TapTool,
)
from bokeh.plotting import figure
@@ -18,191 +19,186 @@
from dask.utils import format_bytes
from distributed.dashboard.components import DashboardComponent, add_periodic_callback
-from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024
+from distributed.dashboard.components.scheduler import (
+ BOKEH_THEME,
+ TICKS_1024,
+ XLABEL_ORIENTATION,
+ MemoryColor,
+)
from distributed.dashboard.utils import update
from distributed.utils import log_errors
+T = TypeVar("T")
+
-class RMMMemoryUsage(DashboardComponent):
+class RMMMemoryUsage(DashboardComponent, MemoryColor):
"""
GPU memory usage plot that includes information about memory
managed by RMM. If an RMM pool is being used, shows the amount of
pool memory utilized.
"""
+ @log_errors
def __init__(self, scheduler, width=600, **kwargs):
- with log_errors():
- self.last = 0
- self.scheduler = scheduler
- self.source = ColumnDataSource(
- {
- "rmm-used": [1, 2],
- "rmm-used-half": [0.5, 1],
- "rmm-total": [2, 4],
- "rmm-total-half": [1, 2],
- "external-used": [2, 1],
- "external-used-x": [3, 4.5],
- "worker": ["a", "b"],
- "gpu-index": [0, 0],
- "y": [1, 2],
- "escaped_worker": ["a", "b"],
- "rmm_memory_text": [
- "RMM memory used: 1B/1B\nTotal GPU memory used: 1B/2B",
- "RMM memory used: 1B/1B\nTotal GPU memory used: 1B/2B",
- ],
- }
- )
-
- memory = figure(
- title="RMM Memory",
- tools="",
- width=int(width / 2),
- name="rmm_memory_histogram",
- **kwargs,
- )
-
- rect = memory.rect(
- source=self.source,
- x="rmm-used-half",
- y="y",
- width="rmm-used",
- height=1,
- color="#76B900",
- alpha=1.0,
- )
- rect.nonselection_glyph = None
-
- rect = memory.rect(
- source=self.source,
- x="rmm-total-half",
- y="y",
- width="rmm-total",
- height=1,
- color="#76B900",
- alpha=0.75,
- )
- rect.nonselection_glyph = None
-
- rect = memory.rect(
- source=self.source,
- x="external-used-x",
- y="y",
- width="external-used",
- height=1,
- color="#76B900",
- alpha=0.5,
- )
- rect.nonselection_glyph = None
-
- memory.axis[0].ticker = BasicTicker(**TICKS_1024)
- memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
- memory.xaxis.major_label_orientation = -math.pi / 12
- memory.x_range.start = 0
-
- for fig in [memory]:
- fig.xaxis.minor_tick_line_alpha = 0
- fig.yaxis.visible = False
- fig.ygrid.visible = False
-
- tap = TapTool(
- callback=OpenURL(url="./info/worker/@escaped_worker.html")
- )
- fig.add_tools(tap)
-
- fig.toolbar_location = None
- fig.yaxis.visible = False
-
- hover = HoverTool()
- hover.tooltips = "@worker : @rmm_memory_text"
- hover.point_policy = "follow_mouse"
- memory.add_tools(hover)
-
- self.memory_figure = memory
+ DashboardComponent.__init__(self)
+ MemoryColor.__init__(self)
+
+ self.last = 0
+ self.scheduler = scheduler
+ self.source = ColumnDataSource(
+ {
+ "width": [],
+ "x": [],
+ "y": [],
+ "color": [],
+ "alpha": [],
+ "worker": [],
+ "escaped_worker": [],
+ "rmm_used": [],
+ "rmm_total": [],
+ "gpu_used": [],
+ "gpu_total": [],
+ "spilled": [],
+ }
+ )
+
+ self.root = figure(
+ title="RMM memory used",
+ tools="",
+ width=int(width / 2),
+ name="rmm_memory",
+ **kwargs,
+ )
+ rect = self.root.rect(
+ source=self.source,
+ x="x",
+ y="y",
+ width="width",
+ height=0.9,
+ color="color",
+ fill_alpha="alpha",
+ line_width=0,
+ )
+ rect.nonselection_glyph = None
+
+ self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
+ self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
+ self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
+ self.root.xaxis.minor_tick_line_alpha = 0
+ self.root.x_range = Range1d(start=0)
+ self.root.yaxis.visible = False
+ self.root.ygrid.visible = False
+ self.root.toolbar_location = None
+
+ tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
+ self.root.add_tools(tap)
+
+ hover = HoverTool(
+ point_policy="follow_mouse",
+ tooltips="""
+
+ Worker:
+ @worker
+
+
+ RMM memory used:
+ @rmm_used{0.00 b} / @rmm_total{0.00 b}
+
+
+ Total GPU memory used:
+ @gpu_used{0.00 b} / @gpu_total{0.00 b}
+
+
+ Spilled to CPU:
+ @spilled{0.00 b}
+
+ """,
+ )
+ self.root.add_tools(hover)
@without_property_validation
+ @log_errors
def update(self):
- with log_errors():
- workers = list(self.scheduler.workers.values())
- rmm_total = []
- rmm_used = []
- external_used = []
- gpu_index = []
- y = []
- worker = []
- external_used_x = []
- memory_max = 0
- gpu_total = []
- rmm_memory_text = []
-
- for idx, ws in enumerate(workers):
- try:
- rmm_metrics = ws.metrics["rmm"]
- gpu_metrics = ws.metrics["gpu"]
- gpu_info = ws.extra["gpu"]
- except KeyError:
- continue
- rmm_total_worker = rmm_metrics["rmm-total"] # RMM memory only
- rmm_used_worker = rmm_metrics["rmm-used"]
- gpu_total_worker = gpu_info["memory-total"] # All GPU memory
- gpu_used_worker = gpu_metrics["memory-used"]
-
- external_used_worker = gpu_used_worker - rmm_total_worker
-
- rmm_total.append(rmm_total_worker)
- rmm_used.append(rmm_used_worker)
- gpu_total.append(gpu_total_worker)
- external_used.append(external_used_worker)
- external_used_x.append(rmm_total_worker + external_used_worker / 2)
- worker.append(ws.address)
- gpu_index.append(idx)
- y.append(idx)
-
- memory_max = max(memory_max, gpu_total_worker)
-
- rmm_memory_text.append(
- "RMM memory used: {}/{}\nTotal GPU memory used: {}/{}".format(
- format_bytes(rmm_used_worker),
- format_bytes(rmm_total_worker),
- format_bytes(gpu_used_worker),
- format_bytes(gpu_total_worker),
- )
- )
-
- self.memory_figure.title.text = dedent(
- """\
- RMM Utilization: {} / {}
- GPU Memory: {} / {}
- """.format(
- format_bytes(sum(rmm_used)),
- format_bytes(sum(rmm_total)),
- format_bytes(sum([*rmm_total, *external_used])),
- format_bytes(sum(gpu_total)),
- )
- )
-
- result = {
- "rmm-total": rmm_total,
- "rmm-used": rmm_used,
- "external-used": external_used,
- "rmm-total-half": [m // 2 for m in rmm_total],
- "rmm-used-half": [m // 2 for m in rmm_used],
- "external-used-x": external_used_x,
- "worker": worker,
- "gpu-index": gpu_index,
- "y": y,
- "escaped_worker": [escape.url_escape(w) for w in worker],
- "rmm_memory_text": rmm_memory_text,
- }
-
- self.memory_figure.x_range.end = memory_max
-
- update(self.source, result)
-
-
+ def quadlist(i: Iterable[T]) -> list[T]:
+ out = []
+ for ii in i:
+ out += [ii, ii, ii, ii]
+ return out
+
+ workers = list(self.scheduler.workers.values())
+
+ width = []
+ x = []
+ color = []
+ max_limit = 0
+ rmm_used = []
+ rmm_total = []
+ gpu_used = []
+ gpu_total = []
+ spilled = []
+
+ for ws in workers:
+ try:
+ rmm_metrics = ws.metrics["rmm"]
+ gpu_metrics = ws.metrics["gpu"]
+ gpu_info = ws.extra["gpu"]
+ cudf_metrics = ws.metrics["cudf"]
+ except KeyError:
+ continue
+
+ rmm_used_worker = rmm_metrics["rmm-used"] # RMM memory only
+ rmm_total_worker = rmm_metrics["rmm-total"]
+ gpu_used_worker = gpu_metrics["memory-used"] # All GPU memory
+ gpu_total_worker = gpu_info["memory-total"]
+ spilled_worker = cudf_metrics["cudf-spilled"] # memory spilled to host
+
+ max_limit = max(max_limit, gpu_total_worker + spilled_worker)
+ color_i = self._memory_color(gpu_used_worker, gpu_total_worker, ws.status)
+
+ width += [
+ rmm_used_worker,
+ rmm_total_worker - rmm_used_worker,
+ gpu_used_worker - rmm_total_worker,
+ spilled_worker,
+ ]
+ x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)]
+ color += [color_i, color_i, color_i, "grey"]
+
+ # memory info
+ rmm_used.append(rmm_used_worker)
+ rmm_total.append(rmm_total_worker)
+ gpu_used.append(gpu_used_worker)
+ gpu_total.append(gpu_total_worker)
+ spilled.append(spilled_worker)
+
+ title = f"RMM memory used: {format_bytes(sum(rmm_used))} / {format_bytes(sum(rmm_total))}\nTotal GPU memory used: {format_bytes(sum(gpu_used))} / {format_bytes(sum(gpu_total))}"
+ if sum(spilled):
+ title += f" + {format_bytes(sum(spilled))} spilled to CPU"
+ self.root.title.text = title
+
+ result = {
+ "width": width,
+ "x": x,
+ "y": quadlist(range(len(workers))),
+ "color": color,
+ "alpha": [1, 0.7, 0.4, 1] * len(workers),
+ "worker": quadlist(ws.address for ws in workers),
+ "escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers),
+ "rmm_used": quadlist(rmm_used),
+ "rmm_total": quadlist(rmm_total),
+ "gpu_used": quadlist(gpu_used),
+ "gpu_total": quadlist(gpu_total),
+ "spilled": quadlist(spilled),
+ }
+
+ self.root.x_range.end = max_limit
+ update(self.source, result)
+
+
+@log_errors
def rmm_memory_doc(scheduler, extra, doc):
- with log_errors():
- rmm_load = RMMMemoryUsage(scheduler, sizing_mode="stretch_both")
- rmm_load.update()
- add_periodic_callback(doc, rmm_load, 100)
- doc.add_root(rmm_load.memory_figure)
- doc.theme = BOKEH_THEME
+ rmm_load = RMMMemoryUsage(scheduler, sizing_mode="stretch_both")
+ rmm_load.update()
+ add_periodic_callback(doc, rmm_load, 100)
+ doc.add_root(rmm_load.root)
+ doc.theme = BOKEH_THEME
diff --git a/distributed/diagnostics/cudf.py b/distributed/diagnostics/cudf.py
index 01118fb0c8..a3404ee490 100644
--- a/distributed/diagnostics/cudf.py
+++ b/distributed/diagnostics/cudf.py
@@ -12,15 +12,14 @@
def real_time():
if get_global_manager is None:
- return {}
+ return {"cudf-spilled": 0}
mgr = get_global_manager()
if mgr is None:
- return {}
- keys = {
- "gpu-to-cpu": {"nbytes": 0, "time": 0},
- "cpu-to-gpu": {"nbytes": 0, "time": 0},
+ return {"cudf-spilled": 0}
+
+ totals = mgr.statistics.spill_totals
+
+ return {
+ "cudf-spilled": totals.get(("gpu", "cpu"), (0,))[0]
+ - totals.get(("cpu", "gpu"), (0,))[0]
}
- for (src, dst), (nbytes, time) in mgr.statistics.spill_totals.items():
- keys[f"{src}-to-{dst}"]["nbytes"] = nbytes
- keys[f"{src}-to-{dst}"]["time"] = time
- return keys
From eeddf1e4af209cc0881119d1c34b132877e0dc1a Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Fri, 1 Sep 2023 12:05:16 -0700
Subject: [PATCH 04/15] Fix memory limit on x axis
---
distributed/dashboard/components/cudf.py | 112 -----------------------
distributed/dashboard/components/rmm.py | 8 +-
2 files changed, 5 insertions(+), 115 deletions(-)
delete mode 100644 distributed/dashboard/components/cudf.py
diff --git a/distributed/dashboard/components/cudf.py b/distributed/dashboard/components/cudf.py
deleted file mode 100644
index 8f31f17895..0000000000
--- a/distributed/dashboard/components/cudf.py
+++ /dev/null
@@ -1,112 +0,0 @@
-from __future__ import annotations
-
-import math
-
-from bokeh.core.properties import without_property_validation
-from bokeh.models import BasicTicker, ColumnDataSource, NumeralTickFormatter
-from bokeh.plotting import figure
-
-from distributed.dashboard.components import DashboardComponent, add_periodic_callback
-from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024
-from distributed.dashboard.utils import update
-from distributed.utils import log_errors
-
-
-class CudfSpillingStatistics(DashboardComponent):
- """
- Plot giving an overview of per-worker GPU spilling statistics, including the number
- of bytes spilled to/from CPU and the time spent spilling.
- """
-
- def __init__(self, scheduler, width=600, **kwargs):
- with log_errors():
- self.last = 0
- self.scheduler = scheduler
- self.source = ColumnDataSource(
- {
- "from-gpu": [1, 2],
- "from-gpu-half": [0.5, 1],
- "worker": ["a", "b"],
- "gpu-index": [0, 0],
- "y": [1, 2],
- }
- )
-
- bytes_spilled = figure(
- title="Bytes spilled from GPU",
- tools="",
- width=int(width / 2),
- name="bytes_spilled_histogram",
- **kwargs,
- )
-
- rect = bytes_spilled.rect(
- source=self.source,
- x="from-gpu-half",
- y="y",
- width="from-gpu",
- height=1,
- color="#76B900",
- alpha=1.0,
- )
- rect.nonselection_glyph = None
-
- bytes_spilled.axis[0].ticker = BasicTicker(**TICKS_1024)
- bytes_spilled.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
- bytes_spilled.xaxis.major_label_orientation = -math.pi / 12
- bytes_spilled.x_range.start = 0
-
- for fig in [bytes_spilled]:
- fig.xaxis.minor_tick_line_alpha = 0
- fig.yaxis.visible = False
- fig.ygrid.visible = False
-
- fig.toolbar_location = None
- fig.yaxis.visible = False
-
- self.bytes_spilled_figure = bytes_spilled
-
- @without_property_validation
- def update(self):
- with log_errors():
- workers = list(self.scheduler.workers.values())
- from_gpu = []
- gpu_index = []
- y = []
- worker = []
- memory_max = 0
-
- for idx, ws in enumerate(workers):
- try:
- cudf_metrics = ws.metrics["cudf"]
- gpu_info = ws.extra["gpu"]
- except KeyError:
- continue
-
- from_gpu.append(cudf_metrics["gpu-to-cpu"]["nbytes"])
- worker.append(ws.address)
- gpu_index.append(idx)
- y.append(idx)
-
- memory_max = max(memory_max, gpu_info["memory-total"])
-
- result = {
- "from-gpu": from_gpu,
- "from-gpu-half": [m // 2 for m in from_gpu],
- "worker": worker,
- "gpu-index": gpu_index,
- "y": y,
- }
-
- self.bytes_spilled_figure.x_range.end = memory_max
-
- update(self.source, result)
-
-
-def cudf_spilling_doc(scheduler, extra, doc):
- with log_errors():
- cudf_spilling = CudfSpillingStatistics(scheduler, sizing_mode="stretch_both")
- cudf_spilling.update()
- add_periodic_callback(doc, cudf_spilling, 100)
- doc.add_root(cudf_spilling.bytes_spilled_figure)
- doc.theme = BOKEH_THEME
diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py
index b53c1ab0de..628885a4a7 100644
--- a/distributed/dashboard/components/rmm.py
+++ b/distributed/dashboard/components/rmm.py
@@ -105,7 +105,7 @@ def __init__(self, scheduler, width=600, **kwargs):
@rmm_used{0.00 b} / @rmm_total{0.00 b}
- Total GPU memory used:
+ GPU memory used:
@gpu_used{0.00 b} / @gpu_total{0.00 b}
@@ -152,7 +152,9 @@ def quadlist(i: Iterable[T]) -> list[T]:
gpu_total_worker = gpu_info["memory-total"]
spilled_worker = cudf_metrics["cudf-spilled"] # memory spilled to host
- max_limit = max(max_limit, gpu_total_worker + spilled_worker)
+ max_limit = max(
+ max_limit, gpu_total_worker, gpu_used_worker + spilled_worker
+ )
color_i = self._memory_color(gpu_used_worker, gpu_total_worker, ws.status)
width += [
@@ -171,7 +173,7 @@ def quadlist(i: Iterable[T]) -> list[T]:
gpu_total.append(gpu_total_worker)
spilled.append(spilled_worker)
- title = f"RMM memory used: {format_bytes(sum(rmm_used))} / {format_bytes(sum(rmm_total))}\nTotal GPU memory used: {format_bytes(sum(gpu_used))} / {format_bytes(sum(gpu_total))}"
+ title = f"RMM memory used: {format_bytes(sum(rmm_used))} / {format_bytes(sum(rmm_total))}\nGPU memory used: {format_bytes(sum(gpu_used))} / {format_bytes(sum(gpu_total))}"
if sum(spilled):
title += f" + {format_bytes(sum(spilled))} spilled to CPU"
self.root.title.text = title
From 043835cea443674fea893664655c7d943d41c3e5 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Fri, 1 Sep 2023 12:08:28 -0700
Subject: [PATCH 05/15] Remove unused dashboard plot
---
distributed/dashboard/scheduler.py | 2 --
1 file changed, 2 deletions(-)
diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py
index c9e8ed9982..4fbe8b24b1 100644
--- a/distributed/dashboard/scheduler.py
+++ b/distributed/dashboard/scheduler.py
@@ -6,7 +6,6 @@
from tornado import web
from tornado.ioloop import IOLoop
-from distributed.dashboard.components.cudf import cudf_spilling_doc
from distributed.dashboard.components.nvml import (
gpu_doc,
gpu_memory_doc,
@@ -120,7 +119,6 @@
"/individual-gpu-memory": gpu_memory_doc,
"/individual-gpu-utilization": gpu_utilization_doc,
"/individual-rmm-memory": rmm_memory_doc,
- "/individual-cudf-spilling": cudf_spilling_doc,
}
From 0ac3344f1bcd4323325380075bed0ab88fc0aca3 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Wed, 6 Sep 2023 08:25:28 -0700
Subject: [PATCH 06/15] Allow MemoryColor colors to be overridden
---
distributed/dashboard/components/rmm.py | 2 +-
distributed/dashboard/components/scheduler.py | 17 +++++++++++------
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py
index 628885a4a7..9f258b787c 100644
--- a/distributed/dashboard/components/rmm.py
+++ b/distributed/dashboard/components/rmm.py
@@ -41,7 +41,7 @@ class RMMMemoryUsage(DashboardComponent, MemoryColor):
@log_errors
def __init__(self, scheduler, width=600, **kwargs):
DashboardComponent.__init__(self)
- MemoryColor.__init__(self)
+ MemoryColor.__init__(self, neutral_color="#76B900")
self.last = 0
self.scheduler = scheduler
diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py
index d25c56f65e..02d563659d 100644
--- a/distributed/dashboard/components/scheduler.py
+++ b/distributed/dashboard/components/scheduler.py
@@ -276,10 +276,15 @@ class MemoryColor:
orange: float
red: float
- def __init__(self):
+ def __init__(self, neutral_color="blue", target_color="orange", terminated_color="red"):
+ self.neutral_color = neutral_color
+ self.target_color = target_color
+ self.terminated_color = terminated_color
+
target = dask.config.get("distributed.worker.memory.target")
spill = dask.config.get("distributed.worker.memory.spill")
terminate = dask.config.get("distributed.worker.memory.terminate")
+
# These values can be False. It's also common to configure them to impossibly
# high values to achieve the same effect.
self.orange = min(target or math.inf, spill or math.inf)
@@ -287,14 +292,14 @@ def __init__(self):
def _memory_color(self, current: int, limit: int, status: Status) -> str:
if status != Status.running:
- return "red"
+ return self.terminated_color
if not limit:
- return "blue"
+ return self.neutral_color
if current >= limit * self.red:
- return "red"
+ return self.terminated_color
if current >= limit * self.orange:
- return "orange"
- return "blue"
+ return self.target_color
+ return self.neutral_color
class ClusterMemory(DashboardComponent, MemoryColor):
From bb491356a739dbbab85882bf0954309ca0ec68f8 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Wed, 6 Sep 2023 08:40:39 -0700
Subject: [PATCH 07/15] Linting
---
distributed/dashboard/components/scheduler.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py
index 02d563659d..4d4c736ff6 100644
--- a/distributed/dashboard/components/scheduler.py
+++ b/distributed/dashboard/components/scheduler.py
@@ -276,7 +276,9 @@ class MemoryColor:
orange: float
red: float
- def __init__(self, neutral_color="blue", target_color="orange", terminated_color="red"):
+ def __init__(
+ self, neutral_color="blue", target_color="orange", terminated_color="red"
+ ):
self.neutral_color = neutral_color
self.target_color = target_color
self.terminated_color = terminated_color
From 50e626aeb78f8ff90dd1c4d1cd2fcac64c109059 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Thu, 7 Sep 2023 09:09:33 -0700
Subject: [PATCH 08/15] Add cudf diagnostics test
---
continuous_integration/gpuci/build.sh | 4 ++
.../tests/test_cudf_diagnostics.py | 39 +++++++++++++++++++
2 files changed, 43 insertions(+)
create mode 100644 distributed/diagnostics/tests/test_cudf_diagnostics.py
diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh
index 37b7ab4370..76048eddd2 100644
--- a/continuous_integration/gpuci/build.sh
+++ b/continuous_integration/gpuci/build.sh
@@ -26,6 +26,10 @@ export CUDA_REL=${CUDA_VERSION%.*}
# FIXME - monitoring GIL contention causes UCX teardown issues
export DASK_DISTRIBUTED__ADMIN__SYSTEM_MONITOR__GIL__ENABLED=False
+# enable cuDF spilling to host
+export CUDF_SPILL=on
+export CUDF_SPILL_STATS=1
+
################################################################################
# SETUP - Check environment
################################################################################
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
new file mode 100644
index 0000000000..9c2bbb825a
--- /dev/null
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -0,0 +1,39 @@
+from __future__ import annotations
+
+import pytest
+
+from distributed.utils_test import gen_cluster
+
+pytestmark = pytest.mark.gpu
+
+cudf = pytest.importorskip("cudf")
+dask_cuda = pytest.importorskip("dask_cuda")
+
+
+def force_spill():
+ from cudf.core.buffer.spill_manager import get_global_manager
+
+ manager = get_global_manager()
+
+ # 24 bytes
+ df = cudf.DataFrame({"a": [1, 2, 3]})
+
+ return manager.spill_to_device_limit(1)
+
+
+@gen_cluster(
+ client=True,
+ nthreads=[("127.0.0.1", 1)],
+ Worker=dask_cuda.CUDAWorker,
+)
+async def test_cudf_metrics(c, s, *workers):
+ w = list(s.workers.values())[0]
+ assert "cudf" in w.metrics
+ assert w.metrics["cudf"]["cudf-spilled"] == 0
+
+ try:
+ await c.run(force_spill)
+ except AttributeError:
+ pytest.xfail("cuDF spilling & spilling statistics must be enabled")
+
+ assert w.metrics["cudf"]["cudf-spilled"] == 24
From 082ddefca3cb6e6c9e80eea15277d2805bca2d93 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Tue, 12 Sep 2023 09:51:00 -0700
Subject: [PATCH 09/15] Resolve bokeh test failures
---
distributed/dashboard/components/rmm.py | 10 ++++++++--
distributed/diagnostics/cudf.py | 4 ++--
2 files changed, 10 insertions(+), 4 deletions(-)
diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py
index 9f258b787c..7376476570 100644
--- a/distributed/dashboard/components/rmm.py
+++ b/distributed/dashboard/components/rmm.py
@@ -142,15 +142,21 @@ def quadlist(i: Iterable[T]) -> list[T]:
rmm_metrics = ws.metrics["rmm"]
gpu_metrics = ws.metrics["gpu"]
gpu_info = ws.extra["gpu"]
+ except KeyError:
+ rmm_metrics = {"rmm-used": 0, "rmm-total": 0}
+ gpu_metrics = {"memory-used": 0}
+ gpu_info = {"memory-total": 0}
+
+ try:
cudf_metrics = ws.metrics["cudf"]
except KeyError:
- continue
+ cudf_metrics = {"cudf-spilled": 0}
rmm_used_worker = rmm_metrics["rmm-used"] # RMM memory only
rmm_total_worker = rmm_metrics["rmm-total"]
gpu_used_worker = gpu_metrics["memory-used"] # All GPU memory
gpu_total_worker = gpu_info["memory-total"]
- spilled_worker = cudf_metrics["cudf-spilled"] # memory spilled to host
+ spilled_worker = cudf_metrics["cudf-spilled"] or 0 # memory spilled to host
max_limit = max(
max_limit, gpu_total_worker, gpu_used_worker + spilled_worker
diff --git a/distributed/diagnostics/cudf.py b/distributed/diagnostics/cudf.py
index a3404ee490..c118f7e503 100644
--- a/distributed/diagnostics/cudf.py
+++ b/distributed/diagnostics/cudf.py
@@ -12,10 +12,10 @@
def real_time():
if get_global_manager is None:
- return {"cudf-spilled": 0}
+ return {"cudf-spilled": None}
mgr = get_global_manager()
if mgr is None:
- return {"cudf-spilled": 0}
+ return {"cudf-spilled": None}
totals = mgr.statistics.spill_totals
From b60173d229b92985c1b9ee6e83f2d52c267ab994 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Thu, 28 Sep 2023 07:47:39 -0700
Subject: [PATCH 10/15] Make cudf spilling monitoring optional and disabled by
default
---
distributed/distributed-schema.yaml | 8 +++++++-
distributed/distributed.yaml | 1 +
distributed/worker.py | 26 ++++++++++++++------------
3 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml
index 510db30123..6985a39574 100644
--- a/distributed/distributed-schema.yaml
+++ b/distributed/distributed-schema.yaml
@@ -1001,6 +1001,12 @@ properties:
not a problem and will be automatically disabled if no GPUs are found in the
system, but in certain cases it may be desirable to completely disable NVML
diagnostics.
+ cudf:
+ type: boolean
+ description: |
+ If ``True``, enables tracking of GPU spilling and unspilling managed by cuDF (if it is enabled).
+ Note that this forces a cuDF import at worker startup, which may be undesirable for performance
+ and memory footprint.
computations:
type: object
properties:
@@ -1008,7 +1014,7 @@ properties:
type: integer
minimum: 0
description: |
- The maximum number of Computations to remember.
+ The maximum number of computations to remember.
nframes:
type: integer
minimum: 0
diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml
index 594ee22532..874735a6f0 100644
--- a/distributed/distributed.yaml
+++ b/distributed/distributed.yaml
@@ -267,6 +267,7 @@ distributed:
diagnostics:
nvml: True
+ cudf: False
computations:
max-history: 100
nframes: 0
diff --git a/distributed/worker.py b/distributed/worker.py
index 5f5ebcdab0..e56de15a86 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -78,7 +78,7 @@
)
from distributed.core import rpc as RPCType
from distributed.core import send_recv
-from distributed.diagnostics import cudf, nvml, rmm
+from distributed.diagnostics import nvml, rmm
from distributed.diagnostics.plugin import WorkerPlugin, _get_plugin_name
from distributed.diskutils import WorkSpace
from distributed.exceptions import Reschedule
@@ -3221,19 +3221,21 @@ async def rmm_metric(worker):
DEFAULT_METRICS["rmm"] = rmm_metric
del _rmm
+# avoid importing cuDF unless explicitly enabled
+if dask.config.get("distributed.diagnostics.cudf"):
+ try:
+ import cudf as _cudf # noqa: F401
+ except Exception:
+ pass
+ else:
+ from distributed.diagnostics import cudf
-try:
- import cudf as _cudf
-except Exception:
- pass
-else:
-
- async def cudf_metric(worker):
- result = await offload(cudf.real_time)
- return result
+ async def cudf_metric(worker):
+ result = await offload(cudf.real_time)
+ return result
- DEFAULT_METRICS["cudf"] = cudf_metric
- del _cudf
+ DEFAULT_METRICS["cudf"] = cudf_metric
+ del _cudf
def print(
From 189013679dac92fbb9c25c0c8415106d34609bd8 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Thu, 28 Sep 2023 13:10:07 -0700
Subject: [PATCH 11/15] Modify cudf spilling test
---
continuous_integration/gpuci/build.sh | 4 +++-
distributed/diagnostics/tests/test_cudf_diagnostics.py | 6 +++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh
index 76048eddd2..8b01eeb5e4 100644
--- a/continuous_integration/gpuci/build.sh
+++ b/continuous_integration/gpuci/build.sh
@@ -26,9 +26,11 @@ export CUDA_REL=${CUDA_VERSION%.*}
# FIXME - monitoring GIL contention causes UCX teardown issues
export DASK_DISTRIBUTED__ADMIN__SYSTEM_MONITOR__GIL__ENABLED=False
-# enable cuDF spilling to host
+# enable monitoring of cuDF spilling
export CUDF_SPILL=on
export CUDF_SPILL_STATS=1
+export DASK_DISTRIBUTED__DIAGNOSTICS__CUDF=1
+
################################################################################
# SETUP - Check environment
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
index 9c2bbb825a..319281a6c4 100644
--- a/distributed/diagnostics/tests/test_cudf_diagnostics.py
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -29,7 +29,11 @@ def force_spill():
async def test_cudf_metrics(c, s, *workers):
w = list(s.workers.values())[0]
assert "cudf" in w.metrics
- assert w.metrics["cudf"]["cudf-spilled"] == 0
+
+ if spill_initial := w.metrics["cudf"]["cudf-spilled"] is None:
+ pytest.xfail("cuDF spilling & spilling statistics must be enabled")
+
+ assert spill_initial == 0
try:
await c.run(force_spill)
From 5eafddcdfee1c3448db2125816118f0f64d38d52 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Fri, 29 Sep 2023 10:19:18 -0700
Subject: [PATCH 12/15] Test cuDF spill tests in separate process
---
continuous_integration/gpuci/build.sh | 4 ++++
.../tests/test_cudf_diagnostics.py | 23 +++++++++++--------
2 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh
index 8b01eeb5e4..54e59df902 100644
--- a/continuous_integration/gpuci/build.sh
+++ b/continuous_integration/gpuci/build.sh
@@ -62,3 +62,7 @@ conda list --show-channel-urls
gpuci_logger "Python py.test for distributed"
py.test distributed -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml"
+
+# cuDF spill stats monitoring must be enabled for this test
+CUDF_SPILL=on CUDF_SPILL_STATS=1 DASK_DISTRIBUTED__DIAGNOSTICS__CUDF=1 \
+ py.test distributed/diagnostics/tests/test_cudf_diagnostics.py -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml"
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
index 319281a6c4..ee9ae12f89 100644
--- a/distributed/diagnostics/tests/test_cudf_diagnostics.py
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -1,10 +1,20 @@
from __future__ import annotations
+import os
+
import pytest
from distributed.utils_test import gen_cluster
-pytestmark = pytest.mark.gpu
+pytestmark = [
+ pytest.mark.gpu,
+ pytest.mark.skipif(
+ os.environ.get("CUDF_SPILL", "off") != "on"
+ or os.environ.get("CUDF_SPILL_STATS", "0") != "1"
+ or os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__CUDF", "0") != "1",
+ reason="cuDF spill stats monitoring must be enabled manually",
+ ),
+]
cudf = pytest.importorskip("cudf")
dask_cuda = pytest.importorskip("dask_cuda")
@@ -29,15 +39,8 @@ def force_spill():
async def test_cudf_metrics(c, s, *workers):
w = list(s.workers.values())[0]
assert "cudf" in w.metrics
+ assert w.metrics["cudf"]["cudf-spilled"] == 0
- if spill_initial := w.metrics["cudf"]["cudf-spilled"] is None:
- pytest.xfail("cuDF spilling & spilling statistics must be enabled")
-
- assert spill_initial == 0
-
- try:
- await c.run(force_spill)
- except AttributeError:
- pytest.xfail("cuDF spilling & spilling statistics must be enabled")
+ await c.run(force_spill)
assert w.metrics["cudf"]["cudf-spilled"] == 24
From 21e106b35e1f46697a745263d059c8c29662b092 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Mon, 2 Oct 2023 06:38:32 -0700
Subject: [PATCH 13/15] Remove global cuDF spilling settings from build.sh
---
continuous_integration/gpuci/build.sh | 6 ------
1 file changed, 6 deletions(-)
diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh
index 54e59df902..ee018779bb 100644
--- a/continuous_integration/gpuci/build.sh
+++ b/continuous_integration/gpuci/build.sh
@@ -26,12 +26,6 @@ export CUDA_REL=${CUDA_VERSION%.*}
# FIXME - monitoring GIL contention causes UCX teardown issues
export DASK_DISTRIBUTED__ADMIN__SYSTEM_MONITOR__GIL__ENABLED=False
-# enable monitoring of cuDF spilling
-export CUDF_SPILL=on
-export CUDF_SPILL_STATS=1
-export DASK_DISTRIBUTED__DIAGNOSTICS__CUDF=1
-
-
################################################################################
# SETUP - Check environment
################################################################################
From 3cc4b946ab749479b0ff920e0e6a23d87b2ab854 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Mon, 2 Oct 2023 18:21:31 -0700
Subject: [PATCH 14/15] cuDF metrics test is flaky
---
distributed/diagnostics/tests/test_cudf_diagnostics.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
index ee9ae12f89..395eb901de 100644
--- a/distributed/diagnostics/tests/test_cudf_diagnostics.py
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -36,6 +36,7 @@ def force_spill():
nthreads=[("127.0.0.1", 1)],
Worker=dask_cuda.CUDAWorker,
)
+@pytest.mark.flaky(reruns=10, reruns_delay=5)
async def test_cudf_metrics(c, s, *workers):
w = list(s.workers.values())[0]
assert "cudf" in w.metrics
From b2fdfc67f634bf5e1063c22b2643e60baed2d04a Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Wed, 25 Oct 2023 08:30:11 -0700
Subject: [PATCH 15/15] Shouldn't need dask-cuda worker for test
---
distributed/diagnostics/tests/test_cudf_diagnostics.py | 2 --
1 file changed, 2 deletions(-)
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
index 395eb901de..feb5681855 100644
--- a/distributed/diagnostics/tests/test_cudf_diagnostics.py
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -17,7 +17,6 @@
]
cudf = pytest.importorskip("cudf")
-dask_cuda = pytest.importorskip("dask_cuda")
def force_spill():
@@ -34,7 +33,6 @@ def force_spill():
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
- Worker=dask_cuda.CUDAWorker,
)
@pytest.mark.flaky(reruns=10, reruns_delay=5)
async def test_cudf_metrics(c, s, *workers):