Skip to content

Commit

Permalink
init from investments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Kutsenko committed Oct 19, 2021
1 parent 6eafbf3 commit ae4668f
Show file tree
Hide file tree
Showing 23 changed files with 3,350 additions and 0 deletions.
2,245 changes: 2,245 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
[package]
name = "balances-hystory"
version = "0.0.1"
authors = ["Sergey Kutsenko <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0.125", features = ["derive"] }
serde_json = "1.0.64"
serde_qs = { version = "0.8", features = ["warp"] }
serde_with = "1.9.3"
chrono = { version = "0.4.19", features = ["serde"] }
envy = "0.4"
tonic = "0.5"
prost = { version = "0.8", features = ["no-recursion-limit"] }
tokio = { version = "1.5.0", features = ["full"] }
tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1"]}
postgres-derive = { version = "0.4.0"}
postgres-types = { version = "0.2.1"}
async-trait = { version = "0.1.50"}
waves-protobuf-schemas = { git = "https://github.com/wavesplatform/protobuf-schemas", rev="dc1597077cbf55aac4cd514f5510ba346c45b361"}
diesel = { version= "1.4.6", features =["postgres"] }
diesel_migrations = "1.4.0"
#bigdecimal = { version = "0.2.0", features = ["serde"] }
rust_decimal = {version ="1.14.3", features = ["serde", "db-tokio-postgres", "db-postgres"]}
wavesexchange_log = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_log/0.2.0" }
wavesexchange_warp = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_warp/0.10.0" }
futures = "0.3"
anyhow = "1.0.40"
warp = "0.3"
bs58 = "0.4.0"
base64 = "0.13.0"
once_cell = "1.7.2"
blake2 = "0.8"
sha3 = "0.8"
bytes = "1.0.1"
lazy_static = "1.4.0"
thiserror = "1.0.25"
fxhash = "0.2"
itertools = "0.10.0"
bb8-postgres="0.7.0"

[lib]
name = "lib"
path = "src/lib/lib.rs"

[[bin]]
name = "consumer"
path = "src/bin/consumer.rs"

[[bin]]
name = "migration"
path = "src/bin/migration.rs"
5 changes: 5 additions & 0 deletions diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/schema.rs"
6 changes: 6 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.

DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();
36 changes: 36 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.




-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
4 changes: 4 additions & 0 deletions migrations/2021-10-19-121919_init_tables/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
drop table balance_history;
drop table blocks_microblocks;
drop function insert_or_prolong_balance;
drop function rollback_balances_to_block_uid;
126 changes: 126 additions & 0 deletions migrations/2021-10-19-121919_init_tables/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
create extension IF NOT EXISTS btree_gist;

CREATE TABLE IF NOT EXISTS blocks_microblocks (
uid BIGINT GENERATED BY DEFAULT AS IDENTITY CONSTRAINT blocks_microblocks_uid_pkey PRIMARY KEY,
id TEXT NOT NULL,
height INTEGER NOT NULL,
time_stamp BIGINT NOT NULL,
is_solidified bool not null default false
);

CREATE INDEX IF NOT EXISTS blocks_microblocks_id_idx ON blocks_microblocks (id);
CREATE INDEX IF NOT EXISTS blocks_microblocks_time_stamp_uid_idx ON blocks_microblocks (time_stamp DESC, uid DESC);
CREATE INDEX IF NOT EXISTS blocks_microblocks_is_solidified_idx ON blocks_microblocks(is_solidified) where is_solidified = false;

create table balance_history (
balance_history_uid BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY CONSTRAINT balance_history_uid_key UNIQUE,
recipient TEXT COLLATE "C" NOT NULL,
asset_id TEXT COLLATE "C" NOT NULL DEFAULT '',
balance bigint NOT NULL,
period int8range -- must be as range(blocks_microblocks.uid, blocks_microblocks.uid)
-- EXCLUDE USING gist (recipient with =, asset_id with =, period WITH &&)
-- , PRIMARY KEY (recipient, asset_id, period) INCLUDE (balance)
);

create index on balance_history(asset_id) include(recipient) where upper(period) is null;
create index on balance_history (recipient, asset_id, lower(period));
create index on balance_history (lower(period));
create index on balance_history (upper(period), balance_history_uid);

-- buggy on big data
ALTER TABLE ONLY balance_history ADD CONSTRAINT balance_history_recipient_asset_id_period_excl
EXCLUDE
USING gist (recipient WITH =, asset_id WITH =, period WITH &&);

create or replace function insert_or_prolong_balance(in_recipient text, in_asset_id text, block_uid bigint, in_balance bigint, in_balance_prev bigint) returns bigint
language plpgsql
as $$
DECLARE
v_bh_uid bigint := NULL;
v_period int8range;
v_balance_prev bigint;
BEGIN
-- search for interval with upper(period) = infinity

select balance_history_uid, period, balance into v_bh_uid, v_period, v_balance_prev
from balance_history
where asset_id = in_asset_id
and recipient = in_recipient
and period && int8range(block_uid, null, '[)')
for update;

RAISE NOTICE 'balance_history_uid:%; period:%;', v_bh_uid, v_period;

IF v_bh_uid IS NOT NULL THEN
BEGIN
-- IF v_balance_prev != in_balance_prev THEN
-- RAISE EXCEPTION 'prev balance do not match previous balance_history_uid:%', v_bh_uid;
-- END IF;

