-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworkbench_example_pipeline.py
62 lines (50 loc) · 1.54 KB
/
workbench_example_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# %%
from flotta.core.queries import Query
from flotta.core.transformers import (
FederatedPipeline,
FederatedMinMaxScaler,
FederatedSimpleImputer,
FederatedDrop,
FederatedKBinsDiscretizer,
FederatedLabelBinarizer,
FederatedBinarizer,
FederatedOneHotEncoder,
FederatedClamp,
)
from flotta.schemas.datasources import DataSource
from flotta.schemas.project import Project
from flotta.workbench.context import Context
# %% create the context
ctx = Context("http://localhost:1456")
p: Project = ctx.project("")
# %% ask the context for available metadata
data_sources: list[DataSource] = ctx.datasources(p)
ds1: DataSource = data_sources[0]
ds2: DataSource = data_sources[1]
# %% develop a filter query
q: Query = ds1.extract()
all_features = q.features()
f1, f2, f3, f4, f5, f6, f7, f8, f9 = all_features
mms = FederatedMinMaxScaler(features_in=all_features, features_out=all_features)
im = FederatedSimpleImputer(features_in=[f1, f2], features_out=[f1, f2])
cl = FederatedClamp(features_in=[f1], features_out=[f1])
bi = FederatedBinarizer(features_in=[f2], features_out=[f2])
ohe1 = FederatedOneHotEncoder(features_in=[f3])
ohe2 = FederatedOneHotEncoder(features_in=[f4])
rm = FederatedDrop(features_in=[ohe1.features_out[2]])
dsc = FederatedKBinsDiscretizer(features_in=[f5], features_out=[f5])
le = FederatedLabelBinarizer(features_in=[f9], features_out=[f9])
pipe = FederatedPipeline(
stages=[
mms,
im,
cl,
bi,
ohe1,
ohe2,
rm,
dsc,
le,
]
)
q.add(pipe)