Skip to content

Commit 32ac42c

Browse files
committed
Initial commit for spin steps
1 parent 17a4489 commit 32ac42c

File tree

7 files changed

+513
-39
lines changed

7 files changed

+513
-39
lines changed

metaflow/cli.py

+44-6
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ def config_merge_cb(ctx, param, value):
134134
"step": "metaflow.cli_components.step_cmd.step",
135135
"run": "metaflow.cli_components.run_cmds.run",
136136
"resume": "metaflow.cli_components.run_cmds.resume",
137+
"spin": "metaflow.cli_components.run_cmds.spin",
138+
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
137139
},
138140
)
139141
def cli(ctx):
@@ -384,7 +386,6 @@ def start(
384386
# second one processed will return the actual options. The order of processing
385387
# depends on what (and in what order) the user specifies on the command line.
386388
config_options = config_file or config_value
387-
388389
if (
389390
hasattr(ctx, "saved_args")
390391
and ctx.saved_args
@@ -462,14 +463,10 @@ def start(
462463
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
463464
flow=ctx.obj.flow, env=ctx.obj.environment
464465
)
465-
ctx.obj.event_logger.start()
466-
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
467466

468467
ctx.obj.monitor = MONITOR_SIDECARS[monitor](
469468
flow=ctx.obj.flow, env=ctx.obj.environment
470469
)
471-
ctx.obj.monitor.start()
472-
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
473470

474471
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
475472
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
@@ -485,6 +482,47 @@ def start(
485482

486483
ctx.obj.config_options = config_options
487484

485+
# Override values for spin
486+
if hasattr(ctx, "saved_args") and ctx.saved_args and ctx.saved_args[0] == "spin":
487+
# For spin, we will only use the local metadata provider, datastore, environment
488+
# and null event logger and monitor
489+
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
490+
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
491+
)
492+
ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
493+
flow=ctx.obj.flow, env=ctx.obj.environment
494+
)
495+
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
496+
flow=ctx.obj.flow, env=ctx.obj.environment
497+
)
498+
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
499+
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
500+
ctx.obj.echo
501+
)
502+
ctx.obj.datastore_impl.datastore_root = datastore_root
503+
504+
FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
505+
ctx.obj.flow_datastore = FlowDataStore(
506+
ctx.obj.flow.name,
507+
ctx.obj.environment,
508+
ctx.obj.metadata,
509+
ctx.obj.event_logger,
510+
ctx.obj.monitor,
511+
)
512+
echo(
513+
"Using local metadata provider, datastore, environment, and null event logger and monitor for spin."
514+
)
515+
print(f"Using metadata provider: {ctx.obj.metadata}")
516+
echo(f"Using Datastore root: {datastore_root}")
517+
echo(f"Using Flow Datastore: {ctx.obj.flow_datastore}")
518+
519+
# Start event logger and monitor
520+
ctx.obj.event_logger.start()
521+
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
522+
523+
ctx.obj.monitor.start()
524+
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
525+
488526
decorators._init(ctx.obj.flow)
489527

