Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

U/jrbogart/truth parquet reader #455

Merged
merged 15 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/galtruth
filename_pattern: 'truth_summary_hp{healpix}.parquet$'
description: DC2 Run 2.2i Summary truth table for galaxies
creators: ['Joanne Bogart']
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/sntruth
filename_pattern: 'sn_truth_summary.parquet$'
description: DC2 Run 2.2i Summary truth table for SNe
creators: ['Joanne Bogart']
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/sntruth
filename_pattern: 'sn_variability_truth.parquet$'
description: DC2 Run 2.2i variability truth table for SNe
creators: ['Joanne Bogart']
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/startruth
filename_pattern: 'star_lc_stats_trimmed.parquet$'
description: DC2 Run 2.2i Summary truth table for stars
creators: ['Joanne Bogart']
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/startruth
filename_pattern: 'star_truth_summary.parquet$'
description: DC2 Run 2.2i Summary truth table for stars
creators: ['Joanne Bogart']
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Use ^/ to indicate file path relative to GCR root dir

subclass_name: dc2_truth_parquet.DC2TruthParquetCatalog
base_dir: ^/DC2-prod/Run2.2i/truth/startruth
filename_pattern: 'star_variability_truth.parquet$'
description: DC2 Run 2.2i variability truth table for SNe
creators: ['Joanne Bogart']
49 changes: 3 additions & 46 deletions GCRCatalogs/dc2_dm_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pyarrow.parquet as pq
import yaml
from GCR import BaseGenericCatalog
from .parquet import ParquetFileWrapper

from .utils import first

Expand Down Expand Up @@ -89,51 +90,6 @@ def create_basic_flag_mask(*flags):
return out


class ParquetFileWrapper():
def __init__(self, file_path, info=None):
self.path = file_path
self._handle = None
self._columns = None
self._info = info or dict()

@property
def handle(self):
if self._handle is None:
self._handle = pq.ParquetFile(self.path)
return self._handle

def close(self):
self._handle = None

def __len__(self):
return int(self.handle.scan_contents)

def __contains__(self, item):
return item in self.columns

def read_columns(self, columns, as_dict=False):
d = self.handle.read(columns=columns).to_pandas()
if as_dict:
return {c: d[c].values for c in columns}
return d

@property
def info(self):
return dict(self._info)

def __getattr__(self, name):
if name not in self._info:
raise AttributeError('Attribute {} does not exist'.format(name))
return self._info[name]

@property
def columns(self):
if self._columns is None:
self._columns = [col for col in self.handle.schema.to_arrow_schema().names
if re.match(r'__\w+__$', col) is None]
return list(self._columns)


class DC2DMCatalog(BaseGenericCatalog):
r"""DC2 Catalog reader

Expand Down Expand Up @@ -299,7 +255,8 @@ def _obtain_native_data_dict(native_quantities_needed, native_quantity_getter):
Overloading this so that we can query the database backend
for multiple columns at once
"""
return native_quantity_getter.read_columns(list(native_quantities_needed), as_dict=True)
return native_quantity_getter.read_columns(list(native_quantities_needed),
as_dict=True)

def _iter_native_dataset(self, native_filters=None):
for dataset in self._datasets:
Expand Down
108 changes: 108 additions & 0 deletions GCRCatalogs/dc2_truth_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Reader for truth catalogs persisted as parquet files. May also be
suitable for other catalogs coming from parquet
"""

import os
import re
import warnings

from GCR import BaseGenericCatalog
from .parquet import ParquetFileWrapper
from .parse_utils import PathInfoParser

from .utils import first

__all__ = ['DC2TruthParquetCatalog']



class DC2TruthParquetCatalog(BaseGenericCatalog):
r"""
DC2 Truth (parquet) Catalog reader

Presents tables exactly as they are defined in the files (no aliases,
no derived quantities)

Parameters
----------
base_dir (str): Directory of data files being served. Required.
filename_pattern (str): Optional "enhanced regex" pattern of served data
files.
Default is match anything

If filename_pattern contains substrings like "{some_ident}" where
some_ident is a legal identifier, this part of the pattern will be
replaced with a regex expression for a group matching a string of
digits or word characters,
e.g.
(?P<some_ident>\d+) or (?P<some_ident>\w+)
The first form will be used iff the identifier is one of a well-known set
with integer values, currently ('tract', 'visit', 'healpix')

