Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DUX-823 Prices v2 rework #7391

Merged
merged 54 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2a979c2
initial v2 setup
0xRobin Dec 27, 2024
322068e
rename
0xRobin Dec 27, 2024
a517883
fix compile
0xRobin Dec 27, 2024
9468c58
syntax
0xRobin Dec 27, 2024
37668fb
fixxxeees
0xRobin Dec 27, 2024
4a07aeb
moar fixeez
0xRobin Dec 27, 2024
b992a88
moar fixeez
0xRobin Dec 27, 2024
7d1288c
fixezzz
0xRobin Dec 27, 2024
181c3cd
fixezzz
0xRobin Dec 27, 2024
2dec8a4
fix col ref in unique
0xRobin Dec 27, 2024
5d6c2a5
left anti join
0xRobin Dec 27, 2024
f3a7475
Merge branch 'main' into prices-v2-rework
0xRobin Dec 27, 2024
cb5eb24
Merge branch 'main' into prices-v2-rework
Hosuke Jan 2, 2025
be35560
add dex volume filter
0xRobin Jan 3, 2025
75a9ea2
add missing select
0xRobin Jan 3, 2025
fdf25cc
materialize end model
0xRobin Jan 3, 2025
1ca0312
add uniqueness tests
0xRobin Jan 3, 2025
c64c3a9
fix test
0xRobin Jan 3, 2025
7cb513c
store test failures
0xRobin Jan 3, 2025
d8fa7c2
fix anti join syntax
0xRobin Jan 3, 2025
c62d7f9
Merge branch 'main' into prices-v2-rework
0xRobin Jan 3, 2025
b227334
trigger CI
0xRobin Jan 3, 2025
568c288
test CI again
0xRobin Jan 3, 2025
d86f7e4
partition by day
0xRobin Jan 3, 2025
0570ca2
remove partition
0xRobin Jan 3, 2025
485dc1c
Merge branch 'main' into prices-v2-rework
0xRobin Jan 6, 2025
e023c9c
partition by date
0xRobin Jan 6, 2025
a3f0174
try out minute mean and compare
0xRobin Jan 7, 2025
5a0ccbf
table
0xRobin Jan 7, 2025
498c681
more comparisons
0xRobin Jan 7, 2025
0dbe248
use dex source
0xRobin Jan 7, 2025
8016156
fix syntax
0xRobin Jan 7, 2025
300c06a
syntax
0xRobin Jan 7, 2025
a4b36f6
remove temp testing models
0xRobin Jan 7, 2025
5b65c74
Merge branch 'main' into prices-v2-rework
0xRobin Jan 7, 2025
06296c9
syntax
0xRobin Jan 7, 2025
a4cdb00
filter out USDe
0xRobin Jan 8, 2025
a886e22
fix
0xRobin Jan 8, 2025
c73ebe1
syntax
0xRobin Jan 8, 2025
9385bfa
add day level prices
0xRobin Jan 8, 2025
c6f6cb5
syntax
0xRobin Jan 8, 2025
29cdb61
syntax
0xRobin Jan 8, 2025
8875b74
remove ref
0xRobin Jan 8, 2025
641ae1e
add date
0xRobin Jan 8, 2025
3051193
Merge branch 'main' into prices-v2-rework
0xRobin Jan 23, 2025
cadbc1e
build out daily prices table
0xRobin Jan 23, 2025
0b46a15
syntax
0xRobin Jan 23, 2025
6028d65
limit data to 30 days
0xRobin Jan 23, 2025
d556a8f
grouping
0xRobin Jan 23, 2025
ef8e398
cast sequence as timestamp
0xRobin Jan 23, 2025
06a26de
syntax
0xRobin Jan 23, 2025
cc7b498
do 100 days
0xRobin Jan 23, 2025
e634b0a
all the daysgit push!
0xRobin Jan 23, 2025
64fbb86
leave day for follow up PR
0xRobin Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/_schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: 2

