-
-
Notifications
You must be signed in to change notification settings - Fork 94
Improve upload account balance history #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 22 commits
9b8c69b
813adca
1593b90
9bf0010
cbff400
1d2eb40
a4c614c
76ebcde
00c6760
7ad7961
967447b
ad421a8
d6f5180
14beaf3
1d07ae0
8b37717
76d25b2
e7acd66
1fe7987
ee959a3
7f68c9c
44711ad
4e2d624
53b6abc
86a0311
ee41dc6
9394a97
508f9bf
2d4f1b4
89a9ce9
b6bbf50
d97de7f
34e440d
b6732d6
a1703a3
d317b9a
3989f3e
147b1aa
88e3dc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,9 +18,11 @@ | |
| AUTH_HEADER_KEY = "authorization" | ||
| CSRF_KEY = "csrftoken" | ||
| DEFAULT_RECORD_LIMIT = 100 | ||
| DELAY = 10 | ||
hammem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ERRORS_KEY = "error_code" | ||
| SESSION_DIR = ".mm" | ||
| SESSION_FILE = f"{SESSION_DIR}/mm_session.pickle" | ||
| TIMEOUT = 300 | ||
hammem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class MonarchMoneyEndpoints(object): | ||
|
|
@@ -127,6 +129,17 @@ async def multi_factor_authenticate( | |
| """Performs multi-factor authentication to access a Monarch Money account.""" | ||
| await self._multi_factor_authenticate(email, password, code) | ||
|
|
||
| async def _upload_form_data(self, url: str, data: FormData) -> dict: | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: please move methods like this down to the bottom for folks that happen to read the code as an end-user, as |
||
| """ | ||
| Retrieves the response from the server for a given URL and form data. | ||
| """ | ||
| async with ClientSession(headers=self._headers) as session: | ||
| resp = await session.post(url, data=data) | ||
| if resp.status != 200: | ||
| raise RequestFailedException(f"HTTP Code {resp.status}: {resp.reason}") | ||
|
|
||
| return await resp.json() | ||
|
|
||
| async def get_accounts(self) -> Dict[str, Any]: | ||
| """ | ||
| Gets the list of accounts configured in the Monarch Money account. | ||
|
|
@@ -434,8 +447,8 @@ async def is_accounts_refresh_complete(self) -> bool: | |
| async def request_accounts_refresh_and_wait( | ||
| self, | ||
| account_ids: Optional[List[str]] = None, | ||
| timeout: int = 300, | ||
| delay: int = 10, | ||
| timeout: int = TIMEOUT, | ||
| delay: int = DELAY, | ||
| ) -> bool: | ||
| """ | ||
| Convenience method for forcing an accounts refresh on Monarch, as well | ||
|
|
@@ -2394,13 +2407,20 @@ async def set_budget_amount( | |
| ) | ||
|
|
||
| async def upload_account_balance_history( | ||
| self, account_id: str, csv_content: str | ||
| ) -> None: | ||
| self, | ||
| account_id: str, | ||
| csv_content: str, | ||
| timeout: int = TIMEOUT, | ||
| delay: int = DELAY, | ||
| ) -> str: | ||
|
||
| """ | ||
| Uploads the account balance history csv for a given account. | ||
| Uploads the account balance history CSV for a specified account. | ||
|
|
||
| :param account_id: The account ID to apply the history to. | ||
| :param csv_content: CSV representation of the balance history. | ||
| Headers: Date, Amount, and Account Name. | ||
| :param timeout: The number of seconds to wait before timing out | ||
| :param delay: The number of seconds to wait for each check on whether parsing is completed | ||
| """ | ||
| if not account_id or not csv_content: | ||
| raise RequestFailedException("account_id and csv_content cannot be empty") | ||
|
|
@@ -2410,13 +2430,94 @@ async def upload_account_balance_history( | |
| form.add_field("files", csv_content, filename=filename, content_type="text/csv") | ||
| form.add_field("account_files_mapping", json.dumps({filename: account_id})) | ||
|
|
||
| async with ClientSession(headers=self._headers) as session: | ||
| resp = await session.post( | ||
| MonarchMoneyEndpoints.getAccountBalanceHistoryUploadEndpoint(), | ||
| data=form, | ||
| ) | ||
| if resp.status != 200: | ||
| raise RequestFailedException(f"HTTP Code {resp.status}: {resp.reason}") | ||
| upload_response = await self._upload_form_data( | ||
| url=MonarchMoneyEndpoints.getAccountBalanceHistoryUploadEndpoint(), | ||
| data=form, | ||
| ) | ||
|
|
||
| session_key = upload_response["session_key"] | ||
|
|
||
| parse_response = await self._initiate_upload_balance_history_session( | ||
| session_key=session_key | ||
| ) | ||
|
|
||
| is_completed = ( | ||
| parse_response["parseBalanceHistory"]["uploadBalanceHistorySession"][ | ||
| "status" | ||
| ] | ||
| == "completed" | ||
| ) | ||
|
|
||
| start = time.time() | ||
| while not is_completed and (time.time() <= (start + timeout)): | ||
| await asyncio.sleep(delay) | ||
|
|
||
| is_completed = ( | ||
| await self._is_upload_balance_history_complete(session_key) | ||
| )["uploadBalanceHistorySession"]["status"] == "completed" | ||
|
|
||
| return is_completed | ||
|
|
||
| async def _initiate_upload_balance_history_session(self, session_key: str) -> dict: | ||
| """ | ||
| Triggers parsing of the uploaded balance history CSV file. | ||
|
|
||
| :param session_key: The session key for the uploaded file. | ||
| """ | ||
|
|
||
| query = gql( | ||
| """ | ||
| mutation Web_ParseUploadBalanceHistorySession($input: ParseBalanceHistoryInput!) { | ||
| parseBalanceHistory(input: $input) { | ||
| uploadBalanceHistorySession { | ||
| ...UploadBalanceHistorySessionFields | ||
| __typename | ||
| } | ||
| __typename | ||
| } | ||
| } | ||
| fragment UploadBalanceHistorySessionFields on UploadBalanceHistorySession { | ||
| sessionKey | ||
| status | ||
| __typename | ||
| } | ||
| """ | ||
| ) | ||
|
|
||
| variables = {"input": {"sessionKey": session_key}} | ||
|
|
||
| return await self.gql_call( | ||
| "Web_ParseUploadBalanceHistorySession", query, variables | ||
| ) | ||
|
|
||
| async def _is_upload_balance_history_complete(self, session_key: str): | ||
| """ | ||
| Retrieves the status of the upload balance history session. | ||
|
|
||
| :param session_key: The session key for the uploaded file. | ||
| """ | ||
|
|
||
| query = gql( | ||
| """ | ||
| query Web_GetUploadBalanceHistorySession($sessionKey: String!) { | ||
| uploadBalanceHistorySession(sessionKey: $sessionKey) { | ||
| ...UploadBalanceHistorySessionFields | ||
| __typename | ||
| } | ||
| } | ||
| fragment UploadBalanceHistorySessionFields on UploadBalanceHistorySession { | ||
| sessionKey | ||
| status | ||
| __typename | ||
| } | ||
| """ | ||
| ) | ||
|
|
||
| variables = {"sessionKey": session_key} | ||
|
|
||
| return await self.gql_call( | ||
| "Web_GetUploadBalanceHistorySession", query, variables | ||
| ) | ||
|
|
||
| async def get_recurring_transactions( | ||
| self, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please pull this branch off of your other commits in the other PR, so it doesn't accidentally introduce those changes simultaneously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!