Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions plugins/wfp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ def account_related_to_entity_type(self):
account = entity_type.account
return account

def retrieve_entities(self):
def get_updated_entity_ids(self, updated_at):
entities = (
Instance.objects.filter(entity__entity_type__code__in=self.types)
.filter(updated_at__gte=updated_at)
.values("entity_id")
.distinct()
)
beneficiary_ids = list(map(lambda entity: entity["entity_id"], entities))
return list(set(beneficiary_ids))

def retrieve_entities(self, entity_ids):
steps_id = ETL().steps_to_exclude()
updated_at = date(2023, 7, 10)
beneficiaries = (
Instance.objects.filter(entity__entity_type__code__in=self.types)
.filter(entity__id__in=entity_ids)
.filter(json__isnull=False)
.filter(form__isnull=False)
.filter(updated_at__gte=updated_at)
.exclude(deleted=True)
.exclude(entity__deleted_at__isnull=False)
.exclude(id__in=steps_id)
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/ethiopia/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ def group_visit_by_entity(self, entities):
)
)

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/nigeria/Pbwg.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


class NG_PBWG:
def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range
logger.info(f"Instances linked to PBWG program: {beneficiaries.count} for {account}")

Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/nigeria/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def group_visit_by_entity(self, entities):
)
)

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/south_sudan/Pbwg.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


class PBWG:
def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to PBWG program: {beneficiaries.count} for {account}")
Expand Down
4 changes: 2 additions & 2 deletions plugins/wfp/management/commands/south_sudan/Under5.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def save_journey(self, beneficiary, record):

return ETL().save_entity_journey(journey, beneficiary, record, "U5")

def run(self, type):
def run(self, type, updated_beneficiaries):
entity_type = ETL([type])
account = entity_type.account_related_to_entity_type()
beneficiaries = entity_type.retrieve_entities()
beneficiaries = entity_type.retrieve_entities(updated_beneficiaries)
pages = beneficiaries.page_range

logger.info(f"Instances linked to Child Under 5 program: {beneficiaries.count} for {account}")
Expand Down
42 changes: 32 additions & 10 deletions plugins/wfp/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from celery import shared_task
from django_celery_results.models import TaskResult

from iaso.models import *
from plugins.wfp.common import ETL
Expand All @@ -19,19 +20,26 @@
@shared_task()
def etl_ng():
"""Extract beneficiary data from Iaso tables and store them in the format expected by existing tableau dashboards"""
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ng", status="SUCCESS").first()
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")

logger.info("Starting ETL for Nigeria")
entity_type_U5_code = "nigeria_under5"
account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=account).delete()
MonthlyStatistics.objects.filter(account=account, programme_type="U5").delete()
NG_Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=account, entity_id__in=updated_U5_beneficiaries).delete()

NG_Under5().run(entity_type_U5_code, updated_U5_beneficiaries)
logger.info(
f"----------------------------- Aggregating journey for {account} per org unit, admission and period(month and year) -----------------------------"
)
MonthlyStatistics.objects.filter(account=account, programme_type="U5").delete()
ETL().journey_with_visit_and_steps_per_visit(account, "U5")
entity_type_pbwg_code = "nigeria_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
NG_PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
NG_PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)
logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand All @@ -41,11 +49,16 @@ def etl_ng():

@shared_task()
def etl_ssd():
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ssd", status="SUCCESS").first()
print("last success ", last_success_task)
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")

logger.info("Starting ETL for South Sudan")
entity_type_U5_code = "ssd_under5"
child_account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=child_account).delete()
Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=child_account, entity_id__in=updated_U5_beneficiaries).delete()
Under5().run(entity_type_U5_code, updated_U5_beneficiaries)

logger.info(
f"----------------------------- Aggregating Children under 5 journey for {child_account} per org unit, admission and period(month and year) -----------------------------"
Expand All @@ -54,7 +67,9 @@ def etl_ssd():
ETL().journey_with_visit_and_steps_per_visit(child_account, "U5")
entity_type_pbwg_code = "ssd_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)
logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand All @@ -64,11 +79,15 @@ def etl_ssd():

@shared_task()
def etl_ethiopia():
last_success_task = TaskResult.objects.filter(task_name="plugins.wfp.tasks.etl_ethiopia", status="SUCCESS").first()
last_success_task_date = last_success_task.date_created.strftime("%Y-%m-%d")

logger.info("Starting ETL for Ethiopia")
entity_type_U5_code = "ethiopia_under5"
child_account = ETL([entity_type_U5_code]).account_related_to_entity_type()
Beneficiary.objects.filter(account=child_account).delete()
ET_Under5().run(entity_type_U5_code)
updated_U5_beneficiaries = ETL([entity_type_U5_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(account=child_account, entity_id__in=updated_U5_beneficiaries).delete()
ET_Under5().run(entity_type_U5_code, updated_U5_beneficiaries)

logger.info(
f"----------------------------- Aggregating Children under 5 journey for {child_account} per org unit, admission and period(month and year) -----------------------------"
Expand All @@ -78,7 +97,10 @@ def etl_ethiopia():

entity_type_pbwg_code = "ethiopia_pbwg"
pbwg_account = ETL([entity_type_pbwg_code]).account_related_to_entity_type()
PBWG().run(entity_type_pbwg_code)
updated_pbwg_beneficiaries = ETL([entity_type_pbwg_code]).get_updated_entity_ids(last_success_task_date)
Beneficiary.objects.filter(entity_id__in=updated_pbwg_beneficiaries).delete()
PBWG().run(entity_type_pbwg_code, updated_pbwg_beneficiaries)

logger.info(
f"----------------------------- Aggregating PBWG journey for {pbwg_account} per org unit, admission and period(month and year) -----------------------------"
)
Expand Down
Loading