Skip to content

Commit a767eda

Browse files
committed
refactor: flows to pipelines
1 parent 0561845 commit a767eda

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+99
-142
lines changed

datatorch/__init__.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
__all__ = ["ApiClient", "get_inputs"]
88

99

10-
def get_inputs() -> dict:
10+
def get_inputs(key: str = None) -> dict:
1111
try:
12-
return json.loads(sys.argv[-1])
12+
values = json.loads(sys.argv[-1])
13+
return values.get(key) if key else values
1314
except Exception as e:
1415
return {}
1516

datatorch/agent/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import logging
33
import click
4-
import sys
54
import os
65

76
from logging.handlers import RotatingFileHandler
@@ -69,7 +68,7 @@ async def _exit_tasks() -> None:
6968
tasks = [
7069
task
7170
for task in asyncio.Task.all_tasks()
72-
if task is not asyncio.tasks.Task.current_task()
71+
if task is not asyncio.tasks.Task.current_task() # type: ignore
7372
]
7473

7574
for task in tasks:

datatorch/agent/agent.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
from typing import List
77
from .client import AgentApiClient
8-
from .flows.template import Variables
8+
from .pipelines.template import Variables
99
from .log_handler import AgentAPIHandler
1010
from .monitoring import AgentSystemStats
1111
from .directory import agent_directory
1212

1313
from gql.client import AsyncClientSession # type: ignore
14-
from datatorch.agent.flows import Job
14+
from datatorch.agent.pipelines import Job
1515

1616

1717
logger = logging.getLogger(__name__)
@@ -61,10 +61,10 @@ async def _run_job(self, job):
6161
job_name = job.get("name")
6262
job_steps = job.get("steps")
6363

64-
flow_run = job.get("run")
65-
flow_config = flow_run.get("config")
64+
run = job.get("run")
65+
run_config = run.get("config")
6666

67-
job_config = flow_config.get("jobs").get(job_name)
67+
job_config = run_config.get("jobs").get(job_name)
6868
job_config["id"] = job_id
6969
job_config["name"] = job_name
7070

@@ -87,7 +87,7 @@ async def _run_job(self, job):
8787
if step.get("id") == None:
8888
raise ValueError(f"No ID found for step {step.get('action')}.")
8989

90-
variables = Variables(flow_run)
90+
variables = Variables(run)
9191

9292
try:
9393
logger.info(f"Starting {job_name} {job_id}")

