-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_transactions.py
executable file
·141 lines (111 loc) · 4.1 KB
/
load_transactions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/bin/sh
"exec" "poetry" "run" "$0" "$@"
# coding: utf8
import logging
import sys
from datetime import date, timedelta, datetime
from fints.client import NeedTANResponse
from fints.hhd.flicker import terminal_flicker_unix
from sqlalchemy import desc
from sqlalchemy.orm.session import Session
import config
import db
import hbci_client
from drinks import load_interval_max
from schema.status import Status, LAST_LOAD
from schema.transaction import Transaction
MAX_LOAD_DAYS = 6 * 30
# grace time of days to fetch extra
LOAD_BACK_DAYS = 4
logging.basicConfig(level=logging.DEBUG if config.debug else logging.INFO,
format="%(levelname) 7s %(module)s - %(message)s")
def main(args):
if '--debug' in args:
config.debug = True
if config.debug:
logging.getLogger('').setLevel(logging.DEBUG)
db.init(config.debug)
get_transactions(True)
def get_transactions(force=False, tan_callback=hbci_client.terminal_tan_callback):
with db.tx() as session:
last_load: Status = session.query(Status)\
.filter_by(key=LAST_LOAD)\
.first()
if last_load:
logging.info("Last load was at %s", last_load.value_dt)
if not force and last_load.value_dt + load_interval_max > datetime.utcnow():
logging.info("Quite recent, not loading new txs")
return
else:
last_load = Status(key=LAST_LOAD)
logging.info("Loading fresh transactions")
load_transactions(session, last_load, tan_callback)
def _query_transactions(session):
return session.query(Transaction)
def load_transactions(session, last_load: Status, tan_callback):
last_transaction: Transaction = _query_transactions(session)\
.order_by(desc(Transaction.entry_date))\
.first()
utcnow = datetime.utcnow()
now = utcnow.date()
if last_transaction:
fetch_from = last_transaction.entry_date - \
timedelta(days=LOAD_BACK_DAYS)
else:
fetch_from = now - timedelta(days=365*2)
while True:
fetch_to = fetch_from + timedelta(days=MAX_LOAD_DAYS)
if fetch_to > now:
fetch_to = now
res = load_chunk(session, fetch_from, fetch_to, now, tan_callback)
if not res:
return
if fetch_to >= now:
break
fetch_from = fetch_to
session.commit()
last_load.value_dt = utcnow
logging.info("last load set to %s", last_load)
session.merge(last_load)
def load_chunk(session: Session, fetch_from: date, fetch_to: date, now: date, tan_callback):
global error
logging.info("Fetching TXs from %s to %s", fetch_from, fetch_to)
error = False
with hbci_client.get_account(tan_callback) as (conn, acc):
def log_callback(_, response):
# 0&1 info, 3 warning, rest err
if response.code[0] not in ('0', '1', '3'):
global error
error = True
conn.add_response_callback(log_callback)
txs = conn.get_transactions(acc, fetch_from, fetch_to)
while isinstance(txs, NeedTANResponse):
logging.info("Calling tan callback for TXs %s", txs)
tan = tan_callback(txs)
if not tan:
logging.error("No TAN got, aborting")
return
txs = conn.send_tan(txs, tan)
logging.info("got txs: len: %s, type: %s, %s", len(txs), type(txs), txs)
new_txs = []
for tx in txs:
tx = Transaction(tx)
if tx.entry_date > now:
logging.warning("Ignoring future tx: %s", tx)
continue
fuzz = timedelta(days=config.import_fuzz_grace_days)
if tx.date + fuzz < fetch_from or tx.date - fuzz > fetch_to:
logging.warning(
"Ignoring tx which is not in requested range: %s", tx)
continue
new_txs.append(tx)
logging.info("Fetched %d txs", len(new_txs))
if error:
logging.error("Errors occurred")
session.rollback()
return False
for tx in new_txs:
session.merge(tx)
return True
if __name__ == "__main__":
sys.exit(main(sys.argv))