Skip to content

Commit 75f3202

Browse files
committed
Initial commit for spin steps
1 parent a82d761 commit 75f3202

File tree

7 files changed

+495
-31
lines changed

7 files changed

+495
-31
lines changed

metaflow/cli.py

+44-6
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ def config_merge_cb(ctx, param, value):
134134
"step": "metaflow.cli_components.step_cmd.step",
135135
"run": "metaflow.cli_components.run_cmds.run",
136136
"resume": "metaflow.cli_components.run_cmds.resume",
137+
"spin": "metaflow.cli_components.run_cmds.spin",
138+
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
137139
},
138140
)
139141
def cli(ctx):
@@ -384,7 +386,6 @@ def start(
384386
# second one processed will return the actual options. The order of processing
385387
# depends on what (and in what order) the user specifies on the command line.
386388
config_options = config_file or config_value
387-
388389
if (
389390
hasattr(ctx, "saved_args")
390391
and ctx.saved_args
@@ -462,14 +463,10 @@ def start(
462463
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
463464
flow=ctx.obj.flow, env=ctx.obj.environment
464465
)
465-
ctx.obj.event_logger.start()
466-
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
467466

468467
ctx.obj.monitor = MONITOR_SIDECARS[monitor](
469468
flow=ctx.obj.flow, env=ctx.obj.environment
470469
)
471-
ctx.obj.monitor.start()
472-
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
473470

474471
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
475472
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
@@ -485,6 +482,47 @@ def start(
485482

486483
ctx.obj.config_options = config_options
487484

485+
# Override values for spin
486+
if hasattr(ctx, "saved_args") and ctx.saved_args and ctx.saved_args[0] == "spin":
487+
# For spin, we will only use the local metadata provider, datastore, environment
488+
# and null event logger and monitor
489+
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
490+
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
491+
)
492+
ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
493+
flow=ctx.obj.flow, env=ctx.obj.environment
494+
)
495+
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
496+
flow=ctx.obj.flow, env=ctx.obj.environment
497+
)
498+
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
499+
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
500+
ctx.obj.echo
501+
)
502+
ctx.obj.datastore_impl.datastore_root = datastore_root
503+
504+
FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
505+
ctx.obj.flow_datastore = FlowDataStore(
506+
ctx.obj.flow.name,
507+
ctx.obj.environment,
508+
ctx.obj.metadata,
509+
ctx.obj.event_logger,
510+
ctx.obj.monitor,
511+
)
512+
echo(
513+
"Using local metadata provider, datastore, environment, and null event logger and monitor for spin."
514+
)
515+
print(f"Using metadata provider: {ctx.obj.metadata}")
516+
echo(f"Using Datastore root: {datastore_root}")
517+
echo(f"Using Flow Datastore: {ctx.obj.flow_datastore}")
518+
519+
# Start event logger and monitor
520+
ctx.obj.event_logger.start()
521+
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
522+
523+
ctx.obj.monitor.start()
524+
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
525+
488526
decorators._init(ctx.obj.flow)
489527

