-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathS2TMBplus.py
204 lines (170 loc) · 9.05 KB
/
S2TMBplus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# with input in the DataFrame format
# update metadata of the dataset
import os, sys
from typing import Union, Dict
import scipy.io
import numpy as np
from sklearn import preprocessing
from common_primitives import utils
from d3m import container
from d3m.metadata import base as metadata_base
from d3m.metadata import hyperparams
from d3m.metadata import params
from d3m.primitive_interfaces.supervised_learning import SupervisedLearnerPrimitiveBase
from d3m.primitive_interfaces import base
from d3m.primitive_interfaces.base import CallResult
import rpi_d3m_primitives
from rpi_d3m_primitives.featSelect.Feature_Selector_model import S2TMB
from rpi_d3m_primitives.featSelect.RelationSet import RelationSet
from sklearn.impute import SimpleImputer
Inputs = container.DataFrame
Outputs = container.DataFrame
__all__ = ('S2TMBplus',)
class Params(params.Params):
pass
class Hyperparams(hyperparams.Hyperparams):
nbins = hyperparams.UniformInt(
lower=2,
upper=21,
default=10,
description = 'The number of bins for discretization.',
semantic_types = ['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)
class S2TMBplus(SupervisedLearnerPrimitiveBase[Inputs, Outputs, Params, Hyperparams]):
"""
A primitive that performs supervised structured feature selection to reduce input feature dimension. Input to this primitive should be a matrix of tabular numerical/categorical data, consisting of columns of features, and an array of labels. Output will be a reduced data matrix with metadata updated.
"""
metadata = metadata_base.PrimitiveMetadata({
'id': '215f554e-15f4-4174-9743-03cddad91dc4',
'version': rpi_d3m_primitives.__coreversion__,
'name': 'S2TMBplus feature selector',
'keywords': ['Feature Selection'],
'description': 'This primitive is a structured feature selection function based on a scoring function.',
'source': {
'name': rpi_d3m_primitives.__author__,
'contact': 'mailto:[email protected]',
'uris': [
'https://github.com/zijun-rpi/d3m-primitives/blob/master/S2TMBplus.py',
'https://github.com/zijun-rpi/d3m-primitives.git'
]
},
'installation':[
{
'type': metadata_base.PrimitiveInstallationType.PIP,
'package': 'rpi_d3m_primitives',
'version': rpi_d3m_primitives.__version__
}
],
'python_path': 'd3m.primitives.feature_selection.score_based_markov_blanket.RPI',
'algorithm_types': [
metadata_base.PrimitiveAlgorithmType.MINIMUM_REDUNDANCY_FEATURE_SELECTION
],
'primitive_family': metadata_base.PrimitiveFamily.FEATURE_SELECTION
})
def __init__(self, *, hyperparams: Hyperparams, random_seed: int = 0, docker_containers: Union[Dict[str, base.DockerContainer]] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
self._index = None
self._problem_type = 'classification'
self._training_inputs = None
self._training_outputs = None
self._fitted = False
self._LEoutput = preprocessing.LabelEncoder()
self._Imputer = SimpleImputer(missing_values=np.nan, strategy='mean')#self.hyperparams['Imputer_Strategy']) # imputer
self._nbins = self.hyperparams['nbins']
self._Kbins = preprocessing.KBinsDiscretizer(n_bins=self._nbins, encode='ordinal', strategy='uniform')#self.hyperparams['Discretizer_Strategy']) #KbinsDiscretizer
## TO Do
# select columns via semantic types
# remove preprocessing
def set_training_data(self, *, inputs: Inputs, outputs: Outputs) -> None:
# set problem type
metadata = outputs.metadata
column_metadata = metadata.query((metadata_base.ALL_ELEMENTS, 0))
semantic_types = column_metadata.get('semantic_types', [])
if 'https://metadata.datadrivendiscovery.org/types/CategoricalData' in semantic_types:
self._problem_type = 'classification'
# set training labels
self._LEoutput.fit(outputs)
self._training_outputs = self._LEoutput.transform(outputs)
else:
self._problem_type = 'regression'
# convert cateforical values to numerical values in training data
metadata = inputs.metadata
[m,n] = inputs.shape
self._training_inputs = np.zeros((m,n))
self._cate_flag = np.zeros((n,))
for column_index in metadata.get_elements((metadata_base.ALL_ELEMENTS,)):
if column_index is metadata_base.ALL_ELEMENTS:
continue
column_metadata = metadata.query((metadata_base.ALL_ELEMENTS, column_index))
semantic_types = column_metadata.get('semantic_types', [])
if 'https://metadata.datadrivendiscovery.org/types/CategoricalData' in semantic_types:
LE = preprocessing.LabelEncoder()
LE = LE.fit(inputs.iloc[:,column_index])
self._training_inputs[:,column_index] = LE.transform(inputs.iloc[:,column_index])
self._cate_flag[column_index] = 1
elif 'http://schema.org/Text' in semantic_types:
pass
else:
temp = list(inputs.iloc[:, column_index].values)
for i in np.arange(len(temp)):
if bool(temp[i]):
self._training_inputs[i,column_index] = float(temp[i])
else:
## To do: float nan
self._training_inputs[i,column_index] = float('nan')
if not np.count_nonzero(np.isnan(self._training_inputs[:, column_index])) == 0: # if there is missing values
if np.count_nonzero(np.isnan(self._training_inputs[:,column_index])) == m: # all missing
self._training_inputs[:,column_index] = np.zeros(m,) # replace with all zeros
self._fitted = False
def fit(self, *, timeout: float = None, iterations: int = None) -> None:
if self._fitted:
return CallResult(None)
if self._training_inputs.any() == None or self._training_outputs.any() == None:
raise ValueError('Missing training data, or missing values exist.')
## impute missing values
self._Imputer.fit(self._training_inputs)
self._training_inputs = self._Imputer.transform(self._training_inputs)
# [m,n] = self._training_inputs.shape
# for column_index in range(n):
# if len(np.unique(self._training_inputs[:,column_index])) == 1:
# self._cate_flag[column_index] = 1
## discretize non-categorical values
disc_training_inputs = self._training_inputs
if not len(np.where(self._cate_flag == 0)[0]) == 0:
self._Kbins.fit(self._training_inputs[:, np.where(self._cate_flag == 0)[0]]) #find non-categorical values
temp = self._Kbins.transform(self._training_inputs[:, np.where(self._cate_flag == 0)[0]])
disc_training_inputs[:, np.where(self._cate_flag == 0)[0]] = temp
#start from zero
Trainset = RelationSet(self._training_inputs, self._training_outputs.reshape(-1, 1))
discTrainset = RelationSet(disc_training_inputs, self._training_outputs.reshape(-1, 1))
validSet, smallTrainSet = Trainset.split(self._training_inputs.shape[0] // 4)
smallDiscTrainSet = discTrainset.split(self._training_inputs.shape[0] // 4)[1]
model = S2TMB(smallTrainSet, smallDiscTrainSet,
self._problem_type, test_set=validSet)
index = model.select_features()
# Features with only one value should not be selected
remove = []
if not len(index) == 0:
for i in np.arange(len(index)):
if len(np.unique(self._training_inputs[:,i])) == 1:
remove.append(i)
index = np.setdiff1d(index, remove)
if len(index) == 0:
index = np.arange(self._training_inputs.shape[1])
self._index = []
[m, ] = index.shape
for ii in np.arange(m):
self._index.append(index[ii].item())
self._fitted = True
return CallResult(None)
def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: # inputs: m x n numpy array
if self._fitted:
output = inputs.iloc[:, self._index]
output.metadata = utils.select_columns_metadata(inputs.metadata, columns=self._index)
return CallResult(output)
else:
raise ValueError('Model should be fitted first.')
def get_params(self) -> None:
pass
def set_params(self) -> None:
pass