Skip to content

Commit

Permalink
Merge pull request #361 from systemaccounting/360-insert-transaction
Browse files Browse the repository at this point in the history
360 insert transaction postgres function
  • Loading branch information
mxfactorial committed May 4, 2024
2 parents e8152f1 + 91f66c7 commit b4a0013
Show file tree
Hide file tree
Showing 32 changed files with 631 additions and 422 deletions.
130 changes: 76 additions & 54 deletions crates/pg/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ pub trait ModelTrait {
&self,
transaction_id: i32,
) -> Result<Transaction, Box<dyn Error>>;
async fn insert_transaction_tx_query(
&mut self,
async fn insert_transaction_query(
&self,
transaction: Transaction,
) -> Result<String, Box<dyn Error>>;
async fn select_last_n_transactions_query(
Expand Down Expand Up @@ -441,106 +441,128 @@ impl ModelTrait for DatabaseConnection {
}
}

async fn insert_transaction_tx_query(
&mut self,
async fn insert_transaction_query(
&self,
transaction: Transaction,
) -> Result<String, Box<dyn Error>> {
let table = crate::sqls::transaction::TransactionTable::new();
let sql = table
.insert_transaction_cte_sql(transaction.clone())
.unwrap();
let approval_lengths = transaction.transaction_items.list_lengths_of_approvals();
let sql = table.fn_select_insert_transaction_sql(approval_lengths);

// convert rust transaction values to postgres values
let transaction_rule_instance_id = parse_pg_int4(None::<String>).unwrap();
let transaction_author_device_latlng = parse_pg_point(None::<String>).unwrap();
let transaction_rule_instance_id = parse_pg_int4(transaction.rule_instance_id).unwrap();
let transaction_author_device_latlng =
parse_pg_point(transaction.author_device_latlng).unwrap();
let transaction_equilibrium_time =
parse_pg_timestamp(transaction.equilibrium_time).unwrap();
let transaction_sum_value = parse_pg_numeric(Some(transaction.sum_value)).unwrap();
let transaction_created_at = parse_pg_timestamp(None::<TZTime>).unwrap();

let mut values: ToSqlVec = vec![
Box::new(transaction_rule_instance_id),
Box::new(transaction.author),
Box::new(None::<String>), // author_device_id
Box::new(transaction_author_device_latlng),
Box::new(transaction.author_role),
Box::new(transaction_equilibrium_time),
Box::new(transaction_sum_value),
Box::new(None::<i32>), // id
Box::new(transaction_rule_instance_id), // rule_instance_id
Box::new(transaction.author), // author
Box::new(transaction.author_device_id), // author_device_id
Box::new(transaction_author_device_latlng), // author_device_latlng
Box::new(transaction.author_role), // author_role
Box::new(transaction_equilibrium_time), // equilibrium_time
Box::new(transaction_sum_value), // sum_value
Box::new(transaction_created_at), // created_at
];

for tr_item in transaction.transaction_items.into_iter() {
// transaction_id inserted by CTE auxiliary statement
values.push(Box::new(tr_item.item_id));
// add values expected by postgres row constructor

values.push(Box::new(None::<i32>)); // id

values.push(Box::new(None::<i32>)); // transaction_id

values.push(Box::new(tr_item.item_id)); // item_id

let tr_item_price = parse_pg_numeric(Some(tr_item.price)).unwrap();
values.push(Box::new(tr_item_price));
values.push(Box::new(tr_item_price)); // price

let tr_item_quantity = parse_pg_numeric(Some(tr_item.quantity)).unwrap();
values.push(Box::new(tr_item_quantity));
values.push(Box::new(tr_item_quantity)); // quantity

values.push(Box::new(tr_item.debitor_first));
values.push(Box::new(tr_item.debitor_first)); // debitor_first

let tr_item_rule_instance_id = parse_pg_int4(tr_item.rule_instance_id).unwrap();
values.push(Box::new(tr_item_rule_instance_id));
values.push(Box::new(tr_item_rule_instance_id)); // rule_instance_id

values.push(Box::new(tr_item.rule_exec_ids)); // rule_exec_ids

values.push(Box::new(tr_item.rule_exec_ids));
values.push(Box::new(tr_item.unit_of_measurement));
values.push(Box::new(tr_item.unit_of_measurement)); // unit_of_measurement

let tr_item_units_measured = parse_pg_numeric(tr_item.units_measured).unwrap();
values.push(Box::new(tr_item_units_measured));
values.push(Box::new(tr_item_units_measured)); // units_measured

values.push(Box::new(tr_item.debitor));
values.push(Box::new(tr_item.creditor));
values.push(Box::new(tr_item.debitor)); // debitor
values.push(Box::new(tr_item.creditor)); // creditor

let debitor_profile_id = parse_pg_int4(tr_item.debitor_profile_id).unwrap();
values.push(Box::new(debitor_profile_id));
values.push(Box::new(debitor_profile_id)); // debitor_profile_id

let creditor_profile_id = parse_pg_int4(tr_item.creditor_profile_id).unwrap();
values.push(Box::new(creditor_profile_id));
values.push(Box::new(creditor_profile_id)); // creditor_profile_id

let tr_item_debitor_approval_time =
parse_pg_timestamp(tr_item.debitor_approval_time).unwrap();
values.push(Box::new(tr_item_debitor_approval_time));
values.push(Box::new(tr_item_debitor_approval_time)); // debitor_approval_time

let tr_item_creditor_approval_time =
parse_pg_timestamp(tr_item.creditor_approval_time).unwrap();
values.push(Box::new(tr_item_creditor_approval_time));
values.push(Box::new(tr_item_creditor_approval_time)); // creditor_approval_time

let tr_item_debitor_expiration_time =
parse_pg_timestamp(tr_item.debitor_expiration_time).unwrap();
values.push(Box::new(tr_item_debitor_expiration_time));
values.push(Box::new(tr_item_debitor_expiration_time)); // debitor_expiration_time

let tr_item_creditor_expiration_time =
parse_pg_timestamp(tr_item.creditor_expiration_time).unwrap();
values.push(Box::new(tr_item_creditor_expiration_time));
values.push(Box::new(tr_item_creditor_expiration_time)); // creditor_expiration_time

let tr_item_creditor_rejection_time =
parse_pg_timestamp(tr_item.creditor_rejection_time).unwrap();
values.push(Box::new(tr_item_creditor_rejection_time)); // creditor_rejection_time

let tr_item_debitor_rejection_time =
parse_pg_timestamp(tr_item.debitor_rejection_time).unwrap();
values.push(Box::new(tr_item_debitor_rejection_time)); // debitor_rejection_time

for approval in tr_item.approvals.unwrap().into_iter() {
values.push(Box::new(None::<i32>)); // id

let approval_rule_instance_id = parse_pg_int4(approval.rule_instance_id).unwrap();
values.push(Box::new(approval_rule_instance_id));
values.push(Box::new(approval_rule_instance_id)); // rule_instance_id

// transaction_id inserted by CTE auxiliary statement
// transaction_item_id inserted by CTE auxiliary statement
values.push(Box::new(approval.account_name));
values.push(Box::new(approval.account_role));
values.push(Box::new(approval.device_id));
values.push(Box::new(None::<i32>)); // transaction_id
values.push(Box::new(None::<i32>)); // transaction_item.id
values.push(Box::new(approval.account_name)); // account_name
values.push(Box::new(approval.account_role)); // account_role
values.push(Box::new(approval.device_id)); // device_id

let approval_device_latlng = parse_pg_point(approval.device_latlng).unwrap();
values.push(Box::new(approval_device_latlng));
values.push(Box::new(approval_device_latlng)); // device_latlng

let approval_approval_time = parse_pg_timestamp(approval.approval_time).unwrap();
values.push(Box::new(approval_approval_time));
values.push(Box::new(approval_approval_time)); // approval_time

let approval_rejection_time = parse_pg_timestamp(approval.rejection_time).unwrap();
values.push(Box::new(approval_rejection_time)); // rejection_time

let approval_expiration_time =
parse_pg_timestamp(approval.expiration_time).unwrap();
values.push(Box::new(approval_expiration_time));
values.push(Box::new(approval_expiration_time)); // expiration_time
}
}

let rows = self.tx(sql.to_string(), values).await;
let rows = self.query(sql.to_string(), values).await;
match rows {
Err(e) => Err(Box::new(e)),
Ok(rows) => {
let row = &rows[0];
let transaction_id = row.get(0);
let id: i32 = row.get(0);
let transaction_id = id.to_string();
Ok(transaction_id)
}
}
Expand Down Expand Up @@ -1314,11 +1336,11 @@ mod integration_tests {

#[cfg_attr(not(feature = "db_tests"), ignore)]
#[tokio::test]
async fn it_creates_an_insert_transaction_tx_query() {
async fn it_creates_an_insert_transaction_query() {
_before_each();

let test_conn = _get_conn().await;
let mut api_conn = DatabaseConnection(test_conn);
let api_conn = DatabaseConnection(test_conn);

let input_file = File::open("../../tests/testdata/requests.json").unwrap();
let input_reader = BufReader::new(input_file);
Expand All @@ -1327,7 +1349,7 @@ mod integration_tests {
let test_transaction = test_transactions[0].transaction.clone();

let transaction_id = api_conn
.insert_transaction_tx_query(test_transaction.clone())
.insert_transaction_query(test_transaction.clone())
.await
.unwrap();

Expand All @@ -1340,7 +1362,7 @@ mod integration_tests {
_before_each();

let test_conn = _get_conn().await;
let mut api_conn = DatabaseConnection(test_conn);
let api_conn = DatabaseConnection(test_conn);

let input_file = File::open("../../tests/testdata/requests.json").unwrap();
let input_reader = BufReader::new(input_file);
Expand All @@ -1352,7 +1374,7 @@ mod integration_tests {
// start from 3 to avoid the first two transactions inserted by db migration
for i in 3..=6 {
api_conn
.insert_transaction_tx_query(test_transaction.clone())
.insert_transaction_query(test_transaction.clone())
.await
.unwrap();

Expand Down Expand Up @@ -1381,7 +1403,7 @@ mod integration_tests {
_before_each();

let test_conn = _get_conn().await;
let mut api_conn = DatabaseConnection(test_conn);
let api_conn = DatabaseConnection(test_conn);

let input_file = File::open("../../tests/testdata/requests.json").unwrap();
let input_reader = BufReader::new(input_file);
Expand All @@ -1392,7 +1414,7 @@ mod integration_tests {

for _ in 0..=2 {
api_conn
.insert_transaction_tx_query(test_transaction.clone())
.insert_transaction_query(test_transaction.clone())
.await
.unwrap();
}
Expand All @@ -1412,7 +1434,7 @@ mod integration_tests {
_before_each();

let test_conn = _get_conn().await;
let mut api_conn = DatabaseConnection(test_conn);
let api_conn = DatabaseConnection(test_conn);

let input_file = File::open("../../tests/testdata/requests.json").unwrap();
let input_reader = BufReader::new(input_file);
Expand All @@ -1422,7 +1444,7 @@ mod integration_tests {

for _ in 0..=2 {
api_conn
.insert_transaction_tx_query(test_transaction.clone())
.insert_transaction_query(test_transaction.clone())
.await
.unwrap();
}
Expand Down
12 changes: 0 additions & 12 deletions crates/pg/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ impl DatabaseConnection {
self.0.execute(sql_stmt.as_str(), &unboxed_values).await
}

pub async fn tx(
&mut self,
sql_stmt: String,
values: ToSqlVec,
) -> Result<Vec<Row>, tokio_postgres::Error> {
let unboxed_values = DatabaseConnection::unbox_values(&values);
let tx = self.0.transaction().await.unwrap();
let rows = tx.query(sql_stmt.as_str(), &unboxed_values).await.unwrap();
tx.commit().await.unwrap();
Ok(rows)
}

fn unbox_values(values: &[Box<(dyn ToSql + Sync + Send)>]) -> Vec<&(dyn ToSql + Sync)> {
values
.iter()
Expand Down
1 change: 0 additions & 1 deletion crates/pg/src/sqls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod account;
pub mod approval;
pub mod balance;
pub mod common;
pub mod cte;
pub mod profile;
pub mod rule_instance;
pub mod transaction;
Expand Down
4 changes: 4 additions & 0 deletions crates/pg/src/sqls/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl TableTrait for AccountOwnerTable {
fn name(&self) -> &str {
self.inner.name
}

fn column_count(&self) -> usize {
self.inner.len()
}
}

impl AccountOwnerTable {
Expand Down
23 changes: 22 additions & 1 deletion crates/pg/src/sqls/approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl TableTrait for ApprovalTable {
fn name(&self) -> &str {
self.inner.name
}

fn column_count(&self) -> usize {
self.inner.len()
}
}

impl ApprovalTable {
Expand All @@ -44,6 +48,23 @@ impl ApprovalTable {
])
}

// fn_ postgres function
pub fn fn_insert_columns_with_casting(&self) -> Columns {
Columns(vec![
self.get_column("id"),
self.get_column("rule_instance_id"),
self.get_column("transaction_id"),
self.get_column("transaction_item_id"),
self.get_column("account_name"),
self.get_column("account_role"),
self.get_column("device_id"),
self.get_column("device_latlng").cast_value_as(Type::POINT),
self.get_column("approval_time"),
self.get_column("rejection_time"),
self.get_column("expiration_time"),
])
}

fn select_all_with_casting(&self) -> Columns {
Columns(vec![
self.get_column("id").cast_column_as(Type::TEXT),
Expand Down Expand Up @@ -152,7 +173,7 @@ impl ApprovalTable {
}

pub fn select_approvals_by_transaction_ids_sql(&self, row_count: usize) -> String {
let values = create_params(row_count);
let values = create_params(row_count, &mut 1);
let columns = self.select_all_with_casting().join_with_casting();
format!(
"{} {} {} {} {} {} {} ({})",
Expand Down
6 changes: 5 additions & 1 deletion crates/pg/src/sqls/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl TableTrait for AccountBalanceTable {
fn name(&self) -> &str {
self.inner.name
}

fn column_count(&self) -> usize {
self.inner.len()
}
}

impl AccountBalanceTable {
Expand All @@ -49,7 +53,7 @@ impl AccountBalanceTable {

pub fn select_account_balances_sql(&self, accounts_count: usize) -> String {
let columns = self.select_columns_with_casting().join_with_casting();
let params = create_params(accounts_count);
let params = create_params(accounts_count, &mut 1);
format!(
"{} {} {} {} {} {} {} ({})",
SELECT,
Expand Down
Loading

0 comments on commit b4a0013

Please sign in to comment.