Skip to content

Commit 44c2e8e

Browse files
committed
Various fixes and cleanups
1 parent 1326e05 commit 44c2e8e

File tree

9 files changed

+149
-60
lines changed

9 files changed

+149
-60
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ This extension currently contains:
3333
This functionality is currently being actively tested within Netflix but has not yet
3434
been deployed in production.
3535

36+
It is likely to evolve primarily in its implementation as we do further testing. Feedback
37+
on what is working and what is not is most welcome.
38+
3639
### Improvements over the included Conda decorator
3740
This decorator improves several aspects of the included Conda decorator:
3841
- it has significant performance gains:
@@ -93,13 +96,14 @@ needs to satisfy the following requirements:
9396
- `lockfile`
9497
- (optional but recommended) `mamba>=1.1.0`
9598

96-
In addition, and only if you want to support `pypi` packages, you need to apply the
99+
In addition, and only if you want to support `pypi` packages, it is best to apply the
97100
PR `https://github.com/conda-incubator/conda-lock/pull/290` to `conda-lock`. This is
98101
the unfortunate result of a bug in how `conda-lock` handles packages that are both
99102
present in the `conda` environment and `pypi` one.
100103

101-
Due to bugs in `conda`, if your resolved environment contains `.conda` packages and you
102-
do not have `micromamba` installed, the environment creation will fail.
104+
Due to bugs in `conda` and the way we use it, if your resolved environment
105+
contains `.conda` packages and you do not have `micromamba` installed, the
106+
environment creation will fail.
103107

104108
#### Uninstallation
105109
Uninstalling this package will revert the behavior of the conda decorator to the one

metaflow_extensions/netflix_ext/cmd/environment/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from metaflow_extensions.netflix_ext.plugins.conda.utils import arch_id
1515

1616
_deps_parse = re.compile(r"([^<>=!~]+)(.*)")
17-
_ext_parse = re.compile(r"(\w+)\(([^)]+)\)")
17+
_ext_parse = re.compile(r"([-_\w]+)\(([^)]+)\)")
1818

1919
name_to_pkg = {"netflix-ext": "metaflow-netflixext"}
2020

metaflow_extensions/netflix_ext/plugins/conda/conda.py

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
get_conda_root,
6767
parse_explicit_url_conda,
6868
parse_explicit_url_pip,
69+
plural_marker,
6970
)
7071

