Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions tools/sdmx/dataflow.py
Comment thread
SandeepTuniki marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""
dataflow.py

This module provides reusable, generalized functions to interact with SDMX APIs.
Currently, it is tailored for OECD endpoints, with plans to extend support to
other sources in the future.
This module provides reusable, generalized functions to interact with SDMX APIs
by connecting to a specified REST endpoint.
"""

import logging
Expand All @@ -13,10 +12,8 @@
from typing import Dict, Any


def fetch_and_save_metadata(dataflow_id: str,
agency_id: str,
output_path: str,
client_id: str = "OECD"):
def fetch_and_save_metadata(dataflow_id: str, agency_id: str, output_path: str,
endpoint: str):
"""
Fetches the complete metadata for a dataflow and saves the raw
SDMX-ML (XML) response to a file.
Expand All @@ -25,7 +22,7 @@ def fetch_and_save_metadata(dataflow_id: str,
dataflow_id (str): The ID of the dataflow (e.g., 'DF_QNA_EXPENDITURE_GROWTH_OECD').
agency_id (str): The ID of the agency providing the data (e.g., 'OECD.SDD.NAD').
output_path (str): The file path where the raw XML metadata will be saved.
client_id (str, optional): The sdmx1 client ID to use. Defaults to "OECD".
endpoint (str): The base URL of the SDMX REST API endpoint.

Raises:
HTTPError: If a network error occurs during the API request.
Expand All @@ -35,11 +32,23 @@ def fetch_and_save_metadata(dataflow_id: str,
fetch_and_save_metadata(
dataflow_id="DSD_NAMAIN1@DF_QNA_EXPENDITURE_GROWTH_OECD",
agency_id="OECD.SDD.NAD",
output_path="gdp_growth_metadata.xml"
output_path="gdp_growth_metadata.xml",
endpoint="https://sdmx.oecd.org/public/rest/"
)
"""
try:
client = sdmx.Client(client_id)
# Dynamically create a source for the specific agency
source_id = agency_id
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
if source_id not in sdmx.list_sources():
custom_source = {
'id': source_id,
'url': endpoint,
'name': f'Custom source for {agency_id}'
}
sdmx.add_source(custom_source)
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated

client = sdmx.Client(source_id)

logging.info(f"Fetching raw metadata for dataflow: {dataflow_id}...")

flow_msg = client.dataflow(dataflow_id,
Expand All @@ -59,8 +68,12 @@ def fetch_and_save_metadata(dataflow_id: str,
f"Network error while downloading dataflow metadata for {agency_id}/{dataflow_id}: {e}"
)
if e.response:
error_filename = f"metadata_error_{dataflow_id}.html"
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
with open(error_filename, "w", encoding="utf-8") as f:
f.write(e.response.text)
logging.error(f"URL: {e.response.url}")
logging.error(f"Response content: {e.response.text[:500]}...")
logging.error(
f"Response content saved to '{error_filename}' for debugging.")
raise
except Exception as e:
logging.error(
Expand All @@ -69,12 +82,9 @@ def fetch_and_save_metadata(dataflow_id: str,
raise


def fetch_and_save_data_as_csv(dataflow_id: str,
agency_id: str,
key: Dict[str, Any],
params: Dict[str, Any],
output_path: str,
client_id: str = "OECD"):
def fetch_and_save_data_as_csv(dataflow_id: str, agency_id: str,
key: Dict[str, Any], params: Dict[str, Any],
output_path: str, endpoint: str):
"""
Fetches data from an SDMX API, converts it to a tidy pandas DataFrame,
and saves it as a CSV file.
Expand All @@ -85,7 +95,7 @@ def fetch_and_save_data_as_csv(dataflow_id: str,
key (dict): A dictionary defining the slice of data to query.
params (dict): A dictionary of query parameters (e.g., startPeriod).
output_path (str): The file path where the final CSV data will be saved.
client_id (str, optional): The sdmx1 client ID to use. Defaults to "OECD".
endpoint (str): The base URL of the SDMX REST API endpoint.

Raises:
HTTPError: If a network error occurs during the API request.
Expand All @@ -107,11 +117,23 @@ def fetch_and_save_data_as_csv(dataflow_id: str,
agency_id="OECD.SDD.NAD",
key=DATA_KEY,
params=DATA_PARAMS,
output_path="gdp_growth_data.csv"
output_path="gdp_growth_data.csv",
endpoint="https://sdmx.oecd.org/public/rest/"
)
"""
try:
client = sdmx.Client(client_id)
# Dynamically create a source for the specific agency
source_id = agency_id
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
if source_id not in sdmx.list_sources():
custom_source = {
'id': source_id,
'url': endpoint,
'name': f'Custom source for {agency_id}'
}
sdmx.add_source(custom_source)

client = sdmx.Client(source_id)

logging.info(f"Fetching data for key: {key}")

data_msg = client.data(dataflow_id,
Expand All @@ -134,8 +156,12 @@ def fetch_and_save_data_as_csv(dataflow_id: str,
f"Network error while downloading data for {agency_id}/{dataflow_id}: {e}"
)
if e.response:
error_filename = f"data_error_{dataflow_id}.html"
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
with open(error_filename, "w", encoding="utf-8") as f:
f.write(e.response.text)
logging.error(f"URL: {e.response.url}")
logging.error(f"Response content: {e.response.text[:500]}...")
logging.error(
f"Response content saved to '{error_filename}' for debugging.")
raise
except Exception as e:
logging.error(
Expand Down
26 changes: 26 additions & 0 deletions tools/sdmx/samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SDMX Utility Sample Scripts

This directory contains sample scripts demonstrating how to use the functions in the `tools.sdmx.dataflow` module to download data and metadata from different SDMX APIs.

## Scripts

### OECD

* `fetch_oecd_gdp_metadata.py`: Downloads the complete metadata for the OECD's Quarterly GDP Growth dataset.
* `fetch_oecd_gdp_data.py`: Fetches a specific slice of data from the same GDP dataset and saves it as a CSV.
* `fetch_oecd_full_gdp_dataset.py`: A more complete example that combines both functions to download the metadata and then the full dataset.

### Eurostat

* `fetch_eurostat_gdp_metadata.py`: Downloads the metadata for the annual GDP dataset from Eurostat.
* `fetch_eurostat_gdp_data.py`: Downloads a slice of the annual GDP data for Germany, France, and Italy from Eurostat.

## Running the Samples

You can execute each script from the root of the repository, for example:

```bash
python3 tools/sdmx/samples/fetch_oecd_gdp_metadata.py
```

The scripts will download the requested data/metadata and save it as `.xml` or `.csv` files in the project's root directory.
57 changes: 57 additions & 0 deletions tools/sdmx/samples/fetch_eurostat_gdp_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
fetch_eurostat_gdp_data.py

This script provides a complete example of fetching a specific dataset
from Eurostat using the reusable functions in the dataflow module.
"""

import logging
import sys
import os

# Add the project root to the Python path
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))

from tools.sdmx import dataflow

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


def main():
"""Downloads a slice of the Eurostat GDP dataset."""
# --- 1. Define Parameters for the Eurostat GDP Dataset ---
agency_id = "ESTAT"
dataflow_id = "TEC00001"
output_path = "eurostat_gdp_data.csv"
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
endpoint = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/"

# Key to select a slice of data
data_key = {
'freq': 'A',
'na_item': 'B1GQ',
'unit': 'CP_MEUR',
'geo': 'DE+FR+IT'
}
# Parameters for the query
data_params = {'startPeriod': '2020'}

logging.info(f"--- Fetching Eurostat Data: {dataflow_id} ---")

# --- 2. Use the Reusable Function ---
try:
dataflow.fetch_and_save_data_as_csv(dataflow_id=dataflow_id,
agency_id=agency_id,
key=data_key,
params=data_params,
output_path=output_path,
endpoint=endpoint)
logging.info(f"--- Successfully downloaded data to {output_path} ---")
except Exception as e:
logging.error(f"Failed to download data. Error: {e}")
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated


if __name__ == "__main__":
main()
46 changes: 46 additions & 0 deletions tools/sdmx/samples/fetch_eurostat_gdp_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
fetch_eurostat_gdp_metadata.py

This script provides a complete example of fetching metadata for a specific
dataset from Eurostat using the reusable functions in the dataflow module.
"""

import logging
import sys
import os

# Add the project root to the Python path
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))

from tools.sdmx import dataflow

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


def main():
"""Downloads the metadata for the Eurostat GDP dataset."""
# --- 1. Define Parameters for the Eurostat GDP Dataset ---
agency_id = "ESTAT"
dataflow_id = "TEC00001"
output_path = "eurostat_gdp_metadata.xml"
endpoint = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/"

logging.info(f"--- Fetching Eurostat Metadata: {dataflow_id} ---")

# --- 2. Use the Reusable Function ---
try:
dataflow.fetch_and_save_metadata(dataflow_id=dataflow_id,
agency_id=agency_id,
output_path=output_path,
endpoint=endpoint)
logging.info(
f"--- Successfully downloaded metadata to {output_path} ---")
except Exception as e:
logging.error(f"Failed to download metadata. Error: {e}")


if __name__ == "__main__":
main()
67 changes: 67 additions & 0 deletions tools/sdmx/samples/fetch_oecd_full_gdp_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
fetch_oecd_full_gdp_dataset.py

This script provides a complete example of fetching both the metadata and the
full data series for the OECD's Quarterly GDP Growth dataset.
"""

import logging
import sys
import os

# Add the project root to the Python path
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))

