-
Notifications
You must be signed in to change notification settings - Fork 81
CHIP : Incremental feature aggregation #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
kambstreat
wants to merge
13
commits into
airbnb:main
Choose a base branch
from
kambstreat:chip/incremental_compute
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
af1f46e
add incremental agg CHIP
kchakka-roku 1965fd0
add images
kchakka-roku 8357238
add images
kchakka-roku 0675fbe
add images
kchakka-roku 5c07bb3
add
kchakka-roku 9b5f66f
api changes
kchakka-roku 5d9fa7d
api changes
kchakka-roku 0017243
remove unused images
kchakka-roku 9c52d2a
Merge branch 'main' into chip/incremental_compute
kchakka-roku 934d155
add implementation details
kchakka-roku 31b9954
add daily agg IRs solution
kchakka-roku 87ac103
add image for daily agg
kchakka-roku 40021d2
slight format change
kchakka-roku File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# Problem Statement | ||
|
||
Chronon reads data partitions based on window sizes to compute features for event sources. For example | ||
|
||
* sum_clicks_past_30days ← 30 day window | ||
* count_visits_past_90days ← 90 day window | ||
|
||
When computing aggregations, one of the major time taking tasks is reading data from disk. If a feature needs 90 days lookback, Chronon reads 90 days event data to compute. | ||
|
||
<img src="./images/CHIP10_inc_agg_visual.png" alt="Incremental Agg" width="900" /> | ||
|
||
This CHIP is to add support for incremental aggregation which will avoid reading full dataset everytime. | ||
|
||
|
||
# Proposed Solution | ||
|
||
Divide the aggregate computation into 2 parts. | ||
* **Daily Aggregation** - Includes reading one day partition and store partial aggregates | ||
* **Final Aggregation** - Read all partial aggregates and compute the final aggregates | ||
|
||
For the above 2 steps to compute we assume | ||
|
||
## Daily Aggregation IRs | ||
|
||
Given a data source | ||
* reads the latest partition and generates aggregated values (IRs) | ||
* These values will be stored in a new intermediate table. | ||
* Schema of intermediate table is of IRs (different from final groupby output) | ||
|
||
<img src="./images/CHIP10_events_to_daily.png" alt="Events to Daily Aggregate" width="900" /> | ||
|
||
The above diagram shows the daily IRs for `count` aggregation | ||
|
||
## Final Aggregation (Final feature values) | ||
|
||
* Read the intermediate table and compute the final aggregates | ||
* Number of partitions read depends on the window size | ||
|
||
## Pros | ||
* Daily aggregates reduces the amnount of data read from disk | ||
* Final aggregation is done on the intermediate table which is smaller than the original data source. | ||
|
||
## Cons | ||
* Intermediate table is created for each groupby which increases the storage cost | ||
* Any changes in datasource will require re-computation of intermediate table. | ||
|
||
# Optimizations | ||
|
||
The proposed solution is a good start but can be improved further for invertible operators. Invertible operators are those which can be deleted. For example, `count` and `sum` are invertible operators. | ||
|
||
### Requirements | ||
|
||
* Previous day's groupBy output. | ||
* groupBy values are basically ML features. So will refer this as `feature store` | ||
* User activity table with latest and historical partitions. | ||
|
||
|
||
### Usecases | ||
#### Case 1 : Unwindowed deletable | ||
|
||
> Eg: how many listings purchased by the user since signup. | ||
|
||
These features are not windowed, which means the feature calculation happens over the lifetime. | ||
|
||
<img src="./images/CHIP10_unwindowed_agg.png" alt="Incremental Agg" width="600" /> | ||
|
||
#### Case 2 : Windowed deletable | ||
These features are windowed and has a inverse operator | ||
|
||
> Eg: Number of listings(Count Aggregation) purchased by the user in the past 90 days | ||
|
||
<img src="./images/CHIP10_windowed_deletable_agg.png" alt="Incremental Agg" width="600" /> | ||
|
||
```python | ||
GroupBy( | ||
sources=[table_name], | ||
keys=["key_col1", "key_col2"], | ||
aggregations=[ | ||
Aggregation( | ||
input_column="inp_col", | ||
operation=Operation.COUNT, | ||
windows=[ | ||
Window(length=3, timeUnit=TimeUnit.DAYS), | ||
Window(length=10, timeUnit=TimeUnit.DAYS) | ||
|
||
] | ||
)], | ||
accuracy=Accuracy.SNAPSHOT | ||
) | ||
``` | ||
To compute above groupBy incrementally | ||
* Read the output table from groupby to get previous day’s aggregated values | ||
* Read _day 0_ to add the latest activity | ||
* Read user activity on day 4, day 11 to delete the unwanted user data | ||
|
||
#### Case 3: Windowed non-deletable | ||
|
||
These features are windowed and does not have an inverse operator | ||
|
||
|
||
> Eg: What is the max purchase price by the user in the past 30 days | ||
|
||
For non-deletable operators, we will go with the current behavior of Chronon where we load all the data/partitions needed to compute feature. | ||
|
||
|
||
### Pros | ||
* For invertible operators, drastically reduces amount of data to read. | ||
* No need to read daily aggregated IRs. | ||
|
||
### Cons | ||
* Applicable for only invertible operators | ||
* If there is one operator in a GroupBy which is non-invertible, then we need to read all the partitions. | ||
* This introduces 2 paths to compute a groupBy. | ||
* Invertible : Just read latest/specific partitions and feature store | ||
* Non-invertible : Read all partitions | ||
|
||
# Implementation | ||
|
||
## API Changes | ||
|
||
Add `incremental=True` if the feature compute needs to happen in incremental way in GroupBy API | ||
|
||
```python | ||
GroupBy( | ||
sources=[table_name], | ||
keys=["key_col1", "key_col2"], | ||
aggregations=[....], | ||
incremental_agg=True, | ||
accuracy=Accuracy.SNAPSHOT | ||
) | ||
``` | ||
Need thrift changes for the flag |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative approach is to store an intermediate 'tiled' representation: each day store the aggregate for just that day, then compute the longer windows from the intermediate.
e.g. For the above example, store the count of
inp_col
each day, then your 3 and 10 day windows just need to sum those intermediate counts to get the final values.The benefit here is it works for almost any kind of aggregation, including max, min etc.
I'm fairly sure this is how the 'tiled architecture' works for the online flow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@blrnw3 Yes. I am going to change the architecture. Going to get the daily aggregations and store it in table. The only change would be the way we store the IRs. For example, for avg, we need to store both sum/count.