It's about time: “event_time” + “microbatch” + “sample” #10672
Replies: 12 comments 16 replies
-
This is certainly an important step into the right direction :-). I'd like to add some thoughts to this discussion:
In my daily work, these considerations are ubiquitous, as all our sources are async and typically at-least-once delivered. So, any generic solution which would not account for such conditions would require us to still stick with our custom incremental logic. |
Beta Was this translation helpful? Give feedback.
-
Thanks to @dataders for calling this thread to my attention. I wanted to weigh in on the full refresh topic. I don't see the options as 3 separate options. I think options 1 and 2 go together: we should allow full refresh (option 1) but the default is As for option 3, cold storage, I think this is a necessary addition. But may consider disassociating it with the "full refresh" concept. Maybe a new flag is warranted that is something like In summary:I'd like to see very standard full-refresh behavior but dbt can take the liberty to make |
Beta Was this translation helpful? Give feedback.
-
We have a common scenario where an incremental model combines data from multiple sources which can have their own latency, eg we get event data throughout the day but data from a service every (for example) 6 hours. In these cases we generally want our max time to be calculated as follows:
We would love this to be included functionality, otherwise incremental tables with left joins will having missing data and need to be rebuilt. Is this something you would consider? |
Beta Was this translation helpful? Give feedback.
-
Small question about the microbatching: will a single dbt run process several microbatches when needed? In other words, a single dbt build of the model will update the table several times? That's not quite clear to me from the description. |
Beta Was this translation helpful? Give feedback.
-
How does this plan work with merge style tables, which is a separate incremental_strategy on its own? |
Beta Was this translation helpful? Give feedback.
-
Hey @graciegoheen I love this idea! For now I am thinking with this new microbatch, if we can rerun the models with 1 or multiple batches in a specific period of event time for backfilling purpose or for fixing data after a bug-fixed? e.g. Thanks |
Beta Was this translation helpful? Give feedback.
-
Hi @graciegoheen , This is great, and thanks @dataders for linking me to this from my discussion on incrementality here. Thanks so much for taking on this big topic! Regarding the full refresh, from the experience of running with ibtp as a large number of our incremental models, I'd say it's really important to maintain a full refresh that looks and feels like an incremental full refresh. That is which you run, and after you run you have a replacement table. Until the final transaction switch your prod table shouldn't change. e.g this option
Why is this important. If you change some model in a fundamental way, you want to be able to rewrite it and all downstreams in one go. You shouldn't have to specify event time which might vary between models. For us we set the lookback period quite short for dev so that you can dev fast, which I think is the same as 'sample' that you describe. Consider supporting blue green deploymentsOur main flaw at the moment is that the table starts by truncating , then filling row by row. Ideally this solution won't edit prod table until the replacement has been fully built, just like incremental models. Given the size of those full refreshes, it would be nice to have a flag option to archive the old version of a full refreshed table, rather than just delete it. p.s. apologies this message is a bit rushed! |
Beta Was this translation helpful? Give feedback.
-
Will this incremantalize more complex DBT models that involve things like multiple joins and aggregations? select
c.id,
c.name,
count(se.customer_id) as num_events
from {{ ref('stg_events') }} se
join {{ ref('stg_customers') }} c on se.customer_id = c.id
group by c.id, c.name; |
Beta Was this translation helpful? Give feedback.
-
Great initiative! |
Beta Was this translation helpful? Give feedback.
-
Curious to hear more about your vision for how this plays out when running a DAG with at least one microbatch model dependent on another microbatch model. For example, let's take a simple daily pipeline with microbatch model A and microbatch model B downstream of it. A batch for each model is a day. If we were to backfill, say, a month, would dbt wait for all batches of A to complete before running B? Or would there be some smarter logic under the hood to, say, recognize that you can run a given day for model B once that same day has been processed for A? And if B is not incrementally built, dbt would know to wait for A to complete all its batches before starting B? |
Beta Was this translation helpful? Give feedback.
-
Nice to see some movement in terms of incremental refreshing - this is a very potent area for cost-savings. I'm wondering what would a good candidate be for an |
Beta Was this translation helpful? Give feedback.
-
Hi, thanks for the great news—we’ll test it on BigQuery. Regarding the "lookback window" for late-arriving events, have we considered leveraging CDC for microbatching or adopting a new "microstreaming" strategy? With CDC-based processing, there’s no need to hardcode the "lookback window." CDC inherently identifies late events, their event_time windows, and the specific partitions or clusters that need refreshing. Since CDC is widely available in platforms like Snowflake(Stream), BigQuery (changes), and Databricks (CDF), and is a common industry pattern, adding support for such processing could be a valuable enhancement |
Beta Was this translation helpful? Give feedback.
-
🏺Background
Does anyone really know what time it is?
Over the years, the number of time-related configs in dbt has grown.
There’s updated_at (for snapshots) and loaded_at_field (for sources) and agg_time_dimension (for semantic models). You can configure the physical layout of time-series data with partition_by on BigQuery + Databricks/Spark, cluster_by on Snowflake, or sortkey on Redshift. You can also fine-tune how dbt incrementally processes the arrival of new data by configuring incremental_strategy, which may require you to define a unique_key (which is often unique, but not always! and subtly different from unique_key for snapshots).
😵💫 If that makes your head hurt… you’re not alone! @dbeatty10 keeps a collection of temporal terminology here.
We think a gap remains: dbt lacks awareness of the “event time” of a given model, the field that indicates at what time a record of data occurred. This gap is one we're keen to close — because we also believe that "event time" awareness will unlock a whole slew of features in the dbt that can save you time & money.
🗺️ We’ve got big plans for big datasets
Imagine this: As a dbt user, I configure an
event_time
column for my large models that represent time-series datasets. This is different from metadata fields about when the data was loaded, transformed, or otherwise processed — it’s the semantically meaningful column that says, “At what time did this thing actually happen?” That thing could be an order, a survey response, a subway ride, anything. If it happened, it happened at a time, and you tell dbt which column represents that time.In exchange for configuring
event_time
, I will unlock two new capabilities:…and more! Once this configuration exists, it can be leveraged in all sorts of places, including metadata integrations and quality extensions. For example, a few months ago, we announced “Advanced CI” that’s coming to dbt Cloud. If you’re only building 3 days of data in your CI environment, then dbt should know to apply the same filter on the other table too — ensuring an “apples-to-apples” comparison, and a much faster query.
❓Why only “time”?
We believe time-series datasets are by far the most common type of very large dataset for dbt users in the wild. We thought about abstracting this into something more general-purpose (such as shard identifier or segmentation_key), but we’d rather start with time because it’s semantically meaningful — it enables dbt to reason about “last 3 days of data,” and to know that a given time-slice (e.g.
2024-08-30
) is referring to the same slice of data in every model that configuresevent_time
.⭐ [Now] “Microbatch” incremental models - coming in dbt Core v1.9
Incremental modeling in dbt today
Incremental models are, and have always been, a performance optimization — they seek to approximate the same behavior as the table materialization, just faster and more cheaply.
You start by materializing as a table. When that query gets too slow, you can optimize it:
incremental
.is_incremental()
block) that uses the already-existing table (this
) as a rough bookmark, so that only new data gets processed.append
,delete+insert
, ormerge
).delete+insert
ormerge
, you can sort of control which “batch” is overridden by setting aunique_key
(which isn’t actually unique - it’s the “upsert/merge key”).As an example:
For this incremental model:
date_day
greater than the maximumdate_day
that has previously been loadeddate_day
, the existing data fordate_day
is deleted and the new data is inserted inThis pattern has gotten us pretty darn far! Every day, there are tens of millions of dbt incremental model refreshes. But there’s also a clear limitation with this approach: Some datasets are just too big to fit into one query. It’s impossible to imagine rebuilding the whole table in one fell swoop. In these cases, “incremental” isn’t a performance optimization — it’s the only way to roll.
This is something the community has known about for a long time. Way back in June 2018, Claire wrote an experimental “insert_by_period” materialization that is still used to this day. (For a long time, it only ran on Redshift; in recent months, our colleagues have also implemented it on Snowflake, BigQuery, and Databricks.) The big idea: Rather than processing all data for all time in a single query, the data should be batched up into “periods” (day, week, month) that are each processed in a separate query.
How to define those batches? How to template the appropriate filters into model SQL? As an experimental materialization,
insert_by_period
offered some creative answers. But what if the answer just felt like dbt? This isn’t a new idea; its time has finally come.The future of incremental modeling in dbt
Epic: #10624
There are limitations of the current incremental model approach:
full-refresh
mode), as it’s done in “one big” SQL statement. In theory, the data platform can optimize processing of distributed datasets; in practice, that query can time out, and when you retry that query you end up reprocessing the same batches.2024-09-01
”), you need to add your own home-rolled logic, likely usingvars
In view of these problems, we are planning to implement a brand-new incremental strategy to handle this type of batched incremental processing out-of-the-box. Introducing:
incremental_strategy=’microbatch’
.Let’s take our same example from before, and instead use the new
microbatch
incremental strategy. This is the proposed spec, liable to change slightly as we implement:Where you’ve also set an
event_time
for the upstream model(s) - in this casestg_events
:And that’s it!
You write your model query for exactly one day of data. You don’t need to think about
is_incremental
filtering or DML (upserting/merging/replacing) - we take care of that for you.where
clause(s) for you, using theevent_time
andbatch_size
configs from the incremental model and upstream modelsDuring standard incremental runs, dbt will process new batches according to the configured
lookback_window
(with one query per batch)If you have configured an
event_time
for upstream models, we we automatically apply a filter. (If there’s an upstream model that configuresevent_time
, but you don’t want the reference to it to be filtered, you can specifyref('upstream_model').render()
to opt-out of auto-filtering)To load a specific batch, you can pass your event time upper & lower bounds at run time using
start_event_time
andend_event_time
-dbt run --event-time-start "2024-09-01 00:00:00" --event-time-end "2024-09-04 00:00:00”
Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from the batch where it was interrupted.
Question: What does “full refresh” mean for microbatch?
If a model is big enough to be microbatched, you probably don’t want to accidentally replace every batch just by typing
-f
!We are considering among three options:
dbt run -s my_microbatch_model --full-refresh
will rebuild all batches for all time — similar to classic incremental models, although each batch will have its own query, and those queries will run independently.--full-refresh
flag. Settingfull_refresh = false
(docs) should be best practice, or the built-in default.--full-refresh
up to a point. If standard incremental runs reprocess 3 days of data, then full-refresh runs should reprocess 30 days (or a period you configure) — everything beyond that is in “cold storage.” It’s still possible to restate historical data beyond that point, but you need to enter “full control mode” by passing explicitstart_event_time
+end_event_time
.🌀[Next] “Sample” mode for dev & CI runs
In v1.8, we introduced
dbt run --empty
, which enables “dry runs” in most dbt projects without any code changes. Whenever dbt finds a{{ ref(...) }}
in an “empty” run, it will templateselect * from {referenced_model} where false limit 0
so that the query completes without processing any actual data. This is also useful for unit tests, which introspect the structure of upstream objects in the data warehouse so that test fixtures don’t need to specify every single column.We want to support a similar pattern, for faster development and CI testing against a time-limited sample of real data (first described in #8378). Instead of empty inputs, dbt would automatically filter down large tables to the last X days of data. How would it know which tables are large, and which columns to filter on? Why,
event_time
, of course!Let’s say you have a query like:
During standard runs, this will compile to:
During sample runs, it will be possible to configure a single environment or run-level variable that instructs dbt to check each upstream model and see if they define
event_time
. Let’s say you’re working in development, and you want to build your models using 3 days of historical data by default. The exact syntax here is TBD, but imagine it looks like something likedbt run --sample 3
, producing a query like:You will be able to set the
sample
flag in CLI commands (to modify specific invocations), as an environment variable (to modify dev/CI as opposed to prod), and in your project file (to set a default behavior for everyone).A “sample” here means a consistent time-based “slice” across all models. Meaning, for a given run or environment, dbt is filtering down to the same number of days everywhere. We’re sure there are reasons to want more-configurable samples — a tenant/country ID instead of time-based, or different slices for different input models — but we also think this simplification can be powerful for user intuition sooner, and allow us to do nicer things later on. How far can we get with this simplifying assumption?
What is the overlap between “microbatch” and “sample”? It’s simple: Both depend on you configuring
event_time
so that dbt knows how to filter down your inputs. Once done, they can form a two-part harmony: If youdbt run -s my_microbatch_model --sample 3
(exact syntax TBD!!), then dbt will build that model for 3 days of data — and spin up a separate query for each of those day-batches.So what?
Some of you might be reading this and thinking, “I already do this with dbt, in [highly customized way].” That’s great! If it’s working for you, you don’t need to change a thing — we are committed to ongoing compatibility with all the code that’s written in existing dbt projects, including “classic” incremental models.
dbt is wonderfully flexible and extensible. At the same time, whenever we see dozens or hundreds of people modifying or extending dbt to accomplish the same basic goal — that’s a clue that there’s something missing from the dbt standard. If dbt Core can support time-aware filtering and batched incremental processing out of the box, we can spend less time solving the same problem in the same way (or slightly different ways) — not to mention, maintaining internal documentation for hand-rolled homegrown approaches — and more time solving the problems that are unique to our data. That’s what we mean by moving up the stack.
Beta Was this translation helpful? Give feedback.
All reactions