-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] add 'force_batch_from_source=true' to batch right from the start to allow handling large source and large target tables #697
Comments
To implement what you suggested, the partitions you are dealing with must be known beforehand, making the implementation hard, hence the solution proposed by the adapter. Using Edit: if we plan to introduce force_batch_from_source (to deal with not partition dates) I suggest to make it as a dict: force_batch_from_source={'enabled': true, 'source_iteration_columns': ['product_id'], batch_size: '30'} |
@nicor88 Thank you for your comment! mmmh I wrote the below code to override one of the default macros (
What do you think? Note: what's not nice of this approach is that it is a for loop. Threads are not used. A threaded materialization where all insert into's run in parallel would be nicer. However, the single-threadedness is also the case in the original implementation of |
The implementation that you suggested was similar to the original implementation. Then we decided to persist the data in a not partitioned table, and then use that table as a source for the inserts statements run afterwards. The problem with the solution that you proposed is that the cost will be really high if the batch filter is executed on not partitioned column, causing the same query to run multiple time over and over, scanning the same bytes multiple times. Of course if the same partition used in the partition_by of the target table is the same used in the source table that's different, the partition pruning will work, and less bytes will be scanned. Said so, we can allow a behavior similar to what you proposed via another flag, but if that's the case I would like to have also control over the name of the source partition column used for filtering - pretty much what was suggested in my message above. NOTE: the current behavior of the materialization MUST be kept for the reason explained above, if a user mistakenly use not partitions columns from the source table that causes high cost. Regarding the multithreading, I discourage from using multiple inserts concurrently due to Athena limitations on amount of queries per seconds, therefore the current behavior/and the one that we have in the macro can be kept. |
@nicor88 thank you for your response. I understand! In our case the source table is huge so copying it first to an unpartitioned table (even if filtered to just a few days of data) is not possible. Further we work with a partition projected table as the source which (in most cases) enforces mentioning the partitioning columns in the where clause (and we do). We use a macro which always fill the base filter on the partition projected partitions. But yeah if one could somehow limit the usage of Othwerise, insert_by_period is an option, I guess. |
What I mean is that the
reduce the byte scanned if the {{batch}} filter condition acts on the source table partitions. Said so, the {{batch}} filter condition to be effective must be aware of the partitions in the source, and everything gets complicated if the compiled code contains join across tables. Overall I'm fine to add force_batch_from_source but with this conditions:
Are you up to an alligment call to discuss the above? Looks like you are in Germany like me, we could find a slot. Feel free to reach me out in dbt slack channel to allign on this. Thanks. |
btw maybe this helps: https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main We are using the insert by period macro cause our source tables are really really big and we need to split the sources for each incremental run. This macro adds custom filter values to further reduce the temp. table created for the merge into the target. We have the limitation of 30 min Athena runs and processing one day can take quite a while. Idk if it helps processing your source tables - for us it is working like a charm for months now. |
Nice to see you added Iceberg support @CommonCrisis 👌 I wonder if there's a future in the dbt-athena project for that macro or if we should keep it third-party. It has helped us a lot as well to process big source tables that otherwise timeout on initial run |
I guess third-party and as dbt package would make sense. Like dbt-utils but for athena related macros |
I like the idea of another package that might include the insert-by-period macros/utilities. I'm wondering where shall we put such utility package and how can we call it? |
Why not simply |
We have https://github.com/dbt-athena/athena-utils that is the main reference for athena-utils to be used within dbt. How about that? |
sure why not. We can add add But ideally we also make the adapter more flexible. E.g. I had to copy paste the build iceberg logic into a separate file to make it work with the insert_by_period macro. |
@CommonCrisis absolutely, we can refactor what we need in the adapter to avoid overwriting as much as we can - I will have a look at your https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main to check the implementation. |
💡 Last night, dbt made this announcement about a new incremental-strategy for time-series event data. It introduces |
Is this your first time submitting a feature request?
Describe the feature
When one has a very large source table and one has a query on that source table that leads to a large target table then one can run into query timeout problems on Athena.
force_batch=true
is good for handling many partitions and not run into the limit of 100 open partitions. However, inforce_batch=true
the first thing that happens is that the model gets written without partitions into a temp table. If that table is very big, that can time out, because that can be a query that means copying in one go many many GB of data.In that case (or in fact, always) the partitions should be leveraged right from the get go. I.e. each partition should be used right from the start and written into the target (staging) table (using threading).
I believe THIS CODE must be adapted.
I am not sure if
force_batch=true
should be reworked to show the above behavior always or if a new flagforce_batch_from_source=true
should be introduced.Describe alternatives you've considered
No response
Who will this benefit?
This will benefit people that work with very large partitioned Athena tables (100GB plus) that they want to write to another Athena table that can have on top many partitions.
Are you interested in contributing this feature?
Yes.
Anything else?
No response
The text was updated successfully, but these errors were encountered: