From c926fadb51cc16b46a94d7a05a347f11332329b3 Mon Sep 17 00:00:00 2001 From: Alicia Date: Thu, 2 Nov 2023 11:14:35 +0100 Subject: [PATCH] finish the skeleton of pipes --- data/notebooks/preprocessing_pipes.ipynb | 200 ++++++++++++++++++ .../tiles_generation/EEZ_tiles.ipynb | 134 ------------ data/src/pipelines/base_pipe.py | 149 +++++++++---- .../intermediate_pipes/EEZIntermediatePipe.py | 178 ++++++++++++++++ .../pipelines/intermediate_pipes/__init__.py | 16 ++ data/src/pipelines/pipes.py | 57 ++++- data/src/pipelines/precalc_pipes.py | 7 - .../pipelines/precalc_pipes/EEZPrecalcPipe.py | 27 +++ data/src/pipelines/precalc_pipes/__init__.py | 16 ++ data/src/pipelines/settings.py | 15 +- data/src/pipelines/tiles_pipes.py | 134 ------------ .../src/pipelines/tiles_pipes/EEZTilesPipe.py | 50 +++++ data/src/pipelines/tiles_pipes/__init__.py | 16 ++ data/src/pipelines/utils.py | 17 ++ data/src/utils.py | 47 ++-- 15 files changed, 729 insertions(+), 334 deletions(-) create mode 100644 data/notebooks/preprocessing_pipes.ipynb delete mode 100644 data/notebooks/tiles_generation/EEZ_tiles.ipynb create mode 100644 data/src/pipelines/intermediate_pipes/EEZIntermediatePipe.py create mode 100644 data/src/pipelines/intermediate_pipes/__init__.py delete mode 100644 data/src/pipelines/precalc_pipes.py create mode 100644 data/src/pipelines/precalc_pipes/EEZPrecalcPipe.py create mode 100644 data/src/pipelines/precalc_pipes/__init__.py delete mode 100644 data/src/pipelines/tiles_pipes.py create mode 100644 data/src/pipelines/tiles_pipes/EEZTilesPipe.py create mode 100644 data/src/pipelines/tiles_pipes/__init__.py create mode 100644 data/src/pipelines/utils.py diff --git a/data/notebooks/preprocessing_pipes.ipynb b/data/notebooks/preprocessing_pipes.ipynb new file mode 100644 index 00000000..95143877 --- /dev/null +++ b/data/notebooks/preprocessing_pipes.ipynb @@ -0,0 +1,200 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "79966c59", + "metadata": {}, + "source": [ + "This notebook relies on the following cmd line tools, ensure they are installed and in the system\n", + "- tippecanoe\n", + "- mapshaper\n", + "- aws cli" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "def1ef65-1fd6-4367-a620-2dbe415f7104", + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "import sys\n", + "import logging\n", + "\n", + "from IPython.lib import backgroundjobs as bg\n", + "\n", + "scripts_dir = Path('..').joinpath('src')\n", + "if scripts_dir not in sys.path:\n", + " sys.path.insert(0, scripts_dir.resolve().as_posix())\n", + "\n", + "from pipelines.pipes import get_pipes, execution_order, filter_pipes\n", + "\n", + "logging.basicConfig(level=logging.DEBUG)\n", + "logging.getLogger(\"requests\").setLevel(logging.WARNING)\n", + "logging.getLogger(\"urllib3\").setLevel(logging.WARNING)\n", + "logging.getLogger(\"fiona\").setLevel(logging.WARNING)\n", + "jobs = bg.BackgroundJobManager()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "38e82156", + "metadata": {}, + "outputs": [], + "source": [ + "# def execute_pipes(pipes):\n", + "# ordered_pipes = execution_order(pipes)\n", + "# for pipe in ordered_pipes:\n", + "# jobs.new(pipe().execute)\n", + "\n", + "# # this code will execute all pipelines selected in background.\n", + "\n", + "# mypipes_subset = filter_pipes(get_pipes().items(), ['eez_tiles'])\n", + "# execute_pipes(mypipes_subset)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "47a451d2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'eez_intermediate': pipelines.intermediate_pipes.EEZIntermediatePipe.EEZIntermediatePipe,\n", + " 'eez_tiles': pipelines.tiles_pipes.EEZTilesPipe.EEZTilesPipe,\n", + " 'eez_precalc': pipelines.precalc_pipes.EEZPrecalcPipe.EEZPrecalcPipe}" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "get_pipes()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "d85fee01", + "metadata": {}, + "outputs": [], + "source": [ + "mypipes_subset = filter_pipes(get_pipes(), ['eez_intermediate'])" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "4798cd04", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "DEBUG:pipelines.settings:/home/mambauser/data\n", + "INFO:pipelines.base_pipe:Pipeline eez_intermediate running at 2023-11-02 10:12:39.302930: starting extract...\n", + "INFO:utils:File /home/mambauser/data/eez_intermediate/World_EEZ_v11_20191118.zip already exists.\n", + "INFO:utils:File /home/mambauser/data/eez_intermediate/World_High_Seas_v1_20200826.zip already exists.\n", + "INFO:pipelines.base_pipe:Pipeline eez_intermediate finish at 2023-11-02 10:12:39.304361: Success executing extract\n", + "INFO:pipelines.base_pipe:Pipeline eez_intermediate running at 2023-11-02 10:12:39.304695: starting transform...\n", + "INFO:pipelines.base_pipe:Pipeline eez_intermediate finish at 2023-11-02 10:12:39.305303: Success executing transform\n", + "INFO:pipelines.base_pipe:Pipeline eez_intermediate running at 2023-11-02 10:12:39.305575: starting load...\n", + "DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token\n", + "ERROR:pipelines.base_pipe:Pipeline eez_intermediate dead at 2023-11-02 10:12:39.608183: Traceback (most recent call last):\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2602, in _prep_and_do_upload\n", + " created_json = self._do_upload(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2425, in _do_upload\n", + " response = self._do_resumable_upload(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2243, in _do_resumable_upload\n", + " upload, transport = self._initiate_resumable_upload(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2117, in _initiate_resumable_upload\n", + " upload.initiate(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/resumable_media/requests/upload.py\", line 420, in initiate\n", + " return _request_helpers.wait_and_retry(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/resumable_media/requests/_request_helpers.py\", line 155, in wait_and_retry\n", + " response = func()\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/resumable_media/requests/upload.py\", line 416, in retriable_request\n", + " self._process_initiate_response(result)\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/resumable_media/_upload.py\", line 518, in _process_initiate_response\n", + " _helpers.require_status_code(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/resumable_media/_helpers.py\", line 108, in require_status_code\n", + " raise common.InvalidResponse(\n", + "google.resumable_media.common.InvalidResponse: ('Request failed with status code', 403, 'Expected one of', , )\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/mambauser/src/pipelines/utils.py\", line 9, in check\n", + " func(self, *args, **kwargs)\n", + " File \"/home/mambauser/src/pipelines/base_pipe.py\", line 139, in load\n", + " writeReadGCP(\n", + " File \"/home/mambauser/src/utils.py\", line 104, in writeReadGCP\n", + " blob.upload_from_file(f)\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2761, in upload_from_file\n", + " self._prep_and_do_upload(\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 2620, in _prep_and_do_upload\n", + " _raise_from_invalid_response(exc)\n", + " File \"/opt/conda/lib/python3.10/site-packages/google/cloud/storage/blob.py\", line 4774, in _raise_from_invalid_response\n", + " raise exceptions.from_http_status(response.status_code, message, response=response)\n", + "google.api_core.exceptions.Forbidden: 403 POST https://storage.googleapis.com/upload/storage/v1/b/vector-data-raw/o?uploadType=resumable: {\n", + " \"error\": {\n", + " \"code\": 403,\n", + " \"message\": \"data-pipelines@x30-399415.iam.gserviceaccount.com does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).\",\n", + " \"errors\": [\n", + " {\n", + " \"message\": \"data-pipelines@x30-399415.iam.gserviceaccount.com does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist).\",\n", + " \"domain\": \"global\",\n", + " \"reason\": \"forbidden\"\n", + " }\n", + " ]\n", + " }\n", + "}\n", + ": ('Request failed with status code', 403, 'Expected one of', , )\n", + "\n" + ] + } + ], + "source": [ + "for n, pipe in mypipes_subset.items():\n", + " new_pipe = pipe()\n", + " new_pipe.extract().transform().load()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9dacf2db", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/data/notebooks/tiles_generation/EEZ_tiles.ipynb b/data/notebooks/tiles_generation/EEZ_tiles.ipynb deleted file mode 100644 index 4d073bb5..00000000 --- a/data/notebooks/tiles_generation/EEZ_tiles.ipynb +++ /dev/null @@ -1,134 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "79966c59", - "metadata": {}, - "source": [ - "This notebook relies on the following cmd line tools, ensure they are installed and in the system\n", - "- tippecanoe\n", - "- mapshaper\n", - "- aws cli" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "def1ef65-1fd6-4367-a620-2dbe415f7104", - "metadata": {}, - "outputs": [], - "source": [ - "from pathlib import Path\n", - "import sys\n", - "import logging\n", - "\n", - "from IPython.lib import backgroundjobs as bg\n", - "\n", - "scripts_dir = Path('../..').joinpath('src')\n", - "if scripts_dir not in sys.path:\n", - " sys.path.insert(0, scripts_dir.resolve().as_posix())\n", - " \n", - "from pipelines.pipes import get_pipes\n", - "\n", - "logging.basicConfig(level=logging.DEBUG)\n", - "logging.getLogger(\"requests\").setLevel(logging.WARNING)\n", - "logging.getLogger(\"urllib3\").setLevel(logging.WARNING)\n", - "jobs = bg.BackgroundJobManager()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2c4b3fc7", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "DEBUG:pipelines.settings:/home/mambauser/data\n", - "DEBUG:root:Starting job # 0 in a separate thread.\n", - "INFO:pipelines.base_pipe:Running eez_tiles pipeline\n", - "INFO:utils:File /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118.zip already exists.\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:utils:Unzip Finish.\n", - "Allocating 8 GB of heap memory\n", - "[o] Wrote /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118/eez_v11.shp\n", - "[o] Wrote /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118/eez_v11.shx\n", - "[o] Wrote /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118/eez_v11.dbf\n", - "[o] Wrote /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118/eez_v11.prj\n", - "INFO:mapbox_uploader:Uploading to Mapbox...\n", - "INFO:mapbox_uploader:Uploading to S3...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Completed 69.8 MiB/69.8 MiB (7.8 MiB/s) with 1 file(s) remaining \r" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:mapbox_uploader:CompletedProcess(args='aws s3 cp /home/mambauser/data/eez_tiles/World_EEZ_v11_20191118/eez_v11.mbtiles s3://tilestream-tilesets-production/53/_pending/h75tk8ezapo2qst37s0l04olc/skytruth --region us-east-1', returncode=0)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "upload: ../../data/eez_tiles/World_EEZ_v11_20191118/eez_v11.mbtiles to s3://tilestream-tilesets-production/53/_pending/h75tk8ezapo2qst37s0l04olc/skytruth\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|██████████| 100/100 [01:48<00:00, 1.08s/it]\n", - "INFO:mapbox_uploader:True\n" - ] - } - ], - "source": [ - "# this code will execute all pipelines selected in background.\n", - "mypipes_subset = {k: v for k, v in get_pipes().items() if k in ['eez_tiles']}\n", - "for name, pipe in mypipes_subset.items():\n", - " jobs.new(pipe().execute)" - ] - }, - { - "cell_type": "markdown", - "id": "0e1a542b", - "metadata": {}, - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.12" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/data/src/pipelines/base_pipe.py b/data/src/pipelines/base_pipe.py index 078a7788..013e8c79 100644 --- a/data/src/pipelines/base_pipe.py +++ b/data/src/pipelines/base_pipe.py @@ -1,73 +1,92 @@ from abc import ABC, abstractmethod, ABCMeta from dataclasses import dataclass -from pipelines.settings import Settings -from mapbox_uploader import uploadToMapbox -from pathlib import Path -from functools import lru_cache -from typing import Optional, TypeVar, Union, List from logging import getLogger +from pathlib import Path +from typing import Literal, Optional, TypeVar, Union, List +from datetime import datetime # import requests -# import time +from pipelines.settings import Settings, get_settings +from pipelines.utils import watch +from mapbox_uploader import uploadToMapbox +from utils import writeReadGCP -logger = getLogger(__name__) +# from strapi import Strapi -@lru_cache() -def get_settings(): - return Settings() +logger = getLogger(__name__) + +LOGGER_LVLS = { + "pending": logger.info, + "running": logger.info, + "finish": logger.info, + "dead": logger.error, +} @dataclass -class DownloadParams: - url: str - output_name: Optional[Path] = None +class ExtractParams: + source: str | Path output_path: Optional[Path] = None + output_name: Optional[str] = None body: Optional[dict] = None params: Optional[dict] = None - headers: Optional[dict] = None + headers: Optional[dict | List[dict]] = None @dataclass class TransformParams: - input_path: Optional[Path] = None + input_path: Optional[Path | List[Path]] = None files: Optional[Union[str, List[str]]] = None columns: Optional[Union[str, List[str]]] = None + rename: Optional[dict] = None @dataclass -class OutputParams: - input_path: Optional[Path] = None +class LoadParams: + input_path: Optional[Path | List[Path]] = None destination_name: Optional[str] = None +@dataclass +class Status: + status: Union[None, Literal["pending", "running", "finish", "dead"]] = None + step: Union[None, Literal["download", "transform", "load"]] = None + message: Union[None, str] = None + timestamp: Union[None, str] = None + + _Self = TypeVar("_Self", bound="BasePipe") # TODO: add a notification system in the pipeline class BasePipe(ABC): - settings = get_settings() + """Base class for all pipelines""" + + settings: Settings = get_settings() + status: Status = Status() force_clean: bool = False + depends_on: Union[None, List[str]] = None @property @abstractmethod - def pipeline_name(self: _Self) -> str: + def extract_params(self: _Self) -> ExtractParams | List[ExtractParams]: pass @property @abstractmethod - def download_params(self: _Self) -> DownloadParams: + def transform_params(self: _Self) -> TransformParams: pass @property @abstractmethod - def transform_params(self: _Self) -> TransformParams: + def load_params(self: _Self) -> LoadParams: pass @property @abstractmethod - def output_params(self: _Self) -> OutputParams: + def pipeline_name(self: _Self) -> str: pass @abstractmethod @@ -84,34 +103,90 @@ def load(self: _Self) -> _Self: def execute(self) -> None: """Run the pipeline""" - try: - logger.info(f"Running {self.pipeline_name} pipeline") - self.extract().transform().load() - # self.__notify(f"Pipeline {self.pipeline_name} finished") + self.extract().transform().load() + return self - except ValueError as e: - # self.__notify(f"Pipeline {self.pipeline_name} failed at {time.now()}") - raise e + def set_status(self, status: str, message: str) -> None: + self.status.status = status + self.status.message = message + self.status.timestamp = str(datetime.now()) + log = f"Pipeline {self.pipeline_name} {status} at {self.status.timestamp}: {message}" + LOGGER_LVLS[status](log) - def __notify(self, message: str) -> None: + def notify(self, message: str) -> None: # try: - # headers = { "Markdown": "yes", "Priority": "5", "Authorization": "Bearer {}".format(self.settings.notification_token)} - # r = requests.post(self.settings.notification_url, - # data=message.encode("utf-8"), - # headers=headers)) - # r.raise_for_status() + # headers = { "Markdown": "yes", "Priority": "5", "Authorization": "Bearer {}".format(self.settings.notification_token)} + # r = requests.post(self.settings.notification_url, + # data=message.encode("utf-8"), + # headers=headers)) + # r.raise_for_status() # except Exception as e: # logger.error(e) # pass pass +class IntermediateBasePipe(BasePipe, metaclass=ABCMeta): + """ + This is a base class for intermediate pipelines. It is used to + generate intermediate files that are used in the tiles or in the preprocess + pipeline, that require a common source file. + + """ + + @watch + def load(self): + writeReadGCP( + credentials=self.settings.GCS_KEYFILE_JSON, + bucket_name=self.settings.GCS_BUCKET, + blob_name=self.load_params.destination_name, + file=self.load_params.input_path, + operation="w", + ) + return self + + class VTBasePipe(BasePipe, metaclass=ABCMeta): + """Base class for vector tiles pipelines. It gets the data prepared in the intermediate pipelines and generates the vector tiles. finally it uploads the vector tiles to mapbox.""" + + @watch + def extract(self: _Self) -> _Self: + self.__set_status("running", "Download from GCP") + writeReadGCP( + credentials=self.settings.GCS_KEYFILE_JSON, + bucket_name=self.settings.GCS_BUCKET, + blob_name=self.extract_params.source, + file=self.extract_params.output_path, + operation="r", + ) + return self + + @watch def load(self): uploadToMapbox( - self.output_params.input_path, - self.output_params.destination_name, + self.load_params.input_path, + self.load_params.destination_name, self.settings.MAPBOX_USER, self.settings.MAPBOX_TOKEN, ) return self + + +class PreprocessBasePipe(BasePipe, metaclass=ABCMeta): + """This is a base class for preprocess pipelines, those that generates the statistics that uses the platform. It gets the data prepared in the intermediate pipelines and generates the statistics.""" + + @watch + def extract(self): + writeReadGCP( + credentials=self.settings.GCS_KEYFILE_JSON, + bucket_name=self.settings.GCS_BUCKET, + blob_name=self.extract_params.source, + file=self.extract_params.output_path, + operation="r", + ) + return self + + @watch + def load(self): + # Strapi() + return self diff --git a/data/src/pipelines/intermediate_pipes/EEZIntermediatePipe.py b/data/src/pipelines/intermediate_pipes/EEZIntermediatePipe.py new file mode 100644 index 00000000..c9a2f65a --- /dev/null +++ b/data/src/pipelines/intermediate_pipes/EEZIntermediatePipe.py @@ -0,0 +1,178 @@ +from logging import getLogger +import shutil +from pathlib import Path +import geopandas as gpd + +from pipelines.base_pipe import ( + IntermediateBasePipe, + ExtractParams, + TransformParams, + LoadParams, +) +from pipelines.utils import watch +from utils import downloadFile, rm_tree + + +logger = getLogger(__name__) + + +class EEZIntermediatePipe(IntermediateBasePipe): + pipeline_name = "eez_intermediate" + extract_params = [ + ExtractParams( + source="https://www.marineregions.org/download_file.php", + params={"name": "World_EEZ_v11_20191118.zip"}, + headers={ + "content-type": "application/x-www-form-urlencoded", + "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off", + "dnt": "1", + "origin": "https://www.marineregions.org", + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-site": "same-origin", + "sec-fetch-user": "?1", + "upgrade-insecure-requests": "1", + }, + body={ + "name": "Jason", + "organisation": "skytruth", + "email": "hello@skytruth.com", + "country": "Spain", + "user_category": "academia", + "purpose_category": "Conservation", + "agree": "1", + }, + ), + ExtractParams( + source="https://www.marineregions.org/download_file.php", + params={"name": "World_High_Seas_v1_20200826.zip"}, + headers={ + "content-type": "application/x-www-form-urlencoded", + "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off", + "dnt": "1", + "origin": "https://www.marineregions.org", + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-site": "same-origin", + "sec-fetch-user": "?1", + "upgrade-insecure-requests": "1", + }, + body={ + "name": "Jason", + "organisation": "skytruth", + "email": "hello@skytruth.com", + "country": "Spain", + "user_category": "academia", + "purpose_category": "Conservation", + "agree": "1", + }, + ), + ] + transform_params = TransformParams( + files=["eez_v11.shp", "High_Seas_v1.shp"], + columns=[ + "MRGID", + "GEONAME", + "POL_TYPE", + "ISO_SOV1", + "ISO_SOV2", + "ISO_SOV3", + "AREA_KM2", + ], + rename={"name": "GEONAME", "area_km2": "AREA_KM2", "mrgid": "MRGID"}, + ) + load_params = LoadParams(destination_name=f"eez/{pipeline_name}.zip") + + def __init__(self, force_clean: bool = False) -> None: + super().__init__() + self.folder_path = self.settings.DATA_DIR.joinpath(self.pipeline_name) + self.force_clean = force_clean + self.settings.validate_config() + + @watch + def extract(self): + instructions = self.extract_params + if not isinstance(instructions, list): + instructions = [instructions] + + output_paths = [] + for instruction in instructions: + output_paths.append( + downloadFile( + instruction.source, + self.folder_path, + instruction.body, + instruction.params, + instruction.headers, + overwrite=self.force_clean, + ) + ) + + self.transform_params.input_path = output_paths + return self + + @watch + def transform(self): + self.load_params.input_path = Path( + f"{self.folder_path}/{self.pipeline_name}.zip" + ) + + if not self.force_clean and self.load_params.input_path.exists(): + return self + + # unzip file if needed & load data + unziped_folders = [] + for idx, path in enumerate(self.transform_params.input_path): + unziped_folder = self.folder_path.joinpath(path.stem) + + if self.force_clean: + rm_tree(unziped_folder) + + shutil.unpack_archive( + path, self.folder_path if idx == 0 else unziped_folder + ) + + unziped_folders.append( + gpd.read_file(unziped_folder.joinpath(self.transform_params.files[idx])) + ) + + # Transform data using geopandas + unziped_folders[1] = ( + unziped_folders[1] + .rename( + columns=self.transform_params.rename, + ) + .assign( + POL_TYPE="High Seas", + ISO_SOV1="ABNJ", + ) + ) + + # merge datasets + df = pd.concat(unziped_folders, ignore_index=True) + + df.drop( + columns=list( + set(df.columns) - set([*self.transform_params.columns, "geometry"]) + ), + inplace=True, + ) + + # save data + input_folder = self.load_params.input_path.parent.joinpath( + self.load_params.input_path.stem + ) + + gpd.GeoDataFrame( + df, + crs=unziped_folders[0].crs, + ).to_file(filename=input_folder, driver="ESRI Shapefile") + + shutil.make_archive(input_folder, "zip") + + # clean unzipped files + rm_tree(input_folder) + for folder in self.transform_params.input_path: + rm_tree(self.folder_path.joinpath(folder.stem)) + + return self diff --git a/data/src/pipelines/intermediate_pipes/__init__.py b/data/src/pipelines/intermediate_pipes/__init__.py new file mode 100644 index 00000000..12ca8da9 --- /dev/null +++ b/data/src/pipelines/intermediate_pipes/__init__.py @@ -0,0 +1,16 @@ +from inspect import isclass +from pkgutil import iter_modules +from pathlib import Path +from importlib import import_module + +# iterate through the modules in the current package +package_dir = Path(__file__).resolve().parent.as_posix() +for _, module_name, _ in iter_modules([package_dir]): + # import the module and iterate through its attributes + module = import_module(f"{__name__}.{module_name}") + for attribute_name in dir(module): + attribute = getattr(module, attribute_name) + + if isclass(attribute): + # Add the class to this package's variables + globals()[attribute_name] = attribute diff --git a/data/src/pipelines/pipes.py b/data/src/pipelines/pipes.py index e6c0be03..044ac810 100644 --- a/data/src/pipelines/pipes.py +++ b/data/src/pipelines/pipes.py @@ -1,12 +1,20 @@ import inspect +from typing import List from functools import lru_cache -from pipelines.base_pipe import BasePipe, VTBasePipe +from pipelines.base_pipe import PreprocessBasePipe, VTBasePipe, IntermediateBasePipe from pipelines import tiles_pipes from pipelines import precalc_pipes +from pipelines import intermediate_pipes @lru_cache def get_pipes() -> dict: + i_p = { + cls.pipeline_name: cls + for _, cls in inspect.getmembers(intermediate_pipes, inspect.isclass) + if issubclass(cls, IntermediateBasePipe) and cls != IntermediateBasePipe + } + t_p = { cls.pipeline_name: cls for _, cls in inspect.getmembers(tiles_pipes, inspect.isclass) @@ -16,10 +24,10 @@ def get_pipes() -> dict: stat_p = { cls.pipeline_name: cls for _, cls in inspect.getmembers(precalc_pipes, inspect.isclass) - if issubclass(cls, BasePipe) and cls != BasePipe + if issubclass(cls, PreprocessBasePipe) and cls != PreprocessBasePipe } - return {**t_p, **stat_p} + return {**i_p, **t_p, **stat_p} def get_pipes_names(): @@ -28,3 +36,46 @@ def get_pipes_names(): def get_pipe_by_name(name): return get_pipes()[name]() + + +def filter_pipes(pipes: dict, filter: List[str]): + return {k: v for k, v in pipes.items() if k in filter} + + +def execution_order(pipes: dict): + """ + Orders a list of basePipe classes or subclasses based on their dependencies. + """ + + # Create a dictionary to hold the dependencies of each pipe + dependencies = {pipe.pipeline_name: set(pipe.depends_on) for pipe in pipes.values()} + + # Create a set to hold the pipes that have no dependencies + no_deps = set(pipes.keys()) - set(dependencies.keys()) + + # Create a list to hold the ordered pipes + ordered_pipes = [] + + # Loop until all pipes have been ordered + while no_deps: + # Pop a pipe with no dependencies from the set + pipe_name = no_deps.pop() + pipe = pipes[pipe_name] + + # Add the pipe to the ordered list + ordered_pipes.append(pipe) + + # Loop through the dependencies of the pipe + for dep in dependencies.get(pipe_name, set()): + # Remove the dependency from the set + dependencies[pipe_name].remove(dep) + + # If the dependency has no more dependencies, add it to the set + if not dependencies.get(dep, set()): + no_deps.add(dep) + + # If there are still dependencies left, raise an error + if any(dependencies.values()): + raise ValueError("Circular dependency detected") + + return ordered_pipes diff --git a/data/src/pipelines/precalc_pipes.py b/data/src/pipelines/precalc_pipes.py deleted file mode 100644 index b8efea86..00000000 --- a/data/src/pipelines/precalc_pipes.py +++ /dev/null @@ -1,7 +0,0 @@ -from pipelines.base_pipe import ( - BasePipe, - DownloadParams, - TransformParams, - OutputParams, - get_settings, -) diff --git a/data/src/pipelines/precalc_pipes/EEZPrecalcPipe.py b/data/src/pipelines/precalc_pipes/EEZPrecalcPipe.py new file mode 100644 index 00000000..009d84f6 --- /dev/null +++ b/data/src/pipelines/precalc_pipes/EEZPrecalcPipe.py @@ -0,0 +1,27 @@ +from logging import getLogger +from pipelines.base_pipe import ( + PreprocessBasePipe, + ExtractParams, + TransformParams, + LoadParams, +) + + +logger = getLogger(__name__) + + +class EEZPrecalcPipe(PreprocessBasePipe): + pipeline_name = "eez_precalc" + depends_on = ["eez_intermediate"] + extract_params = ExtractParams( + source="eez/eez_intermediate.zip", + output_path="data/eez_intermediate", + ) + transform_params = TransformParams( + files="eez_v11.shp", + columns=["GEONAME", "POL_TYPE", "ISO_SOV1", "ISO_SOV2", "ISO_SOV3"], + ) + load_params = LoadParams(destination_name="locations") + + def transform(self): + return self diff --git a/data/src/pipelines/precalc_pipes/__init__.py b/data/src/pipelines/precalc_pipes/__init__.py new file mode 100644 index 00000000..12ca8da9 --- /dev/null +++ b/data/src/pipelines/precalc_pipes/__init__.py @@ -0,0 +1,16 @@ +from inspect import isclass +from pkgutil import iter_modules +from pathlib import Path +from importlib import import_module + +# iterate through the modules in the current package +package_dir = Path(__file__).resolve().parent.as_posix() +for _, module_name, _ in iter_modules([package_dir]): + # import the module and iterate through its attributes + module = import_module(f"{__name__}.{module_name}") + for attribute_name in dir(module): + attribute = getattr(module, attribute_name) + + if isclass(attribute): + # Add the class to this package's variables + globals()[attribute_name] = attribute diff --git a/data/src/pipelines/settings.py b/data/src/pipelines/settings.py index c3960158..65ed29bb 100644 --- a/data/src/pipelines/settings.py +++ b/data/src/pipelines/settings.py @@ -1,14 +1,27 @@ import logging +from functools import lru_cache +from typing import Any, Dict from pydantic_settings import BaseSettings, SettingsConfigDict from pathlib import Path logger = logging.getLogger(__name__) +@lru_cache() +def get_settings(): + return Settings() + + class Settings(BaseSettings): + DATA_DIR: Path MAPBOX_USER: str MAPBOX_TOKEN: str - DATA_DIR: Path + STRAPI_URL: str + STRAPI_JWT: str + NOTIFY_URL: str + NOTIFY_JWT: str + GCS_BUCKET: str + GCS_KEYFILE_JSON: Dict[str, Any] # model_config = SettingsConfigDict(env_file=".env") diff --git a/data/src/pipelines/tiles_pipes.py b/data/src/pipelines/tiles_pipes.py deleted file mode 100644 index 5951c09a..00000000 --- a/data/src/pipelines/tiles_pipes.py +++ /dev/null @@ -1,134 +0,0 @@ -from pipelines.base_pipe import ( - VTBasePipe, - DownloadParams, - TransformParams, - OutputParams, -) - -from utils import downloadFile, unzipFile -from tippcanoe import mbtileGeneration -from mapshaper import Mapshaper -import logging - -logger = logging.getLogger(__name__) - - -class EEZTilesPipe(VTBasePipe): - pipeline_name = "eez_tiles" - download_params = DownloadParams( - url="https://www.marineregions.org/download_file.php", - output_name="eez_tiles.zip", - params={"name": "World_EEZ_v11_20191118.zip"}, - headers={ - "content-type": "application/x-www-form-urlencoded", - "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off", - "dnt": "1", - "origin": "https://www.marineregions.org", - "sec-fetch-dest": "document", - "sec-fetch-mode": "navigate", - "sec-fetch-site": "same-origin", - "sec-fetch-user": "?1", - "upgrade-insecure-requests": "1", - }, - body={ - "name": "Jason", - "organisation": "skytruth", - "email": "hello@skytruth.com", - "country": "Spain", - "user_category": "academia", - "purpose_category": "Conservation", - "agree": "1", - }, - ) - transform_params = TransformParams( - files="eez_v11.shp", - columns=["GEONAME", "POL_TYPE", "ISO_SOV1", "ISO_SOV2", "ISO_SOV3"], - ) - output_params = OutputParams(destination_name="eez_v11-4pg8or") - - def __init__(self, force_clean: bool = False) -> None: - super().__init__() - self.folder_path = self.settings.DATA_DIR.joinpath(self.pipeline_name) - self.force_clean = force_clean - self.settings.validate_config() - - def extract(self): - self.transform_params.input_path = downloadFile( - self.download_params.url, - self.folder_path, - self.download_params.body, - self.download_params.params, - overwrite=self.force_clean, - ) - return self - - def transform(self): - unziped_folder = unzipFile(self.transform_params.input_path) - - file = unziped_folder.joinpath(self.transform_params.files) - keep_fields = ( - ",".join(self.transform_params.columns) - if isinstance(self.transform_params.columns, list) - else self.transform_params.columns - ) - - Mapshaper(8).input([file.as_posix()]).filter_fields(fields=keep_fields).output( - file.as_posix(), force=True - ).execute() - - self.output_params.input_path = mbtileGeneration(file) - return self - - -class MPATilesPipe(VTBasePipe): - pipeline_name = "mpa_tiles" - download_params = DownloadParams( - url="https://www.marineregions.org/download_file.php", - output_name="eez_tiles.zip", - params={"name": "World_EEZ_v11_20191118.zip"}, - headers={ - "content-type": "application/x-www-form-urlencoded", - "cookie": "PHPSESSID=29190501b4503e4b33725cd6bd01e2c6; vliz_webc=vliz_webc2; jwplayer.captionLabel=Off", - "dnt": "1", - "origin": "https://www.marineregions.org", - "sec-fetch-dest": "document", - "sec-fetch-mode": "navigate", - "sec-fetch-site": "same-origin", - "sec-fetch-user": "?1", - "upgrade-insecure-requests": "1", - }, - body={ - "name": "Jason", - "organisation": "skytruth", - "email": "hello@skytruth.com", - "country": "Spain", - "user_category": "academia", - "purpose_category": "Conservation", - "agree": "1", - }, - ) - transform_params = TransformParams(files="eez_v11.shp") - output_params = OutputParams(destination_name="eez_tiles.mbtiles") - - def __init__(self) -> None: - super().__init__() - self.folder_path = self.settings.DATA_DIR.joinpath(self.pipeline_name) - self.settings.validate_config() - - def extract(self): - self.transform_params.input_path = downloadFile( - self.download_params.url, - self.folder_path, - self.download_params.body, - self.download_params.params, - ) - return self - - def transform(self): - unziped_folder = unzipFile(self.transform_params.input_path) - - logger.info(unziped_folder.joinpath(self.transform_params.files)) - self.output_params.input_path = mbtileGeneration( - unziped_folder.joinpath(self.transform_params.files) - ) - return self diff --git a/data/src/pipelines/tiles_pipes/EEZTilesPipe.py b/data/src/pipelines/tiles_pipes/EEZTilesPipe.py new file mode 100644 index 00000000..894eff63 --- /dev/null +++ b/data/src/pipelines/tiles_pipes/EEZTilesPipe.py @@ -0,0 +1,50 @@ +from pipelines.base_pipe import ( + VTBasePipe, + ExtractParams, + TransformParams, + LoadParams, +) + +from shutil import unpack_archive +from tippcanoe import mbtileGeneration +from mapshaper import Mapshaper +from logging import getLogger + +logger = getLogger(__name__) + + +class EEZTilesPipe(VTBasePipe): + pipeline_name = "eez_tiles" + depends_on = ["eez_intermediate"] + extract_params = ExtractParams( + source="eez/eez_intermediate.zip", + output_path="data/eez_intermediate", + ) + transform_params = TransformParams( + files="eez_intermediate.shp", + columns=["GEONAME", "POL_TYPE", "ISO_SOV1", "ISO_SOV2", "ISO_SOV3"], + ) + load_params = LoadParams(destination_name="eez_v11-4pg8or") + + def __init__(self, force_clean: bool = False) -> None: + super().__init__() + self.folder_path = self.settings.DATA_DIR.joinpath(self.pipeline_name) + self.force_clean = force_clean + self.settings.validate_config() + + def transform(self): + unziped_folder = unpack_archive(self.transform_params.input_path) + + file = unziped_folder.joinpath(self.transform_params.files) + keep_fields = ( + ",".join(self.transform_params.columns) + if isinstance(self.transform_params.columns, list) + else self.transform_params.columns + ) + + Mapshaper(8).input([file.as_posix()]).filter_fields(fields=keep_fields).output( + file.as_posix(), force=True + ).execute() + + self.output_params.input_path = mbtileGeneration(file) + return self diff --git a/data/src/pipelines/tiles_pipes/__init__.py b/data/src/pipelines/tiles_pipes/__init__.py new file mode 100644 index 00000000..12ca8da9 --- /dev/null +++ b/data/src/pipelines/tiles_pipes/__init__.py @@ -0,0 +1,16 @@ +from inspect import isclass +from pkgutil import iter_modules +from pathlib import Path +from importlib import import_module + +# iterate through the modules in the current package +package_dir = Path(__file__).resolve().parent.as_posix() +for _, module_name, _ in iter_modules([package_dir]): + # import the module and iterate through its attributes + module = import_module(f"{__name__}.{module_name}") + for attribute_name in dir(module): + attribute = getattr(module, attribute_name) + + if isclass(attribute): + # Add the class to this package's variables + globals()[attribute_name] = attribute diff --git a/data/src/pipelines/utils.py b/data/src/pipelines/utils.py new file mode 100644 index 00000000..51c05a4e --- /dev/null +++ b/data/src/pipelines/utils.py @@ -0,0 +1,17 @@ +import traceback + + +def watch(func): + def check(self, *args, **kwargs): + try: + self.status.step = func.__name__ + self.set_status("running", f"starting {func.__name__}...") + func(self, *args, **kwargs) + self.set_status("finish", f"Success executing {func.__name__}") + except Exception as e: + self.set_status("dead", traceback.format_exc()) + raise e + finally: + return self + + return check diff --git a/data/src/utils.py b/data/src/utils.py index 98e8cb89..7a2b098c 100644 --- a/data/src/utils.py +++ b/data/src/utils.py @@ -1,9 +1,10 @@ -import zipfile import os import requests from logging import getLogger from pathlib import Path -from typing import Union +from typing import Literal, Union +from google.cloud import storage +from google.oauth2 import service_account logger = getLogger(__name__) @@ -77,22 +78,32 @@ def downloadFile( raise e -def unzipFile(source_path: Path, output_path: Union[Path, None] = None) -> Path: - """unzip a file +def writeReadGCP( + credentials: str, + bucket_name: str, + blob_name: str, + file: Path, + operation: Literal["w", "r"] = "w", +): + """Write or read a blob from GCS using file-like IO Args: - source_path (Path): The path to the file. - output_path (Union[Path, None], optional): The path to the output file. The default is None. - - Returns: - Path: The path to the output file. + bucket_name (str): The name of the bucket. + blob_name (str): The name of the blob. + file (io.BytesIO): The file-like object to write or read. + operation (Literal["w", "r"], optional): The operation to perform. "w" for write, "r" for read. Defaults to "w". """ - if not output_path: - output_path = source_path.parent - - with zipfile.ZipFile(source_path, "r") as zip_ref: - zip_ref.extractall(output_path) - - logger.info("Unzip Finish.") - - return output_path.joinpath(source_path.stem) + storage_client = storage.Client( + credentials=service_account.Credentials.from_service_account_info(credentials) + ) + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + if operation == "w": + with open(file, "rb") as f: + blob.upload_from_file(f) + elif operation == "r": + with open(file, "wb") as f: + blob.download_to_file(f) + else: + raise ValueError("operation must be 'w' or 'r'")