Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 98 additions & 131 deletions tools/sdmx/dataflow.py
Comment thread
SandeepTuniki marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""
dataflow.py

This module provides reusable, generalized functions to interact with SDMX APIs.
Currently, it is tailored for OECD endpoints, with plans to extend support to
other sources in the future.
This module provides a client class for interacting with SDMX APIs.
"""

import logging
Expand All @@ -13,132 +11,101 @@
from typing import Dict, Any


def fetch_and_save_metadata(dataflow_id: str,
agency_id: str,
output_path: str,
client_id: str = "OECD"):
"""
Fetches the complete metadata for a dataflow and saves the raw
SDMX-ML (XML) response to a file.

Args:
dataflow_id (str): The ID of the dataflow (e.g., 'DF_QNA_EXPENDITURE_GROWTH_OECD').
agency_id (str): The ID of the agency providing the data (e.g., 'OECD.SDD.NAD').
output_path (str): The file path where the raw XML metadata will be saved.
client_id (str, optional): The sdmx1 client ID to use. Defaults to "OECD".

Raises:
HTTPError: If a network error occurs during the API request.
Exception: For other unexpected errors.

Usage:
fetch_and_save_metadata(
dataflow_id="DSD_NAMAIN1@DF_QNA_EXPENDITURE_GROWTH_OECD",
agency_id="OECD.SDD.NAD",
output_path="gdp_growth_metadata.xml"
)
"""
try:
client = sdmx.Client(client_id)
logging.info(f"Fetching raw metadata for dataflow: {dataflow_id}...")

flow_msg = client.dataflow(dataflow_id,
agency_id=agency_id,
params={'references': 'all'})
logging.info(
f"Successfully received response from the server: {flow_msg.response.url}"
)

raw_metadata_content = flow_msg.response.text
with open(output_path, "w", encoding="utf-8") as f:
f.write(raw_metadata_content)
logging.info(f"Successfully saved raw metadata to '{output_path}'")

except HTTPError as e:
logging.error(
f"Network error while downloading dataflow metadata for {agency_id}/{dataflow_id}: {e}"
)
if e.response:
logging.error(f"URL: {e.response.url}")
logging.error(f"Response content: {e.response.text[:500]}...")
raise
except Exception as e:
logging.error(
f"An error occurred while processing dataflow metadata for {agency_id}/{dataflow_id}: {e}"
)
raise


def fetch_and_save_data_as_csv(dataflow_id: str,
agency_id: str,
key: Dict[str, Any],
params: Dict[str, Any],
output_path: str,
client_id: str = "OECD"):
"""
Fetches data from an SDMX API, converts it to a tidy pandas DataFrame,
and saves it as a CSV file.

Args:
dataflow_id (str): The ID of the dataflow.
agency_id (str): The ID of the agency.
key (dict): A dictionary defining the slice of data to query.
params (dict): A dictionary of query parameters (e.g., startPeriod).
output_path (str): The file path where the final CSV data will be saved.
client_id (str, optional): The sdmx1 client ID to use. Defaults to "OECD".

Raises:
HTTPError: If a network error occurs during the API request.
Exception: For other unexpected errors.

