Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/polio/locale/fr/LC_MESSAGES/django.po
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ msgid ""
msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-10-10 08:12+0000\n"
"POT-Creation-Date: 2025-10-20 08:00+0000\n"
"PO-Revision-Date: 2025-02-05 14:43+0100\n"
"Last-Translator: \n"
"Language-Team: \n"
Expand Down
13 changes: 10 additions & 3 deletions plugins/wfp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,21 @@ def account_related_to_entity_type(self):
account = entity_type.account
return account

def retrieve_entities(self):
def get_updated_entity_ids(self, updated_at=None):
entities = Instance.objects.filter(entity__entity_type__code__in=self.types)
if updated_at is not None:
entities = entities.filter(updated_at__gte=updated_at)
entities = entities.values("entity_id").distinct()
beneficiary_ids = list(map(lambda entity: entity["entity_id"], entities))
return list(set(beneficiary_ids))

def retrieve_entities(self, entity_ids):
steps_id = ETL().steps_to_exclude()
updated_at = date(2023, 7, 10)
beneficiaries = (
Instance.objects.filter(entity__entity_type__code__in=self.types)
.filter(entity__id__in=entity_ids)
.filter(json__isnull=False)
.filter(form__isnull=False)
.filter(updated_at__gte=updated_at)
.exclude(deleted=True)
.exclude(entity__deleted_at__isnull=False)
.exclude(id__in=steps_id)
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/ethiopia/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ def group_visit_by_entity(self, entities):
)
)

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
6 changes: 5 additions & 1 deletion plugins/wfp/management/commands/etl_eth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@
class Command(BaseCommand):
help = "Transform WFP collected data in a format usable for analytics"

def add_arguments(self, parser):
parser.add_argument("all_data", nargs="?", help="Run ETL on the whole data")

def handle(self, *args, **options):
etl_ethiopia()
all_data = options["all_data"]
etl_ethiopia(all_data)
6 changes: 5 additions & 1 deletion plugins/wfp/management/commands/etl_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@
class Command(BaseCommand):
help = "Transform WFP collected data in a format usable for analytics"

def add_arguments(self, parser):
parser.add_argument("all_data", nargs="?", help="Run ETL on the whole data")

def handle(self, *args, **options):
etl_ng()
all_data = options["all_data"]
etl_ng(all_data)
6 changes: 5 additions & 1 deletion plugins/wfp/management/commands/etl_ssd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@
class Command(BaseCommand):
help = "Transform WFP collected data in a format usable for analytics"

def add_arguments(self, parser):
parser.add_argument("all_data", nargs="?", help="Run ETL on the whole data")

def handle(self, *args, **options):
etl_ssd()
all_data = options["all_data"]
etl_ssd(all_data)
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/nigeria/Pbwg.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


class NG_PBWG:
def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range
logger.info(f"Instances linked to PBWG program: {beneficiaries.count} for {account}")

Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/nigeria/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def group_visit_by_entity(self, entities):
)
)

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/south_sudan/Pbwg.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


class PBWG:
def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to PBWG program: {beneficiaries.count} for {account}")
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/south_sudan/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def save_journey(self, beneficiary, record):

return ETL().save_entity_journey(journey, beneficiary, record, "U5")

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
81 changes: 67 additions & 14 deletions plugins/wfp/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from celery import shared_task
from django_celery_results.models import TaskResult

from iaso.models import *
from plugins.wfp.common import ETL
Expand All @@ -17,21 +18,39 @@


@shared_task()
def etl_ng():
def etl_ng(all_data):
"""Extract beneficiary data from Iaso tables and store them in the format expected by existing tableau dashboards"""
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ng", status="SUCCESS").first()

if last_success_task:
# A task was found, use its creation date
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")
else:
# No successful task was found (first run case)
# Define a safe default date
last_success_task_date = None # Example default: Unix Epoch start date

# Allow to re-run on the whole data
if all_data is not None:
last_success_task_date = None

logger.info("Starting ETL for Nigeria")
entity_type_U5_code = "nigeria_under5"
account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=account).delete()
MonthlyStatistics.objects.filter(account=account, programme_type="U5").delete()
NG_Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=account, entity_id__in=updated_U5_beneficiaries).delete()

NG_Under5().run(entity_type_U5_code, updated_U5_beneficiaries)
logger.info(
f"----------------------------- Aggregating journey for {account} per org unit, admission and period(month and year) -----------------------------"
)
MonthlyStatistics.objects.filter(account=account, programme_type="U5").delete()
ETL().journey_with_visit_and_steps_per_visit(account, "U5")
entity_type_pbwg_code = "nigeria_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
NG_PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
NG_PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)
logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand All @@ -40,14 +59,27 @@ def etl_ng():


@shared_task()
def etl_ssd():
# from django.db import connection # For debugging purposes only, to see the SQL queries executed
def etl_ssd(all_data):
"""Extract beneficiary data from Iaso tables and store them in the format expected by existing tableau dashboards"""
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ssd", status="SUCCESS").first()

if last_success_task:
# A task was found, use its creation date
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")
else:
# No successful task was found (first run case)
# Define a safe default date
last_success_task_date = None # Example default: Unix Epoch start date

# Allow to re-run on the whole data
if all_data is not None:
last_success_task_date = None
logger.info("Starting ETL for South Sudan")
entity_type_U5_code = "ssd_under5"
child_account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=child_account).delete()
Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=child_account, entity_id__in=updated_U5_beneficiaries).delete()
Under5().run(entity_type_U5_code, updated_U5_beneficiaries)

logger.info(
f"----------------------------- Aggregating Children under 5 journey for {child_account} per org unit, admission and period(month and year) -----------------------------"
Expand All @@ -56,7 +88,9 @@ def etl_ssd():
ETL().journey_with_visit_and_steps_per_visit(child_account, "U5")
entity_type_pbwg_code = "ssd_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)
logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand All @@ -67,12 +101,28 @@ def etl_ssd():


@shared_task()
def etl_ethiopia():
def etl_ethiopia(all_data):
"""Extract beneficiary data from Iaso tables and store them in the format expected by existing tableau dashboards"""
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ethiopia", status="SUCCESS").first()

if last_success_task:
# A task was found, use its creation date
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")
else:
# No successful task was found (first run case)
# Define a safe default date
last_success_task_date = None # Example default: Unix Epoch start date

# Allow to re-run on the whole data
if all_data is not None:
last_success_task_date = None

logger.info("Starting ETL for Ethiopia")
entity_type_U5_code = "ethiopia_under5"
child_account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=child_account).delete()
ET_Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=child_account, entity_id__in=updated_U5_beneficiaries).delete()
ET_Under5().run(entity_type_U5_code, updated_U5_beneficiaries)

logger.info(
f"----------------------------- Aggregating Children under 5 journey for {child_account} per org unit, admission and period(month and year) -----------------------------"
Expand All @@ -82,7 +132,10 @@ def etl_ethiopia():

entity_type_pbwg_code = "ethiopia_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)

logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand Down
Loading