models:
- name: prices_v2_dex_minute_raw
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices sourced from dex.trades, not filtered"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_coinpaprika_minute
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from coinpaprika (only trusted tokens)"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_minute_sparse
meta:
sector: prices
contributors: 0xRob
description: "sparse minute-level prices from all sources"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp

- name: prices_v2_day_sparse
meta:
sector: prices
contributors: 0xRob
description: "sparse day-level prices from all sources"
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- blockchain
- contract_address
- timestamp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config(
schema = 'prices_v2'
, alias = 'coinpaprika_minute'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

select
ptt.blockchain
, ptt.contract_address
, p.minute as timestamp
, p.price
, cast(null as double) as volume
, 'coinpaprika' as source
, date_trunc('day', p.minute) as date --partition
from
{{ source('prices','usd_0003') }} as p -- todo: fix this source
inner join
{{ ref('prices_trusted_tokens') }} as ptt
on p.token_id = ptt.token_id
where
1=1
{% if is_incremental() %}
and {{ incremental_predicate('p.minute') }}
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{{ config(
schema='prices_v2'
, alias = 'dex_filter'
, materialized = 'view'
)
}}

WITH dex_volume_over_10k as (
select
blockchain
,contract_address
from(
SELECT
d.blockchain,
d.token_bought_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
UNION ALL
SELECT
d.blockchain,
d.token_sold_address as contract_address,
sum(d.amount_usd) as volume -- in USD
FROM {{ source('dex','trades') }} d
group by 1,2
)
group by 1,2
having sum(volume) >= 10000
)
, manual_filter as (
SELECT
blockchain,
contract_address
FROM (
VALUES
('ethereum', 0x4c9EDD5852cd905f086C759E8383e09bff1E68B3) -- USDe has bad events (ex https://etherscan.io/tx/0x0c9464ff4fea893667a43e96e830073031f5587d8f3b33fb27a8464979f12897#eventlog#151)
) as t(blockchain, contract_address)
)

select *
from dex_volume_over_10k
where (blockchain, contract_address)
not in (select blockchain, contract_address from manual_filter)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute'
, materialized = 'view'
)
}}

