From afe214b1921dbcfce0a3c5ff851a23dd0677e681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Wed, 3 Jul 2024 11:39:19 +0000 Subject: [PATCH 1/8] Now seasons used are defined in a file referenced as an environment variable. It is possible to use any number of seasons now --- .env.template | 9 ++--- example_inputs/seasons_file/one_season.json | 6 ++++ example_inputs/seasons_file/seasons.json | 14 ++++++++ src/landcoverpy/config.py | 9 ++--- src/landcoverpy/model_training.py | 28 +++------------ src/landcoverpy/utilities/utils.py | 21 +++++------ src/landcoverpy/workflow.py | 40 +++++++-------------- 7 files changed, 51 insertions(+), 76 deletions(-) create mode 100644 example_inputs/seasons_file/one_season.json create mode 100644 example_inputs/seasons_file/seasons.json diff --git a/.env.template b/.env.template index 81ebb2e..22ce6e9 100644 --- a/.env.template +++ b/.env.template @@ -49,13 +49,8 @@ SL_LABELS_FILE= # For running in a distributed environment DASK_CLUSTER_IP= -# Dates used for each season -SPRING_START= -SPRING_END= -SUMMER_START= -SUMMER_END= -AUTUMN_START= -AUTUMN_END= +# File containing the definition of the seasons +SEASONS_FILE= # Product filtering parameters MAX_CLOUD= diff --git a/example_inputs/seasons_file/one_season.json b/example_inputs/seasons_file/one_season.json new file mode 100644 index 0000000..4787ffa --- /dev/null +++ b/example_inputs/seasons_file/one_season.json @@ -0,0 +1,6 @@ +{ + "april": { + "start": "2021-04-01", + "end": "2021-04-30" + } +} \ No newline at end of file diff --git a/example_inputs/seasons_file/seasons.json b/example_inputs/seasons_file/seasons.json new file mode 100644 index 0000000..11b8579 --- /dev/null +++ b/example_inputs/seasons_file/seasons.json @@ -0,0 +1,14 @@ +{ + "spring": { + "start": "2021-03-01", + "end": "2021-04-30" + }, + "summer": { + "start": "2021-06-01", + "end": "2021-07-31" + }, + "autumn": { + "start": "2021-10-01", + "end": "2021-11-30" + } +} \ No newline at end of file diff --git a/src/landcoverpy/config.py b/src/landcoverpy/config.py index 909be9b..832e704 100644 --- a/src/landcoverpy/config.py +++ b/src/landcoverpy/config.py @@ -52,13 +52,8 @@ class _Settings(BaseSettings): # For running in a distributed environment DASK_CLUSTER_IP: str = "0.0.0.0.0:0000" - # Dates used for each season - SPRING_START: str = "2001-12-31" - SPRING_END: str = "2001-12-31" - SUMMER_START: str = "2001-12-31" - SUMMER_END: str = "2001-12-31" - AUTUMN_START: str = "2001-12-31" - AUTUMN_END: str = "2001-12-31" + # File containing the definition of the seasons + SEASONS_FILE: str = "/seasons.json" # Product filtering parameters MAX_CLOUD: float = 1.0 diff --git a/src/landcoverpy/model_training.py b/src/landcoverpy/model_training.py index 3e0c1d1..ed09113 100644 --- a/src/landcoverpy/model_training.py +++ b/src/landcoverpy/model_training.py @@ -58,17 +58,8 @@ def train_model_land_cover(n_jobs: int = 2): print(f"Training land-cover model using {len(df)} samples with unique {df[settings.LC_PROPERTY].unique()} targets") y_train_data = df[settings.LC_PROPERTY] - x_train_data = df.drop( - [ - settings.LC_PROPERTY, - "latitude", - "longitude", - "spring_product_name", - "autumn_product_name", - "summer_product_name", - ], - axis=1, - ) + not_training_data_columns = [col for col in df.columns if "product_name" in col] + [settings.LC_PROPERTY, "latitude", "longitude"] + x_train_data = df.drop(not_training_data_columns, axis=1) used_columns = _feature_reduction(x_train_data, y_train_data) @@ -174,18 +165,9 @@ def train_second_level_models(lc_classes: List[str], n_jobs: int = 2, n_trees: i print(f"Training {lc_class} model using {len(df_class)} samples with unique {df_class[settings.SL_PROPERTY].unique()} targets") y_train_data = df_class[settings.SL_PROPERTY] - x_train_data = df_class.drop( - [ - settings.LC_PROPERTY, - settings.SL_PROPERTY, - "latitude", - "longitude", - "spring_product_name", - "autumn_product_name", - "summer_product_name", - ], - axis=1, - ) + + not_training_data_columns = [col for col in df_class.columns if "product_name" in col] + [settings.LC_PROPERTY, settings.SL_PROPERTY, "latitude", "longitude"] + x_train_data = df_class.drop(not_training_data_columns, axis=1) used_columns = _feature_reduction(x_train_data, y_train_data) diff --git a/src/landcoverpy/utilities/utils.py b/src/landcoverpy/utilities/utils.py index 9cd7281..f20a880 100644 --- a/src/landcoverpy/utilities/utils.py +++ b/src/landcoverpy/utilities/utils.py @@ -1,3 +1,4 @@ +import json from datetime import datetime from itertools import compress from os.path import join @@ -24,20 +25,16 @@ def get_season_dict(): - spring_start = datetime.strptime(settings.SPRING_START, '%Y-%m-%d') - spring_end = datetime.strptime(settings.SPRING_END, '%Y-%m-%d') - summer_start = datetime.strptime(settings.SUMMER_START, '%Y-%m-%d') - summer_end = datetime.strptime(settings.SUMMER_END, '%Y-%m-%d') - autumn_start = datetime.strptime(settings.AUTUMN_START, '%Y-%m-%d') - autumn_end = datetime.strptime(settings.AUTUMN_END, '%Y-%m-%d') + + with open(settings.SEASONS_FILE) as f: + seasons = json.load(f) + + seasons_out = {} - seasons = { - "spring" : (spring_start, spring_end), - "summer" : (summer_start, summer_end), - "autumn" : (autumn_start, autumn_end) - } + for season in seasons: + seasons_out[season] = (datetime.strptime(seasons[season]["start"], '%Y-%m-%d'), datetime.strptime(seasons[season]["end"], '%Y-%m-%d')) - return seasons + return seasons_out def get_products_by_tile_and_date( tile: str, diff --git a/src/landcoverpy/workflow.py b/src/landcoverpy/workflow.py index 6188aa6..4bb728a 100644 --- a/src/landcoverpy/workflow.py +++ b/src/landcoverpy/workflow.py @@ -206,33 +206,19 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): # Mongo query for obtaining valid products max_cloud_percentage = settings.MAX_CLOUD - spring_start, spring_end = seasons["spring"] - product_metadata_cursor_spring = get_products_by_tile_and_date( - tile, mongo_products_collection, spring_start, spring_end, max_cloud_percentage - ) - - summer_start, summer_end = seasons["summer"] - product_metadata_cursor_summer = get_products_by_tile_and_date( - tile, mongo_products_collection, summer_start, summer_end, max_cloud_percentage - ) - - autumn_start, autumn_end = seasons["autumn"] - product_metadata_cursor_autumn = get_products_by_tile_and_date( - tile, mongo_products_collection, autumn_start, autumn_end, max_cloud_percentage - ) - - product_per_season = { - "spring": list(product_metadata_cursor_spring)[-settings.MAX_PRODUCTS_COMPOSITE:], - "autumn": list(product_metadata_cursor_autumn)[:settings.MAX_PRODUCTS_COMPOSITE], - "summer": list(product_metadata_cursor_summer)[-settings.MAX_PRODUCTS_COMPOSITE:], - } - - if ( - len(product_per_season["spring"]) == 0 - or len(product_per_season["autumn"]) == 0 - or len(product_per_season["summer"]) == 0 - ): - raise NoSentinelException(f"There is no valid Sentinel products for tile {tile}. Skipping it...") + product_per_season = {} + + for season in seasons: + season_start, season_end = seasons[season] + product_metadata_cursor = get_products_by_tile_and_date( + tile, mongo_products_collection, season_start, season_end, max_cloud_percentage + ) + + # If there are more products than the maximum specified for creating a composite, take the last ones + product_per_season[season] = list(product_metadata_cursor)[-settings.MAX_PRODUCTS_COMPOSITE:] + + if len(product_per_season[season]) == 0: + raise NoSentinelException(f"There is no valid Sentinel products for tile {tile} in season {season}. Skipping it...") # Dataframe for storing data of a tile tile_df = None From 5f6c2a616d9497b08484c76cd968f0abb93c028c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Wed, 3 Jul 2024 13:12:49 +0000 Subject: [PATCH 2/8] Deleted deprecated files --- notebooks/explain_model.ipynb | 516 ------------------------ notebooks/mean_shap_values_matrix.ipynb | 120 ------ scripts/execution_times.py | 4 +- scripts/jeffreys_matusita.py | 138 ------- 4 files changed, 3 insertions(+), 775 deletions(-) delete mode 100644 notebooks/explain_model.ipynb delete mode 100644 notebooks/mean_shap_values_matrix.ipynb delete mode 100644 scripts/jeffreys_matusita.py diff --git a/notebooks/explain_model.ipynb b/notebooks/explain_model.ipynb deleted file mode 100644 index 06e07d7..0000000 --- a/notebooks/explain_model.ipynb +++ /dev/null @@ -1,516 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "import pandas as pd\n", - "import shap\n", - "import os\n", - "from sklearn.model_selection import train_test_split\n", - "from sklearn.ensemble import RandomForestClassifier\n", - "from sklearn import metrics\n", - "from joblib import load\n", - "import matplotlib.pyplot as pl\n", - "import pickle\n", - "from landcoverpy.utilities.confusion_matrix import compute_confusion_matrix\n", - "\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Data preprocessing" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "data = pd.read_csv('dataset_postprocessed.csv')\n", - "data.dropna()\n", - "data = data[~data.isin([np.nan, np.inf, -np.inf]).any(1)]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "y_train_data = data[\"class\"] \n", - "x_train_data = data.drop([\"class\", \"latitude\", \"longitude\", \"spring_product_name\", \"autumn_product_name\", \"summer_product_name\"], axis=1)\n", - "pc_columns = x_train_data.columns\n", - "\n", - "reduced_x_train_data = data[pc_columns]\n", - "reduced_x_train_data.columns" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "X_train, X_test, y_train, y_test = train_test_split(reduced_x_train_data, y_train_data, test_size=0.15, random_state=0,)\n", - "labels=y_train_data.unique()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "model = load('model.joblib')\n", - "y_true = model.predict(X_test)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pred = pd.DataFrame(y_true).reset_index(drop=True, inplace=False)\n", - "real = y_test.reset_index(drop=True, inplace=False)\n", - "test = pd.DataFrame(X_test).reset_index(drop=True, inplace=False)\n", - "train = pd.DataFrame(X_train).reset_index(drop=True, inplace=False)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Explainability with SHAP\n", - "\n", - "All dataset is found in *reduced_x_train_data*. The name has not been changed to not modify the previus code." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "explainer = shap.TreeExplainer(model)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "choosen_instance = reduced_x_train_data\n", - "shap_values = explainer.shap_values(choosen_instance)\n", - "shap.initjs()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with open(\"matriz.pckl\",\"wb\") as f:\n", - " pickle.dump(shap_values, f)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#to import the matrix again\n", - "#shap_values = pickle.load(open('matriz.pckl', 'rb'))" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Using all the features\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "labels" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pretty_labels= ['Built Up', 'Water', 'Open Forest', 'Closed Forest', 'Bare Soil',\n", - " 'Cropland', 'Herbaceous Vegetation', 'Wetland', 'Shrubland']" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "num_features = len(list(reduced_x_train_data.columns.values))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def calculateColors(i):\n", - " # Water ClosedForest cropland openForest bareSoil HerbaceousVeg BuiltUp wetland shrubland\n", - " colors = [\"#fa0000\", \"#0032c8\", \"#648c00\", \"#007800\", \"#b4b4b4\", \"#f096ff\", \"#ffff4c\", \"#0096a0\", \"#ffbb22\"]\n", - " return colors[i]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap.summary_plot(shap_values, list(reduced_x_train_data.columns.values), max_display =num_features, class_names=pretty_labels, color= calculateColors, class_inds=\"original\", show=False)\n", - "pl.savefig(\"all_features_DEF.png\")\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Dividing the features in 3 \n", - "\n", - "In this part we're going to divive the dataset in 3 halves. For this porpose the *train dataset* and the *shap_values* (the dimennsion corresponding to features) will be reduced to the halved " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "half = int(np.round(num_features/3))\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "feature_order = np.argsort(np.sum(np.mean(np.abs(shap_values), axis=1), axis=0))\n", - "feature_order[half*2:]" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Crop the shap_values" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap_values_array = np.array(shap_values)\n", - "cropped_first_shap_values = list(shap_values_array[:,:,feature_order[:half]])\n", - "cropped_second_shap_values = list(shap_values_array[:,:,feature_order[half: half*2]])\n", - "cropped_thrid_shap_values = list(shap_values_array[:,:,feature_order[half*2:]])\n", - "\n", - "# alternatively, you can do directly -- > list_first_shap_values = [shap_values[i][:,:36] for i in range(len(shap_values))] " - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Crop the train dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "first_half_train = reduced_x_train_data.iloc[:,feature_order[:half]]\n", - "second_half_train = reduced_x_train_data.iloc[:,feature_order[half: half*2]]\n", - "thrid_half_train = reduced_x_train_data.iloc[:,feature_order[half*2:]]" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Plot for the first half\n", - "\n", - "For both plots, it's important to specicy **class_inds=\"original\"** to set the labels in the correct order" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap.summary_plot(cropped_first_shap_values, first_half_train.columns.values, max_display = half, class_names=pretty_labels, color= calculateColors, class_inds=\"original\", show=False )\n", - "pl.xlim([0, 0.12])\n", - "pl.legend(loc='lower right')\n", - "pl.savefig(\"first_half_DEF.png\")\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Plot for the second half" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap.summary_plot(cropped_second_shap_values, second_half_train.columns.values, max_display = half, class_names=pretty_labels, color= calculateColors, show=False )\n", - "pl.xlim([0, 0.12])\n", - "pl.legend(loc='lower right')\n", - "pl.savefig(\"second_half_DEF.png\")\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Plot for the second half" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap.summary_plot(cropped_thrid_shap_values, thrid_half_train.columns.values, max_display = half, class_names=pretty_labels, color= calculateColors, show=False )\n", - "pl.xlim([0, 0.12])\n", - "pl.legend(loc='lower right')\n", - "pl.savefig(\"third_half_DEF.png\")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Doing transpose - Discarded\n", - "Initially the form of **shap_values** is (classes, samples, features). You can do a transpose to change the dimensions of the matrix to get (features, samples, classes), however this does not make any sense because you cannot change the input to the model. (The model cannot have 9 classes as input and predict a feature)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "np.shape(shap_values)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shap_values_transpose = np.transpose(shap_values, (2, 1, 0))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "np.shape(shap_values_transpose)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "np.shape(reduced_x_train_data.index.values)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reduced_x_train_data.columns.values" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "np.array(pretty_labels)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "#shap.summary_plot(cropped_first_shap_values, first_half_train.columns.values, max_display = half, class_names=pretty_labels, color= calculateColors, class_inds=\"original\", show=False )\n", - "shap.summary_plot(list(shap_values_transpose), np.array(pretty_labels), class_names=reduced_x_train_data.columns.values, max_display = 9,class_inds=\"original\", show=False )\n", - "pl.legend(loc=(1.04, 0))\n", - "pl.savefig(\"transpose_plot_DEF1.png\", bbox_inches = 'tight')\n", - "pl.tight_layout()\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Split by class" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def data_train(data, pc_columns):\n", - " y_train_data = data[\"class\"] \n", - " x_train_data = data.drop([\"class\", \"latitude\", \"longitude\", \"spring_product_name\", \"autumn_product_name\", \"summer_product_name\"], axis=1)\n", - "\n", - " reduced_x_train_data = data[pc_columns]\n", - " #reduced_x_train_data.to_csv(f'{label}.csv')\n", - "\n", - " X_train, X_test, y_train, y_test = train_test_split(reduced_x_train_data, y_train_data, test_size=0.50, random_state=0,)\n", - "\n", - " y_true = model.predict(X_test)\n", - "\n", - " X_train = pd.DataFrame(X_train).reset_index(drop=True, inplace=False)\n", - " X_test = pd.DataFrame(X_test).reset_index(drop=True, inplace=False)\n", - " y_test = pd.DataFrame(y_test).reset_index(drop=True, inplace=False)\n", - " y_train = pd.DataFrame(y_train).reset_index(drop=True, inplace=False)\n", - " y_true = pd.DataFrame(y_true).reset_index(drop=True, inplace=False)\n", - " \n", - "\n", - " return X_train, X_test, y_train, y_test, y_true\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "labels = ['closedForest']\n", - "print(labels)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "for label in labels:\n", - " b_aux = data['class'] == label\n", - " b = data[b_aux]\n", - " print(b)\n", - " X_train, X_test, y_train, y_test, y_true = data_train(b, pc_columns)\n", - " print(\"sssssssssssssssss\",X_train)\n", - " explainer = shap.TreeExplainer(model)\n", - " choosen_instance = X_test.loc[0:10]\n", - " shap_values = explainer.shap_values(choosen_instance)\n", - " shap.initjs()\n", - " shap.force_plot(explainer.expected_value[1], shap_values[1], choosen_instance)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "explainer = shap.TreeExplainer(model)\n", - "choosen_instance = X_test.loc[0:3]\n", - "shap_values = explainer.shap_values(choosen_instance)\n", - "shap.initjs()\n", - "shap.force_plot(explainer.expected_value[1], shap_values[1], choosen_instance)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.12" - }, - "vscode": { - "interpreter": { - "hash": "e7370f93d1d0cde622a1f8e1c04877d8463912d04d973331ad4851f04de6915a" - } - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/notebooks/mean_shap_values_matrix.ipynb b/notebooks/mean_shap_values_matrix.ipynb deleted file mode 100644 index cbef870..0000000 --- a/notebooks/mean_shap_values_matrix.ipynb +++ /dev/null @@ -1,120 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "6affbe60", - "metadata": {}, - "outputs": [], - "source": [ - "import pickle\n", - "import numpy as np\n", - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b39a1718", - "metadata": {}, - "outputs": [], - "source": [ - "#to import the matrix again\n", - "shap_values = pickle.load(open('matriz.pckl', 'rb'))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6546d0d3", - "metadata": {}, - "outputs": [], - "source": [ - "shap_values[0][:][0]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "90552328", - "metadata": {}, - "outputs": [], - "source": [ - "np.shape(shap_values)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "34b8285d", - "metadata": {}, - "outputs": [], - "source": [ - "mean_shap_values= []\n", - "for classes in range(len(shap_values)):\n", - " mean_shap = [np.mean(shap_values[classes][:][i]) for i in range(len(shap_values[0][0]))]\n", - " mean_shap_values.append(mean_shap)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "89aa691a", - "metadata": {}, - "outputs": [], - "source": [ - "data = pd.read_csv('dataset_postprocessed.csv')\n", - "data.dropna()\n", - "data = data[~data.isin([np.nan, np.inf, -np.inf]).any(1)]\n", - "y = data[\"class\"] \n", - "X = data.drop([\"class\", \"latitude\", \"longitude\", \"spring_product_name\", \"autumn_product_name\", \"summer_product_name\"], axis=1)\n", - "pc_columns = X.columns\n", - "\n", - "reduced_x = data[pc_columns]\n", - "reduced_x.columns" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e56b9a59", - "metadata": {}, - "outputs": [], - "source": [ - "np.shape(mean_shap_values)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a63b493e", - "metadata": {}, - "outputs": [], - "source": [ - "df = pd.DataFrame (mean_shap_values, columns = reduced_x.columns, index=y.unique())\n", - "df.to_csv(\"shap_values_mean.csv\", sep=',')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/scripts/execution_times.py b/scripts/execution_times.py index ab2cc44..7ef3d8d 100644 --- a/scripts/execution_times.py +++ b/scripts/execution_times.py @@ -41,7 +41,9 @@ def _time_composite(): tiles = get_list_of_tiles_in_iberian_peninsula() tile = random.choice(tiles) - products_available = get_products_by_tile_and_date(tile, mongo_products, seasons["spring"][0], seasons["autumn"][1], 100) + start_date = seasons[seasons.keys()[0]][0] + end_date = seasons[seasons.keys()[-1]][1] + products_available = get_products_by_tile_and_date(tile, mongo_products, start_date, end_date, 100) products_available = list(products_available) size_sample = random.randint(2, 5) diff --git a/scripts/jeffreys_matusita.py b/scripts/jeffreys_matusita.py deleted file mode 100644 index d347b42..0000000 --- a/scripts/jeffreys_matusita.py +++ /dev/null @@ -1,138 +0,0 @@ -from os.path import join - -from itertools import combinations -from matplotlib import pyplot as plt -import numpy as np -from numpy.lib.function_base import cov -import pandas as pd -import seaborn as sns - -from landcoverpy.config import settings -from landcoverpy.minio import MinioConnection - -def _jeffreys_matusita_distance(mu1, sigma1, mu2, sigma2): - """ - Computes the Jeffreys-Matusita distance between two distributions `P` and `Q`. - Parameters: - mu1 (np.ndarray) : Mean vector of `P`. - sigma1 (np.ndarray) : Covariance matrix of `P`. - mu2 (np.ndarray) : Mean vector of `Q`. - sigma2 (np.ndarray) : Covariance matrix of `Q`. - - Returns: - jmd (int) : Jeffreys-Matusita distanc. - - """ - jmd = np.sqrt(2*(1-np.exp(-_bhattacharyya_distance(mu1, sigma1, mu2, sigma2)))) - return jmd - -def _bhattacharyya_distance(mu1, sigma1, mu2, sigma2): - """ - Computes the Bhattarcharyya distance between two distributions `P` and `Q`. - Parameters: - mu1 (np.ndarray) : Mean vector of `P`. - sigma1 (np.ndarray) : Covariance matrix of `P`. - mu2 (np.ndarray) : Mean vector of `Q`. - sigma2 (np.ndarray) : Covariance matrix of `Q`. - - Returns: - db (int) : Bhattarcharyya distance. - - """ - sigma = np.mean([sigma1,sigma2], axis=0) - - ds = np.linalg.slogdet(sigma)[1] - ds1 = np.linalg.slogdet(sigma1)[1] - ds2 = np.linalg.slogdet(sigma2)[1] - bd = (_squared_mahalanobis_distance(mu1, mu2, sigma)/8) + (0.5) * ((ds - ds1/2 - ds2/2)/2) - return bd - - -def _squared_mahalanobis_distance(u, v, sigma): - """ - Computes the squared Mahalanobis distance between a point and a distribution `Q`. - Parameters: - u (np.ndarray) : A point located in the same dimension space than `Q` - v (np.ndarray) : Mean vector of `Q`. - value2 (np.ndarray) : Covariance matrix of `Q`. - - Returns: - m (int) : Squared Mahalanobis distance between a point `u` to a distribution defined by mean vector `v` and covariance matrix `sigma`. - - """ - delta = u - v - try: - inv_sigma = np.linalg.inv(sigma) - except: - inv_sigma = np.linalg.pinv(sigma) - m = (delta @ inv_sigma) @ delta - return m - -def jeffreys_matusita_analysis( - input_dataset: str, - out_path: str, - is_forest: bool = False, -): - """ - Computes the Jeffreys-Matusita distance for all class combinations in `input_Dataset`. The distance is multiplied by sqrt(2) in order to map the distance to 0-2. - If J-M distance is lower than 1.9 between two different classes, it is considered that those classes are not separable with the current attributes. - Parameters: - input_dataset (str) : Name of the dataset stored in MinIO. - out_path (str) : Local folder where a csv file and a plot will be stored. - is_forest (bool) : Indicate if classes used are those in land cover or in forest classification. - """ - - dataset_path = join(settings.TMP_DIR, input_dataset) - - minio_client = MinioConnection() - - minio_client.fget_object( - bucket_name=settings.MINIO_BUCKET_DATASETS, - object_name=join(settings.MINIO_DATA_FOLDER_NAME, input_dataset), - file_path=dataset_path, - ) - - df = pd.read_csv(dataset_path) - df = df.drop( - [ - "latitude", - "longitude", - "spring_product_name", - "autumn_product_name", - "summer_product_name", - "summer_cri1", - "autumn_cri1", - "spring_cri1", - ], - axis=1, - ) - - if is_forest: - class_column = settings.SL_PROPERTY - df = df.drop("class",axis=1) - else: - class_column = "class" - - df_byclass = df.groupby(by=class_column) - means = df_byclass.mean() - covs = df_byclass.cov() - classes = df[class_column].unique() - data = [] - data_complete = [] - for (class1, class2) in combinations(classes, r=2): - mu1, mu2 = means.loc[class1], means.loc[class2] - sigma1, sigma2 = covs.loc[class1], covs.loc[class2] - m = _jeffreys_matusita_distance(mu1, sigma1, mu2, sigma2) * np.sqrt(2) - data.append([class1, class2, m]) - data_complete.append([class1, class2, m]) - data_complete.append([class2, class1, m]) - for class_ in classes: - data_complete.append([class_,class_,2.0]) - matusita_df = pd.DataFrame(data, columns=['CLASS1', 'CLASS2', 'DISTANCE']) - matusita_df.sort_values(by=["DISTANCE"], ascending=True) - matusita_df.to_csv(join(out_path,"matusita.csv"), index=False) - matusita_df_complete = pd.DataFrame(data_complete, columns=['CLASS1', 'CLASS2', 'DISTANCE']) - matusita_df_complete = matusita_df_complete.pivot('CLASS1', 'CLASS2', 'DISTANCE') - sns.heatmap(matusita_df_complete, vmin=1.8, vmax=2, center=1.95, cmap="RdYlGn", square=True, cbar=False) - plt.savefig(join(out_path,"matusita.png"), bbox_inches="tight") - From f9ba225686651c5b0a70bf6e7c25a5fcffca4fee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Mon, 8 Jul 2024 09:59:00 +0000 Subject: [PATCH 3/8] Preparation for window read/write to reduce RAM usage --- README.md | 8 +-- src/landcoverpy/utilities/raster.py | 31 ++++++++++ src/landcoverpy/workflow.py | 89 ++++++++++++++++++----------- 3 files changed, 87 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 8bb83a9..9fd92e0 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ A research article describing the methodology followed on this workflow can be f ## Installation -Currently, the package is not available on PyPI, so you need to install it from the source code. To do so, you can clone the repository and install it using pip: +The package is not available on PyPI, so you need to install it from the source code. To do so, you can clone the repository and install it using pip: ```bash git clone https://github.com/KhaosResearch/landcoverpy.git @@ -30,12 +30,6 @@ For development purposes, you can install the package in editable mode: pip install -e . ``` -In the future, the package will be available on PyPI, so you will be able to install it using pip: - -```bash -pip install landcoverpy -``` - ## Usage An usage example can be found at the [main usage notebook](notebooks/main_usage.ipynb). diff --git a/src/landcoverpy/utilities/raster.py b/src/landcoverpy/utilities/raster.py index 2423ace..aa4e577 100644 --- a/src/landcoverpy/utilities/raster.py +++ b/src/landcoverpy/utilities/raster.py @@ -9,6 +9,7 @@ from pymongo.collection import Collection from rasterio import mask as msk from rasterio.warp import Resampling, reproject +from rasterio.windows import Window from shapely.geometry import Point, Polygon from shapely.ops import transform @@ -183,6 +184,36 @@ def _download_sample_band_by_tile(tile: str, minio_client: MinioConnection, mong sample_band_path = _download_sample_band_by_title(product_title, minio_client, mongo_collection) return sample_band_path +def _get_block_windows_by_tile(tile: str, minio_client: MinioConnection, mongo_collection: Collection): + """ + Having a tile, get the list of block windows of a 10m sample sentinel band of any related product. + """ + sample_band_path = _download_sample_band_by_tile(tile, minio_client, mongo_collection) + src = rasterio.open(sample_band_path) + block_windows_enum = src.block_windows() + block_windows = [block_window[1] for block_window in block_windows_enum] + src.close() + return block_windows + +def _generate_windows_from_slices_number(tile: str, slices: Tuple[int, int], minio_client: MinioConnection, mongo_collection: Collection): + """ + Having a tile, generate a list of windows based on the number of slices. + """ + sample_kwargs = _get_kwargs_raster(_download_sample_band_by_tile(tile, minio_client, mongo_collection)) + tile_width = sample_kwargs["width"] + tile_height = sample_kwargs["height"] + + slice_width = int(tile_width / slices[0]) + slice_height = int(tile_height / slices[1]) + + windows = [] + for i in range(slices[0]): + for j in range(slices[1]): + window = Window(i * slice_width, j * slice_height, slice_width, slice_height) + windows.append(window) + + return windows + def _download_sample_band_by_title( title: str, minio_client: MinioConnection, mongo_collection: Collection ): diff --git a/src/landcoverpy/workflow.py b/src/landcoverpy/workflow.py index 4bb728a..ffd0aa4 100644 --- a/src/landcoverpy/workflow.py +++ b/src/landcoverpy/workflow.py @@ -11,6 +11,7 @@ import pandas as pd import rasterio from distributed import Client +from rasterio.windows import Window from landcoverpy.aster import get_dem_from_tile from landcoverpy.composite import _create_composite, _get_composite @@ -28,6 +29,8 @@ _get_raster_filename_from_path, _get_raster_name_from_path, _read_raster, + _get_block_windows_by_tile, + _generate_windows_from_slices_number, ) from landcoverpy.utilities.utils import ( _check_tiles_not_predicted, @@ -42,7 +45,14 @@ def workflow( execution_mode: ExecutionMode, client: Client = None, - tiles_to_predict: List[str] = None): + tiles_to_predict: List[str] = None, + use_block_windows: bool = False, + window_slices: tuple = None): + + if execution_mode == ExecutionMode.TRAINING and (use_block_windows or window_slices is not None): + print("Warning: Windows are not much useful in training mode, since it is already optimized for low memory usage. Consider disabling it.") + if window_slices is not None and use_block_windows: + print("Warning: if use_block_windows is enabled, window_slices will be ignored.") predict = execution_mode != ExecutionMode.TRAINING @@ -95,15 +105,15 @@ def workflow( futures = [] if execution_mode != ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in tiles: - future = client.submit(_process_tile, tile, execution_mode, polygons_per_tile[tile], used_columns, resources={"Memory": 100}) + future = client.submit(_process_tile, tile, execution_mode, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) futures.append(future) else: for tile in tiles: - future = client.submit(_process_tile, tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns, resources={"Memory": 100}) + future = client.submit(_process_tile, tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) futures.append(future) client.gather(futures, errors="skip") for tile in sl_tiles: - future = client.submit(_process_tile, tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns, resources={"Memory": 100}) + future = client.submit(_process_tile, tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) futures.append(future) client.gather(futures, errors="skip") @@ -111,18 +121,18 @@ def workflow( if execution_mode != ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in tiles: try: - _process_tile(tile, execution_mode, polygons_per_tile[tile], used_columns) + _process_tile(tile, execution_mode, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) except WorkflowExecutionException as e: print(e) else: for tile in tiles: try: - _process_tile(tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns) + _process_tile(tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) except WorkflowExecutionException as e: print(e) for tile in sl_tiles: try: - _process_tile(tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns) + _process_tile(tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) except WorkflowExecutionException as e: print(e) @@ -172,7 +182,7 @@ def workflow( ) -def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): +def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None, use_block_windows=False, window_slices=None): predict = execution_mode != ExecutionMode.TRAINING @@ -185,6 +195,31 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): mongo_client = MongoConnection() mongo_products_collection = mongo_client.get_collection_object() + if use_block_windows: + windows = _get_block_windows_by_tile(tile, minio_client, mongo_products_collection) + elif window_slices is not None: + windows = _generate_windows_from_slices_number(tile, window_slices, minio_client, mongo_products_collection) + else: + windows = [None] + + # Initialize empty output raster + if predict: + band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) + output_kwargs = _get_kwargs_raster(band_path) + output_kwargs["nodata"] = 0 + output_kwargs["driver"] = "GTiff" + output_kwargs["dtype"] = np.uint8 + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: + classification_name = f"classification_{tile}.tif" + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: + classification_name = f"sl_classification_{tile}.tif" + classification_path = str(Path(settings.TMP_DIR, classification_name)) + with rasterio.open( + classification_path, "w", **output_kwargs + ) as classification_file: + print(f"Initilized output raster at {classification_path}") + + # Names of the indexes that are taken into account indexes_used = [ "cri1", @@ -238,15 +273,15 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): ) # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters - kwargs = _get_kwargs_raster(dem_path) + dem_kwargs = _get_kwargs_raster(dem_path) if execution_mode==ExecutionMode.TRAINING: - crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, kwargs) + crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, dem_kwargs) if execution_mode==ExecutionMode.LAND_COVER_PREDICTION: - crop_mask = np.zeros(shape=(int(kwargs["height"]), int(kwargs["width"])),dtype=np.uint8) + crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) if execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: - crop_mask = np.zeros(shape=(int(kwargs["height"]), int(kwargs["width"])),dtype=np.uint8) + crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) band_normalize_range = normalize_range.get(dem_name, None) raster = _read_raster( @@ -261,16 +296,16 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): # Get crop mask for sentinel rasters and dataset labeled with database points in tile band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) - kwargs = _get_kwargs_raster(band_path) + s2_band_kwargs = _get_kwargs_raster(band_path) if execution_mode==ExecutionMode.TRAINING: - crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, kwargs) + crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, s2_band_kwargs) elif execution_mode==ExecutionMode.LAND_COVER_PREDICTION: - crop_mask = np.zeros(shape=(int(kwargs["height"]), int(kwargs["width"])), dtype=np.uint8) + crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) elif execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: - crop_mask = np.zeros(shape=(int(kwargs["height"]), int(kwargs["width"])), dtype=np.uint8) + crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) for season, products_metadata in product_per_season.items(): print(season) @@ -364,10 +399,6 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): object_name=raster_path, file_path=str(temp_path), ) - kwargs = _get_kwargs_raster(str(temp_path)) - spatial_resolution = kwargs["transform"][0] - if spatial_resolution == 10: - kwargs_10m = kwargs band_normalize_range = normalize_range.get(raster_name, None) if is_band[i] and (band_normalize_range is None): @@ -435,7 +466,7 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): predictions[nodata_rows] = "nodata" predictions = np.reshape( - predictions, (1, kwargs_10m["height"], kwargs_10m["width"]) + predictions, (1, output_kwargs["height"], output_kwargs["width"]) ) encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) @@ -451,13 +482,8 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): predictions == class_, value, encoded_predictions ) - kwargs_10m["nodata"] = 0 - kwargs_10m["driver"] = "GTiff" - kwargs_10m["dtype"] = np.uint8 - classification_name = f"classification_{tile}.tif" - classification_path = str(Path(settings.TMP_DIR, classification_name)) with rasterio.open( - classification_path, "w", **kwargs_10m + classification_path, "r+", **output_kwargs ) as classification_file: classification_file.write(encoded_predictions) print(f"{classification_name} saved") @@ -520,7 +546,7 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): sl_predictions[nodata_rows] = "nodata" sl_predictions = np.reshape( - sl_predictions, (1, kwargs_10m["height"], kwargs_10m["width"]) + sl_predictions, (1, output_kwargs["height"], output_kwargs["width"]) ) encoded_sl_predictions = np.zeros_like(sl_predictions, dtype=np.uint8) @@ -539,13 +565,8 @@ def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None): sl_predictions == class_, value, encoded_sl_predictions ) - kwargs_10m["nodata"] = 0 - kwargs_10m["driver"] = "GTiff" - kwargs_10m["dtype"] = np.uint8 - classification_name = f"sl_classification_{tile}.tif" - classification_path = str(Path(settings.TMP_DIR, classification_name)) with rasterio.open( - classification_path, "w", **kwargs_10m + classification_path, "r+", **output_kwargs ) as classification_file: classification_file.write(encoded_sl_predictions) print(f"{classification_name} saved") From a0f74f09e89fd86d12512d4609466bfc9e4123a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Tue, 9 Jul 2024 06:48:30 +0000 Subject: [PATCH 4/8] Workflow splitted in train predict. Train working as before. Predict is being prepared to add windows --- src/landcoverpy/utilities/raster.py | 11 +- src/landcoverpy/workflow.py | 463 ++-------------------------- src/landcoverpy/workflow_predict.py | 393 +++++++++++++++++++++++ src/landcoverpy/workflow_train.py | 242 +++++++++++++++ 4 files changed, 665 insertions(+), 444 deletions(-) create mode 100644 src/landcoverpy/workflow_predict.py create mode 100644 src/landcoverpy/workflow_train.py diff --git a/src/landcoverpy/utilities/raster.py b/src/landcoverpy/utilities/raster.py index aa4e577..cdeae43 100644 --- a/src/landcoverpy/utilities/raster.py +++ b/src/landcoverpy/utilities/raster.py @@ -31,6 +31,7 @@ def _read_raster( path_to_disk: str = None, normalize_range: Tuple[float, float] = None, to_tif: bool = True, + window: Window = None ): """ Reads a raster as a numpy array. @@ -52,7 +53,15 @@ def _read_raster( # Read file kwargs = band_file.meta destination_crs = band_file.crs - band = band_file.read() + band = band_file.read(window=window) + if window is not None: + kwargs.update( + { + "height": window.height, + "width": window.width, + "transform": band_file.window_transform(window) + } + ) # Just in case... if len(band.shape) == 2: diff --git a/src/landcoverpy/workflow.py b/src/landcoverpy/workflow.py index ffd0aa4..b02f04d 100644 --- a/src/landcoverpy/workflow.py +++ b/src/landcoverpy/workflow.py @@ -1,46 +1,22 @@ -import collections import json -from glob import glob from os.path import join -from pathlib import Path -from shutil import rmtree from typing import List -import joblib -import numpy as np import pandas as pd -import rasterio from distributed import Client -from rasterio.windows import Window -from landcoverpy.aster import get_dem_from_tile -from landcoverpy.composite import _create_composite, _get_composite from landcoverpy.config import settings -from landcoverpy.exceptions import WorkflowExecutionException, NoSentinelException +from landcoverpy.exceptions import WorkflowExecutionException from landcoverpy.execution_mode import ExecutionMode from landcoverpy.minio import MinioConnection -from landcoverpy.mongo import MongoConnection from landcoverpy.utilities.geometries import _group_validated_data_points_by_tile, _kmz_to_geojson, _csv_to_geojson -from landcoverpy.utilities.raster import ( - _download_sample_band_by_tile, - _filter_rasters_paths_by_features_used, - _get_kwargs_raster, - _get_product_rasters_paths, - _get_raster_filename_from_path, - _get_raster_name_from_path, - _read_raster, - _get_block_windows_by_tile, - _generate_windows_from_slices_number, -) from landcoverpy.utilities.utils import ( _check_tiles_not_predicted, - _get_lc_classification, - _mask_polygons_by_tile, _remove_tiles_already_processed_in_training, - get_products_by_tile_and_date, - get_season_dict, ) +from landcoverpy.workflow_predict import _process_tile_predict +from landcoverpy.workflow_train import _process_tile_train def workflow( execution_mode: ExecutionMode, @@ -49,9 +25,11 @@ def workflow( use_block_windows: bool = False, window_slices: tuple = None): + if execution_mode == ExecutionMode.TRAINING and tiles_to_predict is not None: + print("Warning: tiles_to_predict are ignored in training mode.") if execution_mode == ExecutionMode.TRAINING and (use_block_windows or window_slices is not None): - print("Warning: Windows are not much useful in training mode, since it is already optimized for low memory usage. Consider disabling it.") - if window_slices is not None and use_block_windows: + print("Warning: Windows are ignored in training mode, since it is already optimized for low memory usage.") + elif window_slices is not None and use_block_windows: print("Warning: if use_block_windows is enabled, window_slices will be ignored.") predict = execution_mode != ExecutionMode.TRAINING @@ -103,36 +81,38 @@ def workflow( if client is not None: futures = [] - if execution_mode != ExecutionMode.SECOND_LEVEL_PREDICTION: + if execution_mode == ExecutionMode.TRAINING: for tile in tiles: - future = client.submit(_process_tile, tile, execution_mode, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) + future = client.submit(_process_tile_train, tile, polygons_per_tile[tile], resources={"Memory": 100}) futures.append(future) - else: + elif execution_mode == ExecutionMode.LAND_COVER_PREDICTION or execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in tiles: - future = client.submit(_process_tile, tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) + future = client.submit(_process_tile_predict, tile, ExecutionMode.LAND_COVER_PREDICTION, used_columns, use_block_windows, window_slices, resources={"Memory": 100}) futures.append(future) client.gather(futures, errors="skip") + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in sl_tiles: - future = client.submit(_process_tile, tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices, resources={"Memory": 100}) + future = client.submit(_process_tile_predict, tile, ExecutionMode.SECOND_LEVEL_PREDICTION, used_columns, use_block_windows, window_slices, resources={"Memory": 100}) futures.append(future) client.gather(futures, errors="skip") else: - if execution_mode != ExecutionMode.SECOND_LEVEL_PREDICTION: + if execution_mode == ExecutionMode.TRAINING: for tile in tiles: try: - _process_tile(tile, execution_mode, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) + _process_tile_train(tile, polygons_per_tile[tile]) except WorkflowExecutionException as e: print(e) - else: + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION or execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in tiles: try: - _process_tile(tile, ExecutionMode.LAND_COVER_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) + _process_tile_predict(tile, ExecutionMode.LAND_COVER_PREDICTION, used_columns, use_block_windows, window_slices) except WorkflowExecutionException as e: print(e) + if execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: for tile in sl_tiles: try: - _process_tile(tile, ExecutionMode.SECOND_LEVEL_PREDICTION, polygons_per_tile[tile], used_columns, use_block_windows, window_slices) + _process_tile_predict(tile, ExecutionMode.SECOND_LEVEL_PREDICTION, used_columns, use_block_windows, window_slices) except WorkflowExecutionException as e: print(e) @@ -179,407 +159,4 @@ def workflow( object_name=join(settings.MINIO_DATA_FOLDER_NAME, "dataset.csv"), file_path=file_path, content_type="text/csv", - ) - - -def _process_tile(tile, execution_mode, polygons_in_tile, used_columns=None, use_block_windows=False, window_slices=None): - - predict = execution_mode != ExecutionMode.TRAINING - - if not Path(settings.TMP_DIR).exists(): - Path.mkdir(Path(settings.TMP_DIR)) - - seasons = get_season_dict() - - minio_client = MinioConnection() - mongo_client = MongoConnection() - mongo_products_collection = mongo_client.get_collection_object() - - if use_block_windows: - windows = _get_block_windows_by_tile(tile, minio_client, mongo_products_collection) - elif window_slices is not None: - windows = _generate_windows_from_slices_number(tile, window_slices, minio_client, mongo_products_collection) - else: - windows = [None] - - # Initialize empty output raster - if predict: - band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) - output_kwargs = _get_kwargs_raster(band_path) - output_kwargs["nodata"] = 0 - output_kwargs["driver"] = "GTiff" - output_kwargs["dtype"] = np.uint8 - if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: - classification_name = f"classification_{tile}.tif" - elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: - classification_name = f"sl_classification_{tile}.tif" - classification_path = str(Path(settings.TMP_DIR, classification_name)) - with rasterio.open( - classification_path, "w", **output_kwargs - ) as classification_file: - print(f"Initilized output raster at {classification_path}") - - - # Names of the indexes that are taken into account - indexes_used = [ - "cri1", - "ri", - "evi2", - "mndwi", - "moisture", - "ndyi", - "ndre", - "ndvi", - "osavi", - ] - # Name of the sentinel bands that are ignored - skip_bands = ["tci", "scl"] - # Ranges for normalization of each raster - normalize_range = {"slope": (0, 70), "aspect": (0, 360), "dem": (0, 2000)} - - print(f"Working in tile {tile}") - # Mongo query for obtaining valid products - max_cloud_percentage = settings.MAX_CLOUD - - product_per_season = {} - - for season in seasons: - season_start, season_end = seasons[season] - product_metadata_cursor = get_products_by_tile_and_date( - tile, mongo_products_collection, season_start, season_end, max_cloud_percentage - ) - - # If there are more products than the maximum specified for creating a composite, take the last ones - product_per_season[season] = list(product_metadata_cursor)[-settings.MAX_PRODUCTS_COMPOSITE:] - - if len(product_per_season[season]) == 0: - raise NoSentinelException(f"There is no valid Sentinel products for tile {tile} in season {season}. Skipping it...") - - # Dataframe for storing data of a tile - tile_df = None - - dems_raster_names = [ - "slope", - "aspect", - "dem", - ] - - for dem_name in dems_raster_names: - # Add dem and aspect data - if (not predict) or (predict and dem_name in used_columns): - - dem_path = get_dem_from_tile( - execution_mode, tile, mongo_products_collection, minio_client, dem_name - ) - - # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters - dem_kwargs = _get_kwargs_raster(dem_path) - if execution_mode==ExecutionMode.TRAINING: - crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, dem_kwargs) - - if execution_mode==ExecutionMode.LAND_COVER_PREDICTION: - crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) - - if execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: - crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) - - band_normalize_range = normalize_range.get(dem_name, None) - raster = _read_raster( - band_path=dem_path, - rescale=True, - normalize_range=band_normalize_range, - ) - raster_masked = np.ma.masked_array(raster, mask=crop_mask) - raster_masked = np.ma.compressed(raster_masked).flatten() - raster_df = pd.DataFrame({dem_name: raster_masked}) - tile_df = pd.concat([tile_df, raster_df], axis=1) - - # Get crop mask for sentinel rasters and dataset labeled with database points in tile - band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) - s2_band_kwargs = _get_kwargs_raster(band_path) - - if execution_mode==ExecutionMode.TRAINING: - crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, s2_band_kwargs) - - elif execution_mode==ExecutionMode.LAND_COVER_PREDICTION: - crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) - - elif execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: - crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) - - for season, products_metadata in product_per_season.items(): - print(season) - bucket_products = settings.MINIO_BUCKET_NAME_PRODUCTS - bucket_composites = settings.MINIO_BUCKET_NAME_COMPOSITES - current_bucket = None - - if len(products_metadata) == 0: - raise NoSentinelException(f"There is no valid Sentinel products for tile {tile}. Skipping it...") - - elif len(products_metadata) == 1: - product_metadata = products_metadata[0] - current_bucket = bucket_products - else: - # If there are multiple products for one season, use a composite. - mongo_client.set_collection(settings.MONGO_COMPOSITES_COLLECTION) - mongo_composites_collection = mongo_client.get_collection_object() - products_metadata_list = list(products_metadata) - product_metadata = _get_composite( - products_metadata_list, mongo_composites_collection, execution_mode - ) - if product_metadata is None: - _create_composite( - products_metadata_list, - minio_client, - bucket_products, - bucket_composites, - mongo_composites_collection, - execution_mode - ) - product_metadata = _get_composite( - products_metadata_list, mongo_composites_collection, execution_mode - ) - current_bucket = bucket_composites - - product_name = product_metadata["title"] - - # For validate dataset geometries, the product name is added. - if not predict: - raster_product_name = np.full_like( - raster_masked, product_name, dtype=object - ) - raster_df = pd.DataFrame({f"{season}_product_name": raster_product_name}) - tile_df = pd.concat([tile_df, raster_df], axis=1) - - (rasters_paths, is_band) = _get_product_rasters_paths( - product_metadata, minio_client, current_bucket - ) - - if predict: - (rasters_paths, is_band) = _filter_rasters_paths_by_features_used( - rasters_paths, is_band, used_columns, season - ) - temp_product_folder = Path(settings.TMP_DIR, product_name + ".SAFE") - if not temp_product_folder.exists(): - Path.mkdir(temp_product_folder) - print(f"Processing product {product_name}") - - # Read bands and indexes. - already_read = [] - for i, raster_path in enumerate(rasters_paths): - raster_filename = _get_raster_filename_from_path(raster_path) - raster_name = _get_raster_name_from_path(raster_path) - temp_path = Path(temp_product_folder, raster_filename) - - # Only keep bands and indexes in indexes_used - if (not is_band[i]) and ( - not any( - raster_name.upper() == index_used.upper() - for index_used in indexes_used - ) - ): - continue - # Skip bands in skip_bands - if is_band[i] and any( - raster_name.upper() == band_skipped.upper() - for band_skipped in skip_bands - ): - continue - # Read only the first band to avoid duplication of different spatial resolution - if any( - raster_name.upper() == read_raster.upper() - for read_raster in already_read - ): - continue - already_read.append(raster_name) - - print(f"Downloading raster {raster_name} from minio into {temp_path}") - minio_client.fget_object( - bucket_name=current_bucket, - object_name=raster_path, - file_path=str(temp_path), - ) - - band_normalize_range = normalize_range.get(raster_name, None) - if is_band[i] and (band_normalize_range is None): - band_normalize_range = (0, 7000) - - raster = _read_raster( - band_path=temp_path, - rescale=True, - normalize_range=band_normalize_range, - ) - raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) - raster_masked = np.ma.compressed(raster_masked) - - raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) - - tile_df = pd.concat([tile_df, raster_df], axis=1) - - - if not predict: - - for index, label in enumerate([settings.LC_PROPERTY, "longitude", "latitude", settings.SL_PROPERTY]): - - raster_masked = np.ma.masked_array(label_lon_lat[:, :, index], mask=crop_mask) - raster_masked = np.ma.compressed(raster_masked).flatten() - raster_df = pd.DataFrame({label: raster_masked}) - tile_df = pd.concat([tile_df, raster_df], axis=1) - - tile_df_name = f"dataset_{tile}.csv" - tile_df_path = Path(settings.TMP_DIR, tile_df_name) - tile_df.to_csv(str(tile_df_path), index=False) - - minio_client.fput_object( - bucket_name=settings.MINIO_BUCKET_DATASETS, - object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/tiles_datasets/{tile_df_name}", - file_path=tile_df_path, - content_type="image/tif", - ) - - print("Dataframe information:") - print(tile_df.info()) - - model_name = "model.joblib" - - if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: - - lc_model_folder = "land-cover" - model_path = join(settings.TMP_DIR, lc_model_folder, model_name) - - minio_client.fget_object( - bucket_name=settings.MINIO_BUCKET_MODELS, - object_name=join(settings.MINIO_DATA_FOLDER_NAME, lc_model_folder, model_name), - file_path=model_path, - ) - - nodata_rows = (~np.isfinite(tile_df)).any(axis=1) - - # Low memory column reindex without copy taken from https://stackoverflow.com/questions/25878198/change-pandas-dataframe-column-order-in-place - for column in used_columns: - tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) - - clf = joblib.load(model_path) - - print(f"Predicting land cover for tile {tile}") - predictions = clf.predict(tile_df) - - predictions[nodata_rows] = "nodata" - predictions = np.reshape( - predictions, (1, output_kwargs["height"], output_kwargs["width"]) - ) - encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) - - with open(settings.LC_LABELS_FILE, "r") as f: - lc_mapping = json.load(f) - - if 0 in lc_mapping.values(): - print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") - lc_mapping["nodata"] = 0 - - for class_, value in lc_mapping.items(): - encoded_predictions = np.where( - predictions == class_, value, encoded_predictions - ) - - with rasterio.open( - classification_path, "r+", **output_kwargs - ) as classification_file: - classification_file.write(encoded_predictions) - print(f"{classification_name} saved") - - minio_client.fput_object( - bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, - object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/{classification_name}", - file_path=classification_path, - content_type="image/tif", - ) - - elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: - - model_folders = minio_client.list_objects(settings.MINIO_BUCKET_MODELS, prefix=join(settings.MINIO_DATA_FOLDER_NAME, ''), recursive=False) - sl_model_folder = [] - for model_folder in model_folders: - if model_folder.object_name.endswith('/') and "land-cover" not in model_folder.object_name: - sl_model_folder.append(model_folder.object_name.split('/')[-2]) - - local_sl_model_locations = {} - for minio_model_folder in sl_model_folder: - - local_sl_model_path = join(settings.TMP_DIR, minio_model_folder, model_name) - - minio_client.fget_object( - bucket_name=settings.MINIO_BUCKET_MODELS, - object_name=join(settings.MINIO_DATA_FOLDER_NAME, minio_model_folder, model_name), - file_path=local_sl_model_path, - ) - local_sl_model_locations[minio_model_folder] = local_sl_model_path - - - nodata_rows = (~np.isfinite(tile_df)).any(axis=1) - - for column in used_columns: - tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) - - classifiers = {} - - with open(settings.LC_LABELS_FILE, "r") as f: - lc_mapping = json.load(f) - - for sl_model in local_sl_model_locations.keys(): - classifiers[lc_mapping[sl_model]] = joblib.load(local_sl_model_locations[sl_model]) - - lc_raster = _get_lc_classification(tile) - lc_raster_flattened = lc_raster.flatten() - - sl_predictions = np.empty(len(tile_df), dtype=object) - - for lc_class_number , class_model in classifiers.items(): - print(f"Second level prediction of pixels with LC class: {lc_class_number}") - mask_lc_class = (lc_raster_flattened == lc_class_number) - - rows_to_predict = tile_df[mask_lc_class] - - sl_predictions[mask_lc_class] = class_model.predict(rows_to_predict) - - sl_predictions[sl_predictions == None] = "noclassified" - sl_predictions[nodata_rows] = "nodata" - - sl_predictions = np.reshape( - sl_predictions, (1, output_kwargs["height"], output_kwargs["width"]) - ) - encoded_sl_predictions = np.zeros_like(sl_predictions, dtype=np.uint8) - - with open(settings.SL_LABELS_FILE, "r") as f: - sl_mapping = json.load(f) - - if 0 in sl_mapping.values(): - print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") - sl_mapping["nodata"] = 0 - if 1 in sl_mapping.values(): - print("Warning: 1 is already a value in the mapping, which is reserved for NOCLASSIFIED. It will be overwritten.") - sl_mapping["noclassified"] = 1 - - for class_, value in sl_mapping.items(): - encoded_sl_predictions = np.where( - sl_predictions == class_, value, encoded_sl_predictions - ) - - with rasterio.open( - classification_path, "r+", **output_kwargs - ) as classification_file: - classification_file.write(encoded_sl_predictions) - print(f"{classification_name} saved") - - minio_client.fput_object( - bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, - object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/{classification_name}", - file_path=classification_path, - content_type="image/tif", - ) - - for path in Path(settings.TMP_DIR).glob("**/*"): - if path.is_file(): - path.unlink() - elif path.is_dir(): - rmtree(path) + ) \ No newline at end of file diff --git a/src/landcoverpy/workflow_predict.py b/src/landcoverpy/workflow_predict.py new file mode 100644 index 0000000..23330ea --- /dev/null +++ b/src/landcoverpy/workflow_predict.py @@ -0,0 +1,393 @@ +import json +from os.path import join +from pathlib import Path +from shutil import rmtree + +import joblib +import numpy as np +import pandas as pd +import rasterio +from rasterio.windows import Window + +from landcoverpy.aster import get_dem_from_tile +from landcoverpy.composite import _create_composite, _get_composite +from landcoverpy.config import settings +from landcoverpy.exceptions import WorkflowExecutionException, NoSentinelException +from landcoverpy.execution_mode import ExecutionMode +from landcoverpy.minio import MinioConnection +from landcoverpy.mongo import MongoConnection +from landcoverpy.utilities.raster import ( + _download_sample_band_by_tile, + _filter_rasters_paths_by_features_used, + _get_kwargs_raster, + _get_product_rasters_paths, + _get_raster_filename_from_path, + _get_raster_name_from_path, + _read_raster, + _get_block_windows_by_tile, + _generate_windows_from_slices_number, +) +from landcoverpy.utilities.utils import ( + _get_lc_classification, + get_products_by_tile_and_date, + get_season_dict, +) + +def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_windows=False, window_slices=None): + + if execution_mode == ExecutionMode.TRAINING: + raise WorkflowExecutionException("This function is only for prediction") + + if not Path(settings.TMP_DIR).exists(): + Path.mkdir(Path(settings.TMP_DIR)) + + seasons = get_season_dict() + + minio_client = MinioConnection() + mongo_client = MongoConnection() + mongo_products_collection = mongo_client.get_collection_object() + + band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) + kwargs_s2 = _get_kwargs_raster(band_path) + + if use_block_windows: + windows = _get_block_windows_by_tile(tile, minio_client, mongo_products_collection) + elif window_slices is not None: + windows = _generate_windows_from_slices_number(tile, window_slices, minio_client, mongo_products_collection) + else: + windows = [Window(0, 0, kwargs_s2.width, kwargs_s2.height)] + + # Initialize empty output raster + output_kwargs = kwargs_s2.copy() + output_kwargs["nodata"] = 0 + output_kwargs["driver"] = "GTiff" + output_kwargs["dtype"] = np.uint8 + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: + classification_name = f"classification_{tile}.tif" + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: + classification_name = f"sl_classification_{tile}.tif" + classification_path = str(Path(settings.TMP_DIR, classification_name)) + with rasterio.open( + classification_path, "w", **output_kwargs + ) as classification_file: + print(f"Initilized output raster at {classification_path}") + + + # Names of the indexes that are taken into account + indexes_used = [ + "cri1", + "ri", + "evi2", + "mndwi", + "moisture", + "ndyi", + "ndre", + "ndvi", + "osavi", + ] + # Name of the sentinel bands that are ignored + skip_bands = ["tci", "scl"] + # Ranges for normalization of each raster + normalize_range = {"slope": (0, 70), "aspect": (0, 360), "dem": (0, 2000)} + + print(f"Working in tile {tile}") + # Mongo query for obtaining valid products + max_cloud_percentage = settings.MAX_CLOUD + + product_per_season = {} + + for season in seasons: + season_start, season_end = seasons[season] + product_metadata_cursor = get_products_by_tile_and_date( + tile, mongo_products_collection, season_start, season_end, max_cloud_percentage + ) + + # If there are more products than the maximum specified for creating a composite, take the last ones + product_per_season[season] = list(product_metadata_cursor)[-settings.MAX_PRODUCTS_COMPOSITE:] + + if len(product_per_season[season]) == 0: + raise NoSentinelException(f"There is no valid Sentinel products for tile {tile} in season {season}. Skipping it...") + + # Dataframe for storing data of a tile + tile_df = None + + dems_raster_names = [ + "slope", + "aspect", + "dem", + ] + + + # Get crop mask for sentinel rasters and dataset labeled with database points in tile + band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) + s2_band_kwargs = _get_kwargs_raster(band_path) + + if execution_mode==ExecutionMode.LAND_COVER_PREDICTION: + crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) + + elif execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: + crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) + + for season, products_metadata in product_per_season.items(): + print(season) + bucket_products = settings.MINIO_BUCKET_NAME_PRODUCTS + bucket_composites = settings.MINIO_BUCKET_NAME_COMPOSITES + current_bucket = None + + if len(products_metadata) == 0: + raise NoSentinelException(f"There is no valid Sentinel products for tile {tile}. Skipping it...") + + elif len(products_metadata) == 1: + product_metadata = products_metadata[0] + current_bucket = bucket_products + else: + # If there are multiple products for one season, use a composite. + mongo_client.set_collection(settings.MONGO_COMPOSITES_COLLECTION) + mongo_composites_collection = mongo_client.get_collection_object() + products_metadata_list = list(products_metadata) + product_metadata = _get_composite( + products_metadata_list, mongo_composites_collection, execution_mode + ) + if product_metadata is None: + _create_composite( + products_metadata_list, + minio_client, + bucket_products, + bucket_composites, + mongo_composites_collection, + execution_mode + ) + product_metadata = _get_composite( + products_metadata_list, mongo_composites_collection, execution_mode + ) + current_bucket = bucket_composites + + product_name = product_metadata["title"] + + (rasters_paths, is_band) = _get_product_rasters_paths( + product_metadata, minio_client, current_bucket + ) + + (rasters_paths, is_band) = _filter_rasters_paths_by_features_used( + rasters_paths, is_band, used_columns, season + ) + temp_product_folder = Path(settings.TMP_DIR, product_name + ".SAFE") + if not temp_product_folder.exists(): + Path.mkdir(temp_product_folder) + print(f"Processing product {product_name}") + + # Read bands and indexes. + already_read = [] + for i, raster_path in enumerate(rasters_paths): + raster_filename = _get_raster_filename_from_path(raster_path) + raster_name = _get_raster_name_from_path(raster_path) + temp_path = Path(temp_product_folder, raster_filename) + + # Only keep bands and indexes in indexes_used + if (not is_band[i]) and ( + not any( + raster_name.upper() == index_used.upper() + for index_used in indexes_used + ) + ): + continue + # Skip bands in skip_bands + if is_band[i] and any( + raster_name.upper() == band_skipped.upper() + for band_skipped in skip_bands + ): + continue + # Read only the first band to avoid duplication of different spatial resolution + if any( + raster_name.upper() == read_raster.upper() + for read_raster in already_read + ): + continue + already_read.append(raster_name) + + print(f"Downloading raster {raster_name} from minio into {temp_path}") + minio_client.fget_object( + bucket_name=current_bucket, + object_name=raster_path, + file_path=str(temp_path), + ) + + band_normalize_range = normalize_range.get(raster_name, None) + if is_band[i] and (band_normalize_range is None): + band_normalize_range = (0, 7000) + + raster = _read_raster( + band_path=temp_path, + rescale=True, + normalize_range=band_normalize_range, + ) + raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked) + + raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) + + tile_df = pd.concat([tile_df, raster_df], axis=1) + + for dem_name in dems_raster_names: + # Add dem and aspect data + if dem_name in used_columns: + + dem_path = get_dem_from_tile( + execution_mode, tile, mongo_products_collection, minio_client, dem_name + ) + + # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters + dem_kwargs = _get_kwargs_raster(dem_path) + + crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) + + band_normalize_range = normalize_range.get(dem_name, None) + raster = _read_raster( + band_path=dem_path, + rescale=True, + normalize_range=band_normalize_range + ) + raster_masked = np.ma.masked_array(raster, mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked).flatten() + raster_df = pd.DataFrame({dem_name: raster_masked}) + tile_df = pd.concat([tile_df, raster_df], axis=1) + + print("Dataframe information:") + print(tile_df.info()) + + model_name = "model.joblib" + + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: + + lc_model_folder = "land-cover" + model_path = join(settings.TMP_DIR, lc_model_folder, model_name) + + minio_client.fget_object( + bucket_name=settings.MINIO_BUCKET_MODELS, + object_name=join(settings.MINIO_DATA_FOLDER_NAME, lc_model_folder, model_name), + file_path=model_path, + ) + + nodata_rows = (~np.isfinite(tile_df)).any(axis=1) + + # Low memory column reindex without copy taken from https://stackoverflow.com/questions/25878198/change-pandas-dataframe-column-order-in-place + for column in used_columns: + tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) + + clf = joblib.load(model_path) + + print(f"Predicting land cover for tile {tile}") + predictions = clf.predict(tile_df) + + predictions[nodata_rows] = "nodata" + predictions = np.reshape( + predictions, (1, output_kwargs["height"], output_kwargs["width"]) + ) + encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) + + with open(settings.LC_LABELS_FILE, "r") as f: + lc_mapping = json.load(f) + + if 0 in lc_mapping.values(): + print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") + lc_mapping["nodata"] = 0 + + for class_, value in lc_mapping.items(): + encoded_predictions = np.where( + predictions == class_, value, encoded_predictions + ) + + with rasterio.open( + classification_path, "r+", **output_kwargs + ) as classification_file: + classification_file.write(encoded_predictions) + print(f"{classification_name} saved") + + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: + + model_folders = minio_client.list_objects(settings.MINIO_BUCKET_MODELS, prefix=join(settings.MINIO_DATA_FOLDER_NAME, ''), recursive=False) + sl_model_folder = [] + for model_folder in model_folders: + if model_folder.object_name.endswith('/') and "land-cover" not in model_folder.object_name: + sl_model_folder.append(model_folder.object_name.split('/')[-2]) + + local_sl_model_locations = {} + for minio_model_folder in sl_model_folder: + + local_sl_model_path = join(settings.TMP_DIR, minio_model_folder, model_name) + + minio_client.fget_object( + bucket_name=settings.MINIO_BUCKET_MODELS, + object_name=join(settings.MINIO_DATA_FOLDER_NAME, minio_model_folder, model_name), + file_path=local_sl_model_path, + ) + local_sl_model_locations[minio_model_folder] = local_sl_model_path + + + nodata_rows = (~np.isfinite(tile_df)).any(axis=1) + + for column in used_columns: + tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) + + classifiers = {} + + with open(settings.LC_LABELS_FILE, "r") as f: + lc_mapping = json.load(f) + + for sl_model in local_sl_model_locations.keys(): + classifiers[lc_mapping[sl_model]] = joblib.load(local_sl_model_locations[sl_model]) + + lc_raster = _get_lc_classification(tile) + lc_raster_flattened = lc_raster.flatten() + + sl_predictions = np.empty(len(tile_df), dtype=object) + + for lc_class_number , class_model in classifiers.items(): + print(f"Second level prediction of pixels with LC class: {lc_class_number}") + mask_lc_class = (lc_raster_flattened == lc_class_number) + + rows_to_predict = tile_df[mask_lc_class] + + sl_predictions[mask_lc_class] = class_model.predict(rows_to_predict) + + sl_predictions[sl_predictions == None] = "noclassified" + sl_predictions[nodata_rows] = "nodata" + + sl_predictions = np.reshape( + sl_predictions, (1, output_kwargs["height"], output_kwargs["width"]) + ) + encoded_sl_predictions = np.zeros_like(sl_predictions, dtype=np.uint8) + + with open(settings.SL_LABELS_FILE, "r") as f: + sl_mapping = json.load(f) + + if 0 in sl_mapping.values(): + print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") + sl_mapping["nodata"] = 0 + if 1 in sl_mapping.values(): + print("Warning: 1 is already a value in the mapping, which is reserved for NOCLASSIFIED. It will be overwritten.") + sl_mapping["noclassified"] = 1 + + for class_, value in sl_mapping.items(): + encoded_sl_predictions = np.where( + sl_predictions == class_, value, encoded_sl_predictions + ) + + with rasterio.open( + classification_path, "r+", **output_kwargs + ) as classification_file: + classification_file.write(encoded_sl_predictions) + print(f"{classification_name} saved") + + minio_client.fput_object( + bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, + object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/{classification_name}", + file_path=classification_path, + content_type="image/tif", + ) + + for path in Path(settings.TMP_DIR).glob("**/*"): + if path.is_file(): + path.unlink() + elif path.is_dir(): + rmtree(path) diff --git a/src/landcoverpy/workflow_train.py b/src/landcoverpy/workflow_train.py new file mode 100644 index 0000000..fbb7cf2 --- /dev/null +++ b/src/landcoverpy/workflow_train.py @@ -0,0 +1,242 @@ +from pathlib import Path +from shutil import rmtree + +import numpy as np +import pandas as pd + +from landcoverpy.aster import get_dem_from_tile +from landcoverpy.composite import _create_composite, _get_composite +from landcoverpy.config import settings +from landcoverpy.exceptions import NoSentinelException +from landcoverpy.execution_mode import ExecutionMode +from landcoverpy.minio import MinioConnection +from landcoverpy.mongo import MongoConnection +from landcoverpy.utilities.raster import ( + _download_sample_band_by_tile, + _get_kwargs_raster, + _get_product_rasters_paths, + _get_raster_filename_from_path, + _get_raster_name_from_path, + _read_raster, +) +from landcoverpy.utilities.utils import ( + _mask_polygons_by_tile, + get_products_by_tile_and_date, + get_season_dict, +) + +def _process_tile_train(tile, polygons_in_tile): + + execution_mode = ExecutionMode.TRAINING + + if not Path(settings.TMP_DIR).exists(): + Path.mkdir(Path(settings.TMP_DIR)) + + seasons = get_season_dict() + + minio_client = MinioConnection() + mongo_client = MongoConnection() + mongo_products_collection = mongo_client.get_collection_object() + + # Names of the indexes that are taken into account + indexes_used = [ + "cri1", + "ri", + "evi2", + "mndwi", + "moisture", + "ndyi", + "ndre", + "ndvi", + "osavi", + ] + # Name of the sentinel bands that are ignored + skip_bands = ["tci", "scl"] + # Ranges for normalization of each raster + normalize_range = {"slope": (0, 70), "aspect": (0, 360), "dem": (0, 2000)} + + print(f"Working in tile {tile}") + # Mongo query for obtaining valid products + max_cloud_percentage = settings.MAX_CLOUD + + product_per_season = {} + + for season in seasons: + season_start, season_end = seasons[season] + product_metadata_cursor = get_products_by_tile_and_date( + tile, mongo_products_collection, season_start, season_end, max_cloud_percentage + ) + + # If there are more products than the maximum specified for creating a composite, take the last ones + product_per_season[season] = list(product_metadata_cursor)[-settings.MAX_PRODUCTS_COMPOSITE:] + + if len(product_per_season[season]) == 0: + raise NoSentinelException(f"There is no valid Sentinel products for tile {tile} in season {season}. Skipping it...") + + # Dataframe for storing data of a tile + tile_df = None + + dems_raster_names = [ + "slope", + "aspect", + "dem", + ] + + for dem_name in dems_raster_names: + # Add dem and aspect data + dem_path = get_dem_from_tile( + execution_mode, tile, mongo_products_collection, minio_client, dem_name + ) + + # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters + dem_kwargs = _get_kwargs_raster(dem_path) + + crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, dem_kwargs) + + band_normalize_range = normalize_range.get(dem_name, None) + raster = _read_raster( + band_path=dem_path, + rescale=True, + normalize_range=band_normalize_range + ) + raster_masked = np.ma.masked_array(raster, mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked).flatten() + raster_df = pd.DataFrame({dem_name: raster_masked}) + tile_df = pd.concat([tile_df, raster_df], axis=1) + + # Get crop mask for sentinel rasters and dataset labeled with database points in tile + band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) + s2_band_kwargs = _get_kwargs_raster(band_path) + + crop_mask, label_lon_lat = _mask_polygons_by_tile(polygons_in_tile, s2_band_kwargs) + + for season, products_metadata in product_per_season.items(): + print(season) + bucket_products = settings.MINIO_BUCKET_NAME_PRODUCTS + bucket_composites = settings.MINIO_BUCKET_NAME_COMPOSITES + current_bucket = None + + if len(products_metadata) == 0: + raise NoSentinelException(f"There is no valid Sentinel products for tile {tile}. Skipping it...") + + elif len(products_metadata) == 1: + product_metadata = products_metadata[0] + current_bucket = bucket_products + else: + # If there are multiple products for one season, use a composite. + mongo_client.set_collection(settings.MONGO_COMPOSITES_COLLECTION) + mongo_composites_collection = mongo_client.get_collection_object() + products_metadata_list = list(products_metadata) + product_metadata = _get_composite( + products_metadata_list, mongo_composites_collection, execution_mode + ) + if product_metadata is None: + _create_composite( + products_metadata_list, + minio_client, + bucket_products, + bucket_composites, + mongo_composites_collection, + execution_mode + ) + product_metadata = _get_composite( + products_metadata_list, mongo_composites_collection, execution_mode + ) + current_bucket = bucket_composites + + product_name = product_metadata["title"] + + # For validate dataset geometries, the product name is added. + raster_product_name = np.full_like( + raster_masked, product_name, dtype=object + ) + raster_df = pd.DataFrame({f"{season}_product_name": raster_product_name}) + tile_df = pd.concat([tile_df, raster_df], axis=1) + + (rasters_paths, is_band) = _get_product_rasters_paths( + product_metadata, minio_client, current_bucket + ) + + temp_product_folder = Path(settings.TMP_DIR, product_name + ".SAFE") + if not temp_product_folder.exists(): + Path.mkdir(temp_product_folder) + print(f"Processing product {product_name}") + + # Read bands and indexes. + already_read = [] + for i, raster_path in enumerate(rasters_paths): + raster_filename = _get_raster_filename_from_path(raster_path) + raster_name = _get_raster_name_from_path(raster_path) + temp_path = Path(temp_product_folder, raster_filename) + + # Only keep bands and indexes in indexes_used + if (not is_band[i]) and ( + not any( + raster_name.upper() == index_used.upper() + for index_used in indexes_used + ) + ): + continue + # Skip bands in skip_bands + if is_band[i] and any( + raster_name.upper() == band_skipped.upper() + for band_skipped in skip_bands + ): + continue + # Read only the first band to avoid duplication of different spatial resolution + if any( + raster_name.upper() == read_raster.upper() + for read_raster in already_read + ): + continue + already_read.append(raster_name) + + print(f"Downloading raster {raster_name} from minio into {temp_path}") + minio_client.fget_object( + bucket_name=current_bucket, + object_name=raster_path, + file_path=str(temp_path), + ) + + band_normalize_range = normalize_range.get(raster_name, None) + if is_band[i] and (band_normalize_range is None): + band_normalize_range = (0, 7000) + + raster = _read_raster( + band_path=temp_path, + rescale=True, + normalize_range=band_normalize_range, + ) + raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked) + + raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) + + tile_df = pd.concat([tile_df, raster_df], axis=1) + + for index, label in enumerate([settings.LC_PROPERTY, "longitude", "latitude", settings.SL_PROPERTY]): + + raster_masked = np.ma.masked_array(label_lon_lat[:, :, index], mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked).flatten() + raster_df = pd.DataFrame({label: raster_masked}) + tile_df = pd.concat([tile_df, raster_df], axis=1) + + tile_df_name = f"dataset_{tile}.csv" + tile_df_path = Path(settings.TMP_DIR, tile_df_name) + tile_df.to_csv(str(tile_df_path), index=False) + + minio_client.fput_object( + bucket_name=settings.MINIO_BUCKET_DATASETS, + object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/tiles_datasets/{tile_df_name}", + file_path=tile_df_path, + content_type="image/tif", + ) + + print("Dataframe information:") + print(tile_df.info()) + + for path in Path(settings.TMP_DIR).glob("**/*"): + if path.is_file(): + path.unlink() + elif path.is_dir(): + rmtree(path) From 18a69f6b38abd748aecf409e0e8c92d1282cc8d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Tue, 9 Jul 2024 13:42:51 +0000 Subject: [PATCH 5/8] Predict workflow reorganized to allow windowing --- src/landcoverpy/workflow_predict.py | 384 ++++++++++++++-------------- 1 file changed, 196 insertions(+), 188 deletions(-) diff --git a/src/landcoverpy/workflow_predict.py b/src/landcoverpy/workflow_predict.py index 23330ea..c53e46d 100644 --- a/src/landcoverpy/workflow_predict.py +++ b/src/landcoverpy/workflow_predict.py @@ -1,4 +1,5 @@ import json +from collections import defaultdict from os.path import join from pathlib import Path from shutil import rmtree @@ -47,6 +48,65 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win mongo_client = MongoConnection() mongo_products_collection = mongo_client.get_collection_object() + model_name = "model.joblib" + lc_model_folder = "land-cover" + + with open(settings.LC_LABELS_FILE, "r") as f: + lc_mapping = json.load(f) + + if 0 in lc_mapping.values(): + print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") + lc_mapping["nodata"] = 0 + + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: + model_path = join(settings.TMP_DIR, lc_model_folder, model_name) + + print("Downloading land cover model") + minio_client.fget_object( + bucket_name=settings.MINIO_BUCKET_MODELS, + object_name=join(settings.MINIO_DATA_FOLDER_NAME, lc_model_folder, model_name), + file_path=model_path, + ) + + lc_clf = joblib.load(model_path) + + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: + + model_folders = minio_client.list_objects(settings.MINIO_BUCKET_MODELS, prefix=join(settings.MINIO_DATA_FOLDER_NAME, ''), recursive=False) + sl_model_folder = [] + for model_folder in model_folders: + if model_folder.object_name.endswith('/') and lc_model_folder not in model_folder.object_name: + sl_model_folder.append(model_folder.object_name.split('/')[-2]) + + local_sl_model_locations = {} + for minio_model_folder in sl_model_folder: + + local_sl_model_path = join(settings.TMP_DIR, minio_model_folder, model_name) + + print(f"Downloading {model_name} model") + minio_client.fget_object( + bucket_name=settings.MINIO_BUCKET_MODELS, + object_name=join(settings.MINIO_DATA_FOLDER_NAME, minio_model_folder, model_name), + file_path=local_sl_model_path, + ) + local_sl_model_locations[minio_model_folder] = local_sl_model_path + + with open(settings.SL_LABELS_FILE, "r") as f: + sl_mapping = json.load(f) + + if 0 in sl_mapping.values(): + print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") + sl_mapping["nodata"] = 0 + if 1 in sl_mapping.values(): + print("Warning: 1 is already a value in the mapping, which is reserved for NOCLASSIFIED. It will be overwritten.") + sl_mapping["noclassified"] = 1 + + sl_classifiers = {} + + for sl_model in local_sl_model_locations.keys(): + sl_classifiers[lc_mapping[sl_model]] = joblib.load(local_sl_model_locations[sl_model]) + + band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) kwargs_s2 = _get_kwargs_raster(band_path) @@ -108,25 +168,8 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win if len(product_per_season[season]) == 0: raise NoSentinelException(f"There is no valid Sentinel products for tile {tile} in season {season}. Skipping it...") - # Dataframe for storing data of a tile - tile_df = None - dems_raster_names = [ - "slope", - "aspect", - "dem", - ] - - - # Get crop mask for sentinel rasters and dataset labeled with database points in tile - band_path = _download_sample_band_by_tile(tile, minio_client, mongo_products_collection) - s2_band_kwargs = _get_kwargs_raster(band_path) - - if execution_mode==ExecutionMode.LAND_COVER_PREDICTION: - crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) - - elif execution_mode==ExecutionMode.SECOND_LEVEL_PREDICTION: - crop_mask = np.zeros(shape=(int(s2_band_kwargs["height"]), int(s2_band_kwargs["width"])), dtype=np.uint8) + rasters_by_season = defaultdict(dict) for season, products_metadata in product_per_season.items(): print(season) @@ -176,208 +219,173 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win Path.mkdir(temp_product_folder) print(f"Processing product {product_name}") - # Read bands and indexes. - already_read = [] - for i, raster_path in enumerate(rasters_paths): - raster_filename = _get_raster_filename_from_path(raster_path) - raster_name = _get_raster_name_from_path(raster_path) - temp_path = Path(temp_product_folder, raster_filename) - - # Only keep bands and indexes in indexes_used - if (not is_band[i]) and ( - not any( - raster_name.upper() == index_used.upper() - for index_used in indexes_used - ) - ): - continue - # Skip bands in skip_bands - if is_band[i] and any( - raster_name.upper() == band_skipped.upper() - for band_skipped in skip_bands - ): - continue - # Read only the first band to avoid duplication of different spatial resolution - if any( - raster_name.upper() == read_raster.upper() - for read_raster in already_read - ): - continue - already_read.append(raster_name) - - print(f"Downloading raster {raster_name} from minio into {temp_path}") - minio_client.fget_object( - bucket_name=current_bucket, - object_name=raster_path, - file_path=str(temp_path), - ) - - band_normalize_range = normalize_range.get(raster_name, None) - if is_band[i] and (band_normalize_range is None): - band_normalize_range = (0, 7000) + rasters_by_season[season]["raster_paths"] = rasters_paths + rasters_by_season[season]["is_band"] = is_band + rasters_by_season[season]["temp_product_folder"] = temp_product_folder - raster = _read_raster( - band_path=temp_path, - rescale=True, - normalize_range=band_normalize_range, - ) - raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) - raster_masked = np.ma.compressed(raster_masked) - raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) - - tile_df = pd.concat([tile_df, raster_df], axis=1) - - for dem_name in dems_raster_names: - # Add dem and aspect data - if dem_name in used_columns: - - dem_path = get_dem_from_tile( - execution_mode, tile, mongo_products_collection, minio_client, dem_name - ) + dems_raster_names = [ + "slope", + "aspect", + "dem", + ] - # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters - dem_kwargs = _get_kwargs_raster(dem_path) + for window in windows: + + window_kwargs = kwargs_s2.copy() + window_kwargs["width"] = window.width + window_kwargs["height"] = window.height + window_kwargs["transform"] = rasterio.windows.transform(window, kwargs_s2["transform"]) + + # Dataframe for storing data of a window + window_tile_df = None + + crop_mask = np.zeros(shape=(int(window_kwargs["height"]), int(window_kwargs["width"])), dtype=np.uint8) + + for season in seasons: + # Read bands and indexes. + already_read = [] + for i, raster_path in enumerate(rasters_by_season[season]["raster_paths"]): + raster_filename = _get_raster_filename_from_path(raster_path) + raster_name = _get_raster_name_from_path(raster_path) + temp_path = Path(rasters_by_season[season]["temp_product_folder"], raster_filename) + + # Only keep bands and indexes in indexes_used + if (not rasters_by_season[season]["is_band"][i]) and ( + not any( + raster_name.upper() == index_used.upper() + for index_used in indexes_used + ) + ): + continue + # Skip bands in skip_bands + if rasters_by_season[season]["is_band"][i] and any( + raster_name.upper() == band_skipped.upper() + for band_skipped in skip_bands + ): + continue + # Read only the first band to avoid duplication of different spatial resolution + if any( + raster_name.upper() == read_raster.upper() + for read_raster in already_read + ): + continue + already_read.append(raster_name) + + print(f"Downloading raster {raster_name} from minio into {temp_path}") + minio_client.fget_object( + bucket_name=current_bucket, + object_name=raster_path, + file_path=str(temp_path), + ) - crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) + band_normalize_range = normalize_range.get(raster_name, None) + if rasters_by_season[season]["is_band"][i] and (band_normalize_range is None): + band_normalize_range = (0, 7000) - band_normalize_range = normalize_range.get(dem_name, None) - raster = _read_raster( - band_path=dem_path, - rescale=True, - normalize_range=band_normalize_range - ) - raster_masked = np.ma.masked_array(raster, mask=crop_mask) - raster_masked = np.ma.compressed(raster_masked).flatten() - raster_df = pd.DataFrame({dem_name: raster_masked}) - tile_df = pd.concat([tile_df, raster_df], axis=1) + raster = _read_raster( + band_path=temp_path, + rescale=True, + normalize_range=band_normalize_range, + ) + raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked) - print("Dataframe information:") - print(tile_df.info()) + window_raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) - model_name = "model.joblib" + window_tile_df = pd.concat([window_tile_df, window_raster_df], axis=1) - if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: + for dem_name in dems_raster_names: + # Add dem and aspect data + if dem_name in used_columns: - lc_model_folder = "land-cover" - model_path = join(settings.TMP_DIR, lc_model_folder, model_name) + dem_path = get_dem_from_tile( + execution_mode, tile, mongo_products_collection, minio_client, dem_name + ) - minio_client.fget_object( - bucket_name=settings.MINIO_BUCKET_MODELS, - object_name=join(settings.MINIO_DATA_FOLDER_NAME, lc_model_folder, model_name), - file_path=model_path, - ) + # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters + dem_kwargs = _get_kwargs_raster(dem_path) - nodata_rows = (~np.isfinite(tile_df)).any(axis=1) + crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) - # Low memory column reindex without copy taken from https://stackoverflow.com/questions/25878198/change-pandas-dataframe-column-order-in-place - for column in used_columns: - tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) + band_normalize_range = normalize_range.get(dem_name, None) + raster = _read_raster( + band_path=dem_path, + rescale=True, + normalize_range=band_normalize_range + ) + raster_masked = np.ma.masked_array(raster, mask=crop_mask) + raster_masked = np.ma.compressed(raster_masked).flatten() + window_raster_df = pd.DataFrame({dem_name: raster_masked}) + window_tile_df = pd.concat([window_tile_df, window_raster_df], axis=1) - clf = joblib.load(model_path) - print(f"Predicting land cover for tile {tile}") - predictions = clf.predict(tile_df) - predictions[nodata_rows] = "nodata" - predictions = np.reshape( - predictions, (1, output_kwargs["height"], output_kwargs["width"]) - ) - encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) - - with open(settings.LC_LABELS_FILE, "r") as f: - lc_mapping = json.load(f) - - if 0 in lc_mapping.values(): - print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") - lc_mapping["nodata"] = 0 + print("Dataframe information:") + print(window_tile_df.info()) - for class_, value in lc_mapping.items(): - encoded_predictions = np.where( - predictions == class_, value, encoded_predictions - ) + if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: - with rasterio.open( - classification_path, "r+", **output_kwargs - ) as classification_file: - classification_file.write(encoded_predictions) - print(f"{classification_name} saved") + nodata_rows = (~np.isfinite(window_tile_df)).any(axis=1) - elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: + # Low memory column reindex without copy taken from https://stackoverflow.com/questions/25878198/change-pandas-dataframe-column-order-in-place + for column in used_columns: + window_tile_df[column] = window_tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) - model_folders = minio_client.list_objects(settings.MINIO_BUCKET_MODELS, prefix=join(settings.MINIO_DATA_FOLDER_NAME, ''), recursive=False) - sl_model_folder = [] - for model_folder in model_folders: - if model_folder.object_name.endswith('/') and "land-cover" not in model_folder.object_name: - sl_model_folder.append(model_folder.object_name.split('/')[-2]) + predictions = lc_clf.predict(window_tile_df) - local_sl_model_locations = {} - for minio_model_folder in sl_model_folder: - - local_sl_model_path = join(settings.TMP_DIR, minio_model_folder, model_name) - - minio_client.fget_object( - bucket_name=settings.MINIO_BUCKET_MODELS, - object_name=join(settings.MINIO_DATA_FOLDER_NAME, minio_model_folder, model_name), - file_path=local_sl_model_path, + predictions[nodata_rows] = "nodata" + predictions = np.reshape( + predictions, (1, output_kwargs["height"], output_kwargs["width"]) ) - local_sl_model_locations[minio_model_folder] = local_sl_model_path - - - nodata_rows = (~np.isfinite(tile_df)).any(axis=1) + encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) - for column in used_columns: - tile_df[column] = tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) + for class_, value in lc_mapping.items(): + encoded_predictions = np.where( + predictions == class_, value, encoded_predictions + ) - classifiers = {} + with rasterio.open( + classification_path, "r+", **output_kwargs + ) as classification_file: + classification_file.write(encoded_predictions) - with open(settings.LC_LABELS_FILE, "r") as f: - lc_mapping = json.load(f) + elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: - for sl_model in local_sl_model_locations.keys(): - classifiers[lc_mapping[sl_model]] = joblib.load(local_sl_model_locations[sl_model]) + nodata_rows = (~np.isfinite(window_tile_df)).any(axis=1) - lc_raster = _get_lc_classification(tile) - lc_raster_flattened = lc_raster.flatten() - - sl_predictions = np.empty(len(tile_df), dtype=object) + for column in used_columns: + window_tile_df[column] = window_tile_df.pop(column).replace([np.inf, -np.inf, -np.nan], 0) - for lc_class_number , class_model in classifiers.items(): - print(f"Second level prediction of pixels with LC class: {lc_class_number}") - mask_lc_class = (lc_raster_flattened == lc_class_number) - - rows_to_predict = tile_df[mask_lc_class] + lc_raster = _get_lc_classification(tile) + lc_raster_flattened = lc_raster.flatten() - sl_predictions[mask_lc_class] = class_model.predict(rows_to_predict) + sl_predictions = np.empty(len(window_tile_df), dtype=object) - sl_predictions[sl_predictions == None] = "noclassified" - sl_predictions[nodata_rows] = "nodata" - - sl_predictions = np.reshape( - sl_predictions, (1, output_kwargs["height"], output_kwargs["width"]) - ) - encoded_sl_predictions = np.zeros_like(sl_predictions, dtype=np.uint8) - - with open(settings.SL_LABELS_FILE, "r") as f: - sl_mapping = json.load(f) - - if 0 in sl_mapping.values(): - print("Warning: 0 is already a value in the mapping, which is reserved for NODATA. It will be overwritten.") - sl_mapping["nodata"] = 0 - if 1 in sl_mapping.values(): - print("Warning: 1 is already a value in the mapping, which is reserved for NOCLASSIFIED. It will be overwritten.") - sl_mapping["noclassified"] = 1 + for lc_class_number , class_model in sl_classifiers.items(): + mask_lc_class = (lc_raster_flattened == lc_class_number) + + rows_to_predict = window_tile_df[mask_lc_class] + + sl_predictions[mask_lc_class] = class_model.predict(rows_to_predict) - for class_, value in sl_mapping.items(): - encoded_sl_predictions = np.where( - sl_predictions == class_, value, encoded_sl_predictions + sl_predictions[sl_predictions == None] = "noclassified" + sl_predictions[nodata_rows] = "nodata" + + sl_predictions = np.reshape( + sl_predictions, (1, output_kwargs["height"], output_kwargs["width"]) ) + encoded_sl_predictions = np.zeros_like(sl_predictions, dtype=np.uint8) + + for class_, value in sl_mapping.items(): + encoded_sl_predictions = np.where( + sl_predictions == class_, value, encoded_sl_predictions + ) - with rasterio.open( - classification_path, "r+", **output_kwargs - ) as classification_file: - classification_file.write(encoded_sl_predictions) - print(f"{classification_name} saved") + with rasterio.open( + classification_path, "r+", **output_kwargs + ) as classification_file: + classification_file.write(encoded_sl_predictions) minio_client.fput_object( bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, From 4050c907058a929f65fdb04834295fc1404694db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Wed, 10 Jul 2024 11:01:57 +0000 Subject: [PATCH 6/8] Now predict workflow is ready to use slice windows, yet not ready for using block windows --- src/landcoverpy/utilities/raster.py | 8 ++++- src/landcoverpy/workflow_predict.py | 49 +++++++++++++++++++---------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/landcoverpy/utilities/raster.py b/src/landcoverpy/utilities/raster.py index cdeae43..5d076ab 100644 --- a/src/landcoverpy/utilities/raster.py +++ b/src/landcoverpy/utilities/raster.py @@ -6,6 +6,7 @@ import numpy as np import pyproj import rasterio +import rasterio.windows from pymongo.collection import Collection from rasterio import mask as msk from rasterio.warp import Resampling, reproject @@ -42,11 +43,16 @@ def _read_raster( path_to_disk (str) : If the postprocessed (e.g. rescaled, cropped, etc.) raster wants to be saved locally, a path has to be provided normalize_range (Tuple[float, float]) : Values mapped to -1 and +1 in normalization. None if the raster doesn't need to be normalized to_tif (bool) : If the raster wants to be transformed to a GeoTiff raster (usefull when reading JP2 rasters that can only store natural numbers) + window (Window) : If the raster wants to be read in a window, a window object has to be provided Returns: band (np.ndarray) : The read raster as numpy array """ + + if window is not None and (mask_geometry is not None or path_to_disk is not None): + print("If a window is provided, the raster can't be saved to disk or cropped") + band_name = _get_raster_name_from_path(str(band_path)) print(f"Reading raster {band_name}") with rasterio.open(band_path) as band_file: @@ -59,7 +65,7 @@ def _read_raster( { "height": window.height, "width": window.width, - "transform": band_file.window_transform(window) + "transform": rasterio.windows.transform(window, kwargs["transform"]) } ) diff --git a/src/landcoverpy/workflow_predict.py b/src/landcoverpy/workflow_predict.py index c53e46d..d474877 100644 --- a/src/landcoverpy/workflow_predict.py +++ b/src/landcoverpy/workflow_predict.py @@ -8,6 +8,7 @@ import numpy as np import pandas as pd import rasterio +import rasterio.windows from rasterio.windows import Window from landcoverpy.aster import get_dem_from_tile @@ -230,6 +231,17 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win "dem", ] + + dem_paths = {} + for dem_name in dems_raster_names: + # Add dem and aspect data + if dem_name in used_columns: + dem_path = get_dem_from_tile( + execution_mode, tile, mongo_products_collection, minio_client, dem_name + ) + dem_paths[dem_name] = dem_path + + for window in windows: window_kwargs = kwargs_s2.copy() @@ -272,12 +284,14 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win continue already_read.append(raster_name) - print(f"Downloading raster {raster_name} from minio into {temp_path}") - minio_client.fget_object( - bucket_name=current_bucket, - object_name=raster_path, - file_path=str(temp_path), - ) + # download if file_path does not exist + if not temp_path.exists(): + print(f"Downloading raster {raster_name} from minio into {temp_path}") + minio_client.fget_object( + bucket_name=current_bucket, + object_name=raster_path, + file_path=str(temp_path), + ) band_normalize_range = normalize_range.get(raster_name, None) if rasters_by_season[season]["is_band"][i] and (band_normalize_range is None): @@ -287,6 +301,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win band_path=temp_path, rescale=True, normalize_range=band_normalize_range, + window=window ) raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) raster_masked = np.ma.compressed(raster_masked) @@ -299,20 +314,22 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win # Add dem and aspect data if dem_name in used_columns: - dem_path = get_dem_from_tile( - execution_mode, tile, mongo_products_collection, minio_client, dem_name - ) - # Predict doesnt work yet in some specific tiles where tile is not fully contained in aster rasters - dem_kwargs = _get_kwargs_raster(dem_path) + dem_kwargs = _get_kwargs_raster(dem_paths[dem_name]) - crop_mask = np.zeros(shape=(int(dem_kwargs["height"]), int(dem_kwargs["width"])),dtype=np.uint8) + dem_kwargs_window = dem_kwargs.copy() + dem_kwargs_window["width"] = window.width + dem_kwargs_window["height"] = window.height + dem_kwargs_window["transform"] = rasterio.windows.transform(window, dem_kwargs["transform"]) + + crop_mask = np.zeros(shape=(int(dem_kwargs_window["height"]), int(dem_kwargs_window["width"])),dtype=np.uint8) band_normalize_range = normalize_range.get(dem_name, None) raster = _read_raster( - band_path=dem_path, + band_path=dem_paths[dem_name], rescale=True, - normalize_range=band_normalize_range + normalize_range=band_normalize_range, + window=window ) raster_masked = np.ma.masked_array(raster, mask=crop_mask) raster_masked = np.ma.compressed(raster_masked).flatten() @@ -348,7 +365,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win with rasterio.open( classification_path, "r+", **output_kwargs ) as classification_file: - classification_file.write(encoded_predictions) + classification_file.write(encoded_predictions, window=window) elif execution_mode == ExecutionMode.SECOND_LEVEL_PREDICTION: @@ -385,7 +402,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win with rasterio.open( classification_path, "r+", **output_kwargs ) as classification_file: - classification_file.write(encoded_sl_predictions) + classification_file.write(encoded_sl_predictions, window=window) minio_client.fput_object( bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, From 09024e315283cdae63d49d3e1680bfb58d626ec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Mon, 15 Jul 2024 06:59:50 +0000 Subject: [PATCH 7/8] Modified read_raster function to work with windows from different spatial resolution --- notebooks/main_usage.ipynb | 15 ++++++++++++++- src/landcoverpy/utilities/raster.py | 14 ++++++++++++++ src/landcoverpy/workflow_predict.py | 6 +++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/notebooks/main_usage.ipynb b/notebooks/main_usage.ipynb index 5bb3eef..0a9f2e2 100644 --- a/notebooks/main_usage.ipynb +++ b/notebooks/main_usage.ipynb @@ -9,6 +9,16 @@ "### Generate training dataset using validated data located at `DB_DIR` environment variable" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from landcoverpy.config import settings\n", + "settings.MINIO_DATA_FOLDER_NAME" + ] + }, { "cell_type": "code", "execution_count": null, @@ -52,7 +62,10 @@ "metadata": {}, "outputs": [], "source": [ - "workflow(execution_mode=ExecutionMode.LAND_COVER_PREDICTION, tiles_to_predict=[\"36SYC\"])" + "from landcoverpy.workflow import workflow\n", + "from landcoverpy.execution_mode import ExecutionMode\n", + "slices = (5,5)\n", + "workflow(execution_mode=ExecutionMode.LAND_COVER_PREDICTION, tiles_to_predict=[\"36SYC\"], window_slices=slices)" ] }, { diff --git a/src/landcoverpy/utilities/raster.py b/src/landcoverpy/utilities/raster.py index 5d076ab..d9c2dd7 100644 --- a/src/landcoverpy/utilities/raster.py +++ b/src/landcoverpy/utilities/raster.py @@ -1,4 +1,6 @@ import json +import math + from itertools import compress from pathlib import Path from typing import Iterable, List, Tuple @@ -56,6 +58,18 @@ def _read_raster( band_name = _get_raster_name_from_path(str(band_path)) print(f"Reading raster {band_name}") with rasterio.open(band_path) as band_file: + + spatial_resolution = _get_spatial_resolution_raster(band_path) + if window is not None and spatial_resolution != 10: + # Transform the window to the actual resolution of the raster + scale_factor = spatial_resolution / 10 + window = Window( + col_off = window.col_off / scale_factor, + row_off = window.row_off / scale_factor, + width = window.width / scale_factor, + height = window.height / scale_factor + ) + # Read file kwargs = band_file.meta destination_crs = band_file.crs diff --git a/src/landcoverpy/workflow_predict.py b/src/landcoverpy/workflow_predict.py index d474877..e377ed9 100644 --- a/src/landcoverpy/workflow_predict.py +++ b/src/landcoverpy/workflow_predict.py @@ -116,7 +116,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win elif window_slices is not None: windows = _generate_windows_from_slices_number(tile, window_slices, minio_client, mongo_products_collection) else: - windows = [Window(0, 0, kwargs_s2.width, kwargs_s2.height)] + windows = [Window(0, 0, kwargs_s2['width'], kwargs_s2['height'])] # Initialize empty output raster output_kwargs = kwargs_s2.copy() @@ -243,7 +243,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win for window in windows: - + print(f"Processing window {window}") window_kwargs = kwargs_s2.copy() window_kwargs["width"] = window.width window_kwargs["height"] = window.height @@ -353,7 +353,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win predictions[nodata_rows] = "nodata" predictions = np.reshape( - predictions, (1, output_kwargs["height"], output_kwargs["width"]) + predictions, (1, window_kwargs["height"], window_kwargs["width"]) ) encoded_predictions = np.zeros_like(predictions, dtype=np.uint8) From 3a8c821064914a12757796c4c4d58535fd498c08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Manuel=20Burgue=C3=B1o=20Romero?= Date: Mon, 15 Jul 2024 08:00:32 +0000 Subject: [PATCH 8/8] Now tile dataframe is created using a dict not pd.concat, reducing RAM usage --- src/landcoverpy/utilities/raster.py | 9 +++------ src/landcoverpy/workflow_predict.py | 24 +++++++++++------------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/landcoverpy/utilities/raster.py b/src/landcoverpy/utilities/raster.py index d9c2dd7..3072ad1 100644 --- a/src/landcoverpy/utilities/raster.py +++ b/src/landcoverpy/utilities/raster.py @@ -1,5 +1,4 @@ import json -import math from itertools import compress from pathlib import Path @@ -53,12 +52,13 @@ def _read_raster( """ if window is not None and (mask_geometry is not None or path_to_disk is not None): - print("If a window is provided, the raster can't be saved to disk or cropped") + raise ValueError("If a window is provided, mask_geometry and path_to_disk must be None") band_name = _get_raster_name_from_path(str(band_path)) - print(f"Reading raster {band_name}") + with rasterio.open(band_path) as band_file: + spatial_resolution = _get_spatial_resolution_raster(band_path) if window is not None and spatial_resolution != 10: # Transform the window to the actual resolution of the raster @@ -100,7 +100,6 @@ def _read_raster( path_to_disk = path_to_disk[:-3] + "tif" if normalize_range is not None: - print(f"Normalizing band {band_name}") value1, value2 = normalize_range band = _normalize(band, value1, value2) @@ -111,7 +110,6 @@ def _read_raster( # Create a temporal memory file to mask the band # This is necessary because the band is previously read to scale its resolution if mask_geometry: - print(f"Cropping raster {band_name}") projected_geometry = _project_shape(mask_geometry, dcs=destination_crs) with rasterio.io.MemoryFile() as memfile: with memfile.open(**kwargs) as memfile_band: @@ -430,7 +428,6 @@ def _rescale_band( rescaled_raster = np.ndarray( shape=(kwargs["count"], new_kwargs["height"], new_kwargs["width"]), dtype=np.float32) - print(f"Rescaling raster {band_name}, from: {img_resolution}m to {str(spatial_resol)}.0m") reproject( source=band, destination=rescaled_raster, diff --git a/src/landcoverpy/workflow_predict.py b/src/landcoverpy/workflow_predict.py index e377ed9..b181e83 100644 --- a/src/landcoverpy/workflow_predict.py +++ b/src/landcoverpy/workflow_predict.py @@ -173,7 +173,6 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win rasters_by_season = defaultdict(dict) for season, products_metadata in product_per_season.items(): - print(season) bucket_products = settings.MINIO_BUCKET_NAME_PRODUCTS bucket_composites = settings.MINIO_BUCKET_NAME_COMPOSITES current_bucket = None @@ -218,7 +217,6 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win temp_product_folder = Path(settings.TMP_DIR, product_name + ".SAFE") if not temp_product_folder.exists(): Path.mkdir(temp_product_folder) - print(f"Processing product {product_name}") rasters_by_season[season]["raster_paths"] = rasters_paths rasters_by_season[season]["is_band"] = is_band @@ -249,8 +247,8 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win window_kwargs["height"] = window.height window_kwargs["transform"] = rasterio.windows.transform(window, kwargs_s2["transform"]) - # Dataframe for storing data of a window - window_tile_df = None + # Dict for storing data of a window (all rasters and indexes) + window_tile_dict = {} crop_mask = np.zeros(shape=(int(window_kwargs["height"]), int(window_kwargs["width"])), dtype=np.uint8) @@ -306,9 +304,7 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win raster_masked = np.ma.masked_array(raster[0], mask=crop_mask) raster_masked = np.ma.compressed(raster_masked) - window_raster_df = pd.DataFrame({f"{season}_{raster_name}": raster_masked}) - - window_tile_df = pd.concat([window_tile_df, window_raster_df], axis=1) + window_tile_dict.update({f"{season}_{raster_name}": raster_masked}) for dem_name in dems_raster_names: # Add dem and aspect data @@ -333,13 +329,9 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win ) raster_masked = np.ma.masked_array(raster, mask=crop_mask) raster_masked = np.ma.compressed(raster_masked).flatten() - window_raster_df = pd.DataFrame({dem_name: raster_masked}) - window_tile_df = pd.concat([window_tile_df, window_raster_df], axis=1) - - + window_tile_dict.update({dem_name: raster_masked}) - print("Dataframe information:") - print(window_tile_df.info()) + window_tile_df = pd.DataFrame(window_tile_dict) if execution_mode == ExecutionMode.LAND_COVER_PREDICTION: @@ -404,6 +396,8 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win ) as classification_file: classification_file.write(encoded_sl_predictions, window=window) + print(f"Window {window} processed, output raster {classification_path} updated") + minio_client.fput_object( bucket_name=settings.MINIO_BUCKET_CLASSIFICATIONS, object_name=f"{settings.MINIO_DATA_FOLDER_NAME}/{classification_name}", @@ -411,6 +405,10 @@ def _process_tile_predict(tile, execution_mode, used_columns=None, use_block_win content_type="image/tif", ) + print(f"Classification raster {classification_name} uploaded to Minio") + print(f"Tile {tile} processed") + print("Cleaning up temporary files") + for path in Path(settings.TMP_DIR).glob("**/*"): if path.is_file(): path.unlink()