Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow py-spy profiling for geospatial benchmarks #1572

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add column for py-spy profiles url

Revision ID: aa1fc9fdc665
Revises: 1095dfdfc4ae
Create Date: 2024-10-23 16:11:24.794416

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'aa1fc9fdc665'
down_revision = '1095dfdfc4ae'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column('test_run', sa.Column('py_spy_profiles_url', sa.String(), nullable=True))


def downgrade() -> None:
op.drop_column("test_run", "py_spy_profiles_url")
1 change: 1 addition & 0 deletions benchmark_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TestRun(Base):
performance_report_url = Column(String, nullable=True) # Not yet collected
cluster_dump_url = Column(String, nullable=True)
memray_profiles_url = Column(String, nullable=True)
py_spy_profiles_url = Column(String, nullable=True)


class TPCHRun(Base):
Expand Down
157 changes: 118 additions & 39 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ def pytest_addoption(parser):
choices=("scheduler", "none"),
)

parser.addoption(
"--py-spy",
action="store",
default="none",
help="py-spy profiles to collect: scheduler, workers, all, or none",
choices=("scheduler", "workers", "all", "none"),
)


def pytest_sessionfinish(session, exitstatus):
# https://github.com/pytest-dev/pytest/issues/2393
Expand Down Expand Up @@ -670,12 +678,12 @@ def _(**exta_options):

@pytest.fixture(scope="session")
def s3_performance(s3):
profiles_url = f"{S3_BUCKET}/performance"
performance_url = f"{S3_BUCKET}/performance"
# Ensure that the performance directory exists,
# but do NOT remove it as multiple test runs could be
# accessing it at the same time
s3.mkdirs(profiles_url, exist_ok=True)
return profiles_url
s3.mkdirs(performance_url, exist_ok=True)
return performance_url


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -894,32 +902,33 @@ def memray_profile(

if memray_option == "none":
yield contextlib.nullcontext
elif memray_option != "scheduler":
return

if memray_option != "scheduler":
raise ValueError(f"Unhandled value for --memray: {memray_option}")
else:

@contextlib.contextmanager
def _memray_profile(client):
profiles_path = tmp_path / "profiles"
profiles_path.mkdir()
try:
with memray.memray_scheduler(directory=profiles_path):
yield
finally:
archive = tmp_path / "memray.tar.gz"
with tarfile.open(archive, mode="w:gz") as tar:
for item in profiles_path.iterdir():
tar.add(item, arcname=item.name)
test_run_benchmark.memray_profiles_url = (
f"{s3_performance_url}/{archive.name}"
)
s3.put(archive, s3_performance_url)

yield _memray_profile
@contextlib.contextmanager
def _memray_profile(client):
local_directory = tmp_path / "profiles" / "memray"
local_directory.mkdir(parents=True)
try:
with memray.memray_scheduler(directory=local_directory):
yield
finally:
archive_name = "memray.tar.gz"
archive = tmp_path / archive_name
with tarfile.open(archive, mode="w:gz") as tar:
for item in local_directory.iterdir():
tar.add(item, arcname=item.name)
destination = f"{s3_performance_url}/{archive_name}"
test_run_benchmark.memray_profiles_url = destination
s3.put_file(archive, destination)

yield _memray_profile


@pytest.fixture
def performance_report(
def py_spy_profile(
pytestconfig,
s3,
s3_performance_url,
Expand All @@ -929,20 +938,90 @@ def performance_report(
):
if not test_run_benchmark:
yield contextlib.nullcontext
return

py_spy_option = pytestconfig.getoption("--py-spy")
if py_spy_option == "none":
yield contextlib.nullcontext
return

profile_scheduler = False
profile_workers = False

if py_spy_option == "scheduler":
profile_scheduler = True
elif py_spy_option == "workers":
profile_workers = True
elif py_spy_option == "all":
profile_scheduler = True
profile_workers = True
else:
if not pytestconfig.getoption("--performance-report"):
yield contextlib.nullcontext
else:
raise ValueError(f"Unhandled value for --py-spy: {py_spy_option}")

try:
from dask_pyspy import pyspy, pyspy_on_scheduler
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"py-spy profiling benchmarks requires dask-pyspy to be installed."
) from e

@contextlib.contextmanager
def _py_spy_profile(client):
local_directory = tmp_path / "profiles" / "py-spy"
local_directory.mkdir(parents=True)

worker_ctx = contextlib.nullcontext()
if profile_workers:
worker_ctx = pyspy(local_directory, client=client)

scheduler_ctx = contextlib.nullcontext()
if profile_scheduler:
scheduler_ctx = pyspy_on_scheduler(
local_directory / "scheduler.json", client=client
)

try:
with worker_ctx, scheduler_ctx:
yield
finally:
archive_name = "py-spy.tar.gz"
archive = tmp_path / archive_name
with tarfile.open(archive, mode="w:gz") as tar:
for item in local_directory.iterdir():
tar.add(item, arcname=item.name)
destination = f"{s3_performance_url}/{archive_name}"
test_run_benchmark.py_spy_profiles_url = destination
s3.put_file(archive, destination)

yield _py_spy_profile


@pytest.fixture
def performance_report(
pytestconfig,
s3,
s3_performance_url,
s3_storage_options,
test_run_benchmark,
tmp_path,
):
if not test_run_benchmark:
yield contextlib.nullcontext
return

if not pytestconfig.getoption("--performance-report"):
yield contextlib.nullcontext
return

@contextlib.contextmanager
def _performance_report():
try:
filename = f"{s3_performance_url}/performance_report.html.gz"
with distributed.performance_report(
filename=filename, storage_options=s3_storage_options
):
yield
finally:
test_run_benchmark.performance_report_url = filename

@contextlib.contextmanager
def _performance_report():
try:
filename = f"{s3_performance_url}/performance_report.html.gz"
with distributed.performance_report(
filename=filename, storage_options=s3_storage_options
):
yield
finally:
test_run_benchmark.performance_report_url = filename

yield _performance_report
yield _performance_report
11 changes: 8 additions & 3 deletions tests/geospatial/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ def cluster_name(request, scale):

@pytest.fixture()
def client_factory(
cluster_name, github_cluster_tags, benchmark_all, memray_profile, performance_report
cluster_name,
github_cluster_tags,
benchmark_all,
py_spy_profile,
memray_profile,
performance_report,
):
import contextlib

Expand All @@ -45,9 +50,9 @@ def _(n_workers, env=None, **cluster_kwargs):
with cluster.get_client() as client:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(n_workers)
with performance_report(), memray_profile(client), benchmark_all(
with performance_report(), py_spy_profile(client), memray_profile(
client
):
), benchmark_all(client):
yield client

return _
Loading