-
Notifications
You must be signed in to change notification settings - Fork 151
Merge update initial implementation #2684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: vasil.pashov/feature/merge
Are you sure you want to change the base?
Merge update initial implementation #2684
Conversation
efa409d to
33f3a23
Compare
Add prune_previous, metadata, on and match_on_index to the skeleton. Imlement checks for features that are not yet implemented Initial implementation Set types properly Fix tests Fix unit tests Enable more tests
33f3a23 to
2c55289
Compare
| metadata: Optional[Any] = None, | ||
| upsert: bool = False, | ||
| ) -> VersionedItem: | ||
| udm, item, norm_meta = self._nvs._try_normalize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- docs :)
| def merge( | ||
| self, | ||
| symbol: str, | ||
| dataframe: NormalizableType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parameter data to match update method?
| ) | ||
|
|
||
|
|
||
| class MergeStrategy(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the expected semantics of UPDATE vs INSERT? Just thinking of how this will be used, isn't upsert always what's wanted i.e. add missing, or update what's there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess upsert will be the most commonly used. Update and insert are more or less orthogonal and can be implemented separately. Thus we can present an option to the user to do just one operation.
| "mode must be one of StagedDataFinalizeMethod.WRITE, StagedDataFinalizeMethod.APPEND, 'write', 'append'" | ||
| ) | ||
|
|
||
| def merge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have another method on this, vs making this a feature of update?
Update already has an upsert parameter which many would think would have upsert semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I find the semantics of the upsert parameter to be a bit confusing I guess the word got overloaded too much. Technically it's possible to add some flags and have this functionality as part of the update. I think the two things are doing quite different things and the implementation will be vastly different so I'm not quite convinced it'll be for the better to mash them into one API call.
| }; | ||
|
|
||
| std::vector<SliceAffectedByMerge> slices_affected_by_merge( | ||
| const InputTensorFrame& source, std::span<const SliceAndKey> slices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need the `match_on_timeseries_index' parameter as well, as we will always need to read everything when not matching on the index
| std::vector<folly::Future<folly::Unit>> merge_segments_fut; | ||
| merge_segments_fut.reserve(affected_slices.size()); | ||
| for (const SliceAffectedByMerge& affected : affected_slices) { | ||
| merge_segments_fut.emplace_back( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be windowed to guarantee we don't read everything into memory at once
| .thenValue([store, | ||
| &update_info](std::pair<VariantKey, SegmentInMemory>&& key_segment) { | ||
| const AtomKey& key = std::get<AtomKey>(key_segment.first); | ||
| return store->write( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about slicing? We can be inserting lots of rows, so could end up with a massive segment on disk
| ); | ||
| }) | ||
| .thenValueInline([index, affected](VariantKey&& key) { | ||
| index->slice_and_keys[affected.slice_index].set_key( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rowcount will now be wrong in the SliceAndKey?
See above comment on slicing though, I don't think this design of modifying the existing index will work in that case, better to just build up a new index key, more like update
| const auto target_index_end = target_index.end<IndexType>(); | ||
| while (row_in_source < source.num_rows) { | ||
| const timestamp source_index_value = source_index[row_in_source]; | ||
| if (slice_to_update.key().end_time() <= source_index_value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using atom key end times is error prone due to various bugs we've had in the past, use the segment in memory as the source of truth whenever it has already been read
| } | ||
|
|
||
| auto target_row_it = std::lower_bound(target_index_search_start, target_index_end, source_index_value); | ||
| while (target_row_it != target_index_end && *target_row_it == source_index_value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this algorithm on a call. There are pros and cons to whatever we choose to do for index.
We should also design this with match_on_index_column=false and !on.empty() now as well, as they could both cause large changes to how this looks.
Reference Issues/PRs
Monday:
What does this implement or fix?
This provides initial implementation of merge functionality supporting only the update part of it. It supports only matching on an ordered DatetimeIndex and static schema.
The algorithm takes advantage that both the source and the target are ordered.
O(index_row_count * log(source_row_count)). The information is stored as a pair, the index of the affected slice in the index key and the first index value from the source that falls into that slice.Next steps:
The iteration in step 3 above is row wise. This will be slow for DataFrames containing UTF string values as reading UTF strings requires holding the GIL and in general row wise iterations are not cache friendly. The reason this initial implementation uses row wise iteration is that it's easier to implement. Column wise iterations would need to either perfrom
O(slice_column_count * source_row_count * log(segment_size))or use a caching mechanism matching source row to segment row another difficulty will be related to having the on clause. With on clause we need to check the entire row (across all segments) to know if update should be performed. The long term plan is to add additional step beforeupdate_segment_inplacethat will iterate over all slices and generate a list of of pairs (UPDATE/INSERT, row_in_target_segment, row_in_source).Any other comments?
Checklist
Checklist for code changes...