From 54e68e0dbdb7d5366f43848fe35b9ea69ee3a1b3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Oct 2023 16:59:10 -0700 Subject: [PATCH] Airflow agent (#1725) --------- Signed-off-by: Kevin Su Co-authored-by: Yee Hing Tong --- .github/workflows/pythonbuild.yml | 1 + Dockerfile | 4 +- Dockerfile.agent | 5 +- flytekit/extend/backend/base_agent.py | 30 +- flytekit/types/pickle/pickle.py | 50 +- plugins/flytekit-airflow/README.md | 33 + .../flytekitplugins/airflow/__init__.py | 16 + .../flytekitplugins/airflow/agent.py | 109 ++ .../flytekitplugins/airflow/task.py | 75 ++ plugins/flytekit-airflow/requirements.in | 1 + plugins/flytekit-airflow/requirements.txt | 1057 +++++++++++++++++ plugins/flytekit-airflow/setup.py | 43 + plugins/flytekit-airflow/tests/__init__.py | 0 plugins/flytekit-airflow/tests/test_agent.py | 25 + 14 files changed, 1410 insertions(+), 39 deletions(-) create mode 100644 plugins/flytekit-airflow/README.md create mode 100644 plugins/flytekit-airflow/flytekitplugins/airflow/__init__.py create mode 100644 plugins/flytekit-airflow/flytekitplugins/airflow/agent.py create mode 100644 plugins/flytekit-airflow/flytekitplugins/airflow/task.py create mode 100644 plugins/flytekit-airflow/requirements.in create mode 100644 plugins/flytekit-airflow/requirements.txt create mode 100644 plugins/flytekit-airflow/setup.py create mode 100644 plugins/flytekit-airflow/tests/__init__.py create mode 100644 plugins/flytekit-airflow/tests/test_agent.py diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 3ea48ee232..4faeebef90 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -94,6 +94,7 @@ jobs: python-version: ["3.8", "3.11"] plugin-names: # Please maintain an alphabetical order in the following list + - flytekit-airflow - flytekit-aws-athena - flytekit-aws-batch - flytekit-aws-sagemaker diff --git a/Dockerfile b/Dockerfile index d9662e2679..8aa4cb2ae1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ ARG PYTHON_VERSION FROM python:${PYTHON_VERSION}-slim-buster MAINTAINER Flyte Team -LABEL org.opencontainers.image.source https://github.com/flyteorg/flytekit +LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit WORKDIR /root ENV PYTHONPATH /root @@ -20,6 +20,8 @@ RUN pip install -U flytekit==$VERSION \ RUN useradd -u 1000 flytekit RUN chown flytekit: /root +# Some packages will create config file under /home by default, so we need to make sure it's writable +RUN chown flytekit: /home USER flytekit ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE" diff --git a/Dockerfile.agent b/Dockerfile.agent index 79dfb5b9d0..65781d3f68 100644 --- a/Dockerfile.agent +++ b/Dockerfile.agent @@ -5,6 +5,9 @@ LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit ARG VERSION RUN pip install prometheus-client -RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION + +# Airflow plugin's dependencies +RUN pip install apache-airflow +RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION flytekitplugins-airflow==$VERSION CMD pyflyte serve --port 8000 diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 84838559ec..e3558058a1 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -20,7 +20,6 @@ State, ) from flyteidl.core.tasks_pb2 import TaskTemplate -from rich.progress import Progress import flytekit from flytekit import FlyteContext, logger @@ -28,6 +27,7 @@ from flytekit.core.base_task import PythonTask from flytekit.core.type_engine import TypeEngine from flytekit.exceptions.system import FlyteAgentNotFound +from flytekit.exceptions.user import FlyteUserException from flytekit.models.literals import LiteralMap @@ -176,7 +176,7 @@ def execute(self, **kwargs) -> typing.Any: res = asyncio.run(self._get(resource_meta=res.resource_meta)) if res.resource.state != SUCCEEDED: - raise Exception(f"Failed to run the task {self._entity.name}") + raise FlyteUserException(f"Failed to run the task {self._entity.name}") return LiteralMap.from_flyte_idl(res.resource.outputs) @@ -205,21 +205,17 @@ async def _get(self, resource_meta: bytes) -> GetTaskResponse: state = RUNNING grpc_ctx = _get_grpc_context() - progress = Progress(transient=True) - task = progress.add_task(f"[cyan]Running Task {self._entity.name}...", total=None) - with progress: - while not is_terminal_state(state): - progress.start_task(task) - time.sleep(1) - if self._agent.asynchronous: - res = await self._agent.async_get(grpc_ctx, resource_meta) - if self._is_canceled: - await self._is_canceled - sys.exit(1) - else: - res = self._agent.get(grpc_ctx, resource_meta) - state = res.resource.state - logger.info(f"Task state: {state}") + while not is_terminal_state(state): + time.sleep(1) + if self._agent.asynchronous: + res = await self._agent.async_get(grpc_ctx, resource_meta) + if self._is_canceled: + await self._is_canceled + sys.exit(1) + else: + res = self._agent.get(grpc_ctx, resource_meta) + state = res.resource.state + logger.info(f"Task state: {state}") return res def signal_handler(self, resource_meta: bytes, signum: int, frame: FrameType) -> typing.Any: diff --git a/flytekit/types/pickle/pickle.py b/flytekit/types/pickle/pickle.py index b3ce2dff44..214e21924c 100644 --- a/flytekit/types/pickle/pickle.py +++ b/flytekit/types/pickle/pickle.py @@ -4,7 +4,7 @@ import cloudpickle -from flytekit.core.context_manager import FlyteContext +from flytekit.core.context_manager import FlyteContext, FlyteContextManager from flytekit.core.type_engine import TypeEngine, TypeTransformer from flytekit.models.core import types as _core_types from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar @@ -50,6 +50,33 @@ def python_type(cls) -> typing.Type: return _SpecificFormatClass + @classmethod + def to_pickle(cls, python_val: typing.Any) -> str: + ctx = FlyteContextManager.current_context() + local_dir = ctx.file_access.get_random_local_directory() + os.makedirs(local_dir, exist_ok=True) + local_path = ctx.file_access.get_random_local_path() + uri = os.path.join(local_dir, local_path) + with open(uri, "w+b") as outfile: + cloudpickle.dump(python_val, outfile) + + remote_path = ctx.file_access.get_random_remote_path(uri) + ctx.file_access.put_data(uri, remote_path, is_multipart=False) + return remote_path + + @classmethod + def from_pickle(cls, uri: str) -> typing.Any: + ctx = FlyteContextManager.current_context() + # Deserialize the pickle, and return data in the pickle, + # and download pickle file to local first if file is not in the local file systems. + if ctx.file_access.is_remote(uri): + local_path = ctx.file_access.get_random_local_path() + ctx.file_access.get_data(uri, local_path, False) + uri = local_path + with open(uri, "rb") as infile: + data = cloudpickle.load(infile) + return data + class FlytePickleTransformer(TypeTransformer[FlytePickle]): PYTHON_PICKLE_FORMAT = "PythonPickle" @@ -63,15 +90,7 @@ def assert_type(self, t: Type[T], v: T): def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T: uri = lv.scalar.blob.uri - # Deserialize the pickle, and return data in the pickle, - # and download pickle file to local first if file is not in the local file systems. - if ctx.file_access.is_remote(uri): - local_path = ctx.file_access.get_random_local_path() - ctx.file_access.get_data(uri, local_path, False) - uri = local_path - with open(uri, "rb") as infile: - data = cloudpickle.load(infile) - return data + return FlytePickle.from_pickle(uri) def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal: if python_val is None: @@ -81,16 +100,7 @@ def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], exp format=self.PYTHON_PICKLE_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE ) ) - # Dump the task output into pickle - local_dir = ctx.file_access.get_random_local_directory() - os.makedirs(local_dir, exist_ok=True) - local_path = ctx.file_access.get_random_local_path() - uri = os.path.join(local_dir, local_path) - with open(uri, "w+b") as outfile: - cloudpickle.dump(python_val, outfile) - - remote_path = ctx.file_access.get_random_remote_path(uri) - ctx.file_access.put_data(uri, remote_path, is_multipart=False) + remote_path = FlytePickle.to_pickle(python_val) return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=remote_path))) def guess_python_type(self, literal_type: LiteralType) -> typing.Type[FlytePickle[typing.Any]]: diff --git a/plugins/flytekit-airflow/README.md b/plugins/flytekit-airflow/README.md new file mode 100644 index 0000000000..52bc8790e0 --- /dev/null +++ b/plugins/flytekit-airflow/README.md @@ -0,0 +1,33 @@ +# Flytekit Airflow Plugin +Airflow plugin allows you to seamlessly run Airflow tasks in the Flyte workflow without changing any code. + +- Compile Airflow tasks to Flyte tasks +- Use Airflow sensors/operators in Flyte workflows +- Add support running Airflow tasks locally without running a cluster + +## Example +```python +from airflow.sensors.filesystem import FileSensor +from flytekit import task, workflow + +@task() +def t1(): + print("flyte") + + +@workflow +def wf(): + sensor = FileSensor(task_id="id", filepath="/tmp/1234") + sensor >> t1() + + +if __name__ == '__main__': + wf() +``` + + +To install the plugin, run the following command: + +```bash +pip install flytekitplugins-airflow +``` diff --git a/plugins/flytekit-airflow/flytekitplugins/airflow/__init__.py b/plugins/flytekit-airflow/flytekitplugins/airflow/__init__.py new file mode 100644 index 0000000000..1015066db1 --- /dev/null +++ b/plugins/flytekit-airflow/flytekitplugins/airflow/__init__.py @@ -0,0 +1,16 @@ +""" +.. currentmodule:: flytekitplugins.airflow + +This package contains things that are useful when extending Flytekit. + +.. autosummary:: + :template: custom.rst + :toctree: generated/ + + AirflowConfig + AirflowTask + AirflowAgent +""" + +from .agent import AirflowAgent +from .task import AirflowConfig, AirflowTask diff --git a/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py b/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py new file mode 100644 index 0000000000..acd7f9d245 --- /dev/null +++ b/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py @@ -0,0 +1,109 @@ +import importlib +from dataclasses import dataclass +from typing import Optional + +import cloudpickle +import grpc +import jsonpickle +from airflow.providers.google.cloud.operators.dataproc import ( + DataprocDeleteClusterOperator, + DataprocJobBaseOperator, + JobStatus, +) +from airflow.sensors.base import BaseSensorOperator +from airflow.utils.context import Context +from flyteidl.admin.agent_pb2 import ( + PERMANENT_FAILURE, + RUNNING, + SUCCEEDED, + CreateTaskResponse, + DeleteTaskResponse, + GetTaskResponse, + Resource, +) +from flytekitplugins.airflow.task import AirflowConfig +from google.cloud.exceptions import NotFound + +from flytekit import FlyteContext, FlyteContextManager, logger +from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry +from flytekit.models.literals import LiteralMap +from flytekit.models.task import TaskTemplate + + +@dataclass +class ResourceMetadata: + job_id: str + airflow_config: AirflowConfig + + +def _get_airflow_task(ctx: FlyteContext, airflow_config: AirflowConfig): + task_module = importlib.import_module(name=airflow_config.task_module) + task_def = getattr(task_module, airflow_config.task_name) + task_config = airflow_config.task_config + + # Set the GET_ORIGINAL_TASK attribute to True so that task_def will return the original + # airflow task instead of the Flyte task. + ctx.user_space_params.builder().add_attr("GET_ORIGINAL_TASK", True).build() + if issubclass(task_def, DataprocJobBaseOperator): + return task_def(**task_config, asynchronous=True) + return task_def(**task_config) + + +class AirflowAgent(AgentBase): + def __init__(self): + super().__init__(task_type="airflow", asynchronous=False) + + def create( + self, + context: grpc.ServicerContext, + output_prefix: str, + task_template: TaskTemplate, + inputs: Optional[LiteralMap] = None, + ) -> CreateTaskResponse: + airflow_config = jsonpickle.decode(task_template.custom.get("task_config_pkl")) + resource_meta = ResourceMetadata(job_id="", airflow_config=airflow_config) + + ctx = FlyteContextManager.current_context() + airflow_task = _get_airflow_task(ctx, airflow_config) + if isinstance(airflow_task, DataprocJobBaseOperator): + airflow_task.execute(context=Context()) + resource_meta.job_id = ctx.user_space_params.xcom_data["value"]["resource"] + + return CreateTaskResponse(resource_meta=cloudpickle.dumps(resource_meta)) + + def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: + meta = cloudpickle.loads(resource_meta) + airflow_config = meta.airflow_config + job_id = meta.job_id + task = _get_airflow_task(FlyteContextManager.current_context(), meta.airflow_config) + cur_state = RUNNING + + if issubclass(type(task), BaseSensorOperator): + if task.poke(context=Context()): + cur_state = SUCCEEDED + elif issubclass(type(task), DataprocJobBaseOperator): + job = task.hook.get_job( + job_id=job_id, + region=airflow_config.task_config["region"], + project_id=airflow_config.task_config["project_id"], + ) + if job.status.state == JobStatus.State.DONE: + cur_state = SUCCEEDED + elif job.status.state in (JobStatus.State.ERROR, JobStatus.State.CANCELLED): + cur_state = PERMANENT_FAILURE + elif isinstance(task, DataprocDeleteClusterOperator): + try: + task.execute(context=Context()) + except NotFound: + logger.info("Cluster already deleted.") + cur_state = SUCCEEDED + else: + task.execute(context=Context()) + cur_state = SUCCEEDED + return GetTaskResponse(resource=Resource(state=cur_state, outputs=None)) + + def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: + return DeleteTaskResponse() + + +AgentRegistry.register(AirflowAgent()) diff --git a/plugins/flytekit-airflow/flytekitplugins/airflow/task.py b/plugins/flytekit-airflow/flytekitplugins/airflow/task.py new file mode 100644 index 0000000000..93df95b3bf --- /dev/null +++ b/plugins/flytekit-airflow/flytekitplugins/airflow/task.py @@ -0,0 +1,75 @@ +import typing +from dataclasses import dataclass +from typing import Any, Dict, Optional, Type + +import jsonpickle +from airflow import DAG +from airflow.models import BaseOperator +from airflow.sensors.base import BaseSensorOperator + +from flytekit import FlyteContextManager +from flytekit.configuration import SerializationSettings +from flytekit.core.base_task import PythonTask +from flytekit.core.interface import Interface +from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin + + +@dataclass +class AirflowConfig(object): + task_module: str + task_name: str + task_config: typing.Dict[str, Any] + + +class AirflowTask(AsyncAgentExecutorMixin, PythonTask[AirflowConfig]): + _TASK_TYPE = "airflow" + + def __init__( + self, + name: str, + query_template: str, + task_config: Optional[AirflowConfig], + inputs: Optional[Dict[str, Type]] = None, + **kwargs, + ): + super().__init__( + name=name, + task_config=task_config, + query_template=query_template, + interface=Interface(inputs=inputs or {}), + task_type=self._TASK_TYPE, + **kwargs, + ) + + def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + return {"task_config_pkl": jsonpickle.encode(self.task_config)} + + +def _flyte_operator(*args, **kwargs): + """ + This function is called by the Airflow operator to create a new task. We intercept this call and return a Flyte + task instead. + """ + cls = args[0] + if FlyteContextManager.current_context().user_space_params.get_original_task: + # Return original task when running in the agent. + return object.__new__(cls) + config = AirflowConfig(task_module=cls.__module__, task_name=cls.__name__, task_config=kwargs) + t = AirflowTask(name=kwargs["task_id"], query_template="", task_config=config, original_new=cls.__new__) + return t() + + +def _flyte_xcom_push(*args, **kwargs): + """ + This function is called by the Airflow operator to push data to XCom. We intercept this call and store the data + in the Flyte context. + """ + FlyteContextManager.current_context().user_space_params.xcom_data = kwargs + + +params = FlyteContextManager.current_context().user_space_params +params.builder().add_attr("GET_ORIGINAL_TASK", False).add_attr("XCOM_DATA", {}).build() + +BaseOperator.__new__ = _flyte_operator +BaseOperator.xcom_push = _flyte_xcom_push +BaseSensorOperator.dag = DAG(dag_id="flyte_dag") diff --git a/plugins/flytekit-airflow/requirements.in b/plugins/flytekit-airflow/requirements.in new file mode 100644 index 0000000000..29f03e5f4e --- /dev/null +++ b/plugins/flytekit-airflow/requirements.in @@ -0,0 +1 @@ +-e file:.#egg=flytekitplugins-airflow diff --git a/plugins/flytekit-airflow/requirements.txt b/plugins/flytekit-airflow/requirements.txt new file mode 100644 index 0000000000..f12aad9814 --- /dev/null +++ b/plugins/flytekit-airflow/requirements.txt @@ -0,0 +1,1057 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile requirements.in +# +-e file:.#egg=flytekitplugins-airflow + # via -r requirements.in +adlfs==2023.9.0 + # via flytekit +aiobotocore==2.5.4 + # via s3fs +aiofiles==23.2.1 + # via gcloud-aio-storage +aiohttp==3.8.5 + # via + # adlfs + # aiobotocore + # apache-airflow-providers-http + # gcloud-aio-auth + # gcsfs + # s3fs +aioitertools==0.11.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +alembic==1.12.0 + # via + # apache-airflow + # sqlalchemy-spanner +annotated-types==0.5.0 + # via pydantic +anyio==4.0.0 + # via httpcore +apache-airflow==2.7.1 + # via + # apache-airflow-providers-common-sql + # apache-airflow-providers-ftp + # apache-airflow-providers-google + # apache-airflow-providers-http + # apache-airflow-providers-imap + # apache-airflow-providers-sqlite + # flytekitplugins-airflow +apache-airflow-providers-common-sql==1.7.2 + # via + # apache-airflow + # apache-airflow-providers-google + # apache-airflow-providers-sqlite +apache-airflow-providers-ftp==3.5.2 + # via apache-airflow +apache-airflow-providers-google==10.9.0 + # via flytekitplugins-airflow +apache-airflow-providers-http==4.5.2 + # via apache-airflow +apache-airflow-providers-imap==3.3.2 + # via apache-airflow +apache-airflow-providers-sqlite==3.4.3 + # via apache-airflow +apispec[yaml]==6.3.0 + # via flask-appbuilder +argcomplete==3.1.2 + # via apache-airflow +arrow==1.3.0 + # via cookiecutter +asgiref==3.7.2 + # via + # apache-airflow + # apache-airflow-providers-google + # apache-airflow-providers-http +async-timeout==4.0.3 + # via aiohttp +attrs==23.1.0 + # via + # aiohttp + # apache-airflow + # cattrs + # jsonschema + # looker-sdk + # referencing +azure-core==1.29.4 + # via + # adlfs + # azure-identity + # azure-storage-blob +azure-datalake-store==0.0.53 + # via adlfs +azure-identity==1.14.0 + # via adlfs +azure-storage-blob==12.18.2 + # via adlfs +babel==2.13.0 + # via flask-babel +backoff==2.2.1 + # via + # gcloud-aio-auth + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +binaryornot==0.4.4 + # via cookiecutter +blinker==1.6.2 + # via apache-airflow +botocore==1.31.17 + # via aiobotocore +cachelib==0.9.0 + # via + # flask-caching + # flask-session +cachetools==5.3.1 + # via google-auth +cattrs==23.1.2 + # via + # apache-airflow + # looker-sdk +certifi==2023.7.22 + # via + # httpcore + # httpx + # kubernetes + # requests +cffi==1.16.0 + # via + # azure-datalake-store + # cryptography +chardet==5.2.0 + # via + # binaryornot + # gcloud-aio-auth +charset-normalizer==3.3.0 + # via + # aiohttp + # requests +click==8.1.7 + # via + # clickclick + # cookiecutter + # flask + # flask-appbuilder + # flytekit + # rich-click +clickclick==20.10.2 + # via connexion +cloudpickle==2.2.1 + # via flytekit +colorama==0.4.6 + # via flask-appbuilder +colorlog==4.8.0 + # via apache-airflow +configupdater==3.1.1 + # via apache-airflow +connexion[flask]==2.14.2 + # via apache-airflow +cookiecutter==2.4.0 + # via flytekit +cron-descriptor==1.4.0 + # via apache-airflow +croniter==1.4.1 + # via + # apache-airflow + # flytekit +cryptography==41.0.4 + # via + # apache-airflow + # azure-identity + # azure-storage-blob + # gcloud-aio-auth + # msal + # pyjwt + # pyopenssl +dataclasses-json==0.5.9 + # via flytekit +db-dtypes==1.1.1 + # via pandas-gbq +decorator==5.1.1 + # via gcsfs +deprecated==1.2.14 + # via + # apache-airflow + # flytekit + # limits + # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +dill==0.3.7 + # via apache-airflow +diskcache==5.6.3 + # via flytekit +dnspython==2.4.2 + # via email-validator +docker==6.1.3 + # via flytekit +docker-image-py==0.1.12 + # via flytekit +docstring-parser==0.15 + # via flytekit +docutils==0.20.1 + # via python-daemon +email-validator==1.3.1 + # via flask-appbuilder +exceptiongroup==1.1.3 + # via + # anyio + # cattrs +flask==2.2.5 + # via + # apache-airflow + # connexion + # flask-appbuilder + # flask-babel + # flask-caching + # flask-jwt-extended + # flask-limiter + # flask-login + # flask-session + # flask-sqlalchemy + # flask-wtf +flask-appbuilder==4.3.6 + # via apache-airflow +flask-babel==2.0.0 + # via flask-appbuilder +flask-caching==2.0.2 + # via apache-airflow +flask-jwt-extended==4.5.3 + # via flask-appbuilder +flask-limiter==3.5.0 + # via flask-appbuilder +flask-login==0.6.2 + # via + # apache-airflow + # flask-appbuilder +flask-session==0.5.0 + # via apache-airflow +flask-sqlalchemy==2.5.1 + # via flask-appbuilder +flask-wtf==1.2.1 + # via + # apache-airflow + # flask-appbuilder +flyteidl==1.5.21 + # via flytekit +flytekit==1.9.1 + # via flytekitplugins-airflow +frozenlist==1.4.0 + # via + # aiohttp + # aiosignal +fsspec==2023.9.2 + # via + # adlfs + # flytekit + # gcsfs + # s3fs +gcloud-aio-auth==4.2.3 + # via + # apache-airflow-providers-google + # gcloud-aio-bigquery + # gcloud-aio-storage +gcloud-aio-bigquery==7.0.0 + # via apache-airflow-providers-google +gcloud-aio-storage==9.0.0 + # via apache-airflow-providers-google +gcsfs==2023.9.2 + # via flytekit +gitdb==4.0.10 + # via gitpython +gitpython==3.1.37 + # via flytekit +google-ads==22.0.0 + # via apache-airflow-providers-google +google-api-core[grpc]==2.12.0 + # via + # apache-airflow-providers-google + # google-ads + # google-api-python-client + # google-cloud-aiplatform + # google-cloud-appengine-logging + # google-cloud-automl + # google-cloud-batch + # google-cloud-bigquery + # google-cloud-bigquery-datatransfer + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-build + # google-cloud-compute + # google-cloud-container + # google-cloud-core + # google-cloud-datacatalog + # google-cloud-dataflow-client + # google-cloud-dataform + # google-cloud-dataplex + # google-cloud-dataproc + # google-cloud-dataproc-metastore + # google-cloud-dlp + # google-cloud-kms + # google-cloud-language + # google-cloud-logging + # google-cloud-memcache + # google-cloud-monitoring + # google-cloud-orchestration-airflow + # google-cloud-os-login + # google-cloud-pubsub + # google-cloud-redis + # google-cloud-resource-manager + # google-cloud-run + # google-cloud-secret-manager + # google-cloud-spanner + # google-cloud-speech + # google-cloud-storage + # google-cloud-storage-transfer + # google-cloud-tasks + # google-cloud-texttospeech + # google-cloud-translate + # google-cloud-videointelligence + # google-cloud-vision + # google-cloud-workflows + # pandas-gbq + # sqlalchemy-bigquery +google-api-python-client==2.102.0 + # via apache-airflow-providers-google +google-auth==2.23.2 + # via + # apache-airflow-providers-google + # gcsfs + # google-api-core + # google-api-python-client + # google-auth-httplib2 + # google-auth-oauthlib + # google-cloud-core + # google-cloud-storage + # kubernetes + # pandas-gbq + # pydata-google-auth + # sqlalchemy-bigquery +google-auth-httplib2==0.1.1 + # via + # apache-airflow-providers-google + # google-api-python-client +google-auth-oauthlib==1.1.0 + # via + # gcsfs + # google-ads + # pandas-gbq + # pydata-google-auth +google-cloud-aiplatform==1.34.0 + # via apache-airflow-providers-google +google-cloud-appengine-logging==1.3.2 + # via google-cloud-logging +google-cloud-audit-log==0.2.5 + # via google-cloud-logging +google-cloud-automl==2.11.2 + # via apache-airflow-providers-google +google-cloud-batch==0.17.1 + # via apache-airflow-providers-google +google-cloud-bigquery==3.12.0 + # via + # google-cloud-aiplatform + # pandas-gbq + # sqlalchemy-bigquery +google-cloud-bigquery-datatransfer==3.12.1 + # via apache-airflow-providers-google +google-cloud-bigquery-storage==2.22.0 + # via pandas-gbq +google-cloud-bigtable==2.21.0 + # via apache-airflow-providers-google +google-cloud-build==3.20.0 + # via apache-airflow-providers-google +google-cloud-compute==1.14.1 + # via apache-airflow-providers-google +google-cloud-container==2.32.0 + # via apache-airflow-providers-google +google-cloud-core==2.3.3 + # via + # google-cloud-bigquery + # google-cloud-bigtable + # google-cloud-logging + # google-cloud-spanner + # google-cloud-storage + # google-cloud-translate +google-cloud-datacatalog==3.16.0 + # via apache-airflow-providers-google +google-cloud-dataflow-client==0.8.4 + # via apache-airflow-providers-google +google-cloud-dataform==0.5.3 + # via apache-airflow-providers-google +google-cloud-dataplex==1.6.3 + # via apache-airflow-providers-google +google-cloud-dataproc==5.6.0 + # via apache-airflow-providers-google +google-cloud-dataproc-metastore==1.13.0 + # via apache-airflow-providers-google +google-cloud-dlp==3.12.3 + # via apache-airflow-providers-google +google-cloud-kms==2.19.1 + # via apache-airflow-providers-google +google-cloud-language==2.11.1 + # via apache-airflow-providers-google +google-cloud-logging==3.8.0 + # via apache-airflow-providers-google +google-cloud-memcache==1.7.3 + # via apache-airflow-providers-google +google-cloud-monitoring==2.16.0 + # via apache-airflow-providers-google +google-cloud-orchestration-airflow==1.9.2 + # via + # apache-airflow-providers-google + # flytekitplugins-airflow +google-cloud-os-login==2.10.0 + # via apache-airflow-providers-google +google-cloud-pubsub==2.18.4 + # via apache-airflow-providers-google +google-cloud-redis==2.13.2 + # via apache-airflow-providers-google +google-cloud-resource-manager==1.10.4 + # via google-cloud-aiplatform +google-cloud-run==0.9.1 + # via apache-airflow-providers-google +google-cloud-secret-manager==2.16.4 + # via apache-airflow-providers-google +google-cloud-spanner==3.40.1 + # via + # apache-airflow-providers-google + # sqlalchemy-spanner +google-cloud-speech==2.21.0 + # via apache-airflow-providers-google +google-cloud-storage==2.11.0 + # via + # apache-airflow-providers-google + # gcsfs + # google-cloud-aiplatform +google-cloud-storage-transfer==1.9.2 + # via apache-airflow-providers-google +google-cloud-tasks==2.14.2 + # via apache-airflow-providers-google +google-cloud-texttospeech==2.14.2 + # via apache-airflow-providers-google +google-cloud-translate==3.12.0 + # via apache-airflow-providers-google +google-cloud-videointelligence==2.11.4 + # via apache-airflow-providers-google +google-cloud-vision==3.4.4 + # via apache-airflow-providers-google +google-cloud-workflows==1.12.1 + # via apache-airflow-providers-google +google-crc32c==1.5.0 + # via google-resumable-media +google-re2==1.1 + # via apache-airflow +google-resumable-media==2.6.0 + # via + # google-cloud-bigquery + # google-cloud-storage +googleapis-common-protos[grpc]==1.60.0 + # via + # flyteidl + # flytekit + # google-ads + # google-api-core + # google-cloud-audit-log + # grpc-google-iam-v1 + # grpcio-status + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +graphviz==0.20.1 + # via apache-airflow +greenlet==3.0.0 + # via sqlalchemy +grpc-google-iam-v1==0.12.6 + # via + # google-cloud-bigtable + # google-cloud-build + # google-cloud-datacatalog + # google-cloud-dataform + # google-cloud-dataplex + # google-cloud-dataproc + # google-cloud-dataproc-metastore + # google-cloud-kms + # google-cloud-logging + # google-cloud-pubsub + # google-cloud-resource-manager + # google-cloud-run + # google-cloud-secret-manager + # google-cloud-spanner + # google-cloud-tasks +grpcio==1.53.0 + # via + # flytekit + # google-ads + # google-api-core + # google-cloud-bigquery + # google-cloud-pubsub + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-gcp + # grpcio-status + # opentelemetry-exporter-otlp-proto-grpc +grpcio-gcp==0.2.2 + # via apache-airflow-providers-google +grpcio-status==1.53.0 + # via + # flytekit + # google-ads + # google-api-core + # google-cloud-pubsub +gunicorn==21.2.0 + # via apache-airflow +h11==0.14.0 + # via httpcore +httpcore==0.18.0 + # via httpx +httplib2==0.22.0 + # via + # google-api-python-client + # google-auth-httplib2 +httpx==0.25.0 + # via + # apache-airflow + # apache-airflow-providers-google +idna==3.4 + # via + # anyio + # email-validator + # httpx + # requests + # yarl +importlib-metadata==6.8.0 + # via + # flytekit + # keyring + # opentelemetry-api +importlib-resources==6.1.0 + # via limits +inflection==0.5.1 + # via connexion +isodate==0.6.1 + # via azure-storage-blob +itsdangerous==2.1.2 + # via + # apache-airflow + # connexion + # flask + # flask-wtf +jaraco-classes==3.3.0 + # via keyring +jinja2==3.1.2 + # via + # apache-airflow + # cookiecutter + # flask + # flask-babel + # python-nvd3 +jmespath==1.0.1 + # via botocore +joblib==1.3.2 + # via flytekit +json-merge-patch==0.2 + # via apache-airflow-providers-google +jsonpickle==3.0.2 + # via + # flytekit + # flytekitplugins-airflow +jsonschema==4.19.1 + # via + # apache-airflow + # connexion + # flask-appbuilder +jsonschema-specifications==2023.7.1 + # via jsonschema +keyring==24.2.0 + # via flytekit +kubernetes==28.1.0 + # via flytekit +lazy-object-proxy==1.9.0 + # via apache-airflow +limits==3.6.0 + # via flask-limiter +linkify-it-py==2.0.2 + # via apache-airflow +lockfile==0.12.2 + # via + # apache-airflow + # python-daemon +looker-sdk==23.16.0 + # via apache-airflow-providers-google +mako==1.2.4 + # via alembic +markdown==3.4.4 + # via apache-airflow +markdown-it-py==3.0.0 + # via + # apache-airflow + # mdit-py-plugins + # rich +markupsafe==2.1.3 + # via + # apache-airflow + # jinja2 + # mako + # werkzeug + # wtforms +marshmallow==3.20.1 + # via + # dataclasses-json + # flask-appbuilder + # marshmallow-enum + # marshmallow-jsonschema + # marshmallow-oneofschema + # marshmallow-sqlalchemy +marshmallow-enum==1.5.1 + # via + # dataclasses-json + # flytekit +marshmallow-jsonschema==0.13.0 + # via flytekit +marshmallow-oneofschema==3.0.1 + # via apache-airflow +marshmallow-sqlalchemy==0.26.1 + # via flask-appbuilder +mdit-py-plugins==0.4.0 + # via apache-airflow +mdurl==0.1.2 + # via markdown-it-py +more-itertools==10.1.0 + # via jaraco-classes +msal==1.24.1 + # via + # azure-datalake-store + # azure-identity + # msal-extensions +msal-extensions==1.0.0 + # via azure-identity +multidict==6.0.4 + # via + # aiohttp + # yarl +mypy-extensions==1.0.0 + # via typing-inspect +natsort==8.4.0 + # via flytekit +numpy==1.24.4 + # via + # db-dtypes + # flytekit + # pandas + # pandas-gbq + # pyarrow +oauthlib==3.2.2 + # via + # kubernetes + # requests-oauthlib +opentelemetry-api==1.20.0 + # via + # apache-airflow + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http + # opentelemetry-sdk +opentelemetry-exporter-otlp==1.20.0 + # via apache-airflow +opentelemetry-exporter-otlp-proto-common==1.20.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-grpc==1.20.0 + # via opentelemetry-exporter-otlp +opentelemetry-exporter-otlp-proto-http==1.20.0 + # via opentelemetry-exporter-otlp +opentelemetry-proto==1.20.0 + # via + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.20.0 + # via + # opentelemetry-exporter-otlp-proto-grpc + # opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.41b0 + # via opentelemetry-sdk +ordered-set==4.1.0 + # via flask-limiter +packaging==23.2 + # via + # apache-airflow + # apispec + # connexion + # db-dtypes + # docker + # google-cloud-aiplatform + # google-cloud-bigquery + # gunicorn + # limits + # marshmallow + # sqlalchemy-bigquery +pandas==1.5.3 + # via + # apache-airflow-providers-google + # db-dtypes + # flytekit + # pandas-gbq +pandas-gbq==0.19.2 + # via apache-airflow-providers-google +pathspec==0.11.2 + # via apache-airflow +pendulum==2.1.2 + # via apache-airflow +pluggy==1.3.0 + # via apache-airflow +portalocker==2.8.2 + # via msal-extensions +prison==0.2.1 + # via flask-appbuilder +proto-plus==1.22.3 + # via + # apache-airflow-providers-google + # google-ads + # google-cloud-aiplatform + # google-cloud-appengine-logging + # google-cloud-automl + # google-cloud-batch + # google-cloud-bigquery + # google-cloud-bigquery-datatransfer + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-build + # google-cloud-compute + # google-cloud-container + # google-cloud-datacatalog + # google-cloud-dataflow-client + # google-cloud-dataform + # google-cloud-dataplex + # google-cloud-dataproc + # google-cloud-dataproc-metastore + # google-cloud-dlp + # google-cloud-kms + # google-cloud-language + # google-cloud-logging + # google-cloud-memcache + # google-cloud-monitoring + # google-cloud-orchestration-airflow + # google-cloud-os-login + # google-cloud-pubsub + # google-cloud-redis + # google-cloud-resource-manager + # google-cloud-run + # google-cloud-secret-manager + # google-cloud-spanner + # google-cloud-speech + # google-cloud-storage-transfer + # google-cloud-tasks + # google-cloud-texttospeech + # google-cloud-translate + # google-cloud-videointelligence + # google-cloud-vision + # google-cloud-workflows +protobuf==4.24.4 + # via + # flyteidl + # google-ads + # google-api-core + # google-cloud-aiplatform + # google-cloud-appengine-logging + # google-cloud-audit-log + # google-cloud-automl + # google-cloud-batch + # google-cloud-bigquery + # google-cloud-bigquery-datatransfer + # google-cloud-bigquery-storage + # google-cloud-bigtable + # google-cloud-build + # google-cloud-compute + # google-cloud-container + # google-cloud-datacatalog + # google-cloud-dataflow-client + # google-cloud-dataform + # google-cloud-dataplex + # google-cloud-dataproc + # google-cloud-dataproc-metastore + # google-cloud-dlp + # google-cloud-kms + # google-cloud-language + # google-cloud-logging + # google-cloud-memcache + # google-cloud-monitoring + # google-cloud-orchestration-airflow + # google-cloud-os-login + # google-cloud-pubsub + # google-cloud-redis + # google-cloud-resource-manager + # google-cloud-run + # google-cloud-secret-manager + # google-cloud-spanner + # google-cloud-speech + # google-cloud-storage-transfer + # google-cloud-tasks + # google-cloud-texttospeech + # google-cloud-translate + # google-cloud-videointelligence + # google-cloud-vision + # google-cloud-workflows + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status + # opentelemetry-proto + # proto-plus + # protoc-gen-swagger +protoc-gen-swagger==0.1.0 + # via flyteidl +psutil==5.9.5 + # via apache-airflow +pyarrow==10.0.1 + # via + # db-dtypes + # flytekit + # pandas-gbq +pyasn1==0.5.0 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.3.0 + # via + # gcloud-aio-storage + # google-auth +pycparser==2.21 + # via cffi +pydantic==2.4.2 + # via apache-airflow +pydantic-core==2.10.1 + # via pydantic +pydata-google-auth==1.8.2 + # via pandas-gbq +pygments==2.16.1 + # via + # apache-airflow + # rich +pyjwt[crypto]==2.8.0 + # via + # apache-airflow + # flask-appbuilder + # flask-jwt-extended + # gcloud-aio-auth + # msal +pyopenssl==23.2.0 + # via + # apache-airflow-providers-google + # flytekit +pyparsing==3.1.1 + # via httplib2 +python-daemon==3.0.1 + # via apache-airflow +python-dateutil==2.8.2 + # via + # apache-airflow + # arrow + # botocore + # croniter + # flask-appbuilder + # flytekit + # google-cloud-bigquery + # kubernetes + # pandas + # pendulum +python-json-logger==2.0.7 + # via flytekit +python-nvd3==0.15.0 + # via apache-airflow +python-slugify==8.0.1 + # via + # apache-airflow + # cookiecutter + # python-nvd3 +pytimeparse==1.1.8 + # via flytekit +pytz==2023.3.post1 + # via + # flask-babel + # flytekit + # pandas +pytzdata==2020.1 + # via pendulum +pyyaml==6.0.1 + # via + # apispec + # clickclick + # connexion + # cookiecutter + # flytekit + # google-ads + # kubernetes +referencing==0.30.2 + # via + # jsonschema + # jsonschema-specifications +regex==2023.10.3 + # via docker-image-py +requests==2.31.0 + # via + # apache-airflow-providers-http + # azure-core + # azure-datalake-store + # connexion + # cookiecutter + # docker + # flytekit + # gcsfs + # google-api-core + # google-cloud-bigquery + # google-cloud-storage + # kubernetes + # looker-sdk + # msal + # opentelemetry-exporter-otlp-proto-http + # requests-oauthlib + # requests-toolbelt +requests-oauthlib==1.3.1 + # via + # google-auth-oauthlib + # kubernetes +requests-toolbelt==1.0.0 + # via apache-airflow-providers-http +rfc3339-validator==0.1.4 + # via apache-airflow +rich==13.6.0 + # via + # apache-airflow + # cookiecutter + # flask-limiter + # flytekit + # rich-argparse + # rich-click +rich-argparse==1.3.0 + # via apache-airflow +rich-click==1.6.1 + # via flytekit +rpds-py==0.10.3 + # via + # jsonschema + # referencing +rsa==4.9 + # via + # gcloud-aio-storage + # google-auth +s3fs==2023.9.2 + # via flytekit +setproctitle==1.3.3 + # via apache-airflow +shapely==1.8.5.post1 + # via google-cloud-aiplatform +six==1.16.0 + # via + # azure-core + # isodate + # kubernetes + # prison + # python-dateutil + # rfc3339-validator +smmap==5.0.1 + # via gitdb +sniffio==1.3.0 + # via + # anyio + # httpcore + # httpx +sortedcontainers==2.4.0 + # via flytekit +sqlalchemy==1.4.49 + # via + # alembic + # apache-airflow + # flask-appbuilder + # flask-sqlalchemy + # marshmallow-sqlalchemy + # sqlalchemy-bigquery + # sqlalchemy-jsonfield + # sqlalchemy-spanner + # sqlalchemy-utils +sqlalchemy-bigquery==1.8.0 + # via apache-airflow-providers-google +sqlalchemy-jsonfield==1.0.1.post0 + # via apache-airflow +sqlalchemy-spanner==1.6.2 + # via apache-airflow-providers-google +sqlalchemy-utils==0.41.1 + # via flask-appbuilder +sqlparse==0.4.4 + # via + # apache-airflow-providers-common-sql + # google-cloud-spanner +statsd==3.3.0 + # via flytekit +tabulate==0.9.0 + # via apache-airflow +tenacity==8.2.3 + # via apache-airflow +termcolor==2.3.0 + # via apache-airflow +text-unidecode==1.3 + # via python-slugify +types-python-dateutil==2.8.19.14 + # via arrow +typing-extensions==4.8.0 + # via + # alembic + # apache-airflow + # asgiref + # azure-core + # azure-storage-blob + # cattrs + # flask-limiter + # flytekit + # limits + # looker-sdk + # opentelemetry-sdk + # pydantic + # pydantic-core + # typing-inspect +typing-inspect==0.9.0 + # via dataclasses-json +uc-micro-py==1.0.2 + # via linkify-it-py +unicodecsv==0.14.1 + # via apache-airflow +uritemplate==4.1.1 + # via google-api-python-client +urllib3==1.26.17 + # via + # botocore + # docker + # flytekit + # kubernetes + # requests +websocket-client==1.6.3 + # via + # docker + # kubernetes +werkzeug==2.2.3 + # via + # apache-airflow + # connexion + # flask + # flask-jwt-extended + # flask-login +wheel==0.41.2 + # via flytekit +wrapt==1.15.0 + # via + # aiobotocore + # deprecated + # flytekit +wtforms==3.0.1 + # via + # flask-appbuilder + # flask-wtf +yarl==1.9.2 + # via aiohttp +zipp==3.17.0 + # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/plugins/flytekit-airflow/setup.py b/plugins/flytekit-airflow/setup.py new file mode 100644 index 0000000000..f077e174b1 --- /dev/null +++ b/plugins/flytekit-airflow/setup.py @@ -0,0 +1,43 @@ +from setuptools import setup + +PLUGIN_NAME = "airflow" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = [ + "apache-airflow", + "jsonpickle", + "flytekit>=1.9.0", + "google-cloud-orchestration-airflow", + "apache-airflow-providers-google", +] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="This package holds the Airflow plugins for flytekit", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + entry_points={"flytekit.plugins": [f"{PLUGIN_NAME}=flytekitplugins.{PLUGIN_NAME}"]}, +) diff --git a/plugins/flytekit-airflow/tests/__init__.py b/plugins/flytekit-airflow/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-airflow/tests/test_agent.py b/plugins/flytekit-airflow/tests/test_agent.py new file mode 100644 index 0000000000..937a9adcd5 --- /dev/null +++ b/plugins/flytekit-airflow/tests/test_agent.py @@ -0,0 +1,25 @@ +from datetime import datetime, timedelta + +from airflow.operators.python import PythonOperator +from airflow.sensors.bash import BashSensor +from airflow.sensors.time_sensor import TimeSensor +from pytz import UTC + +from flytekit import workflow + + +def py_func(): + print("airflow python sensor") + return True + + +@workflow +def wf(): + sensor = TimeSensor(task_id="fire_immediately", target_time=(datetime.now(tz=UTC) + timedelta(seconds=1)).time()) + t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0") + foo = PythonOperator(task_id="foo", python_callable=py_func) + sensor >> t3 >> foo + + +def test_airflow_agent(): + wf()