diff --git a/Makefile b/Makefile
index e4ddabc..59d2899 100644
--- a/Makefile
+++ b/Makefile
@@ -29,6 +29,9 @@ build:
run:
python src/main.py
+# Shortcut for development (developer run)
+drun: format lint run
+
run-container:
docker run --name cointaxman -it --rm \
-v `pwd`/account_statements:/CoinTaxman/account_statements:Z \
diff --git a/README.md b/README.md
index c5f3965..4c1029c 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@ CoinTaxman hilft dir dabei deine Einkünfte aus dem Krypto-Handel/-Verleih/... i
Momentan deckt der CoinTaxman nur meinen Anwendungsbereich ab.
Pull Requests und Anfragen über Issues sind gerne gesehen (siehe `Key notes for users` für weitere Informationen).
-**Disclaimer: use at your own risk**
+# **Disclaimer: use at your own risk**
### Currently supported countries
- Germany
@@ -23,6 +23,10 @@ Pull Requests und Anfragen über Issues sind gerne gesehen (siehe `Key notes for
- [coinbase (pro)](https://github.com/provinzio/CoinTaxman/wiki/Exchange:-coinbase)
- [Kraken](https://github.com/provinzio/CoinTaxman/wiki/Exchange:-Kraken)
+It is also possible to import a custom transaction history file.
+See [here](https://github.com/provinzio/CoinTaxman/wiki/Custom-import-format) for more informations.
+The format is based on the awesome `bittytax_conv`-tool from [BittyTax](https://github.com/BittyTax/BittyTax).
+
### Requirements
- Python 3.9
@@ -32,10 +36,12 @@ Quick and easy installation can be done with `pip`.
### Usage
-1. Adjust `src/config.py` to your liking
-2. Add account statements from supported exchanges in `account_statements/`
+1. Adjust `src/config.ini` to your liking
+2. Add all your account statements in `account_statements/`
2. Run `python "src/main.py"`
+If not all your exchanges are supported, you can not (directly) calculate your taxes with this tool.
+
Have a look at our [Wiki](https://github.com/provinzio/CoinTaxman/wiki) for more information on how to obtain the account statement for your exchange.
#### Makefile
@@ -80,7 +86,7 @@ Information I require are for example
Not every aspect has to be implemented directly.
We are free to start by implementing the stuff you need for your tax declaration.
-I am looking forward to your [issue](https://github.com/provinzio/CoinTaxman/issues).
+I am looking forward to your [issue](https://github.com/provinzio/CoinTaxman/issues) or pull request.
Your country was already requested?
Hit the thumbs up button of that issue or participate in the process.
@@ -95,7 +101,7 @@ Please provide an example account statement for the requested exchange or some o
Are you already familiar with the API of that exchange or know some other way to request historical prices for that exchange?
Share your knowledge.
-I am looking forward to your [issue](https://github.com/provinzio/CoinTaxman/issues).
+I am looking forward to your [issue](https://github.com/provinzio/CoinTaxman/issues) or pull request.
Your exchange was already requested?
Hit the thumbs up button of that issue or participate in the process.
@@ -106,7 +112,7 @@ Hit the thumbs up button of that issue or participate in the process.
- Add your country to the Country enum in `src/core.py`
- Extend `src/config.py` to fit your tax regulation
- Add a country specific tax evaluation function in `src/taxman.py` like `Taxman._evaluate_taxation_GERMANY`
-- Depending on your specific tax regulation, you might need to add additional functionality and might want to add or edit the enums in `src/core.py`
+- Depending on your specific tax regulation, you might need to add additional functionality
- Update the README with a documentation about the taxation guidelines in your country
#### Adding a new exchange
@@ -123,88 +129,21 @@ Feel free to commit details about the taxation in your country.
## Taxation in Germany
-Meine Interpretation rund um die Besteuerung von Kryptowährung in Deutschland wird durch die Texte von den [Rechtsanwälten und Steuerberatern WINHELLER](https://www.winheller.com/) sehr gut artikuliert.
-Meine kurzen Zusammenfassungen am Ende von jedem Abschnitt werden durch einen ausführlicheren Text von [WINHELLER](https://www.winheller.com/) ergänzt.
-
-An dieser Stelle sei explizit erwähnt, dass dies meine Interpretation ist. Es ist weder sichergestellt, dass ich aktuell noch nach diesen praktiziere (falls ich das Repo in Zukunft nicht mehr aktiv pflege), noch ob diese Art der Versteuerung gesetzlich zulässig ist.
-Meine Interpretation steht gerne zur Debatte.
-
-### Allgemein
-
-> Kryptowährungen sind kein gesetzliches Zahlungsmittel. Vielmehr werden sie – zumindest im Ertragsteuerrecht – als immaterielle Wirtschaftsgüter betrachtet.
->
-> Wird der An- und Verkauf von Kryptowährungen als Privatperson unternommen, sind § 22 Nr. 2, § 23 Abs. 1 Nr. 2 EStG einschlägig. Es handelt sich hierbei um ein privates Veräußerungsgeschäft von „anderen Wirtschaftsgütern“. Gemäß § 23 Abs. 3 Satz 1 EStG ist der Gewinn oder Verlust der Unterschied zwischen Veräußerungspreis einerseits und den Anschaffungs- und Werbungskosten andererseits. Es muss also nur der Anschaffungspreis vom Veräußerungspreis abgezogen werden. Die Gebühren beim Handel auf den Börsen sind Werbungskosten und damit abzugsfähig.
->
-> In § 23 Abs. 3 Satz 5 EStG ist zudem eine Freigrenze von 600 € vorgesehen, bis zu deren Erreichen alle privaten Veräußerungsgeschäfte des Veranlagungszeitraums steuerfrei bleiben. Wird die Grenze überschritten, muss allerdings der gesamte Betrag ab dem ersten Euro versteuert werden. Die Einkommensteuer fällt dabei nicht erst beim Umtausch von Kryptowährungen in Euro oder eine andere Fremdwährung an, sondern bereits bei einem Tausch in eine beliebige andere Kryptowährung oder auch beim Kauf von Waren oder Dienstleistungen mit einer solchen. Vergeht aber zwischen Anschaffung und Veräußerung mehr als ein Jahr (ggf. zehn Jahre nach § 23 Abs. 1 Nr. 2 Satz 4 EStG), greift die Haltefrist des § 23 Abs. 1 Nr. 2 Satz 1 EStG. In diesen Fällen ist der gesamte Veräußerungsgewinn nicht steuerbar.
->
-> Zur Bestimmung der Anschaffungskosten und des Veräußerungsgewinns sowie zur Bestimmung der Einhaltung der Haltefrist wird in der Regel die sogenannte FIFO-Methode aus § 23 Abs. 1 Nr. 2 Satz 3 EStG herangezogen. Zwar schreibt das Gesetz diese First-In-First-Out-Methode nicht für Kryptowährungen vor, in der Praxis wird sie aber weitgehend angewendet. Es werden allerdings auch andere Meinungen vertreten und eine Berechnung nach der LIFO-Methode oder – zur Bestimmung der Anschaffungskosten – nach Durchschnittswerten vorgeschlagen.
-
-[Quelle](https://www.winheller.com/bankrecht-finanzrecht/bitcointrading/bitcoinundsteuer/besteuerung-kryprowaehrungen.html)
-[Wörtlich zitiert vom 14.02.2021]
-
-Zusammenfassung in meinen Worten:
-- Kryptowährung sind immaterielle Wirtschaftsgüter.
-- Der Verkauf innerhalb eines Jahres gilt als privates Veräußerungsgeschäft und ist als Sonstiges Einkommen zu versteuern (Freigrenze 600 €).
-- Der Tausch von Kryptowährung wird ebenfalls versteuert.
-- Gebühren zum Handel sind steuerlich abzugsfähig.
-- Es kann einmalig entschieden werden, ob nach FIFO oder LIFO versteuert werden soll.
-
-Weitere Grundsätze, die oben nicht angesprochen wurden:
-- Versteuerung erfolgt separat getrennt nach Depots (Wallets, Exchanges, etc.) [cryptotax](https://cryptotax.io/fifo-oder-lifo-bitcoin-gewinnermittlung/).
-
-### Airdrops
-
-> Im Rahmen eines Airdrops erhält der Nutzer Kryptowährungen, ohne diese angeschafft oder eine sonstige Leistung hierfür erbracht zu haben. Die Kryptowährungen werden nicht aus dem Rechtskreis eines Dritten auf den Nutzer übertragen. Vielmehr beginnen sie ihre „Existenz“ überhaupt erst in dessen Vermögen. Die Kryptowährung entsteht direkt in den Wallets der Nutzer, wobei die Wallets bestimmte Kriterien erfüllen müssen. Airdrops ähneln insofern einem Lottogewinn oder einem Zufallsfund (sog. Windfall Profits).
->
-> Mangels Anschaffungsvorgangs kommt bei einer anschließenden Veräußerung eine Besteuerung nach § 23 Abs. 1 Nr. 2 Einkommensteuergesetz (EStG) nicht in Betracht. Mangels Leistungserbringung seitens des Nutzers liegen auch keine sonstigen Einkünfte i.S.d. § 22 Nr. 3 EStG vor. Damit ist der Verkauf von Airdrops steuerfrei.
-
-[Quelle](https://www.winheller.com/bankrecht-finanzrecht/bitcointrading/bitcoinundsteuer/besteuerung-airdrops.html)
-[Wörtlich zitiert vom 14.02.2021]
-
-Zusammenfassung in meinen Worten:
-- Erhalt und Verkauf von Airdrops ist steuerfrei.
-
-### Coin Lending
-
-> Handelt es sich bei den durch Krypto-Lending erhaltenen Zinserträgen um Einkünfte aus sonstigen Leistungen gem. § 22 Nr. 3 EStG, so gilt eine Freigrenze von 256 Euro. Beträge darüber werden mit dem perönlichen Einkommensteuersatz von 18 bis 45 % versteuert. Außerdem wäre die spätere Veräußerung gem. § 23 Abs. 1 Nr. 2 EStG der durch das Lending erlangten Kryptowährung mangels Anschaffungsvorgangs nicht steuerbar.
->
-> In Deutschland ist die Besteuerung der durch das Krypto-Lending erhaltenen Zinsen jedoch nicht abschließend geklärt. Zum einem wird diskutiert, ob es sich dabei um Kapitaleinkünfte gem. § 20 Abs. 1 Nr. 7 EStG handelt, da es sich bei der Hingabe der Kryptowährung um ein klassisches, verzinsliches Darlehen handelt. Anderseits wird von Finanzämtern immer wieder angenommen, dass es sich bei den erzielten Zinserträgen durch Lending um Einkünfte aus sonstigen Leistungen gem. § 22 Nr. 3 EStG handelt.
->
-> Die erhaltene Kryptowährung in Form von Zinsen ist im Zeitpunkt des Zuflusses zu bewerten. Es handele sich deshalb nicht um Kapitaleinkünfte, da die Hingabe der Kryptowährung gerade keine Hingabe von Kapital, sondern vielmehr eine Sachleistung darstelle. Begründet wird dies damit, dass sich eine Kapitalforderung auf eine Geldleistung beziehen muss, nicht aber auf eine Sachleistung, wie es bei Kryptowährungen der Fall ist.
->
-> Kontrovers diskutiert wird auch, ob der Verleih einer Kryptowährung zu einer Verlängerung der Haltefrist nach § 23 Abs. 1 Nr. 2 Satz 4 EStG führt. Eine Verlängerung der Haltefrist tritt danach nur dann ein, wenn ein Wirtschaftsgut
-> - nach dem 31.12.08 angeschafft wurde,
-> - als Einkunftsquelle genutzt wird und
-> - damit Einkünfte erzielt werden.
-> Unter Nutzung als Einkunftsquelle ist zu verstehen, dass die betroffenen Wirtschaftsgüter eine eigenständige Erwerbsgrundlage bilden. Maßgeblich ist also die Frage, ob mit der Kryptowährung Einkünften erzielt werden.
->
-> Beim Lending werden jedoch in der Regel keine Einkünfte aus dem Wirtschaftsgut (der Kryptowährung), sondern aus dem Verleihgeschäft erzielt (als Ertrag aus der Forderung). Weil in diesen Fällen kein Missbrauch vorliegt, kann es bei der Haltefrist von einem Jahr bleiben. Auch das Bayrische Landesamt für Steuern hat bestätigt, dass die erhaltenen Zinsen nicht Ausfluss des „anderen Wirtschaftsgutes Fremdwährungsguthaben“, sondern vielmehr Ausfluss der eigentlichen Kapitalforderungen sind.
-
-[Quelle](https://www.winheller.com/bankrecht-finanzrecht/bitcointrading/bitcoinundsteuer/besteuerung-lending.html)
-[Wörtlich zitiert vom 14.02.2021]
-
-Zusammenfassung in meinen Worten:
-- Erhaltene Kryptowährung durch Coin Lending wird im Zeitpunkt des Zuflusses als Einkunft aus sonstigen Leistungen versteuert (Freigrenze 256 €).
-- Der Verkauf ist nicht steuerbar.
-- Coin Lending beeinflusst nicht die Haltefrist der verliehenen Coins.
-
-### Staking
+Nach langer Unklarheit über die Besteuerung von Kryptowährung wurde am 10.05.2022 durch das Bundesministerium für Finanzen (BMF) [ein Schreiben](https://www.bundesfinanzministerium.de/Content/DE/Downloads/BMF_Schreiben/Steuerarten/Einkommensteuer/2022-05-09-einzelfragen-zur-ertragsteuerrechtlichen-behandlung-von-virtuellen-waehrungen-und-von-sonstigen-token.html) mit rechtsverbindlichen Vorgaben zur Versteuerung veröffentlicht.
-> Ebenso [wie beim Coin Lending] verhält es sich bei Kryptowährungen, die für Staking oder Masternodes genutzt werden. Nutzer müssen bei proof-of-stake-basierten Kryptowährungen oder beim Betreiben von Masternodes einen bestimmten Teil ihrer Kryptowährung der Verfügungsmacht entziehen und dem Netzwerk als Sicherheit bereitstellen. Die Sicherheit des Netzwerkes wird dadurch gewährleistet, dass regelwidriges Verhalten den dem Verlust der Sicherheitsleistung (Kryptowährung) zur Folge hat. Auch in diesen Fällen werden keine Einkünfte aus dem Wirtschaftsgut selbst, sondern für das Blockieren der Verfügungsmacht, also als Ertrag aus der Forderung, erzielt. Auch hier bleibt es bei der Haltefrist von einem Jahr.
+Die ursprünglich hier stehenden Vermutungen und Interpretationen meinerseits habe ich aus der aktuellen `README.md` entfernt.
+Für Interessierte findet sich das in der Versionshistorie.
+Dieses Tool richtet sich nach bestem Wissen nach dem BMF Schreiben.
-[Quelle](https://www.winheller.com/bankrecht-finanzrecht/bitcointrading/bitcoinundsteuer/besteuerung-von-staking.html)
-[Wörtlich zitiert vom 19.02.2021]
+An dieser Stelle sei explizit erwähnt, dass ich trotzdem keine Haftung für die Anwendung dieses Programms übernehme.
+Es ist weder sichergestellt, dass ich es aktuell noch nutze (falls ich das Repo in Zukunft nicht mehr aktiv pflege), noch ob die tatsächliche Umsetzung des Programms gesetzlich zulässig ist.
+Issues und Pull Requests sind gerne gesehen.
-Zusammenfassung:
-- siehe Coin Lending
+Weitere Besonderheiten die sich so nicht im BMF-Schreiben wiederfinden, sind im folgenden aufgelistet.
-### Kommission (Referral System)
-Wenn man denkt, dass man den Steuerdschungel endlich durchquert hat, kommt Binance mit seinem Referral System daher.
-Über das Werbungs-System erhält man einen prozentualen Anteil an den Trading-Gebühren der Geworbenen auf sein Konto gutgeschrieben.
-(lebenslang, logischerweise in BTC.)
-Es ist also keine typische Kunden-werben-Kunden-Prämie sondern eher eine Kommission und damit bin ich mir unsicher, wie das einzuordnen ist.
+### Binance Referral Rewards (Referral System)
-Für das Erste handhabe ich es wie eine Kunden-werben-Kunden-Prämie in Form eines Sachwerts.
-Sprich, die BTC werden zum Zeitpunkt des Erhalts in ihren EUR-Gegenwert umgerechnet und den Einkünften aus sonstigen Leistungen hinzugefügt.
-Aufgrund eines fehlenden steuerlichen Anschaffungsvorgangs ist eine Veräußerung steuerfrei.
+Bei Binance gibt es die Möglichkeit, andere Personen über einen Link zu werben.
+Bei jeder Transaktion der Person erhält man einen kleinen Anteil derer Gebühren als Reward gutgeschrieben.
+Die Einkünfte durch diese Rewards werden durch CoinTaxman als Einkünfte aus sonstigen Leistungen ausgewiesen und damit wie eine übliche Kunden-werben-Kunden-Prämie erfasst.
diff --git a/config.ini b/config.ini
index 7ec53f7..95b5132 100644
--- a/config.ini
+++ b/config.ini
@@ -3,8 +3,8 @@
COUNTRY = GERMANY
TAX_YEAR = 2021
# Missing prices are set as 0 in the database.
-# Always refetching zeroes only slows down the evaluation, but at some time.
-# It might be a good idea, to try to refetch missing prices.
+# Always refetching zeroes only slows down the evaluation, but at some time,
+# it might be a good idea, to try to refetch missing prices.
# If you calculated the mean before, this has no effect.
REFETCH_MISSING_PRICES = False
# If the price for a coin is missing, check if there are known prices before
@@ -12,8 +12,9 @@ REFETCH_MISSING_PRICES = False
# the price inbetween.
# Important: The code must be run twice for this option to take effect.
MEAN_MISSING_PRICES = False
-# Calculate the (taxed) gains, if the left over coins would be sold right now.
-# This will fetch the current prices and therefore slow down repetitive runs.
+# Calculate the (taxed) gains, if the left over coins would be sold at taxation
+# deadline (end of the year). If the evaluated TAX_YEAR is ongoing, this will
+# fetch the current prices at runtime and therefore slow down repetitive runs.
CALCULATE_UNREALIZED_GAINS = True
# Evaluate taxes for each depot/platform separately. This may reduce your
# taxable gains. Make sure, that this method is accepted by your tax
@@ -21,4 +22,10 @@ CALCULATE_UNREALIZED_GAINS = True
MULTI_DEPOT = True
# Set logging level
# DEBUG, INFO, WARNING, ERROR, FATAL
-LOG_LEVEL = DEBUG
\ No newline at end of file
+LOG_LEVEL = DEBUG
+# Taxation of Airdrops is currently only partly implemented (#115).
+# If True, all airdrops will be taxed as `Schenkung`.
+# If False, all airdrops will be taxed as `Einkünfte aus sonstigen Leistungen`.
+# Setting this config falsly will result in a wrong tax calculation.
+# Please inform yourself and help to resolve this issue by working on/with #115.
+ALL_AIRDROPS_ARE_GIFTS = True
diff --git a/requirements.txt b/requirements.txt
index 897756d..26b3292 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,3 +5,5 @@ python-dateutil==2.8.1
requests==2.25.1
six==1.15.0
urllib3==1.26.5
+xlsxwriter==3.0.3
+tzdata==2022.1
diff --git a/setup.cfg b/setup.cfg
index 10a1717..9bb5a50 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -9,6 +9,12 @@ warn_return_any = True
show_error_codes = True
warn_unused_configs = True
+[mypy-xlsxwriter]
+ignore_missing_imports = True
+
+[mypy-tzdata]
+ignore_missing_imports = True
+
[flake8]
exclude = *py*env*/
max_line_length = 88
diff --git a/src/archive_evaluation.py b/src/archive_evaluation.py
index 7a6eb0c..ebda9a1 100644
--- a/src/archive_evaluation.py
+++ b/src/archive_evaluation.py
@@ -41,9 +41,9 @@ def append_files(basedir: Path, filenames: list[str]) -> None:
# Evaluation and log file
log.debug("Archive latest evaluation and log file")
-eval_regex = re.compile(str(TAX_YEAR) + r"\_rev\d{3}\.csv")
+eval_regex = re.compile(str(TAX_YEAR) + r"\_rev\d{3}\.xlsx")
evaluation = max((f for f in os.listdir(EXPORT_PATH) if eval_regex.match(f)))
-log_file = evaluation.removesuffix(".csv") + ".log"
+log_file = evaluation.removesuffix(".xlsx") + ".log"
log.debug("Found: %s", ", ".join((evaluation, log_file)))
append_files(EXPORT_PATH, [evaluation, log_file])
diff --git a/src/balance_queue.py b/src/balance_queue.py
index a4b005b..8c0fcff 100644
--- a/src/balance_queue.py
+++ b/src/balance_queue.py
@@ -20,49 +20,43 @@
import decimal
from typing import Union
+import config
import log_config
-import transaction
+import transaction as tr
log = log_config.getLogger(__name__)
@dataclasses.dataclass
class BalancedOperation:
- op: transaction.Operation
+ op: tr.Operation
sold: decimal.Decimal = decimal.Decimal()
+ @property
+ def not_sold(self) -> decimal.Decimal:
+ """Calculate the amount of coins which are not sold yet.
-class BalanceQueue(abc.ABC):
- def __init__(self) -> None:
- self.queue: collections.deque[BalancedOperation] = collections.deque()
- # Buffer fees which could not be directly set off
- # with the current coins in the queue.
- # This can happen if the exchange takes the fees before
- # the buy/sell process.
- self.buffer_fee = decimal.Decimal()
-
- def put(self, item: Union[transaction.Operation, BalancedOperation]) -> None:
- """Put a new item in the queue and set off buffered fees.
-
- Args:
- item (Union[Operation, BalancedOperation])
+ Returns:
+ decimal.Decimal: Amount of coins which are not sold yet.
"""
- if isinstance(item, transaction.Operation):
- item = BalancedOperation(item)
+ not_sold = self.op.change - self.sold
+ # If the left over amount is <= 0, this coin shouldn't be in the queue.
+ assert not_sold > 0, f"{not_sold=} should be > 0"
+ return not_sold
- if not isinstance(item, BalancedOperation):
- raise ValueError
- self._put(item)
-
- # Remove fees which could not be set off before now.
- if self.buffer_fee:
- # Clear the buffer and remove the buffered fee from the queue.
- fee, self.buffer_fee = self.buffer_fee, decimal.Decimal()
- self.remove_fee(fee)
+class BalanceQueue(abc.ABC):
+ def __init__(self, coin: str) -> None:
+ self.coin = coin
+ self.queue: collections.deque[BalancedOperation] = collections.deque()
+ # It might happen, that the exchange takes fees before the buy/sell-
+ # transaction. Keep fees, which couldn't be removed directly from the
+ # queue and remove them as soon as possible.
+ # At the end, all fees should have been paid (removed from the buffer).
+ self.buffer_fee = decimal.Decimal()
@abc.abstractmethod
- def _put(self, bop: BalancedOperation) -> None:
+ def _put_(self, bop: BalancedOperation) -> None:
"""Put a new item in the queue.
Args:
@@ -71,8 +65,8 @@ def _put(self, bop: BalancedOperation) -> None:
raise NotImplementedError
@abc.abstractmethod
- def get(self) -> BalancedOperation:
- """Get an item from the queue.
+ def _pop_(self) -> BalancedOperation:
+ """Pop an item from the queue.
Returns:
BalancedOperation
@@ -80,7 +74,7 @@ def get(self) -> BalancedOperation:
raise NotImplementedError
@abc.abstractmethod
- def peek(self) -> BalancedOperation:
+ def _peek_(self) -> BalancedOperation:
"""Peek at the next item in the queue.
Returns:
@@ -88,71 +82,215 @@ def peek(self) -> BalancedOperation:
"""
raise NotImplementedError
- def sell(
+ def _put(self, item: Union[tr.Operation, BalancedOperation]) -> None:
+ """Put a new item in the queue and remove buffered fees.
+
+ Args:
+ item (Union[Operation, BalancedOperation])
+ """
+ if isinstance(item, tr.Operation):
+ item = BalancedOperation(item)
+ elif not isinstance(item, BalancedOperation):
+ raise TypeError
+
+ self._put_(item)
+
+ # Remove fees which couldn't be removed before.
+ if self.buffer_fee:
+ # Clear the buffer.
+ fee, self.buffer_fee = self.buffer_fee, decimal.Decimal()
+ # Try to remove the fees.
+ self._remove_fee(fee)
+
+ def _pop(self) -> BalancedOperation:
+ """Pop an item from the queue.
+
+ Returns:
+ BalancedOperation
+ """
+ return self._pop_()
+
+ def _peek(self) -> BalancedOperation:
+ """Peek at the next item in the queue.
+
+ Returns:
+ BalancedOperation
+ """
+ return self._peek_()
+
+ def add(self, op: tr.Operation) -> None:
+ """Add an operation with coins to the balance.
+
+ Args:
+ op (tr.Operation)
+ """
+ assert not isinstance(op, tr.Fee)
+ assert op.coin == self.coin
+ self._put(op)
+
+ def _remove(
self,
change: decimal.Decimal,
- ) -> tuple[list[transaction.SoldCoin], decimal.Decimal]:
- """Sell/remove as many coins as possible from the queue.
+ ) -> tuple[list[tr.SoldCoin], decimal.Decimal]:
+ """Remove as many coins as necessary from the queue.
- Depending on the QueueType, the coins will be removed FIFO or LIFO.
+ The removement logic is defined by the BalanceQueue child class.
Args:
- change (decimal.Decimal): Amount of sold coins which will be removed
- from the queue.
+ change (decimal.Decimal): Amount of coins to be removed.
Returns:
- - list[transaction.SoldCoin]: List of specific coins which were
- (depending on the tax regulation) sold in the transaction.
+ - list[tr.SoldCoin]: List of coins which were removed.
- decimal.Decimal: Amount of change which could not be removed
- because the queue ran out of coins to sell.
+ because the queue ran out of coins.
"""
- sold_coins: list[transaction.SoldCoin] = []
+ sold_coins: list[tr.SoldCoin] = []
while self.queue and change > 0:
# Look at the next coin in the queue.
- bop = self.peek()
+ bop = self._peek()
- # Calculate the amount of coins, which are not sold yet.
- not_sold = bop.op.change - bop.sold
- assert not_sold > 0
+ # Get the amount of not sold coins.
+ not_sold = bop.not_sold
if not_sold > change:
# There are more coins left than change.
# Update the sold value,
bop.sold += change
- # keep track of the sold amount and
- sold_coins.append(transaction.SoldCoin(bop.op, change))
- # Set the change to 0.
+ # keep track of the sold amount
+ sold_coins.append(tr.SoldCoin(bop.op, change))
+ # and set the change to 0.
change = decimal.Decimal()
+ # All demanded change was removed.
break
- else: # change >= not_sold
- # The change is higher than or equal to the (left over) coin.
+ else: # not_sold <= change
+ # The change is higher than or equal to the left over coins.
# Update the left over change,
change -= not_sold
- # remove the fully sold coin from the queue and
- self.get()
- # keep track of the sold amount.
- sold_coins.append(transaction.SoldCoin(bop.op, not_sold))
+ # remove the fully sold coin from the queue
+ self._pop()
+ # and keep track of the sold amount.
+ sold_coins.append(tr.SoldCoin(bop.op, not_sold))
- assert change >= 0
+ assert change >= 0, "Removed more than necessary from the queue."
return sold_coins, change
- def remove_fee(self, fee: decimal.Decimal) -> None:
+ def remove(
+ self,
+ op: tr.Operation,
+ ) -> list[tr.SoldCoin]:
+ """Remove as many coins as necessary from the queue.
+
+ The removement logic is defined by the BalanceQueue child class.
+
+ Args:
+ op (tr.Operation): Operation with coins to be removed.
+
+ Raises:
+ RuntimeError: When there are not enough coins in queue to be sold.
+
+ Returns:
+ - list[tr.SoldCoin]: List of coins which were removed.
+ """
+ assert op.coin == self.coin
+ sold_coins, unsold_change = self._remove(op.change)
+
+ if unsold_change:
+ # Queue ran out of items to sell and not all coins could be sold.
+ msg = (
+ f"Not enough {op.coin} in queue to sell: "
+ f"missing {unsold_change} {op.coin} "
+ f"(transaction from {op.utc_time} on {op.platform}, "
+ f"see {op.file_path.name} lines {op.line})\n"
+ f"This can happen when you sold more {op.coin} than you have "
+ "according to your account statements. Have you added every "
+ "account statement including these from last years and the "
+ f"all deposits of {op.coin}?"
+ )
+ if self.coin == config.FIAT:
+ log.warning(
+ f"{msg}\n"
+ "Tracking of your home fiat is not important for tax "
+ f"evaluation but the {op.coin} in your portfolio at "
+ "deadline will be wrong."
+ )
+ else:
+ log.error(
+ f"{msg}\n"
+ "\tThis error may also occur after deposits from unknown "
+ "sources. CoinTaxman requires the full transaction history to "
+ "evaluate taxation (when and where were these deposited coins "
+ "bought?).\n"
+ )
+ raise RuntimeError
+
+ return sold_coins
+
+ def _remove_fee(self, fee: decimal.Decimal) -> None:
"""Remove fee from the last added transaction.
Args:
fee: decimal.Decimal
"""
- _, left_over_fee = self.sell(fee)
- if left_over_fee:
- # Not enough coins in queue to remove fee.
- # Buffer the fee for next time.
+ _, left_over_fee = self._remove(fee)
+ if left_over_fee and self.coin != config.FIAT:
+ log.warning(
+ "Not enough coins in queue to remove fee. Buffer the fee for "
+ "next adding time... "
+ "This should not happen. You might be missing an account "
+ "statement. Please open issue or PR if you need help."
+ )
self.buffer_fee += left_over_fee
+ def remove_fee(self, fee: tr.Fee) -> None:
+ assert fee.coin == self.coin
+ self._remove_fee(fee.change)
+
+ def sanity_check(self) -> None:
+ """Validate that all fees were paid or raise an exception.
+
+ At the end, all fees should have been paid.
+
+ Raises:
+ RuntimeError: Not all fees were paid.
+ """
+ if self.buffer_fee:
+ msg = (
+ f"Not enough {self.coin} in queue to pay left over fees: "
+ f"missing {self.buffer_fee} {self.coin}.\n"
+ "\tThis error occurs when you sold more coins than you have "
+ "according to your account statements. Have you added every "
+ "account statement, including these from the last years?\n"
+ "\tThis error may also occur after deposits from unknown "
+ "sources. "
+ )
+ if self.coin == config.FIAT:
+ log.warning(
+ f"{msg}"
+ "Tracking of your home fiat is not important for tax "
+ f"evaluation but the {self.coin} in your portfolio at "
+ "deadline will be wrong."
+ )
+ else:
+ log.error(
+ "{msg}"
+ "CoinTaxman requires the full transaction history to "
+ "evaluate taxation (when where these deposited coins bought?).\n"
+ )
+ raise RuntimeError
+
+ def remove_all(self) -> list[tr.SoldCoin]:
+ sold_coins = []
+ while self.queue:
+ bop = self._pop()
+ not_sold = bop.not_sold
+ sold_coins.append(tr.SoldCoin(bop.op, not_sold))
+ return sold_coins
+
class BalanceFIFOQueue(BalanceQueue):
- def _put(self, bop: BalancedOperation) -> None:
+ def _put_(self, bop: BalancedOperation) -> None:
"""Put a new item in the queue.
Args:
@@ -160,15 +298,15 @@ def _put(self, bop: BalancedOperation) -> None:
"""
self.queue.append(bop)
- def get(self) -> BalancedOperation:
- """Get an item from the queue.
+ def _pop_(self) -> BalancedOperation:
+ """Pop an item from the queue.
Returns:
BalancedOperation
"""
return self.queue.popleft()
- def peek(self) -> BalancedOperation:
+ def _peek_(self) -> BalancedOperation:
"""Peek at the next item in the queue.
Returns:
@@ -178,7 +316,7 @@ def peek(self) -> BalancedOperation:
class BalanceLIFOQueue(BalanceQueue):
- def _put(self, bop: BalancedOperation) -> None:
+ def _put_(self, bop: BalancedOperation) -> None:
"""Put a new item in the queue.
Args:
@@ -186,15 +324,15 @@ def _put(self, bop: BalancedOperation) -> None:
"""
self.queue.append(bop)
- def get(self) -> BalancedOperation:
- """Get an item from the queue.
+ def _pop_(self) -> BalancedOperation:
+ """Pop an item from the queue.
Returns:
BalancedOperation
"""
return self.queue.pop()
- def peek(self) -> BalancedOperation:
+ def _peek_(self) -> BalancedOperation:
"""Peek at the next item in the queue.
Returns:
diff --git a/src/book.py b/src/book.py
index 7966310..dd3b535 100644
--- a/src/book.py
+++ b/src/book.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
+import collections
import csv
import datetime
import decimal
@@ -58,31 +59,36 @@ def create_operation(
coin: str,
row: int,
file_path: Path,
+ remark: Optional[str] = None,
) -> tr.Operation:
try:
Op = getattr(tr, operation)
except AttributeError:
log.error(
- "Could not recognize operation `%s` in %s file `%s:%i`.",
- operation,
- platform,
- file_path,
- row,
+ f"Could not recognize {operation=} from {platform=} in "
+ f"{file_path=} {row=}. "
+ "The operation type might have been removed or renamed. "
+ "Please open an issue or PR."
)
raise RuntimeError
- op = Op(utc_time, platform, change, coin, row, file_path)
+ kwargs = {}
+ if remark:
+ kwargs["remarks"] = [remark]
+
+ op = Op(utc_time, platform, change, coin, [row], file_path, **kwargs)
assert isinstance(op, tr.Operation)
return op
def _append_operation(
self,
- operation: tr.Operation,
+ op: tr.Operation,
) -> None:
# Discard operations after the `TAX_YEAR`.
- if operation.utc_time.year <= config.TAX_YEAR:
- self.operations.append(operation)
+ # Ignore operations which make no change.
+ if op.utc_time.year <= config.TAX_YEAR and op.change != 0:
+ self.operations.append(op)
def append_operation(
self,
@@ -93,9 +99,11 @@ def append_operation(
coin: str,
row: int,
file_path: Path,
+ remark: Optional[str] = None,
) -> None:
# Discard operations after the `TAX_YEAR`.
- if utc_time.year <= config.TAX_YEAR:
+ # Ignore operations which make no change.
+ if utc_time.year <= config.TAX_YEAR and change != 0:
op = self.create_operation(
operation,
utc_time,
@@ -104,6 +112,7 @@ def append_operation(
coin,
row,
file_path,
+ remark=remark,
)
self._append_operation(op)
@@ -112,25 +121,29 @@ def _read_binance(self, file_path: Path, version: int = 1) -> None:
platform = "binance"
operation_mapping = {
"Distribution": "Airdrop",
+ "Cash Voucher distribution": "Airdrop",
+ "Rewards Distribution": "Airdrop",
+ #
"Savings Interest": "CoinLendInterest",
"Savings purchase": "CoinLend",
"Savings Principal redemption": "CoinLendEnd",
+ #
"Commission History": "Commission",
"Commission Fee Shared With You": "Commission",
"Referrer rebates": "Commission",
"Referral Kickback": "Commission",
- "Launchpool Interest": "StakingInterest",
- "Cash Voucher distribution": "Airdrop",
- "Super BNB Mining": "StakingInterest",
+ # DeFi yield farming
"Liquid Swap add": "CoinLend",
"Liquid Swap remove": "CoinLendEnd",
+ "Liquid Swap rewards": "CoinLendInterest",
+ "Launchpool Interest": "CoinLendInterest",
+ #
+ "Super BNB Mining": "StakingInterest",
"POS savings interest": "StakingInterest",
"POS savings purchase": "Staking",
"POS savings redemption": "StakingEnd",
- "Withdraw": "Withdrawal",
#
- "Rewards Distribution": "Airdrop",
- "Liquid Swap rewards": "CoinLendInterest",
+ "Withdraw": "Withdrawal",
}
with open(file_path, encoding="utf8") as f:
@@ -192,8 +205,8 @@ def _read_binance(self, file_path: Path, version: int = 1) -> None:
change = abs(change)
# Validate data.
- assert account == "Spot", (
- "Other types than Spot are currently not supported. "
+ assert account in ("Spot", "Savings"), (
+ "Other types than Spot or Savings are currently not supported. "
"Please create an Issue or PR."
)
assert operation
@@ -201,7 +214,7 @@ def _read_binance(self, file_path: Path, version: int = 1) -> None:
assert change
# Check for problems.
- if remark:
+ if remark and remark not in ("Withdraw fee is included",):
log.warning(
"I may have missed a remark in %s:%i: `%s`.",
file_path,
@@ -667,16 +680,29 @@ def _read_kraken_ledgers(self, file_path: Path) -> None:
# of change and same coin.
assert isinstance(
op, type(self.kraken_held_ops[refid]["operation"])
- ), "operation"
+ ), (
+ "operation "
+ f"({op.type_name} != "
+ f'{self.kraken_held_ops[refid]["operation"].type_name})'
+ )
assert (
op.change
== self.kraken_held_ops[refid]["operation"].change
- ), "change"
+ ), (
+ "change "
+ f"({op.change} != "
+ f'{self.kraken_held_ops[refid]["operation"].change})'
+ )
assert (
op.coin == self.kraken_held_ops[refid]["operation"].coin
- ), "coin"
+ ), (
+ "coin "
+ f"({op.coin} != "
+ f'{self.kraken_held_ops[refid]["operation"].coin})'
+ )
except AssertionError as e:
- first_row = self.kraken_held_ops[refid]["operation"].line
+ # Row is internally saved as list[int].
+ first_row = self.kraken_held_ops[refid]["operation"].line[0]
log.error(
"Two internal kraken operations matched by the "
f"same {refid=} don't have the same {e}.\n"
@@ -1013,7 +1039,7 @@ def _read_bitpanda(self, file_path: Path) -> None:
elif operation in ["Buy", "Sell"]:
if asset_price_currency != config.FIAT:
log.error(
- f"Only {config.FIAT.upper()} is supported as "
+ f"Only {config.FIAT} is supported as "
"'Asset market price currency', since price fetching for "
"fiat currencies is not fully implemented yet."
)
@@ -1025,9 +1051,7 @@ def _read_bitpanda(self, file_path: Path) -> None:
# price = misc.force_decimal(asset_price)
# Calculated price
price_calc = change_fiat / change
- set_price_db(
- platform, asset, config.FIAT.upper(), utc_time, price_calc
- )
+ set_price_db(platform, asset, config.FIAT, utc_time, price_calc)
if change < 0:
log.error(
@@ -1047,7 +1071,7 @@ def _read_bitpanda(self, file_path: Path) -> None:
utc_time,
platform,
change_fiat,
- config.FIAT.upper(),
+ config.FIAT,
row,
file_path,
)
@@ -1057,7 +1081,7 @@ def _read_bitpanda(self, file_path: Path) -> None:
utc_time,
platform,
change_fiat,
- config.FIAT.upper(),
+ config.FIAT,
row,
file_path,
)
@@ -1073,6 +1097,113 @@ def _read_bitpanda(self, file_path: Path) -> None:
file_path,
)
+ def _read_custom_eur(self, file_path: Path) -> None:
+ fiat = "EUR"
+
+ with open(file_path, encoding="utf8") as f:
+ reader = csv.reader(f)
+
+ # Skip header.
+ next(reader)
+
+ for line in reader:
+ row = reader.line_num
+
+ # Skip empty lines.
+ if not line:
+ continue
+
+ (
+ operation_type,
+ _buy_quantity,
+ buy_asset,
+ _buy_value_in_fiat,
+ _sell_quantity,
+ sell_asset,
+ _sell_value_in_fiat,
+ _fee_quantity,
+ fee_asset,
+ _fee_value_in_fiat,
+ platform,
+ _timestamp,
+ remark,
+ ) = line
+
+ # Parse data.
+ try:
+ utc_time = datetime.datetime.strptime(
+ _timestamp, "%m/%d/%Y %H:%M:%S"
+ )
+ except ValueError:
+ utc_time = datetime.datetime.strptime(
+ _timestamp, "%m/%d/%Y %H:%M:%S.%f"
+ )
+ utc_time = utc_time.replace(tzinfo=datetime.timezone.utc)
+ buy_quantity = misc.xdecimal(_buy_quantity)
+ buy_value_in_fiat = misc.xdecimal(_buy_value_in_fiat)
+ sell_quantity = misc.xdecimal(_sell_quantity)
+ sell_value_in_fiat = misc.xdecimal(_sell_value_in_fiat)
+ fee_quantity = misc.xdecimal(_fee_quantity)
+ fee_value_in_fiat = misc.xdecimal(_fee_value_in_fiat)
+
+ # ... and define which operation to add.
+ add_operations: list[
+ tuple[str, decimal.Decimal, str, Optional[decimal.Decimal]]
+ ] = []
+ if operation_type != "Withdrawal":
+ assert buy_quantity
+ assert buy_asset
+
+ op = "Buy" if operation_type == "Trade" else operation_type
+ add_operations.append(
+ (op, buy_quantity, buy_asset, buy_value_in_fiat)
+ )
+
+ if operation_type != "Deposit":
+ assert sell_quantity
+ assert sell_asset
+
+ op = "Sell" if operation_type == "Trade" else operation_type
+ add_operations.append(
+ (op, sell_quantity, sell_asset, sell_value_in_fiat)
+ )
+
+ if fee_asset:
+ assert fee_quantity
+ assert fee_value_in_fiat
+
+ add_operations.append(
+ ("Fee", fee_quantity, fee_asset, fee_value_in_fiat)
+ )
+
+ for operation, change, coin, change_in_fiat in add_operations:
+ # Add operation to book.
+ self.append_operation(
+ operation,
+ utc_time,
+ platform,
+ change,
+ coin,
+ row,
+ file_path,
+ remark=remark,
+ )
+ # Add price from csv.
+ if change_in_fiat and coin != fiat:
+ price = change_in_fiat / change
+ log.debug(
+ f"Adding {fiat}/{coin} price from custom CSV: "
+ f"{price} for {platform} at {utc_time}"
+ )
+ set_price_db(
+ platform,
+ coin,
+ fiat,
+ utc_time,
+ price,
+ overwrite=True,
+ )
+
def detect_exchange(self, file_path: Path) -> Optional[str]:
if file_path.suffix == ".csv":
@@ -1087,6 +1218,7 @@ def detect_exchange(self, file_path: Path) -> Optional[str]:
"kraken_trades": 1,
"bitpanda_pro_trades": 4,
"bitpanda": 7,
+ "custom_eur": 1,
}
expected_headers = {
@@ -1203,6 +1335,21 @@ def detect_exchange(self, file_path: Path) -> Optional[str]:
"Spread",
"Spread Currency",
],
+ "custom_eur": [
+ "Type",
+ "Buy Quantity",
+ "Buy Asset",
+ "Buy Value in EUR",
+ "Sell Quantity",
+ "Sell Asset",
+ "Sell Value in EUR",
+ "Fee Quantity",
+ "Fee Asset",
+ "Fee Value in EUR",
+ "Wallet",
+ "Timestamp UTC",
+ "Note",
+ ],
}
with open(file_path, encoding="utf8") as f:
reader = csv.reader(f)
@@ -1219,6 +1366,94 @@ def detect_exchange(self, file_path: Path) -> Optional[str]:
return None
+ def resolve_deposits(self) -> None:
+ """Match withdrawals to deposits.
+
+ A match is found when:
+ A. The coin is the same and
+ B. The deposit amount is between 0.99 and 1 times the withdrawal amount.
+ """
+ transfer_operations = (
+ op for op in self.operations if isinstance(op, (tr.Deposit, tr.Withdrawal))
+ )
+ # Sort deposit and withdrawal operations by time so that deposits
+ # come after withdrawal.
+ sorted_transfer_operations = sorted(
+ transfer_operations,
+ key=lambda op: (isinstance(op, tr.Deposit), op.utc_time),
+ )
+
+ def is_match(withdrawal: tr.Withdrawal, deposit: tr.Deposit) -> bool:
+ return (
+ withdrawal.coin == deposit.coin
+ and withdrawal.change * decimal.Decimal(0.99)
+ <= deposit.change
+ <= withdrawal.change
+ )
+
+ withdrawal_queue: list[tr.Withdrawal] = []
+ unmatched_deposits: list[tr.Deposit] = []
+
+ for op in sorted_transfer_operations:
+ if op.coin == config.FIAT:
+ # Do not match home fiat deposit/withdrawals.
+ continue
+
+ if isinstance(op, tr.Withdrawal):
+ # Add new withdrawal to queue.
+ withdrawal_queue.append(op)
+
+ elif isinstance(op, tr.Deposit):
+ try:
+ # Find a matching withdrawal for this deposit.
+ # If multiple are found, take the first (regarding utc_time).
+ match = next(w for w in withdrawal_queue if is_match(w, op))
+ except StopIteration:
+ unmatched_deposits.append(op)
+ else:
+ # Match the found withdrawal and remove it from queue.
+ op.link = match
+ match.has_link = True
+ withdrawal_queue.remove(match)
+ log.debug(
+ "Linking withdrawal with deposit: "
+ f"{match.change} {match.coin} "
+ f"({match.platform}, {match.utc_time}) "
+ f"-> {op.change} {op.coin} "
+ f"({op.platform}, {op.utc_time})"
+ )
+
+ if unmatched_deposits:
+ log.warning(
+ "Unable to match all deposits with withdrawals. "
+ "Have you added all account statements? "
+ "Following deposits couldn't be matched:\n"
+ + (
+ "\n".join(
+ f" - {op.change} {op.coin} to {op.platform} at {op.utc_time}"
+ for op in unmatched_deposits
+ )
+ )
+ )
+ for op in unmatched_deposits:
+ op.remarks.append("Herkunft der Einzahlung unbekannt")
+ if withdrawal_queue:
+ log.warning(
+ "Unable to match all withdrawals with deposits. "
+ "Have you added all account statements? "
+ "Following withdrawals couldn't be matched:\n"
+ + (
+ "\n".join(
+ f" - {op.change} {op.coin} from {op.platform} at {op.utc_time}"
+ for op in withdrawal_queue
+ )
+ )
+ )
+ for op in withdrawal_queue:
+ op.remarks.append("Ziel der Auszahlung unbekannt")
+
+ log.info("Finished withdrawal/deposit matching")
+
def get_price_from_csv(self) -> None:
"""Calculate coin prices from buy/sell operations in CSV files.
@@ -1279,6 +1514,145 @@ def get_price_from_csv(self) -> None:
overwrite=True,
)
+ def merge_identical_operations(self) -> None:
+ grouped_ops = misc.group_by(self.operations, tr.Operation.identical_columns)
+ self.operations = [tr.Operation.merge(*ops) for ops in grouped_ops.values()]
+
+ def match_fees(self) -> None:
+ # Split operations in fees and other operations.
+ operations = []
+ all_fees: list[tr.Fee] = []
+
+ for op in self.operations:
+ if isinstance(op, tr.Fee):
+ all_fees.append(op)
+ else:
+ operations.append(op)
+
+ # Only keep none fee operations in book.
+ self.operations = operations
+
+ # Match fees to book operations.
+ for platform, _fees in misc.group_by(all_fees, "platform").items():
+ for utc_time, fees in misc.group_by(_fees, "utc_time").items():
+
+ # Find matching operations by platform and time.
+ matching_operations = {
+ idx: op
+ for idx, op in enumerate(self.operations)
+ if op.platform == platform and op.utc_time == utc_time
+ }
+
+ # Group matching operations in dict with
+ # { operation typename: list of indices }
+ t_op = collections.defaultdict(list)
+ for idx, op in matching_operations.items():
+ t_op[op.type_name].append(idx)
+
+ # Check if this is a buy/sell-pair.
+ # Fees might occure by other operation types,
+ # but this is currently not implemented.
+ is_buy_sell_pair = all(
+ (
+ len(matching_operations) == 2,
+ len(t_op[tr.Buy.type_name_c()]) == 1,
+ len(t_op[tr.Sell.type_name_c()]) == 1,
+ )
+ )
+ if is_buy_sell_pair:
+ # Fees have to be added to all buys and sells.
+ # 1. Fees on sells are the transaction cost,
+ # which might be fully tax relevant for this sell
+ # and which gets removed from the account balance
+ # 2. Fees on buys increase the buy-in price of the coins
+ # which is relevant when selling these (not buying)
+ (sell_idx,) = t_op[tr.Sell.type_name_c()]
+ (buy_idx,) = t_op[tr.Buy.type_name_c()]
+ assert self.operations[sell_idx].fees is None
+ assert self.operations[buy_idx].fees is None
+ self.operations[sell_idx].fees = fees
+ self.operations[buy_idx].fees = fees
+ else:
+ log.warning(
+ "Fee matching is not implemented for this case. "
+ "Your fees will be discarded and are not evaluated in "
+ "the tax evaluation.\n"
+ "Please create an Issue or PR.\n\n"
+ f"{matching_operations=}\n{fees=}"
+ )
+
+ def resolve_trades(self) -> None:
+ # Match trades which belong together (traded at same time).
+ for _, _operations in misc.group_by(self.operations, "platform").items():
+ for _, matching_operations in misc.group_by(
+ _operations, "utc_time"
+ ).items():
+ # Count matching operations by type with dict
+ # { operation typename: list of operations }
+ t_op = collections.defaultdict(list)
+ for op in matching_operations:
+ t_op[op.type_name].append(op)
+
+ # Check if this is a buy/sell-pair.
+ # Fees might occure by other operation types,
+ # but this is currently not implemented.
+ is_buy_sell_pair = all(
+ (
+ len(matching_operations) == 2,
+ len(t_op[tr.Buy.type_name_c()]) == 1,
+ len(t_op[tr.Sell.type_name_c()]) == 1,
+ )
+ )
+ if is_buy_sell_pair:
+ # Add link that this is a trade pair.
+ (buy_op,) = t_op[tr.Buy.type_name_c()]
+ assert isinstance(buy_op, tr.Buy)
+ (sell_op,) = t_op[tr.Sell.type_name_c()]
+ assert isinstance(sell_op, tr.Sell)
+ assert buy_op.link is None
+ assert buy_op.buying_cost is None
+ buy_op.link = sell_op
+ assert sell_op.link is None
+ assert sell_op.selling_value is None
+ sell_op.link = buy_op
+ continue
+
+ # Binance allows to convert small assets in one go to BNB.
+ # Our `merge_identical_column` function merges all BNB which
+ # gets bought at that time together.
+ # BUG Trade connection can not be established with our current
+ # method.
+ # Calculate the buying cost of this type of operation by all
+ # small asset sells.
+ is_binance_bnb_small_asset_transfer = all(
+ (
+ all(op.platform == "binance" for op in matching_operations),
+ len(t_op[tr.Buy.type_name_c()]) == 1,
+ len(t_op[tr.Sell.type_name_c()]) >= 1,
+ len(t_op.keys()) == 2,
+ )
+ )
+
+ if is_binance_bnb_small_asset_transfer:
+ (buy_op,) = t_op[tr.Buy.type_name_c()]
+ assert isinstance(buy_op, tr.Buy)
+ sell_ops = t_op[tr.Sell.type_name_c()]
+ assert all(isinstance(op, tr.Sell) for op in sell_ops)
+ assert buy_op.link is None
+ assert buy_op.buying_cost is None
+ buying_costs = [self.price_data.get_cost(op) for op in sell_ops]
+ buy_op.buying_cost = misc.dsum(buying_costs)
+ assert len(sell_ops) == len(buying_costs)
+ for sell_op, buying_cost in zip(sell_ops, buying_costs):
+ assert isinstance(sell_op, tr.Sell)
+ assert sell_op.link is None
+ assert sell_op.selling_value is None
+ percent = buying_cost / buy_op.buying_cost
+ sell_op.selling_value = self.price_data.get_partial_cost(
+ buy_op, percent
+ )
+ continue
+
def read_file(self, file_path: Path) -> None:
"""Import transactions form an account statement.
@@ -1303,7 +1677,10 @@ def read_file(self, file_path: Path) -> None:
log.info("Reading file from exchange %s at %s", exchange, file_path)
read_file(file_path)
- else:
+ elif file_path.suffix not in (
+ ".zip",
+ ".rar",
+ ):
log.warning(
f"Unable to detect the exchange of file `{file_path}`. "
"Skipping file."
diff --git a/src/config.py b/src/config.py
index f3cd15a..39edf3b 100644
--- a/src/config.py
+++ b/src/config.py
@@ -15,11 +15,13 @@
# along with this program. If not, see .
import configparser
-from datetime import datetime
+import locale
+import zoneinfo
from os import environ
from pathlib import Path
-from dateutil.relativedelta import relativedelta
+# Make sure, that module `tzdata` is installed.
+import tzdata # noqa: F401
import core
@@ -34,13 +36,21 @@
# General config
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
-COUNTRY = core.Country[config["BASE"].get("COUNTRY", "GERMANY")]
+
+try:
+ COUNTRY = core.Country[config["BASE"].get("COUNTRY", "GERMANY")]
+except KeyError as e:
+ raise NotImplementedError(
+ f"Your country {e} is currently not supported. Please create an Issue or PR."
+ )
+
TAX_YEAR = int(config["BASE"].get("TAX_YEAR", "2021"))
REFETCH_MISSING_PRICES = config["BASE"].getboolean("REFETCH_MISSING_PRICES")
MEAN_MISSING_PRICES = config["BASE"].getboolean("MEAN_MISSING_PRICES")
CALCULATE_UNREALIZED_GAINS = config["BASE"].getboolean("CALCULATE_UNREALIZED_GAINS")
MULTI_DEPOT = config["BASE"].getboolean("MULTI_DEPOT")
LOG_LEVEL = config["BASE"].get("LOG_LEVEL", "INFO")
+ALL_AIRDROPS_ARE_GIFTS = config["BASE"].getboolean("ALL_AIRDROPS_ARE_GIFTS")
# Read in environmental variables.
if _env_country := environ.get("COUNTRY"):
@@ -57,13 +67,16 @@
if COUNTRY == core.Country.GERMANY:
FIAT_CLASS = core.Fiat.EUR
PRINCIPLE = core.Principle.FIFO
-
- def IS_LONG_TERM(buy: datetime, sell: datetime) -> bool:
- return buy + relativedelta(years=1) < sell
-
+ LOCAL_TIMEZONE = zoneinfo.ZoneInfo("CET")
+ LOCAL_TIMEZONE_KEY = "MEZ"
+ locale_str = "de_DE"
else:
- raise NotImplementedError(f"Your country {COUNTRY} is not supported.")
+ raise NotImplementedError(
+ f"Your country {COUNTRY} is currently not supported. "
+ "Please create an Issue or PR."
+ )
# Program specific constants.
FIAT = FIAT_CLASS.name # Convert to string.
+locale.setlocale(locale.LC_ALL, locale_str)
diff --git a/src/log_config.py b/src/log_config.py
index 5fe53e9..be84157 100644
--- a/src/log_config.py
+++ b/src/log_config.py
@@ -28,13 +28,14 @@
# Handler
ch = logging.StreamHandler()
+ch.setLevel(logging.DEBUG)
fh = logging.FileHandler(TMP_LOG_FILEPATH, "w")
+fh.setLevel(logging.WARNING)
# Formatter
formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
for handler in (ch, fh):
- handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
log.addHandler(handler)
diff --git a/src/main.py b/src/main.py
index 77a089c..4de1a62 100644
--- a/src/main.py
+++ b/src/main.py
@@ -39,9 +39,23 @@ def main() -> None:
log.warning("Stopping CoinTaxman.")
return
+ # Merge identical operations together, which makes it easier to match
+ # buy/sell to get prices from csv, match fees and reduces database access
+ # (as long as there are only one buy/sell pair per time,
+ # might be problematic otherwise).
+ book.merge_identical_operations()
+ # Resolve dependencies between withdrawals and deposits, which is
+ # necessary to correctly fetch prices and to calculate p/l.
+ book.resolve_deposits()
book.get_price_from_csv()
+ # Match fees with operations AND
+ # Resolve dependencies between sells and buys, which is
+ # necessary to correctly calculate the buying cost of a sold coin
+ book.match_fees()
+ book.resolve_trades()
+
taxman.evaluate_taxation()
- evaluation_file_path = taxman.export_evaluation_as_csv()
+ evaluation_file_path = taxman.export_evaluation_as_excel()
taxman.print_evaluation()
# Save log
@@ -49,6 +63,9 @@ def main() -> None:
log_config.shutdown()
os.rename(TMP_LOG_FILEPATH, log_file_path)
+ print(f"Detailed export saved at {evaluation_file_path} and {log_file_path}")
+ print("If you want to archive the evaluation, run `make archive`.")
+
if __name__ == "__main__":
main()
diff --git a/src/misc.py b/src/misc.py
index af80261..3d94af1 100644
--- a/src/misc.py
+++ b/src/misc.py
@@ -25,10 +25,12 @@
from typing import (
Any,
Callable,
+ Iterable,
Optional,
SupportsFloat,
SupportsInt,
Tuple,
+ Type,
TypeVar,
Union,
cast,
@@ -68,6 +70,33 @@ def xdecimal(
return None if x is None or x == "" else decimal.Decimal(x)
+def cdecimal(x: Union[None, str, int, float, decimal.Decimal]) -> decimal.Decimal:
+ """Convert to decimal and change None to 0.
+
+ See xdecimal for further informations.
+
+ Args:
+ x (Union[None, str, int, float, decimal.Decimal])
+
+ Returns:
+ decimal.Decimal
+ """
+ dec = xdecimal(x)
+ return decimal.Decimal() if dec is None else dec
+
+
+def dsum(__iterable: Iterable[decimal.Decimal]) -> decimal.Decimal:
+ """Builtin sum function, which always returns a decimal.Decimal.
+
+ Args:
+ __iterable (Iterable[decimal.Decimal])
+
+ Returns:
+ decimal.Decimal
+ """
+ return decimal.Decimal(sum(__iterable))
+
+
def force_decimal(x: Union[None, str, int, float, decimal.Decimal]) -> decimal.Decimal:
"""Convert to decimal, but make sure, that empty values raise an error.
@@ -172,22 +201,61 @@ def parse_iso_timestamp_to_decimal_timestamp(d: str) -> decimal.Decimal:
return to_decimal_timestamp(datetime.datetime.fromisoformat(d))
-def group_by(lst: L, key: str) -> dict[Any, L]:
+def group_by(lst: L, key: Union[str, list[str]]) -> dict[Any, L]:
"""Group a list of objects by `key`.
Args:
- lst (list)
- key (str)
+ lst (L)
+ key (Union[str, list[str]])
Returns:
- dict[Any, list]: Dict with different `key`as keys.
+ dict[Any, L]: Dict with different `key`as keys.
"""
d = collections.defaultdict(list)
- for e in lst:
- d[getattr(e, key)].append(e)
+ if isinstance(key, str):
+ for e in lst:
+ d[getattr(e, key)].append(e)
+ elif isinstance(key, list):
+ assert all(isinstance(k, str) for k in key)
+ for e in lst:
+ d[tuple(getattr(e, k) for k in key)].append(e)
+ else:
+ raise TypeError
return dict(d)
+T = TypeVar("T")
+
+
+def sort_by_order_and_key(
+ order: list[Type[T]],
+ list_: list[T],
+ keys: Optional[list[str]] = None,
+) -> list[T]:
+ """Sort a list by list of existing types and arbitrary keys/members.
+
+ If the type is missing in order list. The entry will be placed first.
+
+ Args:
+ order (list[Type[T]]): List with types in correct order.
+ operations (list[T]): List with entries to be sorted.
+ keys (list[str], optional): List of members which will be considered
+ when sorting. Defaults to None.
+
+ Returns:
+ list[T]: Sorted entries by `order` and specific keys.
+ """
+
+ def key_function(op: T) -> tuple:
+ try:
+ idx = order.index(type(op))
+ except ValueError:
+ idx = 0
+ return tuple(([getattr(op, key) for key in keys] if keys else []) + [idx])
+
+ return sorted(list_, key=key_function)
+
+
__delayed: dict[int, datetime.datetime] = {}
@@ -212,18 +280,20 @@ def wrapper(*args, **kwargs):
def is_fiat(symbol: Union[str, core.Fiat]) -> bool:
- """Check if `symbol` is a fiat.
+ """Check if `symbol` is a fiat currency.
Args:
- fiat (str): Currency Symbol.
+ fiat (Union[str, core.Fiat]): Currency Symbol.
Returns:
- bool: True if `symbol` is fiat. False otherwise.
+ bool: True if `symbol` is a fiat currency. False otherwise.
"""
return isinstance(symbol, core.Fiat) or symbol in core.Fiat.__members__
-def get_next_file_path(path: Path, base_filename: str, extension: str) -> Path:
+def get_next_file_path(
+ path: Path, base_filename: str, extensions: Union[str, list[str]]
+) -> Path:
"""Looking for the next free filename in format {base_filename}_revXXX.
The revision number starts with 001 and will always be +1 from the highest
@@ -232,7 +302,7 @@ def get_next_file_path(path: Path, base_filename: str, extension: str) -> Path:
Args:
path (Path)
base_filename (str)
- extension (str)
+ extension (Union[str, list[str]])
Raises:
AssertitionError: When {base_filename}_rev999.{extension}
@@ -242,7 +312,10 @@ def get_next_file_path(path: Path, base_filename: str, extension: str) -> Path:
Path: Path to next free file.
"""
i = 1
- regex = re.compile(base_filename + r"_rev(\d{3})." + extension)
+ extensions = [extensions] if isinstance(extensions, str) else extensions
+ extension = extensions[0]
+ regex = re.compile(base_filename + r"_rev(\d{3}).(" + "|".join(extensions) + ")")
+
for p in path.iterdir():
if p.is_file():
if m := regex.match(p.name):
@@ -267,3 +340,9 @@ def get_current_commit_hash(default: Optional[str] = None) -> str:
if default is None:
raise RuntimeError("Unable to determine commit hash") from e
return default
+
+
+def not_none(v: Optional[T]) -> T:
+ if v is None:
+ raise ValueError()
+ return v
diff --git a/src/patch_database.py b/src/patch_database.py
index 0b8ed29..c8121de 100644
--- a/src/patch_database.py
+++ b/src/patch_database.py
@@ -146,15 +146,29 @@ def __patch_002(db_path: Path) -> None:
for _utc_time, _price in list(cur.fetchall()):
# Convert the data.
- # Try non-fractional seconds first, then fractional seconds
- try:
- utc_time = datetime.datetime.strptime(
- _utc_time, "%Y-%m-%d %H:%M:%S%z"
- )
- except ValueError:
- utc_time = datetime.datetime.strptime(
- _utc_time, "%Y-%m-%d %H:%M:%S.%f%z"
+ # Try non-fractional seconds first, then fractional seconds,
+ # then the same without timezone
+ for dateformat in (
+ "%Y-%m-%d %H:%M:%S%z",
+ "%Y-%m-%d %H:%M:%S.%f%z",
+ "%Y-%m-%d %H:%M:%S",
+ "%Y-%m-%d %H:%M:%S.%f",
+ ):
+ try:
+ utc_time = datetime.datetime.strptime(_utc_time, dateformat)
+ except ValueError:
+ continue
+ else:
+ if not dateformat.endswith("%z"):
+ # Add the missing time zone information.
+ utc_time = utc_time.replace(tzinfo=None)
+ break
+ else:
+ raise ValueError(
+ f"Could not parse date `{_utc_time}` "
+ "in table `{tablename}`."
)
+
price = decimal.Decimal(_price)
set_price_db("", base_asset, quote_asset, utc_time, price, db_path)
conn.execute(f"DROP TABLE `{tablename}`;")
diff --git a/src/price_data.py b/src/price_data.py
index a38139d..59b239e 100644
--- a/src/price_data.py
+++ b/src/price_data.py
@@ -28,19 +28,13 @@
import config
import log_config
import misc
-import transaction
+import transaction as tr
from core import kraken_pair_map
from database import get_price_db, get_tablenames_from_db, mean_price_db, set_price_db
log = log_config.getLogger(__name__)
-# TODO Keep database connection open?
-# TODO Combine multiple exchanges in one file?
-# - Add a database for each exchange (added with ATTACH DATABASE)
-# - Tables in database stay the same
-
-
class FallbackPriceNotFound(Exception):
pass
@@ -590,7 +584,7 @@ def get_price(
try:
get_price = getattr(self, f"_get_price_{platform}")
except AttributeError:
- raise NotImplementedError("Unable to read data from %s", platform)
+ raise NotImplementedError(f"Unable to read data from {platform=}")
price = get_price(coin, utc_time, reference_coin, **kwargs)
assert isinstance(price, decimal.Decimal)
@@ -606,17 +600,25 @@ def get_price(
def get_cost(
self,
- tr: Union[transaction.Operation, transaction.SoldCoin],
+ op_sc: Union[tr.Operation, tr.SoldCoin],
reference_coin: str = config.FIAT,
) -> decimal.Decimal:
- op = tr if isinstance(tr, transaction.Operation) else tr.op
+ op = op_sc if isinstance(op_sc, tr.Operation) else op_sc.op
price = self.get_price(op.platform, op.coin, op.utc_time, reference_coin)
- if isinstance(tr, transaction.Operation):
- return price * tr.change
- if isinstance(tr, transaction.SoldCoin):
- return price * tr.sold
+ if isinstance(op_sc, tr.Operation):
+ return price * op_sc.change
+ if isinstance(op_sc, tr.SoldCoin):
+ return price * op_sc.sold
raise NotImplementedError
+ def get_partial_cost(
+ self,
+ op_sc: Union[tr.Operation, tr.SoldCoin],
+ percent: decimal.Decimal,
+ reference_coin: str = config.FIAT,
+ ) -> decimal.Decimal:
+ return percent * self.get_cost(op_sc, reference_coin=reference_coin)
+
def check_database(self):
stats = {}
diff --git a/src/taxman.py b/src/taxman.py
index edf26f0..7319f59 100644
--- a/src/taxman.py
+++ b/src/taxman.py
@@ -14,32 +14,53 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-import csv
+import collections
+import dataclasses
import datetime
import decimal
from pathlib import Path
-from typing import Optional, Type
+from typing import Any, Optional, Type, Union
+
+import xlsxwriter
+from dateutil.relativedelta import relativedelta
import balance_queue
import config
import core
import log_config
import misc
-import transaction
+import transaction as tr
from book import Book
+from database import get_sorted_tablename
from price_data import PriceData
log = log_config.getLogger(__name__)
+TAX_DEADLINE = min(
+ datetime.datetime.now().replace(tzinfo=config.LOCAL_TIMEZONE), # now
+ datetime.datetime(
+ config.TAX_YEAR, 12, 31, 23, 59, 59, tzinfo=config.LOCAL_TIMEZONE
+ ), # end of year
+)
+
+
+def in_tax_year(op: tr.Operation) -> bool:
+ return op.utc_time.year == config.TAX_YEAR
+
class Taxman:
def __init__(self, book: Book, price_data: PriceData) -> None:
self.book = book
self.price_data = price_data
- self.tax_events: list[transaction.TaxEvent] = []
- # Tax Events which would occur if all left over coins were sold now.
- self.virtual_tax_events: list[transaction.TaxEvent] = []
+ self.tax_report_entries: list[tr.TaxReportEntry] = []
+ self.multi_depot_portfolio: dict[
+ str, dict[str, decimal.Decimal]
+ ] = collections.defaultdict(lambda: collections.defaultdict(decimal.Decimal))
+ self.single_depot_portfolio: dict[
+ str, decimal.Decimal
+ ] = collections.defaultdict(decimal.Decimal)
+ self.unrealized_sells_faulty = False
# Determine used functions/classes depending on the config.
country = config.COUNTRY.name
@@ -48,6 +69,7 @@ def __init__(self, book: Book, price_data: PriceData) -> None:
except AttributeError:
raise NotImplementedError(f"Unable to evaluate taxation for {country=}.")
+ # Determine the BalanceType.
if config.PRINCIPLE == core.Principle.FIFO:
# Explicity define type for BalanceType on first declaration
# to avoid mypy errors.
@@ -61,324 +83,874 @@ def __init__(self, book: Book, price_data: PriceData) -> None:
f"Unable to evaluate taxation for {config.PRINCIPLE=}."
)
- def in_tax_year(self, op: transaction.Operation) -> bool:
- return op.utc_time.year == config.TAX_YEAR
+ self._balances: dict[Any, balance_queue.BalanceQueue] = {}
+
+ ###########################################################################
+ # Helper functions for balances
+ ###########################################################################
+
+ def balance(self, platform: str, coin: str) -> balance_queue.BalanceQueue:
+ key = (platform, coin) if config.MULTI_DEPOT else coin
+ try:
+ return self._balances[key]
+ except KeyError:
+ self._balances[key] = self.BalanceType(coin)
+ return self._balances[key]
+
+ def balance_op(self, op: tr.Operation) -> balance_queue.BalanceQueue:
+ balance = self.balance(op.platform, op.coin)
+ return balance
+
+ def add_to_balance(self, op: tr.Operation) -> None:
+ self.balance_op(op).add(op)
- def tax_deadline(self) -> datetime.datetime:
- return min(
- datetime.datetime(config.TAX_YEAR, 12, 31, 23, 59, 59),
- datetime.datetime.now(),
- ).astimezone()
+ def remove_from_balance(self, op: tr.Operation) -> list[tr.SoldCoin]:
+ return self.balance_op(op).remove(op)
- def _evaluate_taxation_GERMANY(
+ def remove_fees_from_balance(self, fees: Optional[list[tr.Fee]]) -> None:
+ if fees is not None:
+ for fee in fees:
+ self.balance_op(fee).remove_fee(fee)
+
+ ###########################################################################
+ # Country specific evaluation functions.
+ ###########################################################################
+
+ def _evaluate_fee(
self,
- coin: str,
- operations: list[transaction.Operation],
- ) -> None:
- balance = self.BalanceType()
-
- def evaluate_sell(
- op: transaction.Operation, force: bool = False
- ) -> Optional[transaction.TaxEvent]:
- # Remove coins from queue.
- sold_coins, unsold_coins = balance.sell(op.change)
-
- if coin == config.FIAT:
- # Not taxable.
- return None
-
- if unsold_coins:
- # Queue ran out of items to sell and not all coins
- # could be sold.
- log.error(
- f"{op.file_path.name}: Line {op.line}: "
- f"Not enough {coin} in queue to sell: "
- f"missing {unsold_coins} {coin} "
- f"(transaction from {op.utc_time} "
- f"on {op.platform})\n"
- "\tThis error occurs if your account statements "
- "have unmatched buy/sell positions.\n"
- "\tHave you added all your account statements "
- "of the last years?\n"
- "\tThis error may also occur after deposits "
- "from unknown sources.\n"
- )
- raise RuntimeError
-
- if not self.in_tax_year(op) and not force:
- # Sell is only taxable in the respective year.
- return None
-
- taxation_type = "Sonstige Einkünfte"
- # Price of the sell.
- sell_price = self.price_data.get_cost(op)
- taxed_gain = decimal.Decimal()
- real_gain = decimal.Decimal()
- # Coins which are older than (in this case) one year or
- # which come from an Airdrop, CoinLend or Commission (in an
- # foreign currency) will not be taxed.
- for sc in sold_coins:
- is_taxable = not config.IS_LONG_TERM(
- sc.op.utc_time, op.utc_time
- ) and not (
- isinstance(
- sc.op,
- (
- transaction.Airdrop,
- transaction.CoinLendInterest,
- transaction.StakingInterest,
- transaction.Commission,
- ),
- )
- and not sc.op.coin == config.FIAT
+ fee: tr.Fee,
+ percent: decimal.Decimal,
+ ) -> tuple[decimal.Decimal, str, decimal.Decimal]:
+ return (
+ fee.change * percent,
+ fee.coin,
+ self.price_data.get_partial_cost(fee, percent),
+ )
+
+ def get_buy_cost(self, sc: tr.SoldCoin) -> decimal.Decimal:
+ """Calculate the buy cost of a sold coin.
+
+ Args:
+ sc (tr.SoldCoin): The sold coin.
+
+ Raises:
+ NotImplementedError: Calculation is currently not implemented
+ for buy operations.
+
+ Returns:
+ decimal.Decimal: The buy value of the sold coin in fiat
+ """
+ assert sc.sold <= sc.op.change
+ percent = sc.sold / sc.op.change
+
+ # Fees paid when buying the now sold coins.
+ buying_fees = decimal.Decimal()
+ if sc.op.fees:
+ buying_fees = misc.dsum(
+ self.price_data.get_partial_cost(f, percent) for f in sc.op.fees
+ )
+
+ if isinstance(sc.op, tr.Buy):
+ # Buy cost of a bought coin should be the sell value of the
+ # previously sold coin and not the sell value of the bought coin.
+ # Gains of combinations like below are not correctly calculated:
+ # 1 BTC=1€, 1ETH=2€, 1BTC=1ETH
+ # e.g. buy 1 BTC for 1 €, buy 1 ETH for 1 BTC, buy 2 € for 1 ETH.
+ if sc.op.buying_cost:
+ buy_value = sc.op.buying_cost * percent
+ elif sc.op.link:
+ prev_sell_value = self.price_data.get_partial_cost(sc.op.link, percent)
+ buy_value = prev_sell_value
+ else:
+ log.warning(
+ "Unable to correctly determine buy cost of bought coins "
+ "because the link to the corresponding previous sell could "
+ "not be estalished. Buying cost will be set to the buy "
+ "value of the bought coins instead of the sell value of the "
+ "previously sold coins of the trade. "
+ "The calculated buy cost might be wrong. "
+ "This may lead to a false tax evaluation.\n"
+ f"{sc.op}"
)
- # Only calculate the gains if necessary.
- if is_taxable or config.CALCULATE_UNREALIZED_GAINS:
- partial_sell_price = (sc.sold / op.change) * sell_price
- sold_coin_cost = self.price_data.get_cost(sc)
- gain = partial_sell_price - sold_coin_cost
- if is_taxable:
- taxed_gain += gain
- if config.CALCULATE_UNREALIZED_GAINS:
- real_gain += gain
- remark = ", ".join(
- f"{sc.sold} from {sc.op.utc_time} " f"({sc.op.__class__.__name__})"
- for sc in sold_coins
+ buy_value = self.price_data.get_cost(sc)
+ else:
+ # All other operations "begin their existence" as that coin and
+ # weren't traded/exchanged before.
+ # The buy cost of these coins is the value from when yout got them.
+ buy_value = self.price_data.get_cost(sc)
+
+ return buy_value + buying_fees
+
+ def get_sell_value(
+ self,
+ op: tr.Sell,
+ sc: tr.SoldCoin,
+ ) -> decimal.Decimal:
+ """Calculate the sell value by determining the market price for the
+ with that sell bought coins.
+
+ Args:
+ sc (tr.SoldCoin): The sold coin.
+ ReportType (Union[Type[tr.SellReportEntry],
+ Type[tr.UnrealizedSellReportEntry]])
+
+ Returns:
+ decimal.Decimal: The sell value.
+ """
+ assert sc.op.coin == op.coin
+ percent = sc.sold / op.change
+
+ if op.selling_value:
+ sell_value = op.selling_value * percent
+ elif op.link:
+ sell_value = self.price_data.get_partial_cost(op.link, percent)
+ else:
+ sell_value = self.price_data.get_partial_cost(op, percent)
+
+ return sell_value
+
+ def _get_fee_param_dict(self, op: tr.Operation, percent: decimal.Decimal) -> dict:
+
+ # fee amount/coin/in_fiat
+ first_fee_amount = decimal.Decimal(0)
+ first_fee_coin = ""
+ first_fee_in_fiat = decimal.Decimal(0)
+ second_fee_amount = decimal.Decimal(0)
+ second_fee_coin = ""
+ second_fee_in_fiat = decimal.Decimal(0)
+ if op.fees is None or len(op.fees) == 0:
+ pass
+ elif len(op.fees) >= 1:
+ first_fee_amount, first_fee_coin, first_fee_in_fiat = self._evaluate_fee(
+ op.fees[0], percent
)
- return transaction.TaxEvent(
- taxation_type,
- taxed_gain,
- op,
- sell_price,
- real_gain,
- remark,
+ elif len(op.fees) >= 2:
+ second_fee_amount, second_fee_coin, second_fee_in_fiat = self._evaluate_fee(
+ op.fees[1], percent
)
+ else:
+ raise NotImplementedError("More than two fee coins are not supported")
+
+ return dict(
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ second_fee_amount=second_fee_amount,
+ second_fee_coin=second_fee_coin,
+ second_fee_in_fiat=second_fee_in_fiat,
+ )
- for op in operations:
- if isinstance(op, transaction.Fee):
- balance.remove_fee(op.change)
- if self.in_tax_year(op):
- # Fees reduce taxed gain.
- taxation_type = "Sonstige Einkünfte"
- taxed_gain = -self.price_data.get_cost(op)
- tx = transaction.TaxEvent(taxation_type, taxed_gain, op)
- self.tax_events.append(tx)
- elif isinstance(op, transaction.CoinLend):
- pass
- elif isinstance(op, transaction.CoinLendEnd):
- pass
- elif isinstance(op, transaction.Staking):
- pass
- elif isinstance(op, transaction.StakingEnd):
- pass
- elif isinstance(op, transaction.Buy):
- balance.put(op)
- elif isinstance(op, transaction.Sell):
- if tx_ := evaluate_sell(op):
- self.tax_events.append(tx_)
- elif isinstance(
- op, (transaction.CoinLendInterest, transaction.StakingInterest)
- ):
- balance.put(op)
- if self.in_tax_year(op):
- if misc.is_fiat(coin):
- assert not isinstance(
- op, transaction.StakingInterest
- ), "You can not stake fiat currencies."
+ def _evaluate_sell(
+ self,
+ op: tr.Sell,
+ sc: tr.SoldCoin,
+ ReportType: Union[
+ Type[tr.SellReportEntry], Type[tr.UnrealizedSellReportEntry]
+ ] = tr.SellReportEntry,
+ ) -> None:
+ """Evaluate a (partial) sell operation.
+
+ Args:
+ op (tr.Sell): The general sell operation.
+ sc (tr.SoldCoin): The specific sold coins with their origin (sc.op).
+ `sc.sold` can be a partial sell of `op.change`.
+ ReportType (Union[Type[tr.SellReportEntry],
+ Type[tr.UnrealizedSellReportEntry]], optional):
+ The type of the report entry. Defaults to tr.SellReportEntry.
+
+ Raises:
+ NotImplementedError: When there are more than two different fee coins.
+ """
+ assert op.coin == sc.op.coin
+ assert op.change >= sc.sold
+
+ # Share the fees and sell_value proportionally to the coins sold.
+ percent = sc.sold / op.change
+
+ # Ignore fees for UnrealizedSellReportEntry.
+ fee_params = self._get_fee_param_dict(op, percent)
+ if ReportType is tr.UnrealizedSellReportEntry:
+ # Make sure, that the unrealized sell has no fees.
+ assert not any(v for v in fee_params.values())
+ # Do not give fee parameters to ReportEntry object.
+ fee_params = {}
+ buy_cost_in_fiat = self.get_buy_cost(sc)
+
+ # Taxable when sell is not more than one year after buy.
+ is_taxable = sc.op.utc_time + relativedelta(years=1) >= op.utc_time
+
+ try:
+ sell_value_in_fiat = self.get_sell_value(op, sc)
+ except Exception as e:
+ if ReportType is tr.UnrealizedSellReportEntry:
+ log.warning(
+ "Catched the following exception while trying to query an "
+ f"unrealized sell value for {sc.sold} {sc.op.coin} at deadline "
+ f"on platform {sc.op.platform}. "
+ "If you want to see your unrealized sell value "
+ "in the evaluation, please add a price by hand in the "
+ f"table {get_sorted_tablename(op.coin, config.FIAT)[0]} "
+ f"at {op.utc_time}; "
+ "The sell value for this calculation will be set to 0. "
+ "Your unrealized sell summary will be wrong and will not "
+ "be exported.\n"
+ f"Catched exception: {e}"
+ )
+ sell_value_in_fiat = decimal.Decimal()
+ self.unrealized_sells_faulty = True
+ else:
+ raise e
+
+ sell_report_entry = ReportType(
+ sell_platform=op.platform,
+ buy_platform=sc.op.platform,
+ amount=sc.sold,
+ coin=op.coin,
+ sell_utc_time=op.utc_time,
+ buy_utc_time=sc.op.utc_time,
+ **fee_params, # type: ignore[call-arg]
+ sell_value_in_fiat=sell_value_in_fiat,
+ buy_cost_in_fiat=buy_cost_in_fiat,
+ is_taxable=is_taxable,
+ taxation_type="Einkünfte aus privaten Veräußerungsgeschäften",
+ remark=op.remark,
+ )
+
+ self.tax_report_entries.append(sell_report_entry)
+
+ def evaluate_sell(
+ self,
+ op: tr.Sell,
+ sold_coins: list[tr.SoldCoin],
+ ) -> None:
+ assert op.coin != config.FIAT
+ assert in_tax_year(op)
+ assert op.change == misc.dsum(sc.sold for sc in sold_coins)
+
+ for sc in sold_coins:
+
+ if isinstance(sc.op, tr.Deposit) and sc.op.link:
+ assert (
+ sc.op.link.change >= sc.op.change
+ ), "Withdrawal must be equal or greater than the deposited amount."
+ deposit_fee = sc.op.link.change - sc.op.change
+ sold_percent = sc.sold / sc.op.change
+ sold_deposit_fee = deposit_fee * sold_percent
+
+ for wsc in sc.op.link.partial_withdrawn_coins(sold_percent):
+ wsc_percent = wsc.sold / sc.op.link.change
+ wsc_deposit_fee = sold_deposit_fee * wsc_percent
+
+ if wsc_deposit_fee:
+ # TODO Are withdrawal/deposit fees tax relevant?
+ log.warning(
+ "You paid fees for withdrawal and deposit of coins. "
+ "I am currently not sure if you can reduce your taxed "
+ "gain with these. For now, the deposit/withdrawal fees "
+ "are not included in the tax report. "
+ "Please open an issue or PR if you can resolve this."
+ )
+ # # Deposit fees are evaluated on deposited platform.
+ # wsc_fee_in_fiat = (
+ # self.price_data.get_price(
+ # sc.op.platform,
+ # sc.op.coin,
+ # sc.op.utc_time,
+ # config.FIAT,
+ # )
+ # * wsc_deposit_fee
+ # )
+
+ self._evaluate_sell(op, wsc)
+
+ else:
+
+ if isinstance(sc.op, tr.Deposit):
+ # Raise a warning when a deposit link is missing.
+ log.warning(
+ f"You sold {sc.op.change} {sc.op.coin} which were deposited "
+ f"from somewhere unknown onto {sc.op.platform} (see "
+ f"{sc.op.file_path} {sc.op.line}). "
+ "A correct tax evaluation might not be possible! "
+ "For now, we assume that the coins were bought at "
+ "the timestamp of the deposit. "
+ "If these coins get sold one year after this "
+ "the sell is not tax relevant and everything is fine."
+ )
+
+ self._evaluate_sell(op, sc)
+
+ def _evaluate_taxation_GERMANY(self, op: tr.Operation) -> None:
+ report_entry: tr.TaxReportEntry
+
+ if isinstance(op, (tr.CoinLend, tr.Staking)):
+ # TODO determine which coins get lended/etc., use fifo if it's
+ # unclear. it might be worth to optimize the order
+ # of coins given away (is this legal?)
+ # TODO mark them as currently lended/etc., so they don't get sold
+ pass
+
+ elif isinstance(op, (tr.CoinLendEnd, tr.StakingEnd)):
+ # TODO determine which coins come back from lending/etc. use fifo
+ # if it's unclear; it might be nice to match start and
+ # end of these operations like deposit and withdrawal operations.
+ # e.g.
+ # - lending 1 coin for 2 months
+ # - lending 2 coins for 1 month
+ # - getting back 2 coins from lending
+ # --> this should be the second and third coin,
+ # not the first and second
+ # TODO mark them as not lended/etc. anymore, so they could be sold
+ # again
+ # TODO Add Lending/Staking TaxReportEntry (duration of lend)
+ # TODO maybe add total accumulated fees?
+ # might be impossible to match CoinInterest with CoinLend periods
+ pass
+
+ elif isinstance(op, tr.Buy):
+ # Buys and sells always come in a pair. The buying/receiving
+ # part is not tax relevant per se.
+ # The fees of this buy/sell-transaction are saved internally in
+ # both operations. The "buying fees" are only relevant when
+ # detemining the acquisition cost of the bought coins.
+ # For now we'll just add our bought coins to the balance.
+ self.add_to_balance(op)
+
+ # TODO Only adding the Buys don't bring that much. We should add
+ # all trades instead (buy with buy.link)
+ # Add to export for informational purpose.
+ # if in_tax_year(op):
+ # fee_params = self._get_fee_param_dict(op, decimal.Decimal(1))
+ # tax_report_entry = tr.BuyReportEntry(
+ # platform=op.platform,
+ # amount=op.change,
+ # coin=op.coin,
+ # utc_time=op.utc_time,
+ # **fee_params,
+ # buy_value_in_fiat=self.price_data.get_cost(op),
+ # remark=op.remark,
+ # )
+ # self.tax_report_entries.append(tax_report_entry)
+
+ elif isinstance(op, tr.Sell):
+ # Buys and sells always come in a pair. The selling/redeeming
+ # time is tax relevant.
+ # Remove the sold coins and paid fees from the balance.
+ # Evaluate the sell to determine the taxed gain and other relevant
+ # informations for the tax declaration.
+ sold_coins = self.remove_from_balance(op)
+ self.remove_fees_from_balance(op.fees)
+
+ if op.coin != config.FIAT and in_tax_year(op):
+ self.evaluate_sell(op, sold_coins)
+
+ elif isinstance(op, (tr.CoinLendInterest, tr.StakingInterest)):
+ # Received coins from lending or staking. Add the received coins
+ # to the balance.
+ self.add_to_balance(op)
+
+ if in_tax_year(op):
+ # Determine the taxation type depending on the received coin.
+ if isinstance(op, tr.CoinLendInterest):
+ if misc.is_fiat(op.coin):
+ ReportType = tr.InterestReportEntry
taxation_type = "Einkünfte aus Kapitalvermögen"
else:
+ ReportType = tr.LendingInterestReportEntry
taxation_type = "Einkünfte aus sonstigen Leistungen"
- taxed_gain = self.price_data.get_cost(op)
- tx = transaction.TaxEvent(taxation_type, taxed_gain, op)
- self.tax_events.append(tx)
- elif isinstance(op, transaction.Airdrop):
- balance.put(op)
- elif isinstance(op, transaction.Commission):
- balance.put(op)
- if self.in_tax_year(op):
+ elif isinstance(op, tr.StakingInterest):
+ ReportType = tr.StakingInterestReportEntry
taxation_type = "Einkünfte aus sonstigen Leistungen"
- taxed_gain = self.price_data.get_cost(op)
- tx = transaction.TaxEvent(taxation_type, taxed_gain, op)
- self.tax_events.append(tx)
- elif isinstance(op, transaction.Deposit):
- if coin != config.FIAT:
- log.warning(
- f"Unresolved deposit of {op.change} {coin} "
- f"on {op.platform} at {op.utc_time}. "
- "The evaluation might be wrong."
- )
- elif isinstance(op, transaction.Withdrawal):
- if coin != config.FIAT:
- log.warning(
- f"Unresolved withdrawal of {op.change} {coin} "
- f"from {op.platform} at {op.utc_time}. "
- "The evaluation might be wrong."
- )
+ else:
+ raise NotImplementedError
+
+ report_entry = ReportType(
+ platform=op.platform,
+ amount=op.change,
+ coin=op.coin,
+ utc_time=op.utc_time,
+ interest_in_fiat=self.price_data.get_cost(op),
+ taxation_type=taxation_type,
+ remark=op.remark,
+ )
+ self.tax_report_entries.append(report_entry)
+
+ elif isinstance(op, tr.Airdrop):
+ # Depending on how you received the coins, the taxation varies.
+ # If you didn't "do anything" to get the coins, the airdrop counts
+ # as a gift.
+ self.add_to_balance(op)
+
+ if in_tax_year(op):
+ if config.ALL_AIRDROPS_ARE_GIFTS:
+ taxation_type = "Schenkung"
+ else:
+ taxation_type = "Einkünfte aus sonstigen Leistungen"
+ report_entry = tr.AirdropReportEntry(
+ platform=op.platform,
+ amount=op.change,
+ coin=op.coin,
+ utc_time=op.utc_time,
+ in_fiat=self.price_data.get_cost(op),
+ taxation_type=taxation_type,
+ remark=op.remark,
+ )
+ self.tax_report_entries.append(report_entry)
+
+ elif isinstance(op, tr.Commission):
+ # You received a commission. It is assumed that his is a customer-
+ # recruit-customer-bonus which is taxed as `Einkünfte aus sonstigen
+ # Leistungen`.
+ self.add_to_balance(op)
+
+ if in_tax_year(op):
+ report_entry = tr.CommissionReportEntry(
+ platform=op.platform,
+ amount=op.change,
+ coin=op.coin,
+ utc_time=op.utc_time,
+ in_fiat=self.price_data.get_cost(op),
+ taxation_type="Einkünfte aus sonstigen Leistungen",
+ remark=op.remark,
+ )
+ self.tax_report_entries.append(report_entry)
+
+ elif isinstance(op, tr.Deposit):
+ # Coins get deposited onto this platform/balance.
+ self.add_to_balance(op)
+
+ if op.link:
+ assert op.coin == op.link.coin
+ assert op.fees is None
+ first_fee_amount = op.link.change - op.change
+ first_fee_coin = op.coin if first_fee_amount else ""
+ first_fee_in_fiat = (
+ self.price_data.get_price(op.platform, op.coin, op.utc_time)
+ if first_fee_amount
+ else decimal.Decimal()
+ )
+ report_entry = tr.TransferReportEntry(
+ first_platform=op.platform,
+ second_platform=op.link.platform,
+ amount=op.change,
+ coin=op.coin,
+ first_utc_time=op.utc_time,
+ second_utc_time=op.link.utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ remark=op.remark,
+ )
else:
- raise NotImplementedError
+ assert op.fees is None
+ report_entry = tr.DepositReportEntry(
+ platform=op.platform,
+ amount=op.change,
+ coin=op.coin,
+ utc_time=op.utc_time,
+ first_fee_amount=decimal.Decimal(),
+ first_fee_coin="",
+ first_fee_in_fiat=decimal.Decimal(),
+ remark=op.remark,
+ )
- # Check that all relevant positions were considered.
- if balance.buffer_fee:
- log.warning(
- "Balance has outstanding fees which were not considered: "
- f"{balance.buffer_fee} {coin}"
- )
+ self.tax_report_entries.append(report_entry)
+
+ elif isinstance(op, tr.Withdrawal):
+ # Coins get moved to somewhere else. At this point, we only have
+ # to remove them from the corresponding balance.
+ op.withdrawn_coins = self.remove_from_balance(op)
+
+ if not op.has_link:
+ assert op.fees is None
+ report_entry = tr.WithdrawalReportEntry(
+ platform=op.platform,
+ amount=op.change,
+ coin=op.coin,
+ utc_time=op.utc_time,
+ first_fee_amount=decimal.Decimal(),
+ first_fee_coin="",
+ first_fee_in_fiat=decimal.Decimal(),
+ remark=op.remark,
+ )
+ self.tax_report_entries.append(report_entry)
- # Calculate the amount of coins which should be left on the platform
- # and evaluate the (taxed) gain, if the coin would be sold right now.
- if config.CALCULATE_UNREALIZED_GAINS and (
- (left_coin := sum(((bop.op.change - bop.sold) for bop in balance.queue)))
- ):
- assert isinstance(left_coin, decimal.Decimal)
- # Calculate unrealized gains for the last time of `TAX_YEAR`.
- # If we are currently in ´TAX_YEAR` take now.
- virtual_sell = transaction.Sell(
- self.tax_deadline(),
- op.platform,
- left_coin,
- coin,
- -1,
- Path(""),
- )
- if tx_ := evaluate_sell(virtual_sell, force=True):
- self.virtual_tax_events.append(tx_)
+ else:
+ raise NotImplementedError
- def _evaluate_taxation_per_coin(
- self,
- operations: list[transaction.Operation],
- ) -> None:
- """Evaluate the taxation for a list of operations per coin using
- country specific functions.
+ def _evaluate_unrealized_sells(self) -> None:
+ """Evaluate the unrealized sells at taxation deadline."""
+ for balance in self._balances.values():
+ # Get all left over coins from the balance.
+ sold_coins = balance.remove_all()
+ for sc in sold_coins:
+ # Sum up the portfolio at deadline.
+ # If the evaluation was done with a virtual single depot,
+ # the values per platform might not match the real values at
+ # platform.
+ self.multi_depot_portfolio[sc.op.platform][sc.op.coin] += sc.sold
+ self.single_depot_portfolio[sc.op.coin] += sc.sold
+
+ if sc.op.coin != config.FIAT:
+ # "Sell" these coins which makes it possible to calculate
+ # the unrealized gain afterwards.
+ unrealized_sell = tr.Sell(
+ utc_time=TAX_DEADLINE,
+ platform=sc.op.platform,
+ change=sc.sold,
+ coin=sc.op.coin,
+ line=[-1],
+ file_path=Path(),
+ fees=None,
+ )
+ self._evaluate_sell(
+ unrealized_sell,
+ sc,
+ ReportType=tr.UnrealizedSellReportEntry,
+ )
- Args:
- operations (list[transaction.Operation])
- """
- for coin, coin_operations in misc.group_by(operations, "coin").items():
- coin_operations = transaction.sort_operations(coin_operations, ["utc_time"])
- self.__evaluate_taxation(coin, coin_operations)
+ ###########################################################################
+ # General tax evaluation functions.
+ ###########################################################################
def evaluate_taxation(self) -> None:
- """Evaluate the taxation using country specific function."""
+ """Evaluate the taxation using country specific functions."""
log.debug("Starting evaluation...")
assert all(
op.utc_time.year <= config.TAX_YEAR for op in self.book.operations
), "For tax evaluation, no operation should happen after the tax year."
- if config.MULTI_DEPOT:
- # Evaluate taxation separated by platforms and coins.
- for _, operations in misc.group_by(
- self.book.operations, "platform"
- ).items():
- self._evaluate_taxation_per_coin(operations)
- else:
- # Evaluate taxation separated by coins in a single virtual depot.
- self._evaluate_taxation_per_coin(self.book.operations)
+ # Sort the operations by time.
+ operations = tr.sort_operations(self.book.operations, ["utc_time"])
+
+ # Evaluate the operations one by one.
+ # Difference between the config.MULTI_DEPOT and "single depot" method
+ # is done by keeping balances per platform and coin or only
+ # per coin (see self.balance).
+ for operation in operations:
+ self.__evaluate_taxation(operation)
+
+ # Make sure, that all fees were paid.
+ for balance in self._balances.values():
+ balance.sanity_check()
+
+ # Evaluate the balance at deadline to calculate unrealized sells.
+ if config.CALCULATE_UNREALIZED_GAINS:
+ self._evaluate_unrealized_sells()
+
+ ###########################################################################
+ # Export / Summary
+ ###########################################################################
def print_evaluation(self) -> None:
"""Print short summary of evaluation to stdout."""
- eval_str = "Evaluation:\n\n"
-
- # Summarize the tax evaluation.
- if self.tax_events:
- eval_str += f"Your tax evaluation for {config.TAX_YEAR}:\n"
- for taxation_type, tax_events in misc.group_by(
- self.tax_events, "taxation_type"
- ).items():
- taxed_gains = sum(tx.taxed_gain for tx in tax_events)
- eval_str += f"{taxation_type}: {taxed_gains:.2f} {config.FIAT}\n"
- else:
- eval_str += (
- "Either the evaluation has not run or there are no tax events "
- f"for {config.TAX_YEAR}.\n"
- )
-
- # Summarize the virtual sell, if all left over coins would be sold right now.
- if self.virtual_tax_events:
- assert config.CALCULATE_UNREALIZED_GAINS
- latest_operation = max(
- self.virtual_tax_events, key=lambda tx: tx.op.utc_time
+ eval_str = (
+ f"Your tax evaluation for {config.TAX_YEAR} "
+ f"(Deadline {TAX_DEADLINE.strftime('%d.%m.%Y')}):\n\n"
+ )
+ for taxation_type, tax_report_entries in misc.group_by(
+ self.tax_report_entries, "taxation_type"
+ ).items():
+ if taxation_type is None:
+ continue
+ taxable_gain = misc.dsum(
+ tre.taxable_gain_in_fiat
+ for tre in tax_report_entries
+ if not isinstance(tre, tr.UnrealizedSellReportEntry)
)
- lo_date = latest_operation.op.utc_time.strftime("%d.%m.%y")
+ eval_str += f"{taxation_type}: {taxable_gain:.2f} {config.FIAT}\n"
+
+ unrealized_report_entries = [
+ tre
+ for tre in self.tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
+ ]
+ assert all(tre.gain_in_fiat is not None for tre in unrealized_report_entries)
+ unrealized_gain = misc.dsum(
+ misc.not_none(tre.gain_in_fiat) for tre in unrealized_report_entries
+ )
+ unrealized_taxable_gain = misc.dsum(
+ tre.taxable_gain_in_fiat for tre in unrealized_report_entries
+ )
- invsted = sum(tx.sell_price for tx in self.virtual_tax_events)
- real_gains = sum(tx.real_gain for tx in self.virtual_tax_events)
- taxed_gains = sum(tx.taxed_gain for tx in self.virtual_tax_events)
- eval_str += "\n"
+ if config.CALCULATE_UNREALIZED_GAINS:
eval_str += (
- f"Deadline {config.TAX_YEAR}: {lo_date}\n"
- f"You were invested with {invsted:.2f} {config.FIAT}.\n"
- f"If you would have sold everything then, "
- f"you would have realized {real_gains:.2f} {config.FIAT} gains "
- f"({taxed_gains:.2f} {config.FIAT} taxed gain).\n"
+ "----------------------------------------\n"
+ f"Unrealized gain: {unrealized_gain:.2f} {config.FIAT}\n"
+ "Unrealized taxable gain at deadline: "
+ f"{unrealized_taxable_gain:.2f} {config.FIAT}\n"
+ "----------------------------------------\n"
+ f"Your portfolio on {TAX_DEADLINE.strftime('%x')} was:\n"
)
- eval_str += "\n"
- eval_str += f"Your portfolio on {lo_date} was:\n"
- for tx in sorted(
- self.virtual_tax_events,
- key=lambda tx: tx.sell_price,
- reverse=True,
- ):
- eval_str += (
- f"{tx.op.platform}: "
- f"{tx.op.change:.6f} {tx.op.coin} > "
- f"{tx.sell_price:.2f} {config.FIAT} "
- f"({tx.real_gain:.2f} gain, {tx.taxed_gain:.2f} taxed gain)\n"
- )
+ if config.MULTI_DEPOT:
+ for platform, platform_portfolio in self.multi_depot_portfolio.items():
+ for coin, amount in platform_portfolio.items():
+ eval_str += f"{platform} {coin}: {amount:.2f}\n"
+ else:
+ for coin, amount in self.single_depot_portfolio.items():
+ eval_str += f"{coin}: {amount:.2f}\n"
log.info(eval_str)
- def export_evaluation_as_csv(self) -> Path:
- """Export detailed summary of all tax events to CSV.
+ def export_evaluation_as_excel(self) -> Path:
+ """Export detailed summary of all tax events to Excel.
File will be placed in export/ with ascending revision numbers
(in case multiple evaluations will be done).
- When no tax events occured, the CSV will be exported only with
- a header line.
+ When no tax events occured, the Excel will be exported only with
+ a header line and a general sheet.
Returns:
Path: Path to the exported file.
"""
file_path = misc.get_next_file_path(
- config.EXPORT_PATH, str(config.TAX_YEAR), "csv"
+ config.EXPORT_PATH, str(config.TAX_YEAR), ["xlsx", "log"]
+ )
+ wb = xlsxwriter.Workbook(file_path, {"remove_timezone": True})
+ datetime_format = wb.add_format({"num_format": "dd.mm.yyyy hh:mm;@"})
+ date_format = wb.add_format({"num_format": "dd.mm.yyyy;@"})
+ change_format = wb.add_format({"num_format": "#,##0.00000000"})
+ fiat_format = wb.add_format({"num_format": "#,##0.00"})
+ header_format = wb.add_format(
+ {
+ "bold": True,
+ "border": 5,
+ "align": "center",
+ "valign": "vcenter",
+ "text_wrap": True,
+ }
)
- with open(file_path, "w", newline="", encoding="utf8") as f:
- writer = csv.writer(f)
- # Add embedded metadata info
- writer.writerow(
- ["# software", "CoinTaxman "]
+ def get_format(field: dataclasses.Field) -> Optional[xlsxwriter.format.Format]:
+ if field.type in ("datetime.datetime", "Optional[datetime.datetime]"):
+ return datetime_format
+ if field.type in ("decimal.Decimal", "Optional[decimal.Decimal]"):
+ if field.name.endswith("in_fiat"):
+ return fiat_format
+ return change_format
+ return None
+
+ #
+ # General
+ #
+ last_day = TAX_DEADLINE.date()
+ first_day = last_day.replace(month=1, day=1)
+ time_period = f"{first_day.strftime('%x')}–{last_day.strftime('%x')}"
+ ws_general = wb.add_worksheet("Allgemein")
+ row = 0
+ ws_general.merge_range(row, 0, 0, 1, "Allgemeine Daten", header_format)
+ row += 1
+ ws_general.write_row(
+ row, 0, ["Zeitraum des Steuerberichts", time_period], date_format
+ )
+ row += 1
+ ws_general.write_row(
+ row, 0, ["Verbrauchsfolgeverfahren", config.PRINCIPLE.name], date_format
+ )
+ row += 1
+ ws_general.write_row(
+ row,
+ 0,
+ ["Methode", "Multi-Depot" if config.MULTI_DEPOT else "Single-Depot"],
+ date_format,
+ )
+ row += 1
+ ws_general.write_row(row, 0, ["Alle Zeiten in", config.LOCAL_TIMEZONE_KEY])
+ row += 1
+ ws_general.write_row(
+ row,
+ 0,
+ ["Erstellt am", datetime.datetime.now(config.LOCAL_TIMEZONE)],
+ datetime_format,
+ )
+ row += 1
+ ws_general.write_row(
+ row, 0, ["Software", "CoinTaxman "]
+ )
+ row += 1
+ commit_hash = misc.get_current_commit_hash(default="undetermined")
+ ws_general.write_row(row, 0, ["Version (Commit)", commit_hash])
+ row += 1
+ # Set column format and freeze first row.
+ ws_general.set_column(0, 0, 26)
+ ws_general.set_column(1, 1, 21)
+ ws_general.freeze_panes(1, 0)
+
+ #
+ # Add summary of tax relevant amounts.
+ #
+ ws_summary = wb.add_worksheet("Zusammenfassung")
+ ws_summary.write_row(
+ 0,
+ 0,
+ [
+ "Einkunftsart",
+ "steuerbarer Veräußerungserlös in EUR",
+ "steuerbare Anschaffungskosten in EUR",
+ "steuerbare Werbungskosten in EUR",
+ "steuerbarer Gewinn/Verlust in EUR",
+ ],
+ header_format,
+ )
+ row = 1
+ for taxation_type, tax_report_entries in misc.group_by(
+ self.tax_report_entries, "taxation_type"
+ ).items():
+ if taxation_type is None:
+ continue
+ first_value_in_fiat = None
+ second_value_in_fiat = None
+ total_fee_in_fiat = None
+ if taxation_type == "Einkünfte aus privaten Veräußerungsgeschäften":
+ first_value_in_fiat = misc.dsum(
+ misc.cdecimal(tre.first_value_in_fiat)
+ for tre in tax_report_entries
+ if tre.taxable_gain_in_fiat
+ and not isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ second_value_in_fiat = misc.dsum(
+ misc.cdecimal(tre.second_value_in_fiat)
+ for tre in tax_report_entries
+ if (
+ not isinstance(tre, tr.UnrealizedSellReportEntry)
+ and tre.taxable_gain_in_fiat
+ )
+ )
+ total_fee_in_fiat = misc.dsum(
+ misc.cdecimal(tre.total_fee_in_fiat)
+ for tre in tax_report_entries
+ if (
+ not isinstance(tre, tr.UnrealizedSellReportEntry)
+ and tre.taxable_gain_in_fiat
+ )
+ )
+ taxable_gain = misc.dsum(
+ tre.taxable_gain_in_fiat
+ for tre in tax_report_entries
+ if not isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ ws_summary.write_row(
+ row,
+ 0,
+ [
+ taxation_type,
+ first_value_in_fiat,
+ second_value_in_fiat,
+ total_fee_in_fiat,
+ taxable_gain,
+ ],
)
- commit_hash = misc.get_current_commit_hash(default="undetermined")
- writer.writerow(["# commit", commit_hash])
- writer.writerow(["# updated", datetime.date.today().strftime("%x")])
-
- header = [
- "Date",
- "Taxation Type",
- f"Taxed Gain in {config.FIAT}",
- "Action",
- "Amount",
- "Asset",
- f"Sell Price in {config.FIAT}",
- "Remark",
+ row += 1
+ row += 2
+ if not self.unrealized_sells_faulty:
+ ws_summary.merge_range(
+ row, 0, row, 4, "Unrealisierte Einkünfte", header_format
+ )
+ ws_summary.write_row(
+ row + 1,
+ 0,
+ [
+ "Einkunftsart",
+ "Unrealisierter Veräußerungserlös in EUR",
+ "steuerbare Anschaffungskosten in EUR",
+ "Unrealisierter Gewinn/Verlust in EUR",
+ "davon wären steuerbar in EUR",
+ ],
+ header_format,
+ )
+ taxation_type = "Einkünfte aus privaten Veräußerungsgeschäften"
+ unrealized_report_entries = [
+ tre
+ for tre in self.tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
]
- writer.writerow(header)
- # Tax events are currently sorted by coin. Sort by time instead.
- for tx in sorted(self.tax_events, key=lambda tx: tx.op.utc_time):
- line = [
- tx.op.utc_time,
- tx.taxation_type,
- tx.taxed_gain,
- tx.op.__class__.__name__,
- tx.op.change,
- tx.op.coin,
- tx.sell_price,
- tx.remark,
- ]
- writer.writerow(line)
+ assert all(
+ taxation_type == tre.taxation_type for tre in unrealized_report_entries
+ )
+ assert all(
+ tre.gain_in_fiat is not None for tre in unrealized_report_entries
+ )
+ first_value_in_fiat = misc.dsum(
+ misc.cdecimal(tre.first_value_in_fiat)
+ for tre in tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ second_value_in_fiat = misc.dsum(
+ misc.cdecimal(tre.second_value_in_fiat)
+ for tre in tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ total_gain_fiat = misc.dsum(
+ misc.cdecimal(tre.gain_in_fiat)
+ for tre in tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ taxable_gain = misc.dsum(
+ tre.taxable_gain_in_fiat
+ for tre in tax_report_entries
+ if isinstance(tre, tr.UnrealizedSellReportEntry)
+ )
+ ws_summary.write_row(
+ row + 2,
+ 0,
+ [
+ taxation_type,
+ first_value_in_fiat,
+ second_value_in_fiat,
+ total_gain_fiat,
+ taxable_gain,
+ ],
+ )
+ # Set column format and freeze first row.
+ ws_summary.set_column(0, 0, 43)
+ ws_summary.set_column(1, 2, 18.29, fiat_format)
+ ws_summary.set_column(3, 4, 15.57, fiat_format)
+ ws_summary.freeze_panes(1, 0)
+
+ #
+ # Sheets per ReportType
+ #
+ for event_type, tax_report_entries in misc.group_by(
+ tr.sort_tax_report_entries(self.tax_report_entries), "event_type"
+ ).items():
+ ReportType = type(tax_report_entries[0])
+
+ if (
+ self.unrealized_sells_faulty
+ and ReportType is tr.UnrealizedSellReportEntry
+ ):
+ continue
+
+ ws = wb.add_worksheet(event_type)
+
+ # Header
+ labels = ReportType.excel_labels()
+ ws.write_row(0, 0, labels, header_format)
+ # Set height
+ ws.set_row(0, 45)
+ ws.autofilter(0, 0, 0, len(labels) - 1)
+
+ # Data
+ for row, entry in enumerate(tax_report_entries, 1):
+ ws.write_row(row, 0, entry.excel_values())
+
+ # Set column format and freeze first row.
+ for col, (field, width, hidden) in enumerate(
+ ReportType.excel_field_and_width()
+ ):
+ cell_format = get_format(field)
+ ws.set_column(
+ col,
+ col,
+ width,
+ cell_format,
+ dict(hidden=hidden),
+ )
+ ws.freeze_panes(1, 0)
+ wb.close()
log.info("Saved evaluation in %s.", file_path)
return file_path
diff --git a/src/transaction.py b/src/transaction.py
index 1c3c4fb..2c92e91 100644
--- a/src/transaction.py
+++ b/src/transaction.py
@@ -19,22 +19,53 @@
import dataclasses
import datetime
import decimal
+import itertools
import typing
+from copy import copy
from pathlib import Path
+from typing import ClassVar, Iterator, Optional
+import config
import log_config
+import misc
log = log_config.getLogger(__name__)
+# TODO Implementation might be cleaner, when we add a class AbstractOperation
+# which gets inherited by Fee and Operation
+# Currently it might be possible for fees to have fees, which is unwanted.
+
+
@dataclasses.dataclass
class Operation:
utc_time: datetime.datetime
platform: str
change: decimal.Decimal
coin: str
- line: int
+ line: list[int]
file_path: Path
+ fees: "Optional[list[Fee]]" = None
+ remarks: list[str] = dataclasses.field(default_factory=list)
+
+ @property
+ def remark(self) -> str:
+ return ", ".join(self.remarks)
+
+ @classmethod
+ def type_name_c(cls) -> str:
+ return cls.__name__
+
+ @property
+ def type_name(self) -> str:
+ return self.type_name_c()
+
+ identical_columns: ClassVar[list[str]] = [
+ "type_name",
+ "utc_time",
+ "platform",
+ "coin",
+ ]
def __post_init__(self):
assert self.validate_types()
@@ -50,14 +81,27 @@ def validate_types(self) -> bool:
# (without parameters)
continue
+ actual_value = getattr(self, field.name)
+
+ if field.name == "fees":
+ # TODO currently kind of ignored, would be nice when
+ # implemented correctly.
+ assert actual_value is None
+ continue
+
actual_type = typing.get_origin(field.type) or field.type
- if isinstance(actual_type, str):
- actual_type = eval(actual_type)
- elif isinstance(actual_type, typing._SpecialForm):
+ if isinstance(actual_type, typing._SpecialForm):
actual_type = field.type.__args__
+ elif isinstance(actual_type, str):
+ while isinstance(actual_type, str):
+ # BUG row:list[int] value gets only checked for list.
+ # not as list[int]
+ if actual_type.startswith("list["):
+ actual_type = list
+ else:
+ actual_type = eval(actual_type)
- actual_value = getattr(self, field.name)
if not isinstance(actual_value, actual_type):
log.warning(
f"\t{field.name}: '{type(actual_value)}' "
@@ -66,6 +110,36 @@ def validate_types(self) -> bool:
ret = False
return ret
+ def identical_to(self, op: Operation) -> bool:
+ identical_to = all(
+ getattr(self, i) == getattr(op, i) for i in self.identical_columns
+ )
+
+ if identical_to:
+ assert (
+ self.file_path == op.file_path
+ ), "Identical operations should also be in the same file."
+
+ return identical_to
+
+ @staticmethod
+ def merge(*operations: Operation) -> Operation:
+ assert len(operations) > 0, "There have to be operations to be merged."
+ assert all(
+ op1.identical_to(op2) for op1, op2 in itertools.combinations(operations, 2)
+ ), "Operations have to be identical to be merged"
+
+ # Select arbitray operation from list.
+ o = copy(operations[0])
+ # Update this operation with merged entries.
+ o.change = misc.dsum(op.change for op in operations)
+ o.line = list(itertools.chain(*(op.line for op in operations)))
+ if not all(op.fees is None for op in operations):
+ raise NotImplementedError(
+ "merging operations with fees is currently not supported"
+ )
+ return o
+
class Fee(Operation):
pass
@@ -80,10 +154,14 @@ class CoinLendEnd(Operation):
class Staking(Operation):
+ """Cold Staking or Proof Of Stake (not for mining)"""
+
pass
class StakingEnd(Operation):
+ """Cold Staking or Proof Of Stake (not for mining)"""
+
pass
@@ -92,11 +170,13 @@ class Transaction(Operation):
class Buy(Transaction):
- pass
+ link: Optional[Sell] = None
+ buying_cost: Optional[decimal.Decimal] = None
class Sell(Transaction):
- pass
+ link: Optional[Buy] = None
+ selling_value: Optional[decimal.Decimal] = None
class CoinLendInterest(Transaction):
@@ -104,6 +184,8 @@ class CoinLendInterest(Transaction):
class StakingInterest(Transaction):
+ """Cold Staking or Proof Of Stake (not for mining)"""
+
pass
@@ -116,11 +198,20 @@ class Commission(Transaction):
class Deposit(Transaction):
- pass
+ link: Optional[Withdrawal] = None
class Withdrawal(Transaction):
- pass
+ withdrawn_coins: Optional[list[SoldCoin]] = None
+ has_link: bool = False
+
+ def partial_withdrawn_coins(self, percent: decimal.Decimal) -> list[SoldCoin]:
+ assert self.withdrawn_coins
+ withdrawn_coins = [wc.partial(percent) for wc in self.withdrawn_coins]
+ assert percent * self.change == misc.dsum(
+ (wsc.sold for wsc in withdrawn_coins)
+ ), "Withdrawn coins total must be equal to the sum if the single coins."
+ return withdrawn_coins
# Helping variables
@@ -131,15 +222,698 @@ class SoldCoin:
op: Operation
sold: decimal.Decimal
+ def __post_init__(self):
+ self.validate()
+
+ def validate(self) -> None:
+ assert self.sold <= self.op.change
+
+ def partial(self, percent: decimal.Decimal) -> SoldCoin:
+ return SoldCoin(self.op, self.sold * percent)
+
@dataclasses.dataclass
-class TaxEvent:
- taxation_type: str
- taxed_gain: decimal.Decimal
- op: Operation
- sell_price: decimal.Decimal = decimal.Decimal()
- real_gain: decimal.Decimal = decimal.Decimal()
- remark: str = ""
+class TaxReportEntry:
+ event_type: ClassVar[str] = "virtual"
+ allowed_missing_fields: ClassVar[list[str]] = []
+ abs_gain_loss: ClassVar[bool] = False
+
+ first_platform: Optional[str] = None
+ second_platform: Optional[str] = None
+
+ amount: Optional[decimal.Decimal] = None
+ coin: Optional[str] = None
+
+ first_utc_time: Optional[datetime.datetime] = None
+ second_utc_time: Optional[datetime.datetime] = None
+
+ # Fee might be paid in multiple coin types (e.g. Binance BNB)
+ first_fee_amount: Optional[decimal.Decimal] = None
+ first_fee_coin: Optional[str] = None
+ first_fee_in_fiat: Optional[decimal.Decimal] = None
+ #
+ second_fee_amount: Optional[decimal.Decimal] = None
+ second_fee_coin: Optional[str] = None
+ second_fee_in_fiat: Optional[decimal.Decimal] = None
+
+ first_value_in_fiat: Optional[decimal.Decimal] = None
+ second_value_in_fiat: Optional[decimal.Decimal] = None
+ total_fee_in_fiat: Optional[decimal.Decimal] = dataclasses.field(init=False)
+
+ @property
+ def _total_fee_in_fiat(self) -> Optional[decimal.Decimal]:
+ if self.first_fee_in_fiat is None and self.second_fee_in_fiat is None:
+ return None
+ return misc.dsum(
+ map(
+ misc.cdecimal,
+ (self.first_fee_in_fiat, self.second_fee_in_fiat),
+ )
+ )
+
+ gain_in_fiat: Optional[decimal.Decimal] = dataclasses.field(init=False)
+
+ @property
+ def _gain_in_fiat(self) -> Optional[decimal.Decimal]:
+ if (
+ self.first_value_in_fiat is None
+ and self.second_value_in_fiat is None
+ and self._total_fee_in_fiat is None
+ ):
+ return None
+ gain_in_fiat = (
+ misc.cdecimal(self.first_value_in_fiat)
+ - misc.cdecimal(self.second_value_in_fiat)
+ - misc.cdecimal(self._total_fee_in_fiat)
+ )
+ if self.abs_gain_loss:
+ gain_in_fiat = abs(gain_in_fiat)
+ return gain_in_fiat
+
+ taxable_gain_in_fiat: decimal.Decimal = dataclasses.field(init=False)
+
+ @property
+ def _taxable_gain_in_fiat(self) -> Optional[decimal.Decimal]:
+ if self.is_taxable and self._gain_in_fiat:
+ return self._gain_in_fiat
+ if self.get_excel_label("taxable_gain_in_fiat") == "-":
+ return None
+ return decimal.Decimal()
+
+ is_taxable: Optional[bool] = None
+ taxation_type: Optional[str] = None
+ remark: Optional[str] = None
+
+ # Copy-paste template for subclasses.
+ # def __init__(
+ # self,
+ # first_platform: str,
+ # second_platform: str,
+ # amount: decimal.Decimal,
+ # coin: str,
+ # first_utc_time: datetime.datetime,
+ # second_utc_time: datetime.datetime,
+ # first_fee_amount: decimal.Decimal,
+ # first_fee_coin: str,
+ # first_fee_in_fiat: decimal.Decimal,
+ # second_fee_amount: decimal.Decimal,
+ # second_fee_coin: str,
+ # second_fee_in_fiat: decimal.Decimal,
+ # first_value_in_fiat: decimal.Decimal,
+ # second_value_in_fiat: decimal.Decimal,
+ # is_taxable: bool,
+ # taxation_type: str,
+ # remark: str,
+ # ) -> None:
+ # super().__init__(
+ # first_platform=first_platform,
+ # second_platform=second_platform,
+ # amount=amount,
+ # coin=coin,
+ # first_utc_time=first_utc_time,
+ # second_utc_time=second_utc_time,
+ # first_fee_amount=first_fee_amount,
+ # first_fee_coin=first_fee_coin,
+ # first_fee_in_fiat=first_fee_in_fiat,
+ # second_fee_amount=second_fee_amount,
+ # second_fee_coin=second_fee_coin,
+ # second_fee_in_fiat=second_fee_in_fiat,
+ # first_value_in_fiat=first_value_in_fiat,
+ # second_value_in_fiat=second_value_in_fiat,
+ # is_taxable=is_taxable,
+ # taxation_type=taxation_type,
+ # remark=remark,
+ # )
+
+ def __post_init__(self) -> None:
+ """Validate that all required fields (label != '-') are given."""
+ missing_field_values = [
+ field.name
+ for label, field in zip(self.excel_labels(), self.excel_fields())
+ if label != "-"
+ and getattr(self, field.name) is None
+ and field.name not in self.allowed_missing_fields
+ ]
+ assert not missing_field_values, (
+ f"{self=} : missing values for fields " f"{', '.join(missing_field_values)}"
+ )
+ assert len(self.excel_labels()) == len(self.excel_fields())
+
+ @classmethod
+ def fields(cls) -> tuple[dataclasses.Field, ...]:
+ return dataclasses.fields(cls)
+
+ @classmethod
+ def field_names(cls) -> Iterator[str]:
+ return (field.name for field in cls.fields())
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return list(cls.field_names())
+
+ @classmethod
+ def labels(cls) -> list[str]:
+ labels = cls._labels()
+ assert len(labels) == len(dataclasses.fields(cls)) - 1
+ return labels
+
+ @classmethod
+ def get_excel_label(cls, field_name: str) -> str:
+ assert len(cls.excel_labels()) == len(cls.excel_fields())
+ for label, field in zip(cls.excel_labels(), cls.excel_fields()):
+ if field.name == field_name:
+ return label
+ raise ValueError(f"{field_name} is not a field of {cls=}")
+
+ def values(self) -> Iterator:
+ return (getattr(self, f) for f in self.field_names())
+
+ @staticmethod
+ def is_excel_label(label: str) -> bool:
+ return label != "is_taxable"
+
+ @classmethod
+ def excel_fields(cls) -> tuple[dataclasses.Field, ...]:
+ return tuple(field for field in cls.fields() if cls.is_excel_label(field.name))
+
+ @classmethod
+ def excel_labels(self) -> list[str]:
+ return [label for label in self.labels() if self.is_excel_label(label)]
+
+ @classmethod
+ def excel_field_and_width(cls) -> Iterator[tuple[dataclasses.Field, float, bool]]:
+ for field in cls.fields():
+ if cls.is_excel_label(field.name):
+ label = cls.get_excel_label(field.name)
+ if label == "-":
+ width = 15.0
+ elif field.name == "taxation_type":
+ width = 43.0
+ elif field.name == "taxable_gain_in_fiat":
+ width = 13.0
+ elif (
+ field.name.endswith("_in_fiat")
+ or "coin" in field.name
+ or "platform" in field.name
+ ):
+ width = 15.0
+ elif field.type in ("datetime.datetime", "Optional[datetime.datetime]"):
+ width = 18.43
+ elif field.type in ("decimal.Decimal", "Optional[decimal.Decimal]"):
+ width = 20.0
+ else:
+ width = 18.0
+ hidden = label == "-"
+ yield field, width, hidden
+
+ def excel_values(self) -> Iterator:
+ for field_name in self.field_names():
+ if self.is_excel_label(field_name):
+ value = getattr(self, field_name)
+ label = self.get_excel_label(field_name)
+ if label == "-":
+ yield None
+ else:
+ if isinstance(value, datetime.datetime):
+ value = value.astimezone(config.LOCAL_TIMEZONE)
+ yield value
+
+
+# Bypass dataclass machinery, add a custom property function to a dataclass field.
+TaxReportEntry.total_fee_in_fiat = TaxReportEntry._total_fee_in_fiat # type:ignore
+TaxReportEntry.gain_in_fiat = TaxReportEntry._gain_in_fiat # type:ignore
+TaxReportEntry.taxable_gain_in_fiat = (
+ TaxReportEntry._taxable_gain_in_fiat # type:ignore
+)
+
+
+class LendingReportEntry(TaxReportEntry):
+ event_type = "Coin-Lending Zeitraum"
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Börse",
+ "-",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Wiedererhalten am",
+ "Verliehen am",
+ #
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ #
+ "-",
+ "-",
+ "-",
+ #
+ "Gewinn/Verlust in EUR",
+ "davon steuerbar in EUR",
+ "Einkunftsart",
+ "Bemerkung",
+ ]
+
+
+class StakingReportEntry(LendingReportEntry):
+ event_type = "Staking Zeitaraum"
+
+
+class SellReportEntry(TaxReportEntry):
+ event_type = "Verkauf"
+
+ def __init__(
+ self,
+ sell_platform: str,
+ buy_platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ sell_utc_time: datetime.datetime,
+ buy_utc_time: datetime.datetime,
+ first_fee_amount: decimal.Decimal,
+ first_fee_coin: str,
+ first_fee_in_fiat: decimal.Decimal,
+ second_fee_amount: decimal.Decimal,
+ second_fee_coin: str,
+ second_fee_in_fiat: decimal.Decimal,
+ sell_value_in_fiat: decimal.Decimal,
+ buy_cost_in_fiat: decimal.Decimal,
+ is_taxable: bool,
+ taxation_type: str,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ first_platform=sell_platform,
+ second_platform=buy_platform,
+ amount=amount,
+ coin=coin,
+ first_utc_time=sell_utc_time,
+ second_utc_time=buy_utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ second_fee_amount=second_fee_amount,
+ second_fee_coin=second_fee_coin,
+ second_fee_in_fiat=second_fee_in_fiat,
+ first_value_in_fiat=sell_value_in_fiat,
+ second_value_in_fiat=buy_cost_in_fiat,
+ is_taxable=is_taxable,
+ taxation_type=taxation_type,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Verkauf auf Börse",
+ "Erworben auf Börse",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Verkaufsdatum",
+ "Erwerbsdatum",
+ #
+ "(1) Anzahl Transaktionsgebühr",
+ "(1) Währung Transaktionsgebühr",
+ "(1) Transaktionsgebühr in EUR",
+ "(2) Anzahl Transaktionsgebühr",
+ "(2) Währung Transaktionsgebühr",
+ "(2) Transaktionsgebühr in EUR",
+ #
+ "Veräußerungserlös in EUR",
+ "Anschaffungskosten in EUR",
+ "Werbungskosten in EUR",
+ #
+ "Gewinn/Verlust in EUR",
+ "davon steuerbar in EUR",
+ "Einkunftsart",
+ "Bemerkung",
+ ]
+
+
+class UnrealizedSellReportEntry(TaxReportEntry):
+ event_type = "Bestände"
+
+ def __init__(
+ self,
+ sell_platform: str,
+ buy_platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ sell_utc_time: datetime.datetime,
+ buy_utc_time: datetime.datetime,
+ sell_value_in_fiat: decimal.Decimal,
+ buy_cost_in_fiat: decimal.Decimal,
+ is_taxable: bool,
+ taxation_type: str,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ first_platform=sell_platform,
+ second_platform=buy_platform,
+ amount=amount,
+ coin=coin,
+ first_utc_time=sell_utc_time,
+ second_utc_time=buy_utc_time,
+ first_value_in_fiat=sell_value_in_fiat,
+ second_value_in_fiat=buy_cost_in_fiat,
+ is_taxable=is_taxable,
+ taxation_type=taxation_type,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Bestand auf Börse zum Stichtag",
+ "Erworben auf Börse",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Unrealisiertes Verkaufsdatum",
+ "Erwerbsdatum",
+ #
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ #
+ "Unrealisierter Veräußerungserlös in EUR",
+ "Anschaffungskosten in EUR",
+ "-",
+ #
+ "Unrealisierter Gewinn/Verlust in EUR",
+ "davon wären steuerbar in EUR",
+ "Einkunftsart",
+ "Bemerkung",
+ ]
+
+
+class BuyReportEntry(TaxReportEntry):
+ event_type = "Kauf"
+ abs_gain_loss = True
+
+ def __init__(
+ self,
+ platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ utc_time: datetime.datetime,
+ first_fee_amount: decimal.Decimal,
+ first_fee_coin: str,
+ first_fee_in_fiat: decimal.Decimal,
+ second_fee_amount: decimal.Decimal,
+ second_fee_coin: str,
+ second_fee_in_fiat: decimal.Decimal,
+ buy_value_in_fiat: decimal.Decimal,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ second_platform=platform,
+ amount=amount,
+ coin=coin,
+ second_utc_time=utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ second_fee_amount=second_fee_amount,
+ second_fee_coin=second_fee_coin,
+ second_fee_in_fiat=second_fee_in_fiat,
+ second_value_in_fiat=buy_value_in_fiat,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "-",
+ "Erworben auf Börse",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "-",
+ "Erwerbsdatum",
+ #
+ "(1) Anzahl Transaktionsgebühr",
+ "(1) Währung Transaktionsgebühr",
+ "(1) Transaktionsgebühr in EUR",
+ "(2) Anzahl Transaktionsgebühr",
+ "(2) Währung Transaktionsgebühr",
+ "(2) Transaktionsgebühr in EUR",
+ #
+ "-",
+ "Kaufpreis in EUR",
+ "Werbungskosten in EUR",
+ #
+ "Anschaffungskosten in EUR",
+ "-",
+ "-",
+ "Bemerkung",
+ ]
+
+
+class InterestReportEntry(TaxReportEntry):
+ event_type = "Zinsen"
+
+ def __init__(
+ self,
+ platform: str,
+ amount: decimal.Decimal,
+ utc_time: datetime.datetime,
+ coin: str,
+ interest_in_fiat: decimal.Decimal,
+ taxation_type: str,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ first_platform=platform,
+ amount=amount,
+ first_utc_time=utc_time,
+ coin=coin,
+ first_value_in_fiat=interest_in_fiat,
+ is_taxable=True,
+ taxation_type=taxation_type,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Börse",
+ "-",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Erhalten am",
+ "-",
+ #
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ #
+ "Wert in EUR",
+ "-",
+ "-",
+ #
+ "Gewinn/Verlust in EUR",
+ "davon steuerbar in EUR",
+ "Einkunftsart",
+ "Bemerkung",
+ ]
+
+
+class LendingInterestReportEntry(InterestReportEntry):
+ event_type = "Coin-Lending Einkünfte"
+
+
+class StakingInterestReportEntry(InterestReportEntry):
+ event_type = "Staking Einkünfte"
+
+
+class AirdropReportEntry(TaxReportEntry):
+ event_type = "Airdrops"
+
+ def __init__(
+ self,
+ platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ utc_time: datetime.datetime,
+ in_fiat: decimal.Decimal,
+ taxation_type: str,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ first_platform=platform,
+ amount=amount,
+ coin=coin,
+ first_utc_time=utc_time,
+ first_value_in_fiat=in_fiat,
+ is_taxable=True,
+ taxation_type=taxation_type,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Börse",
+ "-",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Erhalten am",
+ "-",
+ #
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ "-",
+ #
+ "Wert in EUR",
+ "-",
+ "-",
+ #
+ "Gewinn/Verlust in EUR",
+ "davon steuerbar in EUR",
+ "Einkunftsart",
+ "Bemerkung",
+ ]
+
+
+class CommissionReportEntry(AirdropReportEntry):
+ event_type = "Belohnungen-Bonus"
+
+
+class TransferReportEntry(TaxReportEntry):
+ event_type = "Ein-& Auszahlungen"
+ abs_gain_loss = True
+
+ def __init__(
+ self,
+ first_platform: str,
+ second_platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ first_utc_time: datetime.datetime,
+ second_utc_time: datetime.datetime,
+ first_fee_amount: decimal.Decimal,
+ first_fee_coin: str,
+ first_fee_in_fiat: decimal.Decimal,
+ remark: str,
+ ) -> None:
+ super().__init__(
+ first_platform=first_platform,
+ second_platform=second_platform,
+ amount=amount,
+ coin=coin,
+ first_utc_time=first_utc_time,
+ second_utc_time=second_utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ remark=remark,
+ )
+
+ @classmethod
+ def _labels(cls) -> list[str]:
+ return [
+ "Eingang auf Börse",
+ "Ausgang von Börse",
+ #
+ "Anzahl",
+ "Währung",
+ #
+ "Eingangsdatum",
+ "Ausgangsdatum",
+ #
+ "(1) Anzahl Transaktionsgebühr",
+ "(1) Währung Transaktionsgebühr",
+ "(1) Transaktionsgebühr in EUR",
+ "-",
+ "-",
+ "-",
+ #
+ "-",
+ "-",
+ "-",
+ #
+ "Kosten in EUR",
+ "-",
+ "-",
+ "Bemerkung",
+ ]
+
+
+class DepositReportEntry(TransferReportEntry):
+ allowed_missing_fields = ["second_platform", "second_utc_time"]
+
+ def __init__(
+ self,
+ platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ utc_time: datetime.datetime,
+ first_fee_amount: decimal.Decimal,
+ first_fee_coin: str,
+ first_fee_in_fiat: decimal.Decimal,
+ remark: str,
+ ) -> None:
+ TaxReportEntry.__init__(
+ self,
+ first_platform=platform,
+ amount=amount,
+ coin=coin,
+ first_utc_time=utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ remark=remark,
+ )
+
+
+class WithdrawalReportEntry(TransferReportEntry):
+ allowed_missing_fields = ["first_platform", "first_utc_time"]
+
+ def __init__(
+ self,
+ platform: str,
+ amount: decimal.Decimal,
+ coin: str,
+ utc_time: datetime.datetime,
+ first_fee_amount: decimal.Decimal,
+ first_fee_coin: str,
+ first_fee_in_fiat: decimal.Decimal,
+ remark: str,
+ ) -> None:
+ TaxReportEntry.__init__(
+ self,
+ second_platform=platform,
+ amount=amount,
+ coin=coin,
+ second_utc_time=utc_time,
+ first_fee_amount=first_fee_amount,
+ first_fee_coin=first_fee_coin,
+ first_fee_in_fiat=first_fee_in_fiat,
+ remark=remark,
+ )
gain_operations = [
@@ -161,6 +935,22 @@ class TaxEvent:
]
operations_order = gain_operations + loss_operations
+tax_report_entry_order = [
+ BuyReportEntry,
+ SellReportEntry,
+ LendingInterestReportEntry,
+ StakingInterestReportEntry,
+ InterestReportEntry,
+ AirdropReportEntry,
+ CommissionReportEntry,
+ TransferReportEntry,
+ DepositReportEntry,
+ WithdrawalReportEntry,
+ LendingReportEntry,
+ StakingReportEntry,
+ UnrealizedSellReportEntry,
+]
+
def sort_operations(
operations: list[Operation],
@@ -179,12 +969,10 @@ def sort_operations(
Returns:
list[Operation]: Sorted operations by `operations_order` and specific keys.
"""
+ return misc.sort_by_order_and_key(operations_order, operations, keys=keys)
- def key_function(op: Operation) -> tuple:
- try:
- idx = operations_order.index(type(op))
- except ValueError:
- idx = 0
- return tuple(([getattr(op, key) for key in keys] if keys else []) + [idx])
- return sorted(operations, key=key_function)
+def sort_tax_report_entries(
+ tax_report_entries: list[TaxReportEntry],
+) -> list[TaxReportEntry]:
+ return misc.sort_by_order_and_key(tax_report_entry_order, tax_report_entries)