Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions monarchmoney/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
MonarchMoney,
RequireMFAException,
RequestFailedException,
BalanceHistoryRow,
)

__version__ = "0.1.15"
Expand Down
159 changes: 146 additions & 13 deletions monarchmoney/monarchmoney.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import os
import pickle
import time
import csv
from dataclasses import dataclass
from io import StringIO, BytesIO
from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional, Union

import oathtool
from aiohttp import ClientSession, FormData
from aiohttp import ClientSession, MultipartWriter
from aiohttp.client import DEFAULT_TIMEOUT
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
Expand Down Expand Up @@ -38,6 +41,11 @@ def getGraphQL(cls) -> str:
def getAccountBalanceHistoryUploadEndpoint(cls) -> str:
return cls.BASE_URL + "/account-balance-history/upload/"

@dataclass
class BalanceHistoryRow:
date: datetime
amount: float
account_name: Optional[str] = None

class RequireMFAException(Exception):
pass
Expand Down Expand Up @@ -130,6 +138,14 @@ async def multi_factor_authenticate(
"""Performs multi-factor authentication to access a Monarch Money account."""
await self._multi_factor_authenticate(email, password, code)

async def _upload_form_data(self, url: str, mpwriter: MultipartWriter) -> dict:
headers = {k: v for k, v in self._headers.items() if k.lower() != "content-type"}
async with ClientSession(headers=headers) as session:
async with session.post(url, data=mpwriter) as resp:
if resp.status != 200:
raise RequestFailedException(f"HTTP Code {resp.status}: {resp.reason}")
return await resp.json()

async def get_accounts(self) -> Dict[str, Any]:
"""
Gets the list of accounts configured in the Monarch Money account.
Expand Down Expand Up @@ -2673,29 +2689,146 @@ async def set_budget_amount(
)

async def upload_account_balance_history(
self, account_id: str, csv_content: str
) -> None:
self,
account_id: str,
csv_content: List[BalanceHistoryRow],
timeout: int = 300,
delay: int = 10,
) -> bool:
"""
Uploads the account balance history csv for a given account.
Uploads the account balance history CSV for a specified account.

:param account_id: The account ID to apply the history to.
:param csv_content: CSV representation of the balance history.
Headers: Date, Amount, and Account Name.
:param timeout: The number of seconds to wait before timing out.
:param delay: The number of seconds to wait for each check on whether parsing is completed.
"""
if not account_id or not csv_content:
raise RequestFailedException("account_id and csv_content cannot be empty")

csv_string = self._convert_to_csv_string(csv_content)
csv_bytes = BytesIO(csv_string.encode("utf-8"))

mpwriter = MultipartWriter("form-data")
filename = "upload.csv"
form = FormData()
form.add_field("files", csv_content, filename=filename, content_type="text/csv")
form.add_field("account_files_mapping", json.dumps({filename: account_id}))

async with ClientSession(headers=self._headers) as session:
resp = await session.post(
MonarchMoneyEndpoints.getAccountBalanceHistoryUploadEndpoint(),
json=form,
file_part = mpwriter.append(csv_bytes)
file_part.set_content_disposition("form-data", name="files", filename=filename)
file_part.headers["Content-Type"] = "text/csv"

json_part = mpwriter.append(f'{{"{filename}":"{account_id}"}}')
json_part.set_content_disposition("form-data", name="account_files_mapping")
json_part.headers["Content-Type"] = "application/json"

preview_part = mpwriter.append("true")
preview_part.set_content_disposition("form-data", name="preview")

upload_response = await self._upload_form_data(url=MonarchMoneyEndpoints.getAccountBalanceHistoryUploadEndpoint(), mpwriter=mpwriter)

session_key = upload_response["session_key"]

parse_response = await self._initiate_upload_balance_history_session(
session_key=session_key
)

is_completed = (
parse_response["parseBalanceHistory"]["uploadBalanceHistorySession"][
"status"
]
== "completed"
)

start = time.time()
while not is_completed and (time.time() <= (start + timeout)):
await asyncio.sleep(delay)

is_completed = (
await self._is_upload_balance_history_complete(session_key)
)["uploadBalanceHistorySession"]["status"] == "completed"

return is_completed

def _convert_to_csv_string(self, csv_content: List[BalanceHistoryRow]) -> str:
"""
Converts a list of BalanceHistoryRow to CSV string
:param csv_content: A list of BalanceHistoryRow to upload to the account balance
"""

if not csv_content:
return ""

csv_string = StringIO()
writer = csv.writer(csv_string)
writer.writerow(["Date", "Amount", "Account Name"])

for row in csv_content:
writer.writerow(
[row.date.strftime("%Y-%m-%d"), row.amount, row.account_name]
)
if resp.status != 200:
raise RequestFailedException(f"HTTP Code {resp.status}: {resp.reason}")

return csv_string.getvalue()

async def _initiate_upload_balance_history_session(self, session_key: str) -> dict:
"""
Triggers parsing of the uploaded balance history CSV file.

:param session_key: The session key for the uploaded file.
"""

query = gql(
"""
mutation Web_ParseUploadBalanceHistorySession($input: ParseBalanceHistoryInput!) {
parseBalanceHistory(input: $input) {
uploadBalanceHistorySession {
...UploadBalanceHistorySessionFields
__typename
}
__typename
}
}
fragment UploadBalanceHistorySessionFields on UploadBalanceHistorySession {
sessionKey
status
__typename
}
"""
)

variables = {"input": {"sessionKey": session_key}}

return await self.gql_call(
"Web_ParseUploadBalanceHistorySession", query, variables
)

async def _is_upload_balance_history_complete(self, session_key: str):
"""
Retrieves the status of the upload balance history session.

:param session_key: The session key for the uploaded file.
"""

query = gql(
"""
query Web_GetUploadBalanceHistorySession($sessionKey: String!) {
uploadBalanceHistorySession(sessionKey: $sessionKey) {
...UploadBalanceHistorySessionFields
__typename
}
}
fragment UploadBalanceHistorySessionFields on UploadBalanceHistorySession {
sessionKey
status
__typename
}
"""
)

variables = {"sessionKey": session_key}

return await self.gql_call(
"Web_GetUploadBalanceHistorySession", query, variables
)

async def get_recurring_transactions(
self,
Expand Down