SELECT
*
FROM {{ ref('prices_v2_dex_minute_raw') }}
INNER JOIN {{ ref('prices_v2_dex_filter') }} using (blockchain, contract_address)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{{ config(
schema='prices_v2'
, alias = 'dex_minute_raw'
, materialized = 'incremental'
, file_format = 'delta'
, partition_by = ['date']
, incremental_strategy = 'merge'
, unique_key = ['blockchain', 'contract_address', 'timestamp']
, incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}

WITH dex_trades_filter_and_unnest as (
SELECT
d.blockchain,
d.token_bought_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_bought_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_sold_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_bought_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_bought_amount > 0 and token_bought_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}

UNION ALL

SELECT
d.blockchain,
d.token_sold_address as contract_address,
d.block_time as timestamp,
d.amount_usd/d.token_sold_amount as price,
d.amount_usd as volume -- in USD
FROM {{ source('dex','trades') }} d
INNER JOIN {{ref('prices_trusted_tokens')}} t
on t.blockchain = d.blockchain
and t.contract_address = d.token_bought_address -- the token traded against is trusted
LEFT JOIN {{ref('prices_trusted_tokens')}} anti_t
on anti_t.blockchain = d.blockchain
and anti_t.contract_address = d.token_sold_address -- the subjected token is already in trusted
WHERE d.amount_usd > 0 and token_sold_amount > 0 and token_sold_address is not null
and anti_t.contract_address is null
{% if is_incremental() %}
AND {{ incremental_predicate('d.block_time') }}
{% endif %}
)


SELECT
blockchain,
contract_address,
date_trunc('minute',timestamp) as timestamp,
approx_percentile(price,0.5) as price, -- median
sum(volume) as volume,
'dex.trades' as source,
date_trunc('day',timestamp) as date -- partition
FROM dex_trades_filter_and_unnest
group by 1,2,3,7
81 changes: 81 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_day.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{{ config(
schema='prices_v2',
alias = 'day',
tags = ['prod_exclude'],
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


--WITH sparse_prices as (
-- select
-- *
-- , lead(timestamp) over (partition by blockchain, contract_address order by timestamp asc) as next_update
-- from (
-- select
-- blockchain
-- , contract_address
-- , timestamp
-- , price
-- , volume
-- , source
-- , date -- this is redundant here as date = timestamp, but we keep it in to be consistent across intervals
-- , source_timestamp
-- from {{ ref('prices_v2_day_sparse') }}
-- {% if is_incremental() %}
-- where {{ incremental_predicate('timestamp') }}
-- {% endif %}
-- -- If we're running incremental, we also need to add the last known prices from before the incremental window, to forward fill them
-- {% if is_incremental() %}
-- UNION ALL
-- SELECT * FROM (
-- select
-- blockchain
-- , contract_address
-- , max(timestamp) -- we get the last updated price
-- , max_by(price,timestamp) as price
-- , max_by(volume,timestamp) as volume
-- , max_by(source,timestamp) as source
-- , max(date) as date
-- , max_by(source_timestamp,timestamp) as source_timestamp
-- from {{ ref('prices_v2_day_sparse') }}
-- where not {{ incremental_predicate('timestamp') }} -- not in the current incremental window (so before that)
-- group by blockchain, contract_address
-- )
-- {% endif %}
-- )
--)
--
---- construct the time spline we want to fill
--, timeseries as (
-- select timestamp
-- from unnest(
-- sequence(cast((select date_trunc('day', min(timestamp)) from sparse_prices) as timestamp)
-- , cast(date_trunc('day', now()) as timestamp)
-- , interval '1' day
-- )
-- ) as foo(timestamp)
-- {% if is_incremental() %}
-- where {{ incremental_predicate('timestamp') }} -- reduce to the incremental window if running incrementally
-- {% endif %}
--)
--
--SELECT
-- p.blockchain
-- , p.contract_address
-- , t.timestamp
-- , p.price
-- , p.volume
-- , p.source
-- , t.timestamp as date
-- , p.source_timestamp
--FROM timeseries t
--INNER JOIN sparse_prices p
-- on p.timestamp <= t.timestamp
-- and (p.next_update is null or p.next_update > t.timestamp)

23 changes: 23 additions & 0 deletions dbt_subprojects/tokens/models/prices_v2/prices_v2_latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{ config(
schema='prices_v2',
alias = 'latest',
materialized = 'incremental',
file_format = 'delta',
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address']
)
}}


SELECT
blockchain
, contract_address
, max(timestamp) as timestamp
, max_by(price,timestamp) as price
, max_by(volume,timestamp) as volume
, max_by(source,timestamp) as source
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('timestamp') }}
{% endif %}
GROUP BY 1,2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{{ config(
schema='prices_v2',
alias = 'day_sparse',
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


SELECT
blockchain
, contract_address
, date as timestamp
, max_by(price,timestamp) as price
, sum(volume) as volume
, max_by(source,timestamp) as source
, date
, max(timestamp) as source_timestamp
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate('date') }} -- using date here makes sure we always process full days
{% endif %}
GROUP BY 1,2,3,7
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{{ config(
schema='prices_v2',
alias = 'hour_raw',
materialized = 'incremental',
file_format = 'delta',
partition_by = ['date'],
incremental_strategy = 'merge',
unique_key = ['blockchain', 'contract_address', 'timestamp'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.timestamp')]
)
}}


SELECT
blockchain
, contract_address
, date_trunc('hour',timestamp) as timestamp
, max_by(price,timestamp) as price
, sum(volume) as volume
, max_by(source,timestamp) as source
, date
, max(timestamp) as source_timestamp
FROM {{ ref('prices_v2_minute_sparse') }}
{% if is_incremental() %}
WHERE {{ incremental_predicate("date_trunc('hour',timestamp)") }} -- this makes sure we always proces full hours
{% endif %}
GROUP BY 1,2,3,7
Loading
Loading