From 0ff852189ce03341c3c3b1d31d98065fdd6be866 Mon Sep 17 00:00:00 2001 From: "Mats E. Mollestad" Date: Wed, 22 May 2024 08:10:57 +0200 Subject: [PATCH] Added hot-reloading on model promotion --- .dockerignore | 1 + .gitignore | 1 + Makefile | 2 +- README.md | 6 +---- docker-compose.yaml | 12 ++++++--- docker/Dockerfile.catalog | 5 ++-- docker/Dockerfile.serve | 3 ++- serve_model_locally.py | 47 ++++++++++++++++++++++++++++++++++++ src/movie_review/train.py | 11 ++++----- src/pipelines/train.py | 12 ++++----- src/wine/train.py | 4 +-- tests/test_train_pipeline.py | 4 +-- 12 files changed, 79 insertions(+), 29 deletions(-) create mode 100644 serve_model_locally.py diff --git a/.dockerignore b/.dockerignore index b35fa40..795e881 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,3 +3,4 @@ mlflow docker docker-compose.yaml .git +tests diff --git a/.gitignore b/.gitignore index cf6a17a..ecaeda5 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ data/sentiment/datasets.json data/movie_review_is_negative mlflow +data/**/datasets/**/* # Dont check-in database folder .pg/* diff --git a/Makefile b/Makefile index c975ea0..141b3b3 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: clean -build: +clean: docker system prune -f .PHONY: build diff --git a/README.md b/README.md index 42de973..6254f81 100644 --- a/README.md +++ b/README.md @@ -84,11 +84,7 @@ class WineIsHighQuality: ### 3. Create a training pipeline -Setup your Prefect training pipeline, which is just regular python methods with some decorator around them. - -You can see an example of how you can use aligned to create, and store datasets, which can later be used for debugging, monitoring etc. - -When you have trained and stored a model through ML flow +Create your own pipeline logic, or maybe reuse the generic classification pipeline, `classifier_from_train_test_set` which is located at `src/pipelines/train.py`. Remember to add the pipeline to `src/pipelines/available.py` to make it visible in the Prefect UI. diff --git a/docker-compose.yaml b/docker-compose.yaml index 1e987fb..3072fcd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,7 +9,7 @@ services: mlflow-tracker: image: project-base - command: "mlflow server --backend-store-uri file:///app/mlflow-server --host 0.0.0.0 --port 8000" + command: "mlflow server --backend-store-uri file:///app/mlflow-server/experiments --artifacts-destination file:///app/mlflow-server/artifacts --host 0.0.0.0 --port 8000" ports: - 7999:8000 volumes: @@ -23,13 +23,16 @@ services: build: context: . dockerfile: docker/Dockerfile.serve - command: "mlflow models serve -m models:/movie_review_is_negative@champion --port 8080 --host 0.0.0.0 --no-conda --enable-mlserver" + command: "python -m serve_model_locally serve-mlflow-model movie_review_is_negative" ports: - 8081:8080 environment: - MLFLOW_TRACKING_URI=http://mlflow-tracker:8000 volumes: - ./src:/app/src + - ./serve_model_locally.py:/app/serve_model_locally.py + # Adding mlflow to automatically update on model promption + - ./mlflow/experiments/models:/app/mlflow/experiments/models depends_on: - base-image @@ -38,13 +41,16 @@ services: build: context: . dockerfile: docker/Dockerfile.serve - command: "mlflow models serve -m models:/is_high_quality_wine@champion --port 8080 --host 0.0.0.0 --no-conda --enable-mlserver" + command: "python -m serve_model_locally serve-mlflow-model is_high_quality_wine" ports: - 8080:8080 environment: - MLFLOW_TRACKING_URI=http://mlflow-tracker:8000 volumes: - ./src:/app/src + - ./serve_model_locally.py:/app/serve_model_locally.py + # Adding mlflow to automatically update on model promption + - ./mlflow/experiments/models:/app/mlflow/experiments/models extra_hosts: - host.docker.internal:host-gateway depends_on: diff --git a/docker/Dockerfile.catalog b/docker/Dockerfile.catalog index 06d5af4..cd477ce 100644 --- a/docker/Dockerfile.catalog +++ b/docker/Dockerfile.catalog @@ -1,3 +1,4 @@ -FROM aligned-catalog-free +FROM matsmoll/aligned-catalog-free -RUN pip install mlflow prefect +COPY poetry.lock pyproject.toml /app/ +RUN poetry install --without dev diff --git a/docker/Dockerfile.serve b/docker/Dockerfile.serve index 92b6984..2f39f18 100644 --- a/docker/Dockerfile.serve +++ b/docker/Dockerfile.serve @@ -1,5 +1,6 @@ FROM project-base:latest -RUN pip install mlflow[extras] +RUN pip install mlflow[extras]==2.13.0 +RUN pip install watchfiles COPY . /app diff --git a/serve_model_locally.py b/serve_model_locally.py new file mode 100644 index 0000000..7c35d29 --- /dev/null +++ b/serve_model_locally.py @@ -0,0 +1,47 @@ +from pathlib import Path +import mlflow +import click +import subprocess + +@click.group() +def cli() -> None: + pass + +def start_mlfow_server(model_name: str, alias: str, port: int, host: str): + uri = f"models:/{model_name}@{alias}" + print(uri) + try: + mlflow.models.get_model_info(uri) + except Exception as e: + print("Remember to start the tracking server, or train a model first.") + raise e + + subprocess.run([ + "mlflow", "models", "serve", "-m", uri, "--port", str(port), "--host", host, "--no-conda", "--enable-mlserver" + ]) + +@cli.command() +@click.argument("model_name", type=str) +@click.option("--alias", type=str, default="champion") +@click.option("--mlflow-dir", type=Path, default="mlflow/experiments") +@click.option("--port", type=int, default="8080") +@click.option("--host", type=str, default="0.0.0.0") +def serve_mlflow_model( + model_name: str, + alias: str, + mlflow_dir: Path, + port: int, + host: str +): + from watchfiles import run_process + + model_alias_file = mlflow_dir / "models" / model_name / "aliases" / alias + + run_process( + model_alias_file.resolve(), + target=start_mlfow_server, + args=(model_name, alias, port, host) + ) + +if __name__ == "__main__": + cli() diff --git a/src/movie_review/train.py b/src/movie_review/train.py index 6595122..9f8acf8 100644 --- a/src/movie_review/train.py +++ b/src/movie_review/train.py @@ -2,7 +2,7 @@ from sklearn.ensemble import RandomForestClassifier from aligned import ContractStore, FileSource -from src.pipelines.train import classifier_from_train_test_validation_set, load_store +from src.pipelines.train import classifier_from_train_test_set, load_store import numpy as np import polars as pl @@ -24,9 +24,8 @@ async def equal_distribution_entities(number_of_records: int, store: ContractSto async def train_sentiment( number_of_records: int = 1000, search_params: dict | None = None, - train_size: float = 0.6, - test_size: float = 0.2, - validate_size: float = 0.2, + train_size: float = 0.75, + test_size: float = 0.15, dataset_id: str | None = None, ): store: ContractStore = await load_store() @@ -42,10 +41,10 @@ async def train_sentiment( ) dataset_dir = FileSource.directory(f"data/movie_review_is_negative/datasets") - total_size = train_size + test_size + validate_size + total_size = train_size + test_size - await classifier_from_train_test_validation_set( + await classifier_from_train_test_set( store=store, model_contract="movie_review_is_negative", entities=entities, diff --git a/src/pipelines/train.py b/src/pipelines/train.py index 4eb5d5b..16e83cd 100644 --- a/src/pipelines/train.py +++ b/src/pipelines/train.py @@ -210,7 +210,7 @@ async def evaluate_model( -async def classifier_from_train_test_validation_set( +async def classifier_from_train_test_set( store: ContractStore, model_contract: str, entities: ConvertableToRetrivalJob | RetrivalJob, @@ -236,19 +236,17 @@ async def classifier_from_train_test_validation_set( if model_contract_version: model_store = model_store.using_version(model_contract_version) - train, test, validate = await generate_train_test_validate( + train, test = await generate_train_test( model_store, entities, - dataset_dir, - train_size, - test_size, - dataset_id=dataset_id + dataset_dir=dataset_dir, + dataset_id=dataset_id, + train_size=train_size, ) datasets = [ ("train", train), ("test", test), - ("validate", validate) ] with tracker.start_run(run_name=model_contract): diff --git a/src/wine/train.py b/src/wine/train.py index 8432486..d5ed064 100644 --- a/src/wine/train.py +++ b/src/wine/train.py @@ -3,7 +3,7 @@ from sklearn.ensemble import RandomForestClassifier import numpy as np -from src.pipelines.train import classifier_from_train_test_validation_set, load_store +from src.pipelines.train import classifier_from_train_test_set, load_store @flow async def train_wine_model( @@ -24,7 +24,7 @@ async def train_wine_model( dataset_dir = FileSource.directory("data/wine/datasets") total_size = train_size + test_size + validate_size - await classifier_from_train_test_validation_set( + await classifier_from_train_test_set( store=store, model_contract="is_high_quality_wine", entities=entities, diff --git a/tests/test_train_pipeline.py b/tests/test_train_pipeline.py index a7f583f..d448361 100644 --- a/tests/test_train_pipeline.py +++ b/tests/test_train_pipeline.py @@ -9,7 +9,7 @@ from src.model_registry import InMemoryModelRegristry from src.experiment_tracker import StdoutExperimentTracker -from src.pipelines.train import classifier_from_train_test_validation_set +from src.pipelines.train import classifier_from_train_test_set async def setup_store(): store = await ContractStore.from_dir(".") @@ -39,7 +39,7 @@ async def test_generic_classifier_train_pipeline_using_prefect(): @flow(name="test") async def test_flow(): - await classifier_from_train_test_validation_set( + await classifier_from_train_test_set( store=store, model_contract=MovieReviewIsNegative.metadata.name, entities={