Skip to content

Commit

Permalink
Add camstat docs (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpolidori authored Nov 28, 2023
1 parent ea81d04 commit 30bc789
Showing 1 changed file with 165 additions and 0 deletions.
165 changes: 165 additions & 0 deletions ckanext/querytool/commands/camstat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# -*- coding: utf-8 -*-

'''
USAGE: paster --plugin=ckanext-querytool update_camstat --config=/path/to/production.ini
`paster` is a command provided by [The Pylons Project](https://docs.pylonsproject.org/en/latest/),
a collection of tools and utilities for Python and used extensively in CKAN. It allows you to run
scripts in various locations, all through a unified utility.
For example, CKAN and any extension installed along with it can have scripts for miscellaneous
purpose. `paster` makes it easy to run any of these scripts. You only need to provide the plugin
name (`ckan` for CKAN core scripts or `ckanext-EXTENSION_NAME` for extensions), the command, and
the path to your `.ini` CKAN config file.
**Note**: On first run, the script will create the organization (if it doesn't exist) and datasets,
as well as upload all data into resources. Every time you run the script after that, it will first
check if there are changes to the data. If nothing changes, the script won't do anything. If any of
the source data has changed, only those datasets will be updated.
'''

from __future__ import print_function
import requests
import csv
Expand Down Expand Up @@ -32,6 +50,8 @@ class UpdateCamstat(CkanCommand):
cleans it, and creates datasets and resources in CKAN if they don't exist.
If they do exist, it compares the new checksum with a stored checksum and
updates them if there are changes.
This class contains the main command (`command`) and table setup (`setup_tables`) functions.
'''

summary = __doc__.split('\n')[0]
Expand All @@ -40,6 +60,19 @@ class UpdateCamstat(CkanCommand):
min_args = 0

def command(self):
'''
Adds the command to the CKAN `paster` ecosystem.
This is how CKAN knows what to do when you use:
paster --plugin=ckanext-querytool update_camstat --config=/path/to/production.ini
`command` does 2 things:
- If a hash table doesn't exist, create it. Hashes are generated and
stored to know if a data needs to be updated due to changes.
- Calls `update_camstat` (which calls all other functions as needed).
'''
self._load_config()
self.owner_org = 'camstat'
self.languages = ['en', 'km']
Expand All @@ -66,6 +99,16 @@ def command(self):
update_camstat(self.owner_org, self.languages)

def setup_tables(self):
'''
When called, this function creates a table in the DB, `camstat_hashes`,
to store distinct hashes of the data.
When the script is run again later:
- New hashes are generated and compared to the old hashes before updating
each dataset.
- If the hash has changed for a given dataset, that dataset is updated with
the latest data and the new hash is saved to the DB.
'''
model.Session.remove()
model.Session.configure(bind=model.meta.engine)

Expand Down Expand Up @@ -104,6 +147,27 @@ def setup_tables(self):
metadata.create_all(model.meta.engine)


'''
The following functions are only used for testing/debugging. They will run when the command is used
_only when uncommented_ in the `command` function:
```
# The following functions can be
# used for testing and debugging
#
# purge_datasets(self.owner_org)
# purge_organization(self.owner_org)
# drop_table()
```
- `purge_datasets` - Deletes all current Camstat datasets
- `drop_table` - Removes the hash table from the DB
- `purge_organization` - Deletes the Camstat Organization from CKAN
Start of testing/debugging functions.
'''


def purge_datasets(owner_org):
print(
'> PURGING ALL DATASETS FOR ORGANIZATION: {}\n'
Expand Down Expand Up @@ -153,13 +217,38 @@ def purge_organization(owner_org):
print(' ======================================\n')


'''
End of testing/debugging functions.
'''


def utf_8_encoder(unicode_csv_data):
'''
A small helper function to fix any improperly encoded data.
Currently **deprecated**.
'''
for line in unicode_csv_data:
yield line.encode('utf-8')


def clean_csv(data, id_removal, dataflow_agency,
dataflow_id, dataflow_version):
'''
Cleans the data.
It handles the following cases:
- Reformats inconsistent values
- Remove special keywords before the values (e.g. removes `SOME_VALUE: ` in `SOME_VALUE: Some Value`)
- Removes `NA` values
- Wraps strings containing `,` in quotes to avoid being treated as a new column
(e.g. adds quotes to values like "One, two, and three", otherwise, each item will be considered a new column)
- Removes unused IDs
- Converts all headers from fully uppercase to titles (e.g. `HEADER 1` -> `Header 1`)
- Removes empty columns
Once cleaning is done, the function calls `pivot_data` before returning the final data
(see the next section for more information regarding `pivot_data`).
'''
print(' + Cleaning CSV data for: {}'.format(dataflow_id))

cleaned = 0
Expand Down Expand Up @@ -242,6 +331,11 @@ def clean_csv(data, id_removal, dataflow_agency,
return data

def pivot_data(data):
'''
Pivots the data from a wider and less usable format to a cleaner, vertical CSV, with one observation per row.
The main issue with the original format is that the column headers we need for visualizations are in a single
column themselves, instead of at the top as headers.
'''
print(' + Pivoting data...')

df = pd.DataFrame(data)
Expand All @@ -263,6 +357,9 @@ def pivot_data(data):


def compare_hashes(existing_hash, new_hash):
'''
Compares a new dataset/resource hash with an existing hash (if one exists) and returns `True` or `False`.
'''
print(' + Comparing hashes...')
print(' + Existing hash: {}'.format(existing_hash))
print(' + New hash: {}'.format(new_hash))
Expand All @@ -279,6 +376,9 @@ def compare_hashes(existing_hash, new_hash):


def upload_resource(dataflow_name_munged, dataflow_title, resource):
'''
Uploads a new resource to a given dataset.
'''
print(' + Uploading resource to dataset: {}'.format(dataflow_title))

try:
Expand All @@ -300,6 +400,9 @@ def upload_resource(dataflow_name_munged, dataflow_title, resource):


def patch_resource(dataflow_name_munged, dataflow_title, resource, resource_id):
'''
Updates the data in a resource for a given dataset.
'''
print(' + Updating resource: {}'.format(dataflow_title))

try:
Expand All @@ -323,6 +426,9 @@ def patch_resource(dataflow_name_munged, dataflow_title, resource, resource_id):

def create_dataset(dataflow_name_munged, owner_org,
dataflow_title, dataflow_description):
'''
Creates a new dataset for a given dataflow.
'''
print(' + Creating dataset: {}'.format(dataflow_name_munged))

try:
Expand Down Expand Up @@ -354,6 +460,9 @@ def create_dataset(dataflow_name_munged, owner_org,

def patch_dataset(dataflow_name_munged, owner_org,
dataflow_title, dataflow_description):
'''
Updates a dataset for a given dataflow.
'''
print(' + Updating dataset: {}'.format(dataflow_name_munged))

try:
Expand All @@ -376,6 +485,9 @@ def patch_dataset(dataflow_name_munged, owner_org,


def verify_organization_exists(owner_org):
'''
Checks if the Camstat organization exists. If not, it creates the organization, otherwise, it's skipped.
'''
print('> VERIFYING ORGANIZATION {} EXISTS...'.format(owner_org))

try:
Expand All @@ -400,6 +512,12 @@ def verify_organization_exists(owner_org):

def prepare_dataflow_description(dataflow_description, dataflow_id,
dataflow_last_extracted):
'''
Builds the dataset descriptions with the dataflow ID and current time. For example:
>**Extracted from**: _DF_NUTRITION_
>
>**Last extracted**: _2022-09-28 07:56 PM (UTC)_
'''
print('\n + Preparing description...')

if dataflow_description:
Expand All @@ -424,6 +542,9 @@ def prepare_dataflow_description(dataflow_description, dataflow_id,

def get_data(dataflow_agency, dataflow_id,
dataflow_version, data_type):
'''
Retrieves the raw data from the [Camstat data API](https://nsiws-stable-camstat-live.officialstatistics.org).
'''
if data_type == 'both':
print(' + Retrieving raw data...')

Expand Down Expand Up @@ -468,6 +589,10 @@ def get_data(dataflow_agency, dataflow_id,


def write_csv(data, csv_filename):
'''
Creates a temporary CSV file from the raw data. This is then cleaned, transformed,
and uploaded as a resource, as mentioned in the other steps.
'''
print(' + Writing CSV data to temporary file...')

tmp_file = tempfile.NamedTemporaryFile('w+b')
Expand Down Expand Up @@ -495,6 +620,10 @@ def write_csv(data, csv_filename):


def get_dataflows():
'''
Retrieves the list dataflow IDs/names from the Health and Nutrition topic of the Camstat API.
This is used to retrieve the raw data for each dataflow.
'''
print(
'\n ======================================\n\n'
'> RETRIEVING DATAFLOWS...'
Expand Down Expand Up @@ -567,6 +696,9 @@ def get_dataflows():


def get_new_hash(data):
'''
Generates a new data hash. This is compared to the existing data hash later.
'''
print(' + Generating new hash...')

new_hash = hashlib.sha256(str(data).encode('utf-8')).hexdigest()
Expand All @@ -577,6 +709,9 @@ def get_new_hash(data):


def get_existing_hash(dataflow_id):
'''
Retrieves the existing data hash for a given resource (if exists already).
'''
print(' + Retrieving existing hash for: {}'.format(dataflow_id))

connection = model.Session.connection()
Expand Down Expand Up @@ -606,6 +741,10 @@ def get_existing_hash(dataflow_id):

def update_hash(dataflow_id, dataflow_name_munged, resource_id,
new_hash, dataflow_last_updated, existing_hash, was_deleted):
'''
Updates the data hash in the DB if there was a change. This function will also
remove a hash if the resource/dataset is deleted.
'''
print(' + Updating hashes...')

connection = model.Session.connection()
Expand Down Expand Up @@ -648,6 +787,32 @@ def update_hash(dataflow_id, dataflow_name_munged, resource_id,


def update_camstat(owner_org, languages):
'''
This function contains the overall process code. It's where most of the previous
functions get called when needed.
Here's the flow of the process:
- Retrieves all dataflows using `get_dataflows`
- Iterates over the dataflows:
- Extracts the metadata from the dataflow
(title, ID, name, agency, description)
- Retrieves the current time
(used for "Last extracted")
- Calls `prepare_dataflow_description`
(this will be added to the dataset object later in the process)
- Retrieves the raw data using `get_data`
- Cleans the retrieved raw data using `clean_csv`
- Write the temporary CSV using `write_csv`
- Generate a new data hash using `get_new_hash`
- Retrieve the existing data hash (if exists)
- Check if the new data hash and existing data hash differ
- If no existing data hash exists:
- Create a new dataset and resource
- If the data hashes match:
- The data is up-to-date, move on to the next dataflow
- If they differ:
- Update the resource in the dataset with the latest data
'''
verify_organization_exists(owner_org)

# languages = ['km', 'en']
Expand Down

0 comments on commit 30bc789

Please sign in to comment.