From 09f96947a14ec7436a18b87b5897d5d36243f0cf Mon Sep 17 00:00:00 2001 From: Tiziano Carotti Date: Fri, 29 Oct 2021 10:54:39 +0100 Subject: [PATCH] For schwab we can now use the awards centre report to calculate the stock price of the Stock Activities. (#104) Also fixed a double conversion to gbp bug in the stock activity calculation Co-authored-by: Ruslan Sayfutdinov --- .gitignore | 1 + cgt_calc/args_parser.py | 7 + cgt_calc/exceptions.py | 8 ++ cgt_calc/main.py | 104 +++++++-------- cgt_calc/model.py | 4 + cgt_calc/parsers/__init__.py | 5 +- cgt_calc/parsers/schwab.py | 124 +++++++++++++++++- .../resources/GBP_USD_monthly_history.csv | 6 + .../test_run_with_example_files_output.txt | 11 +- 9 files changed, 199 insertions(+), 71 deletions(-) diff --git a/.gitignore b/.gitignore index 9234532e..05c5387c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.log .DS_Store .mypy_cache/ +.vscode __pycache__ env dist diff --git a/cgt_calc/args_parser.py b/cgt_calc/args_parser.py index 9eb4eac1..55e1fedf 100644 --- a/cgt_calc/args_parser.py +++ b/cgt_calc/args_parser.py @@ -50,6 +50,13 @@ def create_parser() -> argparse.ArgumentParser: nargs="?", help="monthly GBP/USD prices from HMRC", ) + parser.add_argument( + "--schwab-award", + type=str, + default=None, + nargs="?", + help="file containing schwab award data for stock prices", + ) parser.add_argument( "--initial-prices", type=str, diff --git a/cgt_calc/exceptions.py b/cgt_calc/exceptions.py index 85da4914..63f692f1 100644 --- a/cgt_calc/exceptions.py +++ b/cgt_calc/exceptions.py @@ -68,6 +68,14 @@ def __init__(self, row: list[str], count: int, file: str): ) +class UnexpectedRowCountError(ParsingError): + """Unexpected row error.""" + + def __init__(self, count: int, file: str): + """Initialise.""" + super().__init__(file, f"The following file doesn't have {count} rows:") + + class CalculatedAmountDiscrepancyError(InvalidTransactionError): """Calculated amount discrepancy error.""" diff --git a/cgt_calc/main.py b/cgt_calc/main.py index 86d62cc4..bcf6330a 100755 --- a/cgt_calc/main.py +++ b/cgt_calc/main.py @@ -46,6 +46,14 @@ LOGGER = logging.getLogger(__name__) +def get_amount_or_fail(transaction: BrokerTransaction) -> Decimal: + """Return the transaction amount or throw an error.""" + amount = transaction.amount + if amount is None: + raise AmountMissingError(transaction) + return amount + + class CapitalGainsCalculator: """Main calculator class.""" @@ -75,6 +83,7 @@ def add_acquisition( """Add new acquisition to the given list.""" symbol = transaction.symbol quantity = transaction.quantity + price = transaction.price if symbol is None: raise SymbolMissingError(transaction) if quantity is None or quantity <= 0: @@ -84,29 +93,22 @@ def add_acquisition( portfolio[symbol] += quantity else: portfolio[symbol] = quantity + # Add to acquisition_list to apply same day rule if transaction.action in [ActionType.STOCK_ACTIVITY, ActionType.SPIN_OFF]: - stock_price_gbp = None - - if transaction.price is not None and transaction.currency is not None: - stock_price_gbp = self.converter.to_gbp( - transaction.price, transaction.currency, transaction.date - ) - else: - stock_price_gbp = self.initial_prices.get(transaction.date, symbol) - - amount = quantity * stock_price_gbp + if price is None: + price = self.initial_prices.get(transaction.date, symbol) + amount = round_decimal(quantity * price, 2) else: - if transaction.amount is None: - raise AmountMissingError(transaction) - if transaction.price is None: + if price is None: raise PriceMissingError(transaction) - calculated_amount = round_decimal( - quantity * transaction.price + transaction.fees, 2 - ) - if transaction.amount != -calculated_amount: + + amount = get_amount_or_fail(transaction) + calculated_amount = round_decimal(quantity * price + transaction.fees, 2) + if amount != -calculated_amount: raise CalculatedAmountDiscrepancyError(transaction, -calculated_amount) - amount = -transaction.amount + amount = -amount + add_to_list( acquisition_list, transaction.date, @@ -143,14 +145,13 @@ def add_disposal( if portfolio[symbol] == 0: del portfolio[symbol] # Add to disposal_list to apply same day rule - if transaction.amount is None: - raise AmountMissingError(transaction) - if transaction.price is None: + + amount = get_amount_or_fail(transaction) + price = transaction.price + + if price is None: raise PriceMissingError(transaction) - amount = transaction.amount - calculated_amount = round_decimal( - quantity * transaction.price - transaction.fees, 2 - ) + calculated_amount = round_decimal(quantity * price - transaction.fees, 2) if amount != calculated_amount: raise CalculatedAmountDiscrepancyError(transaction, calculated_amount) add_to_list( @@ -180,28 +181,20 @@ def convert_to_hmrc_transactions( for i, transaction in enumerate(transactions): new_balance = balance[(transaction.broker, transaction.currency)] if transaction.action is ActionType.TRANSFER: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + new_balance += get_amount_or_fail(transaction) elif transaction.action is ActionType.BUY: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + new_balance += get_amount_or_fail(transaction) self.add_acquisition(portfolio, acquisition_list, transaction) elif transaction.action is ActionType.SELL: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + amount = get_amount_or_fail(transaction) + new_balance += amount self.add_disposal(portfolio, disposal_list, transaction) if self.date_in_tax_year(transaction.date): - total_sells += self.converter.to_gbp_for( - transaction.amount, transaction - ) + total_sells += self.converter.to_gbp_for(amount, transaction) elif transaction.action is ActionType.FEE: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount - transaction.fees = -transaction.amount + amount = get_amount_or_fail(transaction) + new_balance += amount + transaction.fees = -amount transaction.quantity = Decimal(0) gbp_fees = self.converter.to_gbp_for(transaction.fees, transaction) if transaction.symbol is None: @@ -217,29 +210,20 @@ def convert_to_hmrc_transactions( elif transaction.action in [ActionType.STOCK_ACTIVITY, ActionType.SPIN_OFF]: self.add_acquisition(portfolio, acquisition_list, transaction) elif transaction.action in [ActionType.DIVIDEND, ActionType.CAPITAL_GAIN]: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + amount = get_amount_or_fail(transaction) + new_balance += amount if self.date_in_tax_year(transaction.date): - dividends += self.converter.to_gbp_for( - transaction.amount, transaction - ) + dividends += self.converter.to_gbp_for(amount, transaction) elif transaction.action in [ActionType.TAX, ActionType.ADJUSTMENT]: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + amount = get_amount_or_fail(transaction) + new_balance += amount if self.date_in_tax_year(transaction.date): - dividends_tax += self.converter.to_gbp_for( - transaction.amount, transaction - ) + dividends_tax += self.converter.to_gbp_for(amount, transaction) elif transaction.action is ActionType.INTEREST: - if transaction.amount is None: - raise AmountMissingError(transaction) - new_balance += transaction.amount + amount = get_amount_or_fail(transaction) + new_balance += amount if self.date_in_tax_year(transaction.date): - interest += self.converter.to_gbp_for( - transaction.amount, transaction - ) + interest += self.converter.to_gbp_for(amount, transaction) else: raise InvalidTransactionError( transaction, f"Action not processed({transaction.action})" @@ -656,7 +640,7 @@ def main() -> int: # Read data from input files broker_transactions = read_broker_transactions( - args.schwab, args.trading212, args.mssb + args.schwab, args.schwab_award, args.trading212, args.mssb ) converter = CurrencyConverter(read_gbp_prices_history(args.gbp_history)) initial_prices = InitialPrices(read_initial_prices(args.initial_prices)) diff --git a/cgt_calc/model.py b/cgt_calc/model.py index 634c7e63..930fbbcf 100644 --- a/cgt_calc/model.py +++ b/cgt_calc/model.py @@ -125,6 +125,10 @@ def taxable_gain(self) -> Decimal: assert self.capital_gain_allowance is not None return max(Decimal(0), self.total_gain() - self.capital_gain_allowance) + def __repr__(self) -> str: + """Return string representation.""" + return f"" + def __str__(self) -> str: """Return string representation.""" out = f"Portfolio at the end of {self.tax_year}/{self.tax_year + 1} tax year:\n" diff --git a/cgt_calc/parsers/__init__.py b/cgt_calc/parsers/__init__.py index 7973d907..1b6c1f09 100644 --- a/cgt_calc/parsers/__init__.py +++ b/cgt_calc/parsers/__init__.py @@ -42,13 +42,16 @@ def __str__(self) -> str: def read_broker_transactions( schwab_transactions_file: str | None, + schwab_awards_transactions_file: str | None, trading212_transactions_folder: str | None, mssb_transactions_folder: str | None, ) -> list[BrokerTransaction]: """Read transactions for all brokers.""" transactions = [] if schwab_transactions_file is not None: - transactions += read_schwab_transactions(schwab_transactions_file) + transactions += read_schwab_transactions( + schwab_transactions_file, schwab_awards_transactions_file + ) else: print("WARNING: No schwab file provided") diff --git a/cgt_calc/parsers/schwab.py b/cgt_calc/parsers/schwab.py index da9cfb0b..d1f1b7b3 100644 --- a/cgt_calc/parsers/schwab.py +++ b/cgt_calc/parsers/schwab.py @@ -1,15 +1,46 @@ """Charles Schwab parser.""" from __future__ import annotations +from collections import defaultdict import csv +from dataclasses import dataclass import datetime from decimal import Decimal +import itertools from pathlib import Path -from cgt_calc.exceptions import ParsingError, UnexpectedColumnCountError +from cgt_calc.exceptions import ( + ExchangeRateMissingError, + ParsingError, + SymbolMissingError, + UnexpectedColumnCountError, + UnexpectedRowCountError, +) from cgt_calc.model import ActionType, BrokerTransaction +@dataclass +class AwardPrices: + """Class to store initial stock prices.""" + + award_prices: dict[datetime.date, dict[str, Decimal]] + + def get(self, date: datetime.date, symbol: str) -> Decimal: + """Get initial stock price at given date.""" + # Award dates may go back for few days, depending on + # holidays or weekends, so we do a linear search + # in the past to find the award price + for i in range(7): + to_search = date - datetime.timedelta(days=i) + + if ( + to_search in self.award_prices + and symbol in self.award_prices[to_search] + ): + return self.award_prices[to_search][symbol] + raise ExchangeRateMissingError(symbol, date) + + def action_from_str(label: str) -> ActionType: """Convert string label to ActionType.""" if label == "Buy": @@ -59,7 +90,11 @@ def action_from_str(label: str) -> ActionType: class SchwabTransaction(BrokerTransaction): """Represent single Schwab transaction.""" - def __init__(self, row: list[str], file: str): + def __init__( + self, + row: list[str], + file: str, + ): """Create transaction from CSV row.""" if len(row) != 9: raise UnexpectedColumnCountError(row, 9, file) @@ -80,6 +115,7 @@ def __init__(self, row: list[str], file: str): price = Decimal(row[5].replace("$", "")) if row[5] != "" else None fees = Decimal(row[6].replace("$", "")) if row[6] != "" else Decimal(0) amount = Decimal(row[7].replace("$", "")) if row[7] != "" else None + currency = "USD" broker = "Charles Schwab" super().__init__( @@ -95,16 +131,94 @@ def __init__(self, row: list[str], file: str): broker, ) - -def read_schwab_transactions(transactions_file: str) -> list[BrokerTransaction]: + @staticmethod + def create( + row: list[str], file: str, awards_prices: AwardPrices + ) -> SchwabTransaction: + """Create and post process a SchwabTransaction.""" + transaction = SchwabTransaction(row, file) + if ( + transaction.price is None + and transaction.action == ActionType.STOCK_ACTIVITY + ): + symbol = transaction.symbol + if symbol is None: + raise SymbolMissingError(transaction) + transaction.price = awards_prices.get(transaction.date, symbol) + return transaction + + +def read_schwab_transactions( + transactions_file: str, schwab_award_transactions_file: str | None +) -> list[BrokerTransaction]: """Read Schwab transactions from file.""" + awards_prices = _read_schwab_awards(schwab_award_transactions_file) try: with Path(transactions_file).open(encoding="utf-8") as csv_file: lines = list(csv.reader(csv_file)) + # Remove headers and footer lines = lines[2:-1] - transactions = [SchwabTransaction(row, transactions_file) for row in lines] + transactions = [ + SchwabTransaction.create(row, transactions_file, awards_prices) + for row in lines + ] transactions.reverse() return list(transactions) except FileNotFoundError: print(f"WARNING: Couldn't locate Schwab transactions file({transactions_file})") return [] + + +def _read_schwab_awards( + schwab_award_transactions_file: str | None, +) -> AwardPrices: + """Read initial stock prices from CSV file.""" + initial_prices: dict[datetime.date, dict[str, Decimal]] = defaultdict(dict) + + lines = [] + if schwab_award_transactions_file is not None: + try: + with Path(schwab_award_transactions_file).open( + encoding="utf-8" + ) as csv_file: + lines = list(csv.reader(csv_file)) + # Remove headers + lines = lines[2:] + except FileNotFoundError: + print( + "WARNING: Couldn't locate Schwab award " + f"file({schwab_award_transactions_file})" + ) + else: + print("WARNING: No schwab award file provided") + + modulo = len(lines) % 3 + if modulo != 0: + raise UnexpectedRowCountError( + len(lines) - modulo + 3, schwab_award_transactions_file or "" + ) + + for row in zip(lines[::3], lines[1::3], lines[2::3]): + if len(row) != 3: + raise UnexpectedColumnCountError( + list(itertools.chain(*row)), 3, schwab_award_transactions_file or "" + ) + + lapse_main, _, lapse_data = row + + if len(lapse_main) != 8: + raise UnexpectedColumnCountError( + lapse_main, 8, schwab_award_transactions_file or "" + ) + if len(lapse_data) != 8: + raise UnexpectedColumnCountError( + lapse_data, 7, schwab_award_transactions_file or "" + ) + + date_str = lapse_main[0] + date = datetime.datetime.strptime(date_str, "%Y/%m/%d").date() + symbol = lapse_main[2] if lapse_main[2] != "" else None + price = Decimal(lapse_data[3].replace("$", "")) if lapse_data[3] != "" else None + if symbol is not None and price is not None: + initial_prices[date][symbol] = price + return AwardPrices(award_prices=dict(initial_prices)) diff --git a/cgt_calc/resources/GBP_USD_monthly_history.csv b/cgt_calc/resources/GBP_USD_monthly_history.csv index fa5fa86b..ec33a62a 100644 --- a/cgt_calc/resources/GBP_USD_monthly_history.csv +++ b/cgt_calc/resources/GBP_USD_monthly_history.csv @@ -54,3 +54,9 @@ month,price 02/2021,1.3632 03/2021,1.4051 04/2021,1.3721 +05/2021,1.393 +06/2021,1.4146 +07/2021,1.3969 +08/2021,1.3674 +09/2021,1.3757 +10/2021,1.3646 diff --git a/tests/test_data/test_run_with_example_files_output.txt b/tests/test_data/test_run_with_example_files_output.txt index f53b001d..36883215 100644 --- a/tests/test_data/test_run_with_example_files_output.txt +++ b/tests/test_data/test_run_with_example_files_output.txt @@ -1,3 +1,4 @@ +WARNING: No schwab award file provided Parsing tests/test_data/trading212/from_2020-09-11_to_2021-04-02.csv First pass completed Final portfolio: @@ -21,15 +22,15 @@ Portfolio at the end of 2020/2021 tax year: GE: 1.00, £8.07 NVDA: 1.00, £401.01 FOO: 30.50, £543.97 - GOOG: 8.60, £8908.20 + GOOG: 8.60, £12516.92 For tax year 2020/2021: Number of disposals: 4 Disposal proceeds: £42769.56 -Allowable costs: £16816.93 -Capital gain: £25952.63 +Allowable costs: £17656.17 +Capital gain: £25113.39 Capital loss: £0.00 -Total capital gain: £25952.63 -Taxable capital gain: £13652.63 +Total capital gain: £25113.39 +Taxable capital gain: £12813.39 Generate calculations report All done!