490528
# It is important to initialize flow decorators early as some of the
@@ -523,7 +561,7 @@ def start(
523561
if (
524562
hasattr(ctx, "saved_args")
525563
and ctx.saved_args
526-
and ctx.saved_args[0] not in ("run", "resume")
564+
and ctx.saved_args[0] not in ("run", "resume", "spin")
527565
):
528566
# run/resume are special cases because they can add more decorators with --with,
529567
# so they have to take care of themselves.

metaflow/cli_components/run_cmds.py

+81-16
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from ..graph import FlowGraph
1010
from ..metaflow_current import current
1111
from ..package import MetaflowPackage
12-
from ..runtime import NativeRuntime
12+
from ..runtime import NativeRuntime, SpinRuntime
1313
from ..system import _system_logger
1414

1515
from ..tagging_util import validate_tags
16-
from ..util import get_latest_run_id, write_latest_run_id
16+
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec
1717

1818

1919
def before_run(obj, tags, decospecs):
@@ -68,6 +68,28 @@ def write_file(file_path, content):
6868
f.write(str(content))
6969

7070

71+
def common_runner_options(func):
72+
@click.option(
73+
"--run-id-file",
74+
default=None,
75+
show_default=True,
76+
type=str,
77+
help="Write the ID of this run to the file specified.",
78+
)
79+
@click.option(
80+
"--runner-attribute-file",
81+
default=None,
82+
show_default=True,
83+
type=str,
84+
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
85+
)
86+
@wraps(func)
87+
def wrapper(*args, **kwargs):
88+
return func(*args, **kwargs)
89+
90+
return wrapper
91+
92+
7193
def common_run_options(func):
7294
@click.option(
7395
"--tag",
@@ -108,20 +130,6 @@ def common_run_options(func):
108130
"option multiple times to attach multiple decorators "
109131
"in steps.",
110132
)
111-
@click.option(
112-
"--run-id-file",
113-
default=None,
114-
show_default=True,
115-
type=str,
116-
help="Write the ID of this run to the file specified.",
117-
)
118-
@click.option(
119-
"--runner-attribute-file",
120-
default=None,
121-
show_default=True,
122-
type=str,
123-
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
124-
)
125133
@wraps(func)
126134
def wrapper(*args, **kwargs):
127135
return func(*args, **kwargs)
@@ -165,6 +173,7 @@ def wrapper(*args, **kwargs):
165173
@click.argument("step-to-rerun", required=False)
166174
@click.command(help="Resume execution of a previous run of this flow.")
167175
@common_run_options
176+
@common_runner_options
168177
@click.pass_obj
169178
def resume(
170179
obj,
@@ -283,6 +292,7 @@ def resume(
283292
@click.command(help="Run the workflow locally.")
284293
@tracing.cli_entrypoint("cli/run")
285294
@common_run_options
295+
@common_runner_options
286296
@click.option(
287297
"--namespace",
288298
"user_namespace",
@@ -358,3 +368,58 @@ def run(
358368
f,
359369
)
360370
runtime.execute()
371+
372+
373+
@click.command(help="Spins up a step locally")
374+
@click.argument(
375+
"step-name",
376+
required=True,
377+
type=str,
378+
)
379+
@click.option(
380+
"--task-pathspec",
381+
default=None,
382+
show_default=True,
383+
help="Task ID to use when spinning up the step. The spinned up step will use the artifacts"
384+
"corresponding to this task ID. If not provided, an arbitrary task ID from the latest run will be used.",
385+
)
386+
@common_runner_options
387+
@click.pass_obj
388+
def spin(
389+
obj,
390+
step_name,
391+
task_pathspec=None,
392+
run_id_file=None,
393+
runner_attribute_file=None,
394+
**kwargs
395+
):
396+
before_run(obj, [], [])
397+
if task_pathspec is None:
398+
task_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)
399+
400+
obj.echo(
401+
f"Spinning up step *{step_name}* locally with task pathspec *{task_pathspec}*"
402+
)
403+
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
404+
step_func = getattr(obj.flow, step_name)
405+
406+
spin_runtime = SpinRuntime(
407+
obj.flow,
408+
obj.graph,
409+
obj.flow_datastore,
410+
obj.metadata,
411+
obj.environment,
412+
obj.package,
413+
obj.logger,
414+
obj.entrypoint,
415+
obj.event_logger,
416+
obj.monitor,
417+
step_func,
418+
task_pathspec,
419+
)
420+
421+
# write_latest_run_id(obj, runtime.run_id)
422+
# write_file(run_id_file, runtime.run_id)
423+
424+
spin_runtime.execute()
425+
pass

metaflow/cli_components/step_cmd.py

+75
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,78 @@ def step(
187187
)
188188

189189
echo("Success", fg="green", bold=True, indent=True)
190+
191+
192+
@click.command(help="Internal command to spin a single task.", hidden=True)
193+
@click.argument("step-name")
194+
@click.option(
195+
"--run-id",
196+
default=None,
197+
required=True,
198+
help="Run ID for the step that's about to be spun",
199+
)
200+
@click.option(
201+
"--task-id",
202+
default=None,
203+
required=True,
204+
help="Task ID for the step that's about to be spun",
205+
)
206+
@click.option(
207+
"--input-paths",
208+
help="A comma-separated list of pathspecs specifying inputs for this step.",
209+
)
210+
@click.option(
211+
"--split-index",
212+
type=int,
213+
default=None,
214+
show_default=True,
215+
help="Index of this foreach split.",
216+
)
217+
@click.option(
218+
"--retry-count",
219+
default=0,
220+
help="How many times we have attempted to run this task.",
221+
)
222+
@click.option(
223+
"--max-user-code-retries",
224+
default=0,
225+
help="How many times we should attempt running the user code.",
226+
)
227+
@click.option(
228+
"--namespace",
229+
"namespace",
230+
default=None,
231+
help="Change namespace from the default (your username) to the specified tag.",
232+
)
233+
@click.pass_context
234+
def spin_internal(
235+
ctx,
236+
step_name,
237+
run_id=None,
238+
task_id=None,
239+
input_paths=None,
240+
split_index=None,
241+
retry_count=None,
242+
max_user_code_retries=None,
243+
namespace=None,
244+
):
245+
if ctx.obj.is_quiet:
246+
echo = echo_dev_null
247+
else:
248+
echo = echo_always
249+
print("I am here 1")
250+
print("I am here 2")
251+
# echo("Spinning a task, *%s*" % step_name, fg="magenta", bold=False)
252+
253+
task = MetaflowTask(
254+
ctx.obj.flow,
255+
ctx.obj.flow_datastore, # local datastore
256+
ctx.obj.metadata, # local metadata provider
257+
ctx.obj.environment, # local environment
258+
ctx.obj.echo,
259+
ctx.obj.event_logger, # null logger
260+
ctx.obj.monitor, # null monitor
261+
None, # no unbounded foreach context
262+
)
263+
# echo("Task is: ", task)
264+
# pass

metaflow/metaflow_config.py

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@
4747
"DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows"
4848
)
4949

50+
###
51+
# Spin configuration
52+
###
53+
SPIN_ALLOWED_DECORATORS = from_conf(
54+
"SPIN_ALLOWED_DECORATORS", ["conda", "pypi", "environment"]
55+
)
56+
57+
5058
###
5159
# User configuration
5260
###

metaflow/plugins/pypi/conda_decorator.py

+1
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def task_pre_step(
287287
def runtime_step_cli(
288288
self, cli_args, retry_count, max_user_code_retries, ubf_context
289289
):
290+
print("Let's go - I am here")
290291
if self.disabled:
291292
return
292293
# Ensure local installation of Metaflow is visible to user code

0 commit comments

Comments
 (0)