-- close interval with upper(period) = block_uid only if it was open earlier (and lower(period) < block_uid)
update balance_history set
period = int8range(lower(period), block_uid, '[)'),
balance = in_balance
where
balance_history_uid = v_bh_uid
and lower(period) < block_uid;

IF NOT FOUND THEN
BEGIN
-- passible twice updates balance in one block
-- try to simple update balance with same lower(uid) and checks we update only this one

update balance_history set
balance = in_balance
where balance_history_uid = v_bh_uid
and lower(period) = block_uid;

IF NOT FOUND THEN
RAISE EXCEPTION 'balance_history_uid:%; period:%; found later period for block:%;', v_bh_uid, v_period, block_uid;
END IF;

return v_bh_uid;
END;
END IF;

END;
END IF;

insert into balance_history(recipient, asset_id, balance, period)
values(in_recipient, in_asset_id, in_balance, int8range(block_uid, null, '[)'))
returning balance_history_uid into v_bh_uid;

return v_bh_uid;
END;
$$;

create or replace function rollback_balances_to_block_uid(in_block_uid bigint) returns bool
LANGUAGE plpgsql VOLATILE
AS $$
DECLARE
del_vals RECORD;
v_balance_uid BIGINT;
BEGIN
FOR del_vals IN
with to_del as (
delete from balance_history where lower(period) > in_block_uid returning recipient, asset_id
) select distinct recipient, asset_id from to_del
LOOP
select max(balance_history_uid)
into v_balance_uid
from balance_history
where recipient = del_vals.recipient
and asset_id = del_vals.asset_id
and balance_history_uid > in_block_uid
and not upper(period) is null;

IF v_balance_uid is NOT NULL THEN
update balance_history set period = int8range(lower(period), null) where balance_history_uid = v_balance_uid;
END IF;

END LOOP;

return true;

END;
$$;
31 changes: 31 additions & 0 deletions src/bin/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use anyhow::Result;
use lib::consumer;
use lib::consumer::mappers::blocks_microblocks;
use lib::db::*;
use consumer::SETTINGS;

#[tokio::main]
async fn main() -> Result<()> {

let mut db = Db::new(&SETTINGS.config.postgres).await.unwrap();

init_db_data(&db).await;

let start_height =
match blocks_microblocks::get_last_height(&db).await {
None => SETTINGS.config.blockchain_start_height,
Some(last_h) => std::cmp::max(last_h + 1, SETTINGS.config.blockchain_start_height)
};


consumer::run(
&mut db,
SETTINGS.config.blockchain_updates_url.clone(),
start_height,
)
.await
}

async fn init_db_data(db: &Db) {
db.client.query("delete from blocks_microblocks where is_solidified = false", &[]).await.unwrap();
}
58 changes: 58 additions & 0 deletions src/bin/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#[macro_use]
extern crate diesel;

use diesel::{pg::PgConnection, Connection};
use diesel_migrations::{
find_migrations_directory, revert_latest_migration_in_directory,
run_pending_migrations_in_directory,
};
use lib::config::migration as migration_config;
use std::{convert::TryInto, env};

enum Action {
Up,
Down,
}

#[derive(Debug)]
struct Error(String);

impl TryInto<Action> for String {
type Error = Error;

fn try_into(self) -> Result<Action, Self::Error> {
match &self[..] {
"up" => Ok(Action::Up),
"down" => Ok(Action::Down),
_ => Err(Error("cannot parse command line arg".into())),
}
}
}

fn main() {
let action: Action = env::args().nth(1).unwrap().try_into().unwrap();

let config = migration_config::load().unwrap();

let db_url = format!(
"postgres://{}:{}@{}:{}/{}",
config.postgres.user,
config.postgres.password,
config.postgres.host,
config.postgres.port,
config.postgres.database
);

let conn = PgConnection::establish(&db_url).unwrap();
let dir = find_migrations_directory().unwrap();
let path = dir.as_path();

match action {
Action::Up => {
run_pending_migrations_in_directory(&conn, path, &mut std::io::stdout()).unwrap();
}
Action::Down => {
revert_latest_migration_in_directory(&conn, path).unwrap();
}
};
}
41 changes: 41 additions & 0 deletions src/lib/config/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::config::postgres::PostgresConfig;
use anyhow::Result;
use serde::Deserialize;


#[derive(Deserialize, Debug, Clone)]
struct ConfigFlat {
pub pghost: String,
pub pgport: u16,
pub pgdatabase: String,
pub pguser: String,
pub pgpassword: String,
// #[serde(default = "default_pgpool")]
// pub pgpoolsize: u32,
pub blockchain_updates_url: String,
pub blockchain_start_height: i32,
}

#[derive(Debug, Clone)]
pub struct Config {
pub blockchain_updates_url: String,
pub blockchain_start_height: i32,
pub postgres: PostgresConfig,
}

pub fn load() -> Result<Config> {
let config_flat = envy::from_env::<ConfigFlat>()?;

Ok(Config {
blockchain_updates_url: config_flat.blockchain_updates_url,
blockchain_start_height: config_flat.blockchain_start_height,
postgres: PostgresConfig {
host: config_flat.pghost,
port: config_flat.pgport,
database: config_flat.pgdatabase,
user: config_flat.pguser,
password: config_flat.pgpassword,
pool_size: 1,
},
})
}
Loading

0 comments on commit ae4668f

Please sign in to comment.