Such group names may be used subsequently as native_filter_quantities
If filename_pattern already includes standard regex syntax for named
groups, those group names may also be used as native filters
"""

def _subclass_init(self, **kwargs):
self.base_dir = kwargs['base_dir']
self.path_parser = PathInfoParser(kwargs.get('filename_pattern','.*'))

if not os.path.isdir(self.base_dir):
raise ValueError('`base_dir` {} is not a valid directory'.format(self.base_dir))
self._datasets = self._generate_datasets()
if not self._datasets:
err_msg = 'No catalogs were found in `base_dir` {}'
raise RuntimeError(err_msg.format(self.base_dir))

self._columns = first(self._datasets).columns

self._quantity_modifiers = {col: None for col in self._columns}

self._native_filter_quantities = set(self.path_parser.group_names)

def _generate_datasets(self):
"""Return viable data sets from all files in self.base_dir

Returns:
A list of ParquetFileWrapper objects. If any native filters come
from filepath re, dict of their values will be stored in the object
"""
datasets = list()
for fname in sorted(os.listdir(self.base_dir)):
info_dict = self.path_parser.file_info(fname)
if info_dict == None:
continue
datasets.append(ParquetFileWrapper(os.path.join(self.base_dir,
fname),
info=info_dict))
return datasets

def _generate_native_quantity_list(self):
return self._columns

@staticmethod
def _obtain_native_data_dict(native_quantities_needed,
native_quantity_getter):
"""
Overloading this so that we can query the database backend
for multiple columns at once
"""
return native_quantity_getter.read_columns_row_group(list(native_quantities_needed), as_dict=True)

def _iter_native_dataset(self, native_filters=None):
for dataset in self._datasets:
if (native_filters is not None and
not native_filters.check_scalar(dataset.info)):
continue
for i in range(dataset.num_row_groups):
dataset.current_row_group = i
yield dataset

def close_all_file_handles(self):
"""Clear all cached file handles"""
for dataset in self._datasets:
dataset.close()


115 changes: 115 additions & 0 deletions GCRCatalogs/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pyarrow.parquet as pq
import re

__all__ = ['ParquetFileWrapper']

class ParquetFileWrapper():
'''
Provide services commonly needed when catalog consists of one or more parquet files.
Typical usage by a GCR reader might include
creating a ParquetFileWrapper object for each parquet file in _generate_datasets
ParquetFileWrapper object will serve as native_quantity_getter in reader's
implementation of _obtain_native_data_dict
Yield instance of ParquetFileObject in implementation of _iter_native_dataset

There are two ways to read data using a ParquetFileWrapper object: either read data
a file at a time (use read_columns) or a row group at a time (use read_columns_row_group).
In the latter case _iter_native_dataset will have to iterate over row groups as well
as files. See reader dc2_truth_parquet.py for an example.
The two methods are equivalent for files having only a single row group.

'''
def __init__(self, file_path, info=None):
'''
Parameters
----------
file_path string Full path to underlying parquet file (required)
info dict Associate native filter names with values for this file (optional)
'''
self.path = file_path
self._handle = None
self._columns = None
self._info = info or dict()
self._row_group = 0 # store the current row group index

@property
def handle(self):
if self._handle is None:
self._handle = pq.ParquetFile(self.path)
return self._handle

@property
def num_row_groups(self):
return self.handle.metadata.num_row_groups

@property
def current_row_group(self):
return self._row_group

@current_row_group.setter
def current_row_group(self, grp):
self._row_group = int(grp)

def close(self):
self._handle = None

def __len__(self):
return int(self.handle.scan_contents)

def __contains__(self, item):
return item in self.columns

def read_columns(self, columns, as_dict=False):
'''
Read all values for specified columns

Parameters
----------
columns list of columns to be read
as_dict boolean. If true, return data as dict where keys are column names
Else return pandas dataframe
returns
-------
dict or dataframe See as_dict parameter above

'''
d = self.handle.read(columns=columns).to_pandas()
if as_dict:
return {c: d[c].values for c in columns}
return d

def read_columns_row_group(self, columns, as_dict=False):
'''
Read specified columns for a single row group, the one stored in the property
current_row_group

Parameters
----------
columns list of columns to be read
as_dict boolean. If true, return data as dict where keys are column names
Else return pandas dataframe
returns
-------
dict or dataframe See as_dict parameter above
'''
d = self.handle.read_row_group(self.current_row_group, columns=columns).to_pandas()
if as_dict:
return {c: d[c].values for c in columns}
return d


@property
def info(self):
return dict(self._info)

def __getattr__(self, name):
if name not in self._info:
raise AttributeError('Attribute {} does not exist'.format(name))
return self._info[name]

@property
def columns(self):
if self._columns is None:
self._columns = [col for col in self.handle.schema.to_arrow_schema().names
if re.match(r'__\w+__$', col) is None]
return list(self._columns)
Loading