9
9
import pickle
10
10
import subprocess
11
11
import sys
12
+ import tarfile
12
13
import threading
13
14
import time
14
15
import uuid
28
29
import yaml
29
30
from coiled import Cluster
30
31
from dask .distributed import Client , WorkerPlugin
32
+ from dask .distributed .diagnostics import memray
31
33
from dask .distributed .diagnostics .memory_sampler import MemorySampler
32
34
from dask .distributed .scheduler import logger as scheduler_logger
33
35
from dotenv import load_dotenv
@@ -68,6 +70,14 @@ def pytest_addoption(parser):
68
70
"--benchmark" , action = "store_true" , help = "Collect benchmarking data for tests"
69
71
)
70
72
73
+ parser .addoption (
74
+ "--memray" ,
75
+ action = "store" ,
76
+ default = "scheduler" ,
77
+ help = "Memray profiles to collect: scheduler or none" ,
78
+ choices = ("scheduler" , "none" ),
79
+ )
80
+
71
81
72
82
def pytest_sessionfinish (session , exitstatus ):
73
83
# https://github.com/pytest-dev/pytest/issues/2393
@@ -652,6 +662,16 @@ def _(**exta_options):
652
662
return _
653
663
654
664
665
+ @pytest .fixture (scope = "session" )
666
+ def s3_performance (s3 ):
667
+ profiles_url = f"{ S3_BUCKET } /performance"
668
+ # Ensure that the performance directory exists,
669
+ # but do NOT remove it as multiple test runs could be
670
+ # accessing it at the same time
671
+ s3 .mkdirs (profiles_url , exist_ok = True )
672
+ return profiles_url
673
+
674
+
655
675
@pytest .fixture (scope = "session" )
656
676
def s3_scratch (s3 ):
657
677
# Ensure that the test-scratch directory exists,
@@ -675,6 +695,13 @@ def s3_url(s3, s3_scratch, test_name_uuid):
675
695
pass
676
696
677
697
698
+ @pytest .fixture (scope = "function" )
699
+ def s3_performance_url (s3 , s3_performance , test_name_uuid ):
700
+ url = f"{ s3_performance } /{ test_name_uuid } "
701
+ s3 .mkdirs (url , exist_ok = False )
702
+ return url
703
+
704
+
678
705
GCS_REGION = "us-central1"
679
706
GCS_BUCKET = "gs://coiled-oss-scratch/benchmarks-bot"
680
707
@@ -843,3 +870,43 @@ def _(*args, **kwargs):
843
870
@pytest .fixture (params = [0.1 , 1 ])
844
871
def memory_multiplier (request ):
845
872
return request .param
873
+
874
+
875
+ @pytest .fixture
876
+ def memray_profile (
877
+ pytestconfig ,
878
+ s3 ,
879
+ s3_performance_url ,
880
+ s3_storage_options ,
881
+ test_run_benchmark ,
882
+ tmp_path ,
883
+ ):
884
+ if not test_run_benchmark :
885
+ yield
886
+ else :
887
+ memray_option = pytestconfig .getoption ("--memray" )
888
+
889
+ if memray_option == "none" :
890
+ yield contextlib .nullcontext
891
+ elif memray_option != "scheduler" :
892
+ raise ValueError (f"Unhandled value for --memray: { memray_option } " )
893
+ else :
894
+
895
+ @contextlib .contextmanager
896
+ def _memray_profile (client ):
897
+ profiles_path = tmp_path / "profiles"
898
+ profiles_path .mkdir ()
899
+ try :
900
+ with memray .memray_scheduler (directory = profiles_path ):
901
+ yield
902
+ finally :
903
+ archive = tmp_path / "memray.tar.gz"
904
+ with tarfile .open (archive , mode = "w:gz" ) as tar :
905
+ for item in profiles_path .iterdir ():
906
+ tar .add (item , arcname = item .name )
907
+ test_run_benchmark .memray_profiles_url = (
908
+ f"{ s3_performance_url } /{ archive .name } "
909
+ )
910
+ s3 .put (archive , s3_performance_url )
911
+
912
+ yield _memray_profile
0 commit comments