datatorch/agent/client.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def agent_jobs(self):
2828
text
2929
config
3030
runNumber
31-
flow {
31+
pipeline {
3232
id
3333
projectId
3434
creatorId
@@ -124,7 +124,7 @@ def update_step(self, values: dict):
124124
$startedAt: DateTime
125125
$finishedAt: DateTime
126126
) {
127-
updateFlowStep(
127+
updatePipelineStep(
128128
id: $id
129129
input: {
130130
status: $status
@@ -142,8 +142,8 @@ def update_step(self, values: dict):
142142
async def update_job(self, values: dict):
143143
# fmt: off
144144
mutate = """
145-
mutation UpdateFlowJob($id: ID!, $status: String) {
146-
updateFlowJobRun(
145+
mutation UpdateJob($id: ID!, $status: String) {
146+
updatePipelineJobRun(
147147
id: $id,
148148
input: { status: $status }
149149
)
@@ -155,8 +155,8 @@ async def update_job(self, values: dict):
155155
def upload_step_logs(self, step_id: str, logs: List[Log]):
156156
# fmt: off
157157
mutate = """
158-
mutation UploadStepLogs($id: ID!, $logs: [CreateFlowStepLog!]!) {
159-
createFlowStepLog(stepId: $id, logs: $logs)
158+
mutation UploadStepLogs($id: ID!, $logs: [CreatePipelineStepLog!]!) {
159+
createPipelineStepLog(stepId: $id, logs: $logs)
160160
}
161161
"""
162162
# fmt: on

datatorch/agent/flows/__init__.py

-4
This file was deleted.

datatorch/agent/flows/flow.py

-34
This file was deleted.

datatorch/agent/pipelines/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .pipeline import Pipeline
2+
from .job import Job
3+
4+
__all__ = ["Pipeline", "Job"]

datatorch/agent/flows/action/action.py datatorch/agent/pipelines/action/action.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datatorch.agent.flows.template import Variables
1+
from datatorch.agent.pipelines.template import Variables
22
import os
33
import yaml
44
import logging

datatorch/agent/flows/job/job.py datatorch/agent/pipelines/job/job.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,15 @@ def __init__(self, config: dict, agent: "Agent" = None):
2828
yaml.dump(self.config, yaml_config, default_flow_style=False)
2929

3030
async def update(self, status: str) -> None:
31-
if self.agent is None:
31+
if self.agent is None or self.id is None:
3232
return None
3333
variables = {"id": self.id, "status": status}
3434
await self.agent.api.update_job(variables)
3535

3636
async def run(self, variables: Variables):
3737
""" Runs each step of the job. """
3838
steps = Step.from_dict_list(self.config.get("steps", []), job=self)
39-
inputs = {}
4039
await self.update("RUNNING")
41-
4240
variables.set_job(self)
4341

4442
for step in steps:

datatorch/agent/pipelines/pipeline.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import os
2+
import yaml
3+
import logging
4+
5+
from typing import Union, TYPE_CHECKING
6+
from .job import Job
7+
8+
9+
if TYPE_CHECKING:
10+
from ..agent import Agent
11+
12+
13+
logger = logging.getLogger("datatorch.agent.pipeline")
14+
15+
16+
class Pipeline(object):
17+
@classmethod
18+
def from_yaml(cls, path, agent: "Agent" = None):
19+
with open(path, "r") as config_file:
20+
config = yaml.load(config_file, Loader=yaml.FullLoader)
21+
config["name"] = config.get("name", os.path.splitext(os.path.basename(path))[0])
22+
return cls.from_config(config, agent)
23+
24+
@classmethod
25+
def from_config(cls, config: Union[str, dict], agent: "Agent" = None):
26+
""" Creates a pipeline from a config file. """
27+
if isinstance(config, str):
28+
cf = yaml.load(config, Loader=yaml.FullLoader)
29+
else:
30+
cf = config
31+
return cls(cf, agent=agent)
32+
33+
def __init__(self, config: dict, agent: "Agent" = None):
34+
self.name = config.get("name")
35+
self.config = config
36+
self.agent = agent
37+
38+
async def run(self, job_config: dict):
39+
""" Runs a job. """
40+
# await Job(job_config, agent=self.agent).run()
41+
pass
File renamed without changes.
File renamed without changes.

datatorch/agent/flows/template.py datatorch/agent/pipelines/template.py

+20-15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
global_variables = {
1414
"machine": {
15-
"name": platform.machine(),
15+
"name": platform.node(),
1616
"os": platform.system(),
1717
"version": platform.version(),
1818
},
@@ -24,28 +24,28 @@
2424

2525

2626
class Variables(object):
27-
def __init__(self, flow_run: dict):
27+
def __init__(self, run: dict):
2828
self.variables = {"variable": {}, "input": {}}
29-
flow = flow_run.get("flow")
29+
pipeline = run.get("pipeline")
3030
self.set(
31-
"flow",
31+
"pipeline",
3232
{
33-
"id": flow.get("id"),
34-
"name": flow.get("name"),
35-
"creatorId": flow.get("creatorId"),
36-
"projectId": flow.get("projectId"),
37-
"lastRunNumber": flow.get("lastRunNumber"),
33+
"id": pipeline.get("id"),
34+
"name": pipeline.get("name"),
35+
"creatorId": pipeline.get("creatorId"),
36+
"projectId": pipeline.get("projectId"),
37+
"lastRunNumber": pipeline.get("lastRunNumber"),
3838
},
3939
)
4040

4141
self.set(
4242
"run",
4343
{
44-
"id": flow_run.get("id"),
45-
"name": flow_run.get("name"),
46-
"config": flow_run.get("config"),
47-
"createdAt": flow_run.get("createdAt"),
48-
"runNumber": flow_run.get("runNumber"),
44+
"id": run.get("id"),
45+
"name": run.get("name"),
46+
"config": run.get("config"),
47+
"createdAt": run.get("createdAt"),
48+
"runNumber": run.get("runNumber"),
4949
},
5050
)
5151
print(self.variables)
@@ -78,7 +78,12 @@ def merge(self, section: str, variables: dict):
7878
self.variables[section] = {**self.variables[section], **variables}
7979

8080
def render(self, string: str):
81-
tp = Template(string)
81+
tp = Template(
82+
string,
83+
block_start_string="${%",
84+
variable_start_string="${{",
85+
comment_start_string="${#",
86+
)
8287
return tp.render({**global_variables, **self.variables})
8388

8489
@property

datatorch/agent/utils/threads.py

-16
This file was deleted.

datatorch/api/entity/project.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def datasets(self) -> List[Dataset]:
101101
params={"projectId": self.id},
102102
)
103103

104-
def files(self, where: Where = None, limit=500, page=1):
104+
def files(self, where: Where = None, limit=500, page=1) -> List[File]:
105105
if where is None:
106106
where = Where()
107107
return self.client.query_to_class(

datatorch/api/entity/sources/image/bounding_box.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ def xywh(cls, x, y, width, height):
2323
@property
2424
def top_left(self) -> Point2D:
2525
""" Top-left point of the box """
26-
return [self.x, self.y]
26+
return (self.x, self.y)
2727

2828
@property
2929
def bottom_right(self) -> Point2D:
30-
return [self.x + self.width, self.y + self.height]
30+
return (self.x + self.width, self.y + self.height)
3131

3232
@property
3333
def size(self) -> float:

datatorch/cli/action/run.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import json
77

88
from datatorch.agent import logger
9-
from datatorch.agent.flows.action import Action
9+
from datatorch.agent.pipelines.action import Action
1010

1111

1212
input_types = {"string": str, "boolean": bool, "number": float}

datatorch/cli/groups.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from .main.version import version
1010

1111
from .agent import agent
12-
from .flow import flow
12+
from .pipeline import pipeline
1313
from .action import action
1414

1515

@@ -24,6 +24,6 @@ def main():
2424
main.add_command(version)
2525
main.add_command(package_upgrade)
2626

27-
main.add_command(flow)
27+
main.add_command(pipeline)
2828
main.add_command(agent)
2929
main.add_command(action)

datatorch/cli/flow/__init__.py datatorch/cli/pipeline/__init__.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77

88
@click.group()
9-
def flow():
9+
def pipeline():
1010
pass
1111

1212

13-
flow.add_command(run)
14-
flow.add_command(generate)
15-
flow.add_command(upload)
13+
pipeline.add_command(run)
14+
pipeline.add_command(generate)
15+
pipeline.add_command(upload)
File renamed without changes.

datatorch/cli/flow/run.py datatorch/cli/pipeline/run.py

+4-8
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@
22
import yaml
33
import asyncio
44

5-
from datatorch.agent.flows import Flow, Job
5+
from datatorch.agent.pipelines import Pipeline, Job
66
from datatorch.agent import setup_logging
77

88

9-
class FlowRun(object):
10-
pass
11-
12-
13-
@click.command(help="Runs a flow yaml file on local machine.")
9+
@click.command(help="Runs a pipeline yaml file on local machine.")
1410
@click.argument("path", type=click.Path(exists=True))
1511
def run(path):
1612
setup_logging()
1713

18-
async def run_jobs(flow: Flow):
14+
async def run_jobs(flow: Pipeline):
1915
""" Run tasks in parallel """
2016

2117
tasks = []
@@ -26,4 +22,4 @@ async def run_jobs(flow: Flow):
2622
await asyncio.wait(tasks)
2723

2824
loop = asyncio.get_event_loop()
29-
loop.run_until_complete(run_jobs(Flow.from_yaml(path)))
25+
loop.run_until_complete(run_jobs(Pipeline.from_yaml(path)))
File renamed without changes.

0 commit comments

Comments
 (0)