7172
from .env_descr import (
@@ -114,7 +115,9 @@ def _modified_logger(*args: Any, **kwargs: Any):
114115
self._use_conda_lock_to_resolve = (
115116
CONDA_PREFERRED_RESOLVER == "conda-lock"
116117
) # type: bool
117-
self._find_conda_binary()
118+
self._found_binaries = False # We delay resolving binaries until we need them
119+
# because in the remote case, in conda_environment.py
120+
# we create this object but don't use it.
118121

119122
# Figure out what environments we know about locally
120123
self._local_root = LocalStorage.get_datastore_root_from_config(
@@ -136,6 +139,8 @@ def _modified_logger(*args: Any, **kwargs: Any):
136139
self._storage = None
137140

138141
def binary(self, binary: str) -> Optional[str]:
142+
if not self._found_binaries:
143+
self._find_conda_binary()
139144
if self._bins:
140145
return self._bins.get(binary)
141146
return None
@@ -153,6 +158,9 @@ def resolve(
153158
# is a "lighter" conda.
154159
raise CondaException("Cannot resolve environments in a remote environment")
155160

161+
if not self._found_binaries:
162+
self._find_conda_binary()
163+
156164
have_pip_deps = any([d.category == "pip" for d in deps])
157165
if have_pip_deps and not self._use_conda_lock_to_resolve:
158166
raise CondaException(
@@ -189,6 +197,9 @@ def add_to_resolved_env(
189197
# is a "lighter" conda.
190198
raise CondaException("Cannot resolve environments in a remote environment")
191199

200+
if not self._found_binaries:
201+
self._find_conda_binary()
202+
192203
have_pip_deps = any(
193204
chain(
194205
[p.TYPE == "pip" for p in cur_env.packages],
@@ -252,6 +263,10 @@ def create_for_step(
252263
env: ResolvedEnvironment,
253264
do_symlink: bool = False,
254265
):
266+
267+
if not self._found_binaries:
268+
self._find_conda_binary()
269+
255270
try:
256271
# I am not 100% sure the lock is required but since the environments share
257272
# a common package cache, we will keep it for now
@@ -263,6 +278,10 @@ def create_for_step(
263278
def create_for_name(
264279
self, name: str, env: ResolvedEnvironment, do_symlink: bool = False
265280
):
281+
282+
if not self._found_binaries:
283+
self._find_conda_binary()
284+
266285
with CondaLock(self._env_lock_file(name)):
267286
self._create(env, name)
268287
if do_symlink:
@@ -272,6 +291,9 @@ def create_for_name(
272291

273292
def remove_for_step(self, step_name: str, env_id: EnvID):
274293
# Remove the conda environment
294+
if not self._found_binaries:
295+
self._find_conda_binary()
296+
275297
try:
276298
env_name = self._env_directory_from_envid(env_id)
277299
return self.remove_for_name(env_name)
@@ -280,11 +302,15 @@ def remove_for_step(self, step_name: str, env_id: EnvID):
280302
raise CondaStepException(e, [step_name])
281303

282304
def remove_for_name(self, name: str):
305+
if not self._found_binaries:
306+
self._find_conda_binary()
283307
with CondaLock(self._env_lock_file(name)):
284308
self._remove(name)
285309

286310
def python(self, env_desc: Union[EnvID, str]) -> Optional[str]:
287311
# Get Python interpreter for the conda environment
312+
if not self._found_binaries:
313+
self._find_conda_binary()
288314
env_path = None
289315
if isinstance(env_desc, EnvID):
290316
env_path = self.created_environment(env_desc)
@@ -305,6 +331,9 @@ def python(self, env_desc: Union[EnvID, str]) -> Optional[str]:
305331
def created_environment(
306332
self, env_desc: Union[EnvID, str]
307333
) -> Optional[Tuple[EnvID, str]]:
334+
if not self._found_binaries:
335+
self._find_conda_binary()
336+
308337
if isinstance(env_desc, EnvID):
309338
prefix = "metaflow_%s_%s" % (env_desc.req_id, env_desc.full_id)
310339
else:
@@ -320,6 +349,8 @@ def created_environments(
320349
# List all existing metaflow environments; this can include environments that
321350
# were created with the `environment` command and therefore have a different
322351
# name
352+
if not self._found_binaries:
353+
self._find_conda_binary()
323354
prefix = "metaflow_%s_" % req_id if req_id else ""
324355
return {
325356
k: v
@@ -400,6 +431,8 @@ def cache_environments(
400431
raise CondaException(
401432
"Cannot cache environments since no datastore configured"
402433
)
434+
if not self._found_binaries:
435+
self._find_conda_binary()
403436

404437
cache_paths_to_check = [] # type: List[Tuple[str, str, str]]
405438
my_arch_id = arch_id()
@@ -633,12 +666,19 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str:
633666
if upload_files:
634667
start = time.time()
635668
self._echo(
636-
" Caching %d items to %s ..."
637-
% (len(upload_files), self._datastore_type),
669+
" Caching %d item%s to %s ..."
670+
% (
671+
len(upload_files),
672+
plural_marker(len(upload_files)),
673+
self._datastore_type,
674+
),
638675
nl=False,
639676
)
640677
self._upload_to_ds(upload_files)
641-
self._echo(" done in %d seconds." % int(time.time() - start))
678+
delta_time = int(time.time() - start)
679+
self._echo(
680+
" done in %d second%s." % (delta_time, plural_marker(delta_time))
681+
)
642682

643683
# If this is successful, we cache the environments. We do this *after*
644684
# in case some packages fail to upload so we don't write corrupt
@@ -656,12 +696,19 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str:
656696
)
657697
start = time.time()
658698
self._echo(
659-
" Caching %d environments to %s ..."
660-
% (len(upload_files), self._datastore_type),
699+
" Caching %d environment%s to %s ..."
700+
% (
701+
len(upload_files),
702+
plural_marker(len(upload_files)),
703+
self._datastore_type,
704+
),
661705
nl=False,
662706
)
663707
self._upload_to_ds(upload_files)
664-
self._echo(" done in %d seconds." % int(time.time() - start))
708+
delta_time = int(time.time() - start)
709+
self._echo(
710+
" done in %d second%s." % (delta_time, plural_marker(delta_time))
711+
)
665712
else:
666713
self._echo(" All items already cached in %s." % self._datastore_type)
667714

@@ -691,6 +738,9 @@ def lazy_fetch_packages(
691738
# transmutation also happens if the requested format is not found (only for
692739
# conda packages))
693740

741+
if not self._found_binaries:
742+
self._find_conda_binary()
743+
694744
if require_conda_format is None:
695745
require_conda_format = []
696746
use_package_dirs = True
@@ -961,8 +1011,12 @@ def _micromamba_transmute(src_file: str, dst_file: str, dst_format: str):
9611011
if do_download:
9621012
start = time.time()
9631013
self._echo(
964-
" Downloading %d(web) + %d(cache) packages ..."
965-
% (len(web_downloads), len(cache_downloads)),
1014+
" Downloading %d(web) + %d(cache) package%s ..."
1015+
% (
1016+
len(web_downloads),
1017+
len(cache_downloads),
1018+
plural_marker(len(web_downloads) + len(cache_downloads)),
1019+
),
9661020
nl=False,
9671021
)
9681022

@@ -1041,12 +1095,18 @@ def _micromamba_transmute(src_file: str, dst_file: str, dst_format: str):
10411095
)
10421096

10431097
if do_download:
1098+
delta_time = int(time.time() - start)
10441099
self._echo(
1045-
" done in %d seconds." % int(time.time() - start), timestamp=False
1100+
" done in %d second%s." % (delta_time, plural_marker(delta_time)),
1101+
timestamp=False,
10461102
)
10471103
if not pending_errors and transmutes:
10481104
start = time.time()
1049-
self._echo(" Transmuting %d packages ..." % len(transmutes), nl=False)
1105+
self._echo(
1106+
" Transmuting %d package%s ..."
1107+
% (len(transmutes), plural_marker(len(transmutes))),
1108+
nl=False,
1109+
)
10501110
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
10511111
transmut_results = [
10521112
executor.submit(_transmute, entry) for entry in transmutes
@@ -1064,8 +1124,10 @@ def _micromamba_transmute(src_file: str, dst_file: str, dst_format: str):
10641124

10651125
if new_url not in known_urls:
10661126
url_adds.append(new_url)
1127+
delta_time = int(time.time() - start)
10671128
self._echo(
1068-
" done in %d seconds." % int(time.time() - start), timestamp=False
1129+
" done in %d second%s." % (delta_time, plural_marker(delta_time)),
1130+
timestamp=False,
10691131
)
10701132
if url_adds:
10711133
# Update the urls file in the packages directory so that Conda knows that the
@@ -1387,6 +1449,7 @@ def _find_conda_binary(self):
13871449
err = self._validate_conda_installation()
13881450
if err:
13891451
raise err
1452+
self._found_binaries = True
13901453

13911454
def _ensure_local_conda(self):
13921455
if CONDA_LOCAL_PATH is not None:
@@ -1458,7 +1521,11 @@ def _install_local_conda(self):
14581521
raise InvalidEnvironmentException(
14591522
msg="Could not extract environment: %s" % str(e)
14601523
)
1461-
self._echo(" done in %d seconds." % int(time.time() - start), timestamp=False)
1524+
delta_time = int(time.time() - start)
1525+
self._echo(
1526+
" done in %d second%s." % (delta_time, plural_marker(delta_time)),
1527+
timestamp=False,
1528+
)
14621529

14631530
def _ensure_remote_conda(self):
14641531
if CONDA_REMOTE_INSTALLER is not None:
@@ -1467,14 +1534,17 @@ def _ensure_remote_conda(self):
14671534
# If we don't have a REMOTE_INSTALLER, we check if we need to install one
14681535
args = [
14691536
"/bin/bash",
1470-
"-c"
1471-
"if ! type %s >/dev/null 2>&1; "
1472-
"then wget --no-check-certificate "
1473-
"https://micro.mamba.pm/install.sh -O micromamba.sh >/dev/null 2>&1; "
1474-
"./micromamba.sh >/dev/null 2>&1; echo $HOME/.local/bin/micromamba; "
1475-
"else which %s; fi" % ("micromamba", "micromamba"),
1537+
"-c",
1538+
"if ! type micromamba >/dev/null 2>&1; then "
1539+
"mkdir -p ~/.local/bin >/dev/null 2>&1; "
1540+
"curl -Ls https://micro.mamba.pm/api/micromamba/%s/latest | "
1541+
"tar -xvj -C ~/.local/bin/ --strip-components=1 bin/micromamba >/dev/null 2>&1; "
1542+
"echo $HOME/.local/bin/micromamba; "
1543+
"else which micromamba; fi" % arch_id(),
14761544
]
1477-
self._bins = {"micromamba": subprocess.check_output(args).decode("utf-8")}
1545+
self._bins = {
1546+
"conda": subprocess.check_output(args).decode("utf-8").strip()
1547+
}
14781548

14791549
def _install_remote_conda(self):
14801550
from metaflow.plugins import DATASTORES
@@ -1541,7 +1611,8 @@ def _validate_conda_installation(self) -> Optional[Exception]:
15411611
to_remove.append(k)
15421612
else:
15431613
return InvalidEnvironmentException(
1544-
"Required binary '%s' found. Install using `%s install -n base %s`"
1614+
"Required binary '%s' not found. "
1615+
"Install using `%s install -n base %s`"
15451616
% (k, self._dependency_solver, k)
15461617
)
15471618
if to_remove:
@@ -1847,7 +1918,11 @@ def _create(self, env: ResolvedEnvironment, env_name: str) -> None:
18471918
) as f:
18481919
json.dump(env.env_id, f)
18491920

1850-
self._echo(" done in %s seconds." % int(time.time() - start), timestamp=False)
1921+
delta_time = int(time.time() - start)
1922+
self._echo(
1923+
" done in %s second%s." % (delta_time, plural_marker(delta_time)),
1924+
timestamp=False,
1925+
)
18511926

18521927
def _remove(self, env_name: str):
18531928
# TODO: Verify that this is a proper metaflow environment to remove

metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
from metaflow.metaflow_environment import MetaflowEnvironment
3333

34-
from .utils import get_conda_manifest_path
34+
from .utils import get_conda_manifest_path, plural_marker
3535

3636
from .env_descr import CachedEnvironmentInfo, EnvID, ResolvedEnvironment
3737
from .conda import Conda
@@ -84,9 +84,20 @@ def init_environment(self, echo: Callable[..., None]):
8484
# Figure out the environments that we need to resolve for all steps
8585
# We will resolve all unique environments in parallel
8686
step_conda_dec = get_conda_decorator(self._flow, step.__name__)
87-
env_ids = step_conda_dec.env_ids
88-
for env_id in env_ids:
89-
resolved_env = self._conda.environment(env_id)
87+
my_arch_env_id = step_conda_dec.env_id
88+
requested_archs = step_conda_dec.requested_architectures
89+
# First look up the env_id (it's the default one)
90+
resolved_env = self._conda.environment(my_arch_env_id)
91+
if resolved_env is not None:
92+
# Check to make sure it supports all requested architectures
93+
if not set(requested_archs).issubset(resolved_env.co_resolved_archs):
94+
resolved_env = None
95+
for arch in requested_archs:
96+
env_id = EnvID(
97+
req_id=my_arch_env_id.req_id,
98+
full_id=my_arch_env_id.full_id,
99+
arch=arch,
100+
)
90101
if env_id not in self._requested_envs:
91102
self._requested_envs[env_id] = {
92103
"id": env_id,
@@ -177,11 +188,19 @@ def _resolve_environments(
177188
):
178189
start = time.time()
179190
if len(env_ids) == len(self._requested_envs):
180-
echo(" Resolving %d environments in flow ..." % len(env_ids), nl=False)
191+
echo(
192+
" Resolving %d environment%s in flow ..."
193+
% (len(env_ids), plural_marker(len(env_ids))),
194+
nl=False,
195+
)
181196
else:
182197
echo(
183-
" Resolving %d of %d environments in flows (others are cached) ..."
184-
% (len(env_ids), len(self._requested_envs)),
198+
" Resolving %d of %d environment%s in flow (others are cached) ..."
199+
% (
200+
len(env_ids),
201+
len(self._requested_envs),
202+
plural_marker(len(self._requested_envs)),
203+
),
185204
nl=False,
186205
)
187206

@@ -262,7 +281,7 @@ def _resolve(env_desc: Mapping[str, Any]) -> Tuple[EnvID, ResolvedEnvironment]:
262281
)
263282

264283
duration = int(time.time() - start)
265-
echo(" done in %d seconds." % duration)
284+
echo(" done in %d second%s." % (duration, plural_marker(duration)))
266285

267286
def _get_env_id(self, step_name: str) -> Optional[EnvID]:
268287
conda_decorator = get_conda_decorator(self._flow, step_name)

0 commit comments

Comments
 (0)