from tools.sdmx import dataflow

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


def main():
"""Downloads the full OECD Quarterly GDP Growth dataset and its metadata."""
# --- 1. Define Common Parameters ---
agency_id = "OECD.SDD.NAD"
dataflow_id = "DSD_NAMAIN1@DF_QNA_EXPENDITURE_GROWTH_OECD"
metadata_output_path = "oecd_gdp_full_metadata.xml"
data_output_path = "oecd_gdp_full_data.csv"
endpoint = "https://sdmx.oecd.org/public/rest/"

# --- 2. Fetch Metadata ---
logging.info("--- Step 1: Starting Metadata Download ---")
try:
dataflow.fetch_and_save_metadata(dataflow_id=dataflow_id,
agency_id=agency_id,
output_path=metadata_output_path,
endpoint=endpoint)
logging.info(
f"--- Successfully downloaded metadata to {metadata_output_path} ---"
)
except Exception as e:
logging.error(f"Failed to download metadata. Error: {e}")
# Exit if metadata fails, as it's needed for context
return
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated

# --- 3. Fetch Full Data Series ---
logging.info("\n--- Step 2: Starting Full Data Download ---")
# For the full dataset, we use an empty key and no time parameters
data_key = {}
data_params = {}

try:
dataflow.fetch_and_save_data_as_csv(dataflow_id=dataflow_id,
agency_id=agency_id,
key=data_key,
params=data_params,
output_path=data_output_path,
endpoint=endpoint)
logging.info(
f"--- Successfully downloaded data to {data_output_path} ---")
except Exception as e:
logging.error(f"Failed to download data. Error: {e}")
Comment thread
SandeepTuniki marked this conversation as resolved.


if __name__ == "__main__":
main()
Loading