Usage:
DATA_KEY = {
'FREQ': 'Q',
'REF_AREA': 'USA+CAN+MEX',
'TRANSACTION': 'B1GQ',
'TRANSFORMATION': 'G1'
}
DATA_PARAMS = {
'startPeriod': '2022',
'endPeriod': '2023'
class SdmxClient:
"""A client for fetching data and metadata from an SDMX REST API."""

def __init__(self, endpoint: str, agency_id: str):
"""
Initializes the SdmxClient.

Args:
endpoint (str): The base URL of the SDMX REST API endpoint.
agency_id (str): The ID of the agency providing the data.
"""
self.agency_id = agency_id
self.endpoint = endpoint
self.client = self._get_sdmx_client()

def _get_sdmx_client(self) -> sdmx.Client:
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
"""
Creates and configures an sdmx.Client for the specified endpoint and agency.
"""
source_id = self.agency_id
custom_source = {
'id': source_id,
'url': self.endpoint,
'name': f'Custom source for {self.agency_id}'
}
fetch_and_save_data_as_csv(
dataflow_id="DSD_NAMAIN1@DF_QNA_EXPENDITURE_GROWTH_OECD",
agency_id="OECD.SDD.NAD",
key=DATA_KEY,
params=DATA_PARAMS,
output_path="gdp_growth_data.csv"
)
"""
try:
client = sdmx.Client(client_id)
logging.info(f"Fetching data for key: {key}")

data_msg = client.data(dataflow_id,
key=key,
params=params,
agency_id=agency_id)
logging.info(
f"Successfully received response from the server: {data_msg.response.url}"
)

data_series = sdmx.to_pandas(data_msg)
df_tidy = data_series.reset_index()

df_tidy.to_csv(output_path, index=False)
logging.info(
f"Successfully converted to CSV data and saved to '{output_path}'")

except HTTPError as e:
logging.error(
f"Network error while downloading data for {agency_id}/{dataflow_id}: {e}"
)
if e.response:
logging.error(f"URL: {e.response.url}")
logging.error(f"Response content: {e.response.text[:500]}...")
raise
except Exception as e:
logging.error(
f"An error occurred while processing data for {agency_id}/{dataflow_id}: {e}"
)
raise
sdmx.add_source(custom_source, override=True)
return sdmx.Client(source_id)

def fetch_and_save_metadata(self, dataflow_id: str, output_path: str):
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
"""
Fetches the complete metadata for a dataflow and saves the raw
SDMX-ML (XML) response to a file.
"""
try:
logging.info(
f"Fetching raw metadata for dataflow: {dataflow_id}...")
flow_msg = self.client.dataflow(dataflow_id,
agency_id=self.agency_id,
params={'references': 'all'})
logging.info(
f"Successfully received response: {flow_msg.response.url}")

raw_content = flow_msg.response.text
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
with open(output_path, "w", encoding="utf-8") as f:
f.write(raw_content)
logging.info(f"Successfully saved metadata to '{output_path}'")

except HTTPError as e:
logging.error(
f"Network error for {self.agency_id}/{dataflow_id}: {e}")
if e.response:
safe_df_id = dataflow_id.replace('@', '_')
error_filename = f"metadata_error_{safe_df_id}.html"
with open(error_filename, "w", encoding="utf-8") as f:
f.write(e.response.text)
logging.error(f"URL: {e.response.url}")
logging.error(f"Response saved to '{error_filename}'")
raise
except Exception as e:
logging.error(
f"Error processing metadata for {self.agency_id}/{dataflow_id}: {e}"
)
raise

def fetch_and_save_data_as_csv(self, dataflow_id: str, key: Dict[str, Any],
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
params: Dict[str, Any], output_path: str):
"""
Fetches data, converts it to a pandas DataFrame, and saves as CSV.
"""
try:
logging.info(f"Fetching data for key: {key}")
Comment thread
SandeepTuniki marked this conversation as resolved.
Outdated
data_msg = self.client.data(dataflow_id,
key=key,
params=params,
agency_id=self.agency_id)
logging.info(
f"Successfully received response: {data_msg.response.url}")

df = sdmx.to_pandas(data_msg).reset_index()
df.to_csv(output_path, index=False)
logging.info(f"Successfully saved data to '{output_path}'")

except HTTPError as e:
logging.error(
f"Network error for {self.agency_id}/{dataflow_id}: {e}")
if e.response:
safe_df_id = dataflow_id.replace('@', '_')
error_filename = f"data_error_{safe_df_id}.html"
with open(error_filename, "w", encoding="utf-8") as f:
f.write(e.response.text)
logging.error(f"URL: {e.response.url}")
logging.error(f"Response saved to '{error_filename}'")
raise
except Exception as e:
logging.error(
f"Error processing data for {self.agency_id}/{dataflow_id}: {e}"
)
raise
26 changes: 26 additions & 0 deletions tools/sdmx/samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SDMX Utility Sample Scripts

This directory contains sample scripts demonstrating how to use the functions in the `tools.sdmx.dataflow` module to download data and metadata from different SDMX APIs.

## Scripts

### OECD

* `fetch_oecd_gdp_metadata.py`: Downloads the complete metadata for the OECD's Quarterly GDP Growth dataset.
* `fetch_oecd_gdp_data.py`: Fetches a specific slice of data from the same GDP dataset and saves it as a CSV.
* `fetch_oecd_full_gdp_dataset.py`: A more complete example that combines both functions to download the metadata and then the full dataset.

### Eurostat

* `fetch_eurostat_gdp_metadata.py`: Downloads the metadata for the annual GDP dataset from Eurostat.
* `fetch_eurostat_gdp_data.py`: Downloads a slice of the annual GDP data for Germany, France, and Italy from Eurostat.

## Running the Samples

You can execute each script from the root of the repository, for example:

```bash
python3 tools/sdmx/samples/fetch_oecd_gdp_metadata.py
```

The scripts will download the requested data/metadata and save it as `.xml` or `.csv` files in the project's root directory.
57 changes: 57 additions & 0 deletions tools/sdmx/samples/fetch_eurostat_gdp_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
fetch_eurostat_gdp_data.py

This script provides a complete example of fetching a specific dataset
from Eurostat using the reusable functions in the dataflow module.
"""

import logging
import sys
import os

# Add the project root to the Python path
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))

from tools.sdmx.dataflow import SdmxClient

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


def main():
"""Downloads a slice of the Eurostat GDP dataset."""
# --- 1. Define Parameters for the Eurostat GDP Dataset ---
agency_id = "ESTAT"
dataflow_id = "TEC00001"
endpoint = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/"

# Create output directory inside the samples folder
output_dir = os.path.join(os.path.dirname(__file__), "output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "eurostat_gdp_data.csv")

# Key to select a slice of data
data_key = {
'freq': 'A',
'na_item': 'B1GQ',
'unit': 'CP_MEUR',
'geo': 'DE+FR+IT'
}
# Parameters for the query
data_params = {'startPeriod': '2020'}

logging.info(f"--- Fetching Eurostat Data: {dataflow_id} ---")

# --- 2. Use the SdmxClient ---
client = SdmxClient(endpoint, agency_id)
client.fetch_and_save_data_as_csv(dataflow_id=dataflow_id,
key=data_key,
params=data_params,
output_path=output_path)
logging.info(f"--- Successfully downloaded data to {output_path} ---")


if __name__ == "__main__":
main()
45 changes: 45 additions & 0 deletions tools/sdmx/samples/fetch_eurostat_gdp_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
fetch_eurostat_gdp_metadata.py

This script provides a complete example of fetching metadata for a specific
dataset from Eurostat using the reusable functions in the dataflow module.
"""

import logging
import sys
import os

# Add the project root to the Python path
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))

from tools.sdmx.dataflow import SdmxClient

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


def main():
"""Downloads the metadata for the Eurostat GDP dataset."""
# --- 1. Define Parameters for the Eurostat GDP Dataset ---
agency_id = "ESTAT"
dataflow_id = "TEC00001"
endpoint = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/"

# Create output directory inside the samples folder
output_dir = os.path.join(os.path.dirname(__file__), "output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "eurostat_gdp_metadata.xml")

logging.info(f"--- Fetching Eurostat Metadata: {dataflow_id} ---")

# --- 2. Use the SdmxClient ---
client = SdmxClient(endpoint, agency_id)
client.fetch_and_save_metadata(dataflow_id=dataflow_id,
output_path=output_path)
logging.info(f"--- Successfully downloaded metadata to {output_path} ---")


if __name__ == "__main__":
main()
Loading