490528
# It is important to initialize flow decorators early as some of the
@@ -528,7 +566,7 @@ def start(
528566
if (
529567
hasattr(ctx, "saved_args")
530568
and ctx.saved_args
531-
and ctx.saved_args[0] not in ("run", "resume")
569+
and ctx.saved_args[0] not in ("run", "resume", "spin")
532570
):
533571
# run/resume are special cases because they can add more decorators with --with,
534572
# so they have to take care of themselves.

metaflow/cli_components/run_cmds.py

+81-16
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from ..graph import FlowGraph
1010
from ..metaflow_current import current
1111
from ..package import MetaflowPackage
12-
from ..runtime import NativeRuntime
12+
from ..runtime import NativeRuntime, SpinRuntime
1313
from ..system import _system_logger
1414

1515
from ..tagging_util import validate_tags
16-
from ..util import get_latest_run_id, write_latest_run_id
16+
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec
1717

1818

1919
def before_run(obj, tags, decospecs):
@@ -70,6 +70,28 @@ def write_file(file_path, content):
7070
f.write(str(content))
7171

7272

73+
def common_runner_options(func):
74+
@click.option(
75+
"--run-id-file",
76+
default=None,
77+
show_default=True,
78+
type=str,
79+
help="Write the ID of this run to the file specified.",
80+
)
81+
@click.option(
82+
"--runner-attribute-file",
83+
default=None,
84+
show_default=True,
85+
type=str,
86+
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
87+
)
88+
@wraps(func)
89+
def wrapper(*args, **kwargs):
90+
return func(*args, **kwargs)
91+
92+
return wrapper
93+
94+
7395
def common_run_options(func):
7496
@click.option(
7597
"--tag",
@@ -110,20 +132,6 @@ def common_run_options(func):
110132
"option multiple times to attach multiple decorators "
111133
"in steps.",
112134
)
113-
@click.option(
114-
"--run-id-file",
115-
default=None,
116-
show_default=True,
117-
type=str,
118-
help="Write the ID of this run to the file specified.",
119-
)
120-
@click.option(
121-
"--runner-attribute-file",
122-
default=None,
123-
show_default=True,
124-
type=str,
125-
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
126-
)
127135
@wraps(func)
128136
def wrapper(*args, **kwargs):
129137
return func(*args, **kwargs)
@@ -167,6 +175,7 @@ def wrapper(*args, **kwargs):
167175
@click.argument("step-to-rerun", required=False)
168176
@click.command(help="Resume execution of a previous run of this flow.")
169177
@common_run_options
178+
@common_runner_options
170179
@click.pass_obj
171180
def resume(
172181
obj,
@@ -285,6 +294,7 @@ def resume(
285294
@click.command(help="Run the workflow locally.")
286295
@tracing.cli_entrypoint("cli/run")
287296
@common_run_options
297+
@common_runner_options
288298
@click.option(
289299
"--namespace",
290300
"user_namespace",
@@ -360,3 +370,58 @@ def run(
360370
f,
361371
)
362372
runtime.execute()
373+
374+
375+
@click.command(help="Spins up a step locally")
376+
@click.argument(
377+
"step-name",
378+
required=True,
379+
type=str,
380+
)
381+
@click.option(
382+
"--task-pathspec",
383+
default=None,
384+
show_default=True,
385+
help="Task ID to use when spinning up the step. The spinned up step will use the artifacts"
386+
"corresponding to this task ID. If not provided, an arbitrary task ID from the latest run will be used.",
387+
)
388+
@common_runner_options
389+
@click.pass_obj
390+
def spin(
391+
obj,
392+
step_name,
393+
task_pathspec=None,
394+
run_id_file=None,
395+
runner_attribute_file=None,
396+
**kwargs
397+
):
398+
before_run(obj, [], [])
399+
if task_pathspec is None:
400+
task_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)
401+
402+
obj.echo(
403+
f"Spinning up step *{step_name}* locally with task pathspec *{task_pathspec}*"
404+
)
405+
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
406+
step_func = getattr(obj.flow, step_name)
407+
408+
spin_runtime = SpinRuntime(
409+
obj.flow,
410+
obj.graph,
411+
obj.flow_datastore,
412+
obj.metadata,
413+
obj.environment,
414+
obj.package,
415+
obj.logger,
416+
obj.entrypoint,
417+
obj.event_logger,
418+
obj.monitor,
419+
step_func,
420+
task_pathspec,
421+
)
422+
423+
# write_latest_run_id(obj, runtime.run_id)
424+
# write_file(run_id_file, runtime.run_id)
425+
426+
spin_runtime.execute()
427+
pass

metaflow/cli_components/step_cmd.py

+75
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,78 @@ def step(
174174
)
175175

176176
echo("Success", fg="green", bold=True, indent=True)
177+
178+
179+
@click.command(help="Internal command to spin a single task.", hidden=True)
180+
@click.argument("step-name")
181+
@click.option(
182+
"--run-id",
183+
default=None,
184+
required=True,
185+
help="Run ID for the step that's about to be spun",
186+
)
187+
@click.option(
188+
"--task-id",
189+
default=None,
190+
required=True,
191+
help="Task ID for the step that's about to be spun",
192+
)
193+
@click.option(
194+
"--input-paths",
195+
help="A comma-separated list of pathspecs specifying inputs for this step.",
196+
)
197+
@click.option(
198+
"--split-index",
199+
type=int,
200+
default=None,
201+
show_default=True,
202+
help="Index of this foreach split.",
203+
)
204+
@click.option(
205+
"--retry-count",
206+
default=0,
207+
help="How many times we have attempted to run this task.",
208+
)
209+
@click.option(
210+
"--max-user-code-retries",
211+
default=0,
212+
help="How many times we should attempt running the user code.",
213+
)
214+
@click.option(
215+
"--namespace",
216+
"namespace",
217+
default=None,
218+
help="Change namespace from the default (your username) to the specified tag.",
219+
)
220+
@click.pass_context
221+
def spin_internal(
222+
ctx,
223+
step_name,
224+
run_id=None,
225+
task_id=None,
226+
input_paths=None,
227+
split_index=None,
228+
retry_count=None,
229+
max_user_code_retries=None,
230+
namespace=None,
231+
):
232+
if ctx.obj.is_quiet:
233+
echo = echo_dev_null
234+
else:
235+
echo = echo_always
236+
print("I am here 1")
237+
print("I am here 2")
238+
# echo("Spinning a task, *%s*" % step_name, fg="magenta", bold=False)
239+
240+
task = MetaflowTask(
241+
ctx.obj.flow,
242+
ctx.obj.flow_datastore, # local datastore
243+
ctx.obj.metadata, # local metadata provider
244+
ctx.obj.environment, # local environment
245+
ctx.obj.echo,
246+
ctx.obj.event_logger, # null logger
247+
ctx.obj.monitor, # null monitor
248+
None, # no unbounded foreach context
249+
)
250+
# echo("Task is: ", task)
251+
# pass

metaflow/metaflow_config.py

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@
4747
"DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows"
4848
)
4949

50+
###
51+
# Spin configuration
52+
###
53+
SPIN_ALLOWED_DECORATORS = from_conf(
54+
"SPIN_ALLOWED_DECORATORS", ["conda", "pypi", "environment"]
55+
)
56+
57+
5058
###
5159
# User configuration
5260
###

metaflow/plugins/pypi/conda_decorator.py

+1
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def task_pre_step(
287287
def runtime_step_cli(
288288
self, cli_args, retry_count, max_user_code_retries, ubf_context
289289
):
290+
print("Let's go - I am here")
290291
if self.disabled:
291292
return
292293
# Ensure local installation of Metaflow is visible to user code

0 commit comments

Comments
 (0)