Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT add Data Health Checker #1574

Merged
merged 11 commits into from
Jan 9, 2025
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ We recommend users to prepare their own data if they have a high-quality dataset
* *trading_date*: start of trading day
* *end_date*: end of trading day(not included)

### Checking the health of the data
* We provide a script to check the health of the data, you can run the following commands to check whether the data is healthy or not.
```
python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data
```
* Of course, you can also add some parameters to adjust the test results, such as this.
```
python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data --missing_data_num 30055 --large_step_threshold_volume 94485 --large_step_threshold_price 20
```
* If you want more information about `check_data_health`, please refer to the [documentation](https://qlib.readthedocs.io/en/latest/component/data.html#checking-the-health-of-the-data).

<!--
- Run the initialization code and get stock data:
Expand Down
51 changes: 51 additions & 0 deletions docs/component/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,57 @@ After conversion, users can find their Qlib format data in the directory `~/.qli
In the convention of `Qlib` data processing, `open, close, high, low, volume, money and factor` will be set to NaN if the stock is suspended.
If you want to use your own alpha-factor which can't be calculate by OCHLV, like PE, EPS and so on, you could add it to the CSV files with OHCLV together and then dump it to the Qlib format data.

Checking the health of the data
-------------------------------

``Qlib`` provides a script to check the health of the data.

- The main points to check are as follows

- Check if any data is missing in the DataFrame.

- Check if there are any large step changes above the threshold in the OHLCV columns.

- Check if any of the required columns (OLHCV) are missing in the DataFrame.

- Check if the 'factor' column is missing in the DataFrame.

- You can run the following commands to check whether the data is healthy or not.

for daily data:
.. code-block:: bash

python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data

for 1min data:
.. code-block:: bash

python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data_1min --freq 1min

- Of course, you can also add some parameters to adjust the test results.

- The available parameters are these.

- freq: Frequency of data.

- large_step_threshold_price: Maximum permitted price change

- large_step_threshold_volume: Maximum permitted volume change.

- missing_data_num: Maximum value for which data is allowed to be null.

- You can run the following commands to check whether the data is healthy or not.

for daily data:
.. code-block:: bash

python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data --missing_data_num 30055 --large_step_threshold_volume 94485 --large_step_threshold_price 20

for 1min data:
.. code-block:: bash

python scripts/check_data_health.py check_data --qlib_dir ~/.qlib/qlib_data/cn_data --freq 1min --missing_data_num 35806 --large_step_threshold_volume 3205452000000 --large_step_threshold_price 0.91

Stock Pool (Market)
-------------------

Expand Down
203 changes: 203 additions & 0 deletions scripts/check_data_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from loguru import logger
import os
from typing import Optional

import fire
import pandas as pd
import qlib
from tqdm import tqdm

from qlib.data import D


class DataHealthChecker:
"""Checks a dataset for data completeness and correctness. The data will be converted to a pd.DataFrame and checked for the following problems:
- any of the columns ["open", "high", "low", "close", "volume"] are missing
- any data is missing
- any step change in the OHLCV columns is above a threshold (default: 0.5 for price, 3 for volume)
- any factor is missing
"""

def __init__(
self,
csv_path=None,
qlib_dir=None,
freq="day",
large_step_threshold_price=0.5,
large_step_threshold_volume=3,
missing_data_num=0,
):
assert csv_path or qlib_dir, "One of csv_path or qlib_dir should be provided."
assert not (csv_path and qlib_dir), "Only one of csv_path or qlib_dir should be provided."

self.data = {}
self.problems = {}
self.freq = freq
self.large_step_threshold_price = large_step_threshold_price
self.large_step_threshold_volume = large_step_threshold_volume
self.missing_data_num = missing_data_num

if csv_path:
assert os.path.isdir(csv_path), f"{csv_path} should be a directory."
files = [f for f in os.listdir(csv_path) if f.endswith(".csv")]
for filename in tqdm(files, desc="Loading data"):
df = pd.read_csv(os.path.join(csv_path, filename))
self.data[filename] = df

elif qlib_dir:
qlib.init(provider_uri=qlib_dir)
self.load_qlib_data()

def load_qlib_data(self):
instruments = D.instruments(market="all")
instrument_list = D.list_instruments(instruments=instruments, as_list=True, freq=self.freq)
required_fields = ["$open", "$close", "$low", "$high", "$volume", "$factor"]
for instrument in instrument_list:
df = D.features([instrument], required_fields, freq=self.freq)
df.rename(
columns={
"$open": "open",
"$close": "close",
"$low": "low",
"$high": "high",
"$volume": "volume",
"$factor": "factor",
},
inplace=True,
)
self.data[instrument] = df
print(df)

def check_missing_data(self) -> Optional[pd.DataFrame]:
"""Check if any data is missing in the DataFrame."""
result_dict = {
"instruments": [],
"open": [],
"high": [],
"low": [],
"close": [],
"volume": [],
}
for filename, df in self.data.items():
missing_data_columns = df.isnull().sum()[df.isnull().sum() > self.missing_data_num].index.tolist()
if len(missing_data_columns) > 0:
result_dict["instruments"].append(filename)
result_dict["open"].append(df.isnull().sum()["open"])
result_dict["high"].append(df.isnull().sum()["high"])
result_dict["low"].append(df.isnull().sum()["low"])
result_dict["close"].append(df.isnull().sum()["close"])
result_dict["volume"].append(df.isnull().sum()["volume"])

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ There are no missing data.")
return None

def check_large_step_changes(self) -> Optional[pd.DataFrame]:
"""Check if there are any large step changes above the threshold in the OHLCV columns."""
result_dict = {
"instruments": [],
"col_name": [],
"date": [],
"pct_change": [],
}
for filename, df in self.data.items():
affected_columns = []
for col in ["open", "high", "low", "close", "volume"]:
if col in df.columns:
pct_change = df[col].pct_change(fill_method=None).abs()
threshold = self.large_step_threshold_volume if col == "volume" else self.large_step_threshold_price
if pct_change.max() > threshold:
large_steps = pct_change[pct_change > threshold]
result_dict["instruments"].append(filename)
result_dict["col_name"].append(col)
result_dict["date"].append(large_steps.index.to_list()[0][1].strftime("%Y-%m-%d"))
result_dict["pct_change"].append(pct_change.max())
affected_columns.append(col)

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ There are no large step changes in the OHLCV column above the threshold.")
return None

def check_required_columns(self) -> Optional[pd.DataFrame]:
"""Check if any of the required columns (OLHCV) are missing in the DataFrame."""
required_columns = ["open", "high", "low", "close", "volume"]
result_dict = {
"instruments": [],
"missing_col": [],
}
for filename, df in self.data.items():
if not all(column in df.columns for column in required_columns):
missing_required_columns = [column for column in required_columns if column not in df.columns]
result_dict["instruments"].append(filename)
result_dict["missing_col"] += missing_required_columns

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ The columns (OLHCV) are complete and not missing.")
return None

def check_missing_factor(self) -> Optional[pd.DataFrame]:
"""Check if the 'factor' column is missing in the DataFrame."""
result_dict = {
"instruments": [],
"missing_factor_col": [],
"missing_factor_data": [],
}
for filename, df in self.data.items():
if "000300" in filename or "000903" in filename or "000905" in filename:
continue
if "factor" not in df.columns:
result_dict["instruments"].append(filename)
result_dict["missing_factor_col"].append(True)
if df["factor"].isnull().all():
if filename in result_dict["instruments"]:
result_dict["missing_factor_data"].append(True)
else:
result_dict["instruments"].append(filename)
result_dict["missing_factor_col"].append(False)
result_dict["missing_factor_data"].append(True)

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ The `factor` column already exists and is not empty.")
return None

def check_data(self):
check_missing_data_result = self.check_missing_data()
check_large_step_changes_result = self.check_large_step_changes()
check_required_columns_result = self.check_required_columns()
check_missing_factor_result = self.check_missing_factor()
if (
check_large_step_changes_result is not None
or check_large_step_changes_result is not None
or check_required_columns_result is not None
or check_missing_factor_result is not None
):
print(f"\nSummary of data health check ({len(self.data)} files checked):")
print("-------------------------------------------------")
if isinstance(check_missing_data_result, pd.DataFrame):
logger.warning(f"There is missing data.")
print(check_missing_data_result)
if isinstance(check_large_step_changes_result, pd.DataFrame):
logger.warning(f"The OHLCV column has large step changes.")
print(check_large_step_changes_result)
if isinstance(check_required_columns_result, pd.DataFrame):
logger.warning(f"Columns (OLHCV) are missing.")
print(check_required_columns_result)
if isinstance(check_missing_factor_result, pd.DataFrame):
logger.warning(f"The factor column does not exist or is empty")
print(check_missing_factor_result)


if __name__ == "__main__":
fire.Fire(DataHealthChecker)
Loading