diff --git a/CHANGELOG.md b/CHANGELOG.md index c8bb24e..f136dc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # CHANGELOG +## 1.1.0 (2024-03-24) + +- Refactor the engine's `run` method for readability. +- Accept dictionary of inputs to `StratgyEngine` init. + ## 1.0.1 (2024-03-18) - Refactor __holidays__.py to a utils function using the `holiday` library diff --git a/examples/black_scholes_calculator.ipynb b/examples/black_scholes_calculator.ipynb index 9d51839..0032750 100644 --- a/examples/black_scholes_calculator.ipynb +++ b/examples/black_scholes_calculator.ipynb @@ -13,26 +13,14 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 1, "metadata": { "ExecuteTime": { - "end_time": "2024-03-15T17:36:52.330797Z", - "start_time": "2024-03-15T17:36:52.121865Z" + "end_time": "2024-03-15T21:15:37.010803Z", + "start_time": "2024-03-15T21:15:36.450216Z" } }, - "outputs": [ - { - "ename": "ImportError", - "evalue": "cannot import name 'get_bs_info' from 'optionlab' (/Users/mnhmbp/PycharmProjects/optionlab/optionlab/__init__.py)", - "output_type": "error", - "traceback": [ - "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", - "\u001B[0;31mImportError\u001B[0m Traceback (most recent call last)", - "Cell \u001B[0;32mIn[6], line 3\u001B[0m\n\u001B[1;32m 1\u001B[0m \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01m__future__\u001B[39;00m \u001B[38;5;28;01mimport\u001B[39;00m print_function\n\u001B[1;32m 2\u001B[0m \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01m__future__\u001B[39;00m \u001B[38;5;28;01mimport\u001B[39;00m division\n\u001B[0;32m----> 3\u001B[0m \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01moptionlab\u001B[39;00m \u001B[38;5;28;01mimport\u001B[39;00m VERSION, get_bs_info\n\u001B[1;32m 4\u001B[0m \u001B[38;5;28;01mimport\u001B[39;00m \u001B[38;5;21;01msys\u001B[39;00m\n", - "\u001B[0;31mImportError\u001B[0m: cannot import name 'get_bs_info' from 'optionlab' (/Users/mnhmbp/PycharmProjects/optionlab/optionlab/__init__.py)" - ] - } - ], + "outputs": [], "source": [ "from __future__ import print_function\n", "from __future__ import division\n", diff --git a/examples/naked_call.ipynb b/examples/naked_call.ipynb index ead2912..7b32311 100644 --- a/examples/naked_call.ipynb +++ b/examples/naked_call.ipynb @@ -20,8 +20,8 @@ "execution_count": 1, "metadata": { "ExecuteTime": { - "end_time": "2024-03-15T17:50:56.431755Z", - "start_time": "2024-03-15T17:50:55.507652Z" + "end_time": "2024-03-15T21:16:28.614458Z", + "start_time": "2024-03-15T21:16:27.865860Z" } }, "outputs": [], @@ -31,7 +31,7 @@ "import datetime as dt\n", "import sys\n", "\n", - "from optionlab import VERSION, StrategyEngine, Inputs\n", + "from optionlab import VERSION, StrategyEngine, Inputs, plot_pl\n", "\n", "%matplotlib inline" ] @@ -64,11 +64,11 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 3, "metadata": { "ExecuteTime": { - "end_time": "2024-03-13T02:27:33.629312Z", - "start_time": "2024-03-13T02:27:33.621905Z" + "end_time": "2024-03-15T21:16:37.874205Z", + "start_time": "2024-03-15T21:16:37.868325Z" } }, "outputs": [], @@ -100,11 +100,11 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 4, "metadata": { "ExecuteTime": { - "end_time": "2024-03-13T02:27:36.676391Z", - "start_time": "2024-03-13T02:27:36.670314Z" + "end_time": "2024-03-15T21:16:40.959949Z", + "start_time": "2024-03-15T21:16:40.942429Z" } }, "outputs": [ @@ -112,8 +112,8 @@ "name": "stdout", "output_type": "stream", "text": [ - "CPU times: user 1.94 ms, sys: 1.22 ms, total: 3.16 ms\n", - "Wall time: 2.36 ms\n" + "CPU times: user 2.23 ms, sys: 1.28 ms, total: 3.51 ms\n", + "Wall time: 2.57 ms\n" ] } ], @@ -131,11 +131,11 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 5, "metadata": { "ExecuteTime": { - "end_time": "2024-03-13T02:27:38.761229Z", - "start_time": "2024-03-13T02:27:38.445697Z" + "end_time": "2024-03-15T21:16:42.811428Z", + "start_time": "2024-03-15T21:16:42.639286Z" } }, "outputs": [ @@ -158,7 +158,7 @@ } ], "source": [ - "st.plot_pl()" + "plot_pl(st)" ] }, { diff --git a/optionlab/__init__.py b/optionlab/__init__.py index 84664cf..2a786f2 100644 --- a/optionlab/__init__.py +++ b/optionlab/__init__.py @@ -1,7 +1,7 @@ import typing -VERSION = "1.0.0" +VERSION = "1.1.0" if typing.TYPE_CHECKING: @@ -36,6 +36,7 @@ get_theta, ) from .engine import StrategyEngine + from .plot import plot_pl from .support import ( get_pl_profile, get_pl_profile_stock, @@ -84,6 +85,8 @@ "get_delta", "get_gamma", "get_theta", + # plot + "plot_pl", ) # A mapping of {: (package, )} defining dynamic imports @@ -124,6 +127,8 @@ "get_delta": (__package__, ".black_scholes"), "get_gamma": (__package__, ".black_scholes"), "get_theta": (__package__, ".black_scholes"), + # plot + "plot_pl": (__package__, ".plot"), } diff --git a/optionlab/engine.py b/optionlab/engine.py index e9a4820..3dc204d 100644 --- a/optionlab/engine.py +++ b/optionlab/engine.py @@ -2,12 +2,9 @@ from __future__ import print_function import datetime as dt -from typing import Literal +from typing import Literal, Any -import matplotlib.pyplot as plt -import numpy as np -from matplotlib import rcParams -from numpy import array, ndarray, zeros, full, stack, savetxt +from numpy import array, ndarray, zeros, stack, savetxt from optionlab.black_scholes import get_bs_info, get_implied_vol from optionlab.models import ( @@ -21,6 +18,7 @@ Outputs, ProbabilityOfProfitInputs, ProbabilityOfProfitArrayInputs, + OptionType, ) from optionlab.support import ( get_pl_profile, @@ -35,7 +33,7 @@ class StrategyEngine: - def __init__(self, inputs: Inputs): + def __init__(self, inputs_data: Inputs | dict): """ __init__ -> initializes class variables. @@ -43,45 +41,48 @@ def __init__(self, inputs: Inputs): ------- None. """ - self.__s = array([]) - self.__s_mc: np.ndarray = array(inputs.array_prices or []) - self.__strike: list[float] = [] - self.__premium: list[float] = [] - self.__n: list[int] = [] - self.__action: list[Action | Literal["n/a"]] = [] - self.__type: list[StrategyType] = [] - self.__expiration: list[dt.date | int] = [] - self.__prevpos: list[float] = [] - self.__usebs: list[bool] = [] - self.__profitranges: list[Range] = [] - self.__profittargrange: list[Range] = [] - self.__losslimitranges: list[Range] = [] - self.__days2maturity: list[int] = [] - self.__days2target = 30 - self.__daysinyear = 252 if inputs.discard_nonbusiness_days else 365 - self.implied_volatility: list[float] = [] + inputs = ( + inputs_data + if isinstance(inputs_data, Inputs) + else Inputs.model_validate(inputs_data) + ) + + self.s = create_price_seq(inputs.min_stock, inputs.max_stock) + self.terminal_stock_prices: ndarray = array(inputs.array_prices or []) + self.strike: list[float] = [] + self.premium: list[float] = [] + self.n: list[int] = [] + self.action: list[Action | Literal["n/a"]] = [] + self.type: list[StrategyType] = [] + self._previous_position: list[float] = [] + self._use_bs: list[bool] = [] + self._profit_ranges: list[Range] = [] + self._profit_target_range: list[Range] = [] + self._loss_limit_ranges: list[Range] = [] + self._days_to_maturity: list[int] = [] + self._days_in_year = 252 if inputs.discard_nonbusiness_days else 365 + self.days_to_target = 30 + self.implied_volatility: list[float | ndarray] = [] self.itm_probability: list[float] = [] self.delta: list[float] = [] self.gamma: list[float] = [] self.vega: list[float] = [] self.theta: list[float] = [] self.cost: list[float] = [] - self.project_probability = 0.0 + self.profit_probability = 0.0 self.project_target_probability = 0.0 self.loss_limit_probability = 0.0 - self.__distribution = inputs.distribution - self.__stockprice = inputs.stock_price - self.__volatility = inputs.volatility - self.__r = inputs.interest_rate - self.__y = inputs.dividend_yield - self.__minstock = inputs.min_stock - self.__maxstock = inputs.max_stock - self.__profittarg = inputs.profit_target - self.__losslimit = inputs.loss_limit - self.__optcommission = inputs.opt_commission - self.__stockcommission = inputs.stock_commission - self.__nmcprices = inputs.mc_prices_number - self.__compute_expectation = inputs.compute_expectation + self.distribution = inputs.distribution + self.stock_price = inputs.stock_price + self.volatility = inputs.volatility + self.r = inputs.interest_rate + self.y = inputs.dividend_yield + self.profit_target = inputs.profit_target + self.loss_limit = inputs.loss_limit + self.opt_commission = inputs.opt_commission + self.stock_commission = inputs.stock_commission + self.n_mc_prices = inputs.mc_prices_number + self.compute_expectation = inputs.compute_expectation if inputs.start_date and inputs.target_date: if inputs.discard_nonbusiness_days: @@ -91,30 +92,26 @@ def __init__(self, inputs: Inputs): else: n_discarded_days = 0 - self.__days2target = ( + self.days_to_target = ( inputs.target_date - inputs.start_date ).days - n_discarded_days else: - self.__days2target = inputs.days_to_target_date + self.days_to_target = inputs.days_to_target_date for i, strategy in enumerate(inputs.strategy): - self.__type.append(strategy.type) + self.type.append(strategy.type) if isinstance(strategy, OptionStrategy): - self.__strike.append(strategy.strike) - self.__premium.append(strategy.premium) - self.__n.append(strategy.n) - self.__action.append(strategy.action) - self.__prevpos.append(strategy.prev_pos or 0.0) + self.strike.append(strategy.strike) + self.premium.append(strategy.premium) + self.n.append(strategy.n) + self.action.append(strategy.action) + self._previous_position.append(strategy.prev_pos or 0.0) if not strategy.expiration: - if inputs.target_date: - self.__expiration.append(inputs.target_date) - - self.__days2maturity.append(self.__days2target) - self.__usebs.append(False) + self._days_to_maturity.append(self.days_to_target) + self._use_bs.append(False) elif isinstance(strategy.expiration, dt.date) and inputs.start_date: - self.__expiration.append(strategy.expiration) if inputs.discard_nonbusiness_days: n_discarded_days = get_nonbusiness_days( @@ -123,17 +120,17 @@ def __init__(self, inputs: Inputs): else: n_discarded_days = 0 - self.__days2maturity.append( + self._days_to_maturity.append( (strategy.expiration - inputs.start_date).days - n_discarded_days ) - self.__usebs.append(strategy.expiration != inputs.target_date) + self._use_bs.append(strategy.expiration != inputs.target_date) elif isinstance(strategy.expiration, int): - if strategy.expiration >= self.__days2target: - self.__days2maturity.append(strategy.expiration) + if strategy.expiration >= self.days_to_target: + self._days_to_maturity.append(strategy.expiration) - self.__usebs.append(strategy.expiration != self.__days2target) + self._use_bs.append(strategy.expiration != self.days_to_target) else: raise ValueError( "Days remaining to maturity must be greater than or equal to the number of days remaining to the target date!" @@ -142,32 +139,26 @@ def __init__(self, inputs: Inputs): raise ValueError("Expiration must be a date, an int or None.") elif isinstance(strategy, StockStrategy): - self.__n.append(strategy.n) - self.__action.append(strategy.action) - self.__prevpos.append(strategy.prev_pos or 0.0) - self.__strike.append(0.0) - self.__premium.append(0.0) - self.__usebs.append(False) - self.__days2maturity.append(-1) - self.__expiration.append( - inputs.target_date if isinstance(inputs.target_date, int) else -1 - ) + self.n.append(strategy.n) + self.action.append(strategy.action) + self._previous_position.append(strategy.prev_pos or 0.0) + self.strike.append(0.0) + self.premium.append(0.0) + self._use_bs.append(False) + self._days_to_maturity.append(-1) elif isinstance(strategy, ClosedPosition): - self.__prevpos.append(strategy.prev_pos) - self.__strike.append(0.0) - self.__n.append(0) - self.__premium.append(0.0) - self.__action.append("n/a") - self.__usebs.append(False) - self.__days2maturity.append(-1) - self.__expiration.append( - inputs.target_date if isinstance(inputs.target_date, int) else -1 - ) + self._previous_position.append(strategy.prev_pos) + self.strike.append(0.0) + self.n.append(0) + self.premium.append(0.0) + self.action.append("n/a") + self._use_bs.append(False) + self._days_to_maturity.append(-1) else: raise ValueError("Type must be 'call', 'put', 'stock' or 'closed'!") - def run(self): + def run(self) -> Outputs: """ run -> runs calculations for an options strategy. @@ -177,8 +168,8 @@ def run(self): An Outputs object containing the output of a calculation. """ - time2target = self.__days2target / self.__daysinyear - self.cost = [0.0 for _ in range(len(self.__type))] + time_to_target = self.days_to_target / self._days_in_year + self.cost = [0.0] * len(self.type) self.implied_volatility = [] self.itm_probability = [] self.delta = [] @@ -186,304 +177,79 @@ def run(self): self.vega = [] self.theta = [] - if self.__s.shape[0] == 0: - self.__s = create_price_seq(self.__minstock, self.__maxstock) - - self.profit = zeros((len(self.__type), self.__s.shape[0])) - self.strategyprofit = zeros(self.__s.shape[0]) - - if self.__compute_expectation and self.__s_mc.shape[0] == 0: - self.__s_mc = create_price_samples( - self.__stockprice, - self.__volatility, - time2target, - self.__r, - self.__distribution, - self.__y, - self.__nmcprices, + self.profit = zeros((len(self.type), self.s.shape[0])) + self.strategy_profit = zeros(self.s.shape[0]) + + if self.compute_expectation and self.terminal_stock_prices.shape[0] == 0: + self.terminal_stock_prices = create_price_samples( + self.stock_price, + self.volatility, + time_to_target, + self.r, + self.distribution, + self.y, + self.n_mc_prices, ) - if self.__s_mc.shape[0] > 0: - self.profit_mc = zeros((len(self.__type), self.__s_mc.shape[0])) - self.strategyprofit_mc = zeros(self.__s_mc.shape[0]) + if self.terminal_stock_prices.shape[0] > 0: + self.profit_mc = zeros( + (len(self.type), self.terminal_stock_prices.shape[0]) + ) + self.strategy_profit_mc = zeros(self.terminal_stock_prices.shape[0]) - for i, type in enumerate(self.__type): + for i, type in enumerate(self.type): if type in ("call", "put"): - if self.__prevpos[i] >= 0.0: - time_to_maturity = self.__days2maturity[i] / self.__daysinyear - bs = get_bs_info( - self.__stockprice, - self.__strike[i], - self.__r, - self.__volatility, - time_to_maturity, - self.__y, - ) - - self.gamma.append(bs.gamma) - self.vega.append(bs.vega) - - if type == "call": - self.implied_volatility.append( - get_implied_vol( - "call", - self.__premium[i], - self.__stockprice, - self.__strike[i], - self.__r, - time_to_maturity, - self.__y, - ) - ) - self.itm_probability.append(bs.call_itm_prob) - - if self.__action[i] == "buy": - self.delta.append(bs.call_delta) - self.theta.append(bs.call_theta / self.__daysinyear) - else: - self.delta.append(-bs.call_delta) - self.theta.append(-bs.call_theta / self.__daysinyear) - else: - self.implied_volatility.append( - get_implied_vol( - "put", - self.__premium[i], - self.__stockprice, - self.__strike[i], - self.__r, - time_to_maturity, - self.__y, - ) - ) - self.itm_probability.append(bs.put_itm_prob) - - if self.__action[i] == "buy": - self.delta.append(bs.put_delta) - self.theta.append(bs.put_theta / self.__daysinyear) - else: - self.delta.append(-bs.put_delta) - self.theta.append(-bs.put_theta / self.__daysinyear) - else: - self.implied_volatility.append(0.0) - self.itm_probability.append(0.0) - self.delta.append(0.0) - self.gamma.append(0.0) - self.vega.append(0.0) - self.theta.append(0.0) - - if self.__prevpos[i] < 0.0: # Previous position is closed - costtmp = (self.__premium[i] + self.__prevpos[i]) * self.__n[i] - - if self.__action[i] == "buy": - costtmp *= -1.0 - - self.cost[i] = costtmp - self.profit[i] += costtmp - - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] += costtmp - else: - if self.__prevpos[i] > 0.0: # Premium of the open position - opval = self.__prevpos[i] - else: # Current premium - opval = self.__premium[i] - - if self.__usebs[i]: - self.profit[i], self.cost[i] = get_pl_profile_bs( - type, - self.__action[i], - self.__strike[i], - opval, - self.__r, - (self.__days2maturity[i] - self.__days2target) - / self.__daysinyear, - self.__volatility, - self.__n[i], - self.__s, - self.__y, - self.__optcommission, - ) - - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] = get_pl_profile_bs( - type, - self.__action[i], - self.__strike[i], - opval, - self.__r, - (self.__days2maturity[i] - self.__days2target) - / self.__daysinyear, - self.__volatility, - self.__n[i], - self.__s_mc, - self.__y, - self.__optcommission, - )[0] - else: - self.profit[i], self.cost[i] = get_pl_profile( - type, - self.__action[i], - self.__strike[i], - opval, - self.__n[i], - self.__s, - self.__optcommission, - ) - - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] = get_pl_profile( - type, - self.__action[i], - self.__strike[i], - opval, - self.__n[i], - self.__s_mc, - self.__optcommission, - )[0] + self._run_option_calcs(i) elif type == "stock": - self.implied_volatility.append(0.0) - self.itm_probability.append(1.0) - self.delta.append(1.0) - self.gamma.append(0.0) - self.vega.append(0.0) - self.theta.append(0.0) - - if self.__prevpos[i] < 0.0: # Previous position is closed - costtmp = (self.__stockprice + self.__prevpos[i]) * self.__n[i] - - if self.__action[i] == "buy": - costtmp *= -1.0 - - self.cost[i] = costtmp - self.profit[i] += costtmp - - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] += costtmp - else: - if self.__prevpos[i] > 0.0: # Stock price at previous position - stockpos = self.__prevpos[i] - else: # Spot price of the stock at start date - stockpos = self.__stockprice - - self.profit[i], self.cost[i] = get_pl_profile_stock( - stockpos, - self.__action[i], - self.__n[i], - self.__s, - self.__stockcommission, - ) - - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] = get_pl_profile_stock( - stockpos, - self.__action[i], - self.__n[i], - self.__s_mc, - self.__stockcommission, - )[0] + self._run_stock_calcs(i) elif type == "closed": - self.implied_volatility.append(0.0) - self.itm_probability.append(0.0) - self.delta.append(0.0) - self.gamma.append(0.0) - self.vega.append(0.0) - self.theta.append(0.0) - - self.cost[i] = self.__prevpos[i] - self.profit[i] += self.__prevpos[i] + self._run_closed_position_calcs(i) - if self.__compute_expectation or self.__distribution == "array": - self.profit_mc[i] += self.__prevpos[i] + self.strategy_profit += self.profit[i] - self.strategyprofit += self.profit[i] + if self.compute_expectation or self.distribution == "array": + self.strategy_profit_mc += self.profit_mc[i] - if self.__compute_expectation or self.__distribution == "array": - self.strategyprofit_mc += self.profit_mc[i] + self._profit_ranges = get_profit_range(self.s, self.strategy_profit) - self.__profitranges = get_profit_range(self.__s, self.strategyprofit) - - if self.__distribution in ("normal", "laplace", "black-scholes"): + pop_inputs: ProbabilityOfProfitInputs | ProbabilityOfProfitArrayInputs + if self.distribution in ("normal", "laplace", "black-scholes"): pop_inputs = ProbabilityOfProfitInputs( - source=self.__distribution, - stock_price=self.__stockprice, - volatility=self.__volatility, - years_to_maturity=time2target, - interest_rate=self.__r, - dividend_yield=self.__y, + source=self.distribution, # type: ignore + stock_price=self.stock_price, + volatility=self.volatility, + years_to_maturity=time_to_target, + interest_rate=self.r, + dividend_yield=self.y, + ) + elif self.distribution == "array": + pop_inputs = ProbabilityOfProfitArrayInputs( + array=self.terminal_stock_prices ) - elif self.__distribution == "array": - pop_inputs = ProbabilityOfProfitArrayInputs(array=self.__s_mc) else: raise ValueError("Source not supported yet!") - self.project_probability = get_pop(self.__profitranges, pop_inputs) + self.profit_probability = get_pop(self._profit_ranges, pop_inputs) - if self.__profittarg is not None: - self.__profittargrange = get_profit_range( - self.__s, self.strategyprofit, self.__profittarg + if self.profit_target is not None: + self._profit_target_range = get_profit_range( + self.s, self.strategy_profit, self.profit_target ) self.project_target_probability = get_pop( - self.__profittargrange, pop_inputs + self._profit_target_range, pop_inputs ) - if self.__losslimit is not None: - self.__losslimitranges = get_profit_range( - self.__s, self.strategyprofit, self.__losslimit + 0.01 + if self.loss_limit is not None: + self._loss_limit_rangesm = get_profit_range( + self.s, self.strategy_profit, self.loss_limit + 0.01 ) self.loss_limit_probability = 1.0 - get_pop( - self.__losslimitranges, pop_inputs - ) - - optional_outputs = {} - - if self.__profittarg is not None: - optional_outputs["probability_of_profit_target"] = ( - self.project_target_probability + self._loss_limit_ranges, pop_inputs ) - optional_outputs["project_target_ranges"] = self.__profittargrange - - if self.__losslimit is not None: - optional_outputs["probability_of_loss_limit"] = self.loss_limit_probability - - if ( - self.__compute_expectation or self.__distribution == "array" - ) and self.__s_mc.shape[0] > 0: - tmpprof = self.strategyprofit_mc[self.strategyprofit_mc >= 0.01] - tmploss = self.strategyprofit_mc[self.strategyprofit_mc < 0.0] - optional_outputs["average_profit_from_mc"] = 0.0 - optional_outputs["average_loss_from_mc"] = ( - tmploss.mean() if tmploss.shape[0] > 0 else 0.0 - ) - - if tmpprof.shape[0] > 0: - optional_outputs["average_profit_from_mc"] = tmpprof.mean() - if tmploss.shape[0] > 0: - optional_outputs["average_loss_from_mc"] = tmploss.mean() + return self._generate_outputs() - optional_outputs["probability_of_profit_from_mc"] = ( - self.strategyprofit_mc >= 0.01 - ).sum() / self.strategyprofit_mc.shape[0] - - return Outputs.model_validate( - optional_outputs - | { - "probability_of_profit": self.project_probability, - "strategy_cost": sum(self.cost), - "per_leg_cost": self.cost, - "profit_ranges": self.__profitranges, - "minimum_return_in_the_domain": self.strategyprofit.min(), - "maximum_return_in_the_domain": self.strategyprofit.max(), - "implied_volatility": self.implied_volatility, - "in_the_money_probability": self.itm_probability, - "delta": self.delta, - "gamma": self.gamma, - "theta": self.theta, - "vega": self.vega, - } - ) - - def get_pl(self, leg=-1): + def get_pl(self, leg: int | None = None) -> tuple[ndarray, ndarray]: """ get_pl -> returns the profit/loss profile of either a leg or the whole strategy. @@ -491,7 +257,7 @@ def get_pl(self, leg=-1): Parameters ---------- leg : int, optional - Index of the leg. Default is -1 (whole strategy). + Index of the leg. Default is None (whole strategy). Returns ------- @@ -500,12 +266,12 @@ def get_pl(self, leg=-1): P/L profile : numpy array Profit/loss profile of either a leg or the whole strategy. """ - if self.profit.size > 0 and leg >= 0 and leg < self.profit.shape[0]: - return self.__s, self.profit[leg] - else: - return self.__s, self.strategyprofit + if self.profit.size > 0 and leg and leg < self.profit.shape[0]: + return self.s, self.profit[leg] - def pl_to_csv(self, filename="pl.csv", leg=-1): + return self.s, self.strategy_profit + + def pl_to_csv(self, filename: str = "pl.csv", leg: int | None = None) -> None: """ pl_to_csv -> saves the profit/loss data to a .csv file. @@ -514,227 +280,265 @@ def pl_to_csv(self, filename="pl.csv", leg=-1): filename : string, optional Name of the .csv file. Default is 'pl.csv'. leg : int, optional - Index of the leg. Default is -1 (whole strategy). + Index of the leg. Default is None (whole strategy). Returns ------- None. """ - if self.profit.size > 0 and leg >= 0 and leg < self.profit.shape[0]: - arr = stack((self.__s, self.profit[leg])) + if self.profit.size > 0 and leg and leg < self.profit.shape[0]: + arr = stack((self.s, self.profit[leg])) else: - arr = stack((self.__s, self.strategyprofit)) + arr = stack((self.s, self.strategy_profit)) savetxt( filename, arr.transpose(), delimiter=",", header="StockPrice,Profit/Loss" ) - def plot_pl(self): - """ - plot_pl -> displays the strategy's profit/loss profile diagram. + def _run_option_calcs(self, i: int): + action: Action = self.action[i] # type: ignore + type: OptionType = self.type[i] # type: ignore - Returns - ------- - None. - """ - if len(self.strategyprofit) == 0: - raise RuntimeError( - "Before plotting the profit/loss profile diagram, you must run a calculation!" + if self._previous_position[i] < 0.0: + # Previous position is closed + self.implied_volatility.append(0.0) + self.itm_probability.append(0.0) + self.delta.append(0.0) + self.gamma.append(0.0) + self.vega.append(0.0) + self.theta.append(0.0) + + cost = (self.premium[i] + self._previous_position[i]) * self.n[i] + + if self.action[i] == "buy": + cost *= -1.0 + + self.cost[i] = cost + self.profit[i] += cost + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] += cost + + return + + time_to_maturity = self._days_to_maturity[i] / self._days_in_year + bs = get_bs_info( + self.stock_price, + self.strike[i], + self.r, + self.volatility, + time_to_maturity, + self.y, + ) + + self.gamma.append(bs.gamma) + self.vega.append(bs.vega) + + self.implied_volatility.append( + get_implied_vol( + type, + self.premium[i], + self.stock_price, + self.strike[i], + self.r, + time_to_maturity, + self.y, ) + ) - rcParams.update({"figure.autolayout": True}) - - zeroline = zeros(self.__s.shape[0]) - strikecallbuy = [] - strikeputbuy = [] - zerocallbuy = [] - zeroputbuy = [] - strikecallsell = [] - strikeputsell = [] - zerocallsell = [] - zeroputsell = [] - comment = "P/L profile diagram:\n--------------------\n" - comment += "The vertical green dashed line corresponds to the position " - comment += "of the stock's spot price. The right and left arrow " - comment += "markers indicate the strike prices of calls and puts, " - comment += "respectively, with blue representing long and red representing " - comment += "short positions." - - plt.axvline(self.__stockprice, ls="--", color="green") - plt.xlabel("Stock price") - plt.ylabel("Profit/Loss") - plt.xlim(self.__s.min(), self.__s.max()) - - for i in range(len(self.__strike)): - if self.__strike[i] > 0.0: - if self.__type[i] == "call": - if self.__action[i] == "buy": - strikecallbuy.append(self.__strike[i]) - zerocallbuy.append(0.0) - elif self.__action[i] == "sell": - strikecallsell.append(self.__strike[i]) - zerocallsell.append(0.0) - elif self.__type[i] == "put": - if self.__action[i] == "buy": - strikeputbuy.append(self.__strike[i]) - zeroputbuy.append(0.0) - elif self.__action[i] == "sell": - strikeputsell.append(self.__strike[i]) - zeroputsell.append(0.0) - - if self.__profittarg is not None: - comment += " The blue dashed line represents the profit target level." - targetline = full(self.__s.shape[0], self.__profittarg) - - if self.__losslimit is not None: - comment += " The red dashed line represents the loss limit level." - lossline = full(self.__s.shape[0], self.__losslimit) - - print(comment) - - if self.__losslimit is not None and self.__profittarg is not None: - plt.plot( - self.__s, - zeroline, - "m--", - self.__s, - lossline, - "r--", - self.__s, - targetline, - "b--", - self.__s, - self.strategyprofit, - "k-", - strikecallbuy, - zerocallbuy, - "b>", - strikeputbuy, - zeroputbuy, - "b<", - strikecallsell, - zerocallsell, - "r>", - strikeputsell, - zeroputsell, - "r<", - markersize=10, + negative_multiplier = 1 if self.action[i] == "buy" else -1 + + if type == "call": + self.itm_probability.append(bs.call_itm_prob) + self.delta.append(bs.call_delta * negative_multiplier) + self.theta.append(bs.call_theta / self._days_in_year * negative_multiplier) + else: + self.itm_probability.append(bs.put_itm_prob) + self.delta.append(bs.put_delta * negative_multiplier) + self.theta.append(bs.put_theta / self._days_in_year * negative_multiplier) + + if self._previous_position[i] > 0.0: # Premium of the open position + opt_value = self._previous_position[i] + else: # Current premium + opt_value = self.premium[i] + + if self._use_bs[i]: + target_to_maturity = ( + self._days_to_maturity[i] - self.days_to_target + ) / self._days_in_year + + self.profit[i], self.cost[i] = get_pl_profile_bs( + type, + action, + self.strike[i], + opt_value, + self.r, + target_to_maturity, + self.volatility, + self.n[i], + self.s, + self.y, + self.opt_commission, ) - elif self.__losslimit is not None: - plt.plot( - self.__s, - zeroline, - "m--", - self.__s, - lossline, - "r--", - self.__s, - self.strategyprofit, - "k-", - strikecallbuy, - zerocallbuy, - "b>", - strikeputbuy, - zeroputbuy, - "b<", - strikecallsell, - zerocallsell, - "r>", - strikeputsell, - zeroputsell, - "r<", - markersize=10, + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] = get_pl_profile_bs( + type, + action, + self.strike[i], + opt_value, + self.r, + target_to_maturity, + self.volatility, + self.n[i], + self.terminal_stock_prices, + self.y, + self.opt_commission, + )[0] + else: + self.profit[i], self.cost[i] = get_pl_profile( + type, + action, + self.strike[i], + opt_value, + self.n[i], + self.s, + self.opt_commission, ) - elif self.__profittarg is not None: - plt.plot( - self.__s, - zeroline, - "m--", - self.__s, - targetline, - "b--", - self.__s, - self.strategyprofit, - "k-", - strikecallbuy, - zerocallbuy, - "b>", - strikeputbuy, - zeroputbuy, - "b<", - strikecallsell, - zerocallsell, - "r>", - strikeputsell, - zeroputsell, - "r<", - markersize=10, + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] = get_pl_profile( + type, + action, + self.strike[i], + opt_value, + self.n[i], + self.terminal_stock_prices, + self.opt_commission, + )[0] + + def _run_stock_calcs(self, i: int): + action: Action = self.action[i] # type: ignore + + self.implied_volatility.append(0.0) + self.itm_probability.append(1.0) + self.delta.append(1.0) + self.gamma.append(0.0) + self.vega.append(0.0) + self.theta.append(0.0) + + if self._previous_position[i] < 0.0: # Previous position is closed + costtmp = (self.stock_price + self._previous_position[i]) * self.n[i] + + if self.action[i] == "buy": + costtmp *= -1.0 + + self.cost[i] = costtmp + self.profit[i] += costtmp + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] += costtmp + + return + + if self._previous_position[i] > 0.0: # Stock price at previous position + stockpos = self._previous_position[i] + else: # Spot price of the stock at start date + stockpos = self.stock_price + + self.profit[i], self.cost[i] = get_pl_profile_stock( + stockpos, + action, + self.n[i], + self.s, + self.stock_commission, + ) + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] = get_pl_profile_stock( + stockpos, + action, + self.n[i], + self.terminal_stock_prices, + self.stock_commission, + )[0] + + def _run_closed_position_calcs(self, i: int): + self.implied_volatility.append(0.0) + self.itm_probability.append(0.0) + self.delta.append(0.0) + self.gamma.append(0.0) + self.vega.append(0.0) + self.theta.append(0.0) + + self.cost[i] = self._previous_position[i] + self.profit[i] += self._previous_position[i] + + if self.compute_expectation or self.distribution == "array": + self.profit_mc[i] += self._previous_position[i] + + def _generate_outputs(self) -> Outputs: + optional_outputs: dict[str, Any] = {} + + if self.profit_target is not None: + optional_outputs["probability_of_profit_target"] = ( + self.project_target_probability ) - else: - plt.plot( - self.__s, - zeroline, - "m--", - self.__s, - self.strategyprofit, - "k-", - strikecallbuy, - zerocallbuy, - "b>", - strikeputbuy, - zeroputbuy, - "b<", - strikecallsell, - zerocallsell, - "r>", - strikeputsell, - zeroputsell, - "r<", - markersize=10, + optional_outputs["project_target_ranges"] = self._profit_target_range + + if self.loss_limit is not None: + optional_outputs["probability_of_loss_limit"] = self.loss_limit_probability + + if ( + self.compute_expectation or self.distribution == "array" + ) and self.terminal_stock_prices.shape[0] > 0: + profit = self.strategy_profit_mc[self.strategy_profit_mc >= 0.01] + loss = self.strategy_profit_mc[self.strategy_profit_mc < 0.0] + optional_outputs["average_profit_from_mc"] = 0.0 + optional_outputs["average_loss_from_mc"] = ( + loss.mean() if loss.shape[0] > 0 else 0.0 ) + if profit.shape[0] > 0: + optional_outputs["average_profit_from_mc"] = profit.mean() + + if loss.shape[0] > 0: + optional_outputs["average_loss_from_mc"] = loss.mean() + + optional_outputs["probability_of_profit_from_mc"] = ( + self.strategy_profit_mc >= 0.01 + ).sum() / self.strategy_profit_mc.shape[0] + + return Outputs.model_validate( + optional_outputs + | { + "probability_of_profit": self.profit_probability, + "strategy_cost": sum(self.cost), + "per_leg_cost": self.cost, + "profit_ranges": self._profit_ranges, + "minimum_return_in_the_domain": self.strategy_profit.min(), + "maximum_return_in_the_domain": self.strategy_profit.max(), + "implied_volatility": self.implied_volatility, + "in_the_money_probability": self.itm_probability, + "delta": self.delta, + "gamma": self.gamma, + "theta": self.theta, + "vega": self.vega, + } + ) + """ Properties ---------- - days2target : int, readonly - Number of days remaining to the target date from the start date. - stockpricearray : array + stock_price_array : array A Numpy array of consecutive stock prices, from the minimum price up to the maximum price in the stock price domain. It is used to compute the strategy's P/L profile. - terminalstockprices : array + terminal_stock_prices : array A Numpy array or terminal stock prices typically generated by Monte Carlo simulations. It is used to compute strategy's expected profit and loss. """ @property - def days2target(self): - return self.__days2target - - @property - def stockpricearray(self): - return self.__s - - @stockpricearray.setter - def stockpricearray(self, s): - if isinstance(s, ndarray): - if s.shape[0] > 0: - self.__s = s - else: - raise ValueError("Empty stock price array is not allowed!") - else: - raise TypeError("A numpy array is expected!") - - @property - def terminalstockprices(self): - return self.__s_mc - - @terminalstockprices.setter - def terminalstockprices(self, s): - if isinstance(s, ndarray): - if s.shape[0] > 0: - self.__s_mc = s - else: - raise ValueError("Empty terminal stock price array is not allowed!") - else: - raise TypeError("A numpy array is expected!") + def stock_price_array(self): + return self.s diff --git a/optionlab/models.py b/optionlab/models.py index 624af26..a85981c 100644 --- a/optionlab/models.py +++ b/optionlab/models.py @@ -215,7 +215,7 @@ class Inputs(BaseModel): target_date: dt.date | None = None days_to_target_date: int = Field(0, ge=0) distribution: Distribution = "black-scholes" - mc_prices_number: float = 100000 + mc_prices_number: int = 100_000 array_prices: list[float] | None = None @field_validator("strategy") diff --git a/optionlab/plot.py b/optionlab/plot.py new file mode 100644 index 0000000..0b46901 --- /dev/null +++ b/optionlab/plot.py @@ -0,0 +1,177 @@ +from __future__ import division +from __future__ import print_function + +import matplotlib.pyplot as plt +from matplotlib import rcParams +from numpy import zeros, full + +from optionlab import StrategyEngine + + +def plot_pl(st: StrategyEngine): + """ + plot_pl -> displays the strategy's profit/loss profile diagram. + + Returns + ------- + None. + """ + if len(st.strategy_profit) == 0: + raise RuntimeError( + "Before plotting the profit/loss profile diagram, you must run a calculation!" + ) + + rcParams.update({"figure.autolayout": True}) + + zero_line = zeros(st.s.shape[0]) + strike_call_buy = [] + strike_put_buy = [] + zero_call_buy = [] + zero_put_buy = [] + strike_call_sell = [] + strike_put_sell = [] + zero_call_sell = [] + zero_put_sell = [] + comment = "P/L profile diagram:\n--------------------\n" + comment += "The vertical green dashed line corresponds to the position " + comment += "of the stock's spot price. The right and left arrow " + comment += "markers indicate the strike prices of calls and puts, " + comment += "respectively, with blue representing long and red representing " + comment += "short positions." + + plt.axvline(st.stock_price, ls="--", color="green") + plt.xlabel("Stock price") + plt.ylabel("Profit/Loss") + plt.xlim(st.s.min(), st.s.max()) + + for i, strike in enumerate(st.strike): + if strike == 0.0: + continue + + if st.type[i] == "call": + if st.action[i] == "buy": + strike_call_buy.append(strike) + zero_call_buy.append(0.0) + elif st.action[i] == "sell": + strike_call_sell.append(strike) + zero_call_sell.append(0.0) + elif st.type[i] == "put": + if st.action[i] == "buy": + strike_put_buy.append(strike) + zero_put_buy.append(0.0) + elif st.action[i] == "sell": + strike_put_sell.append(strike) + zero_put_sell.append(0.0) + + target_line = None + if st.profit_target is not None: + comment += " The blue dashed line represents the profit target level." + target_line = full(st.s.shape[0], st.profit_target) + + loss_line = None + if st.loss_limit is not None: + comment += " The red dashed line represents the loss limit level." + loss_line = full(st.s.shape[0], st.loss_limit) + + print(comment) + + if loss_line is not None and target_line is not None: + plt.plot( + st.s, + zero_line, + "m--", + st.s, + loss_line, + "r--", + st.s, + target_line, + "b--", + st.s, + st.strategy_profit, + "k-", + strike_call_buy, + zero_call_buy, + "b>", + strike_put_buy, + zero_put_buy, + "b<", + strike_call_sell, + zero_call_sell, + "r>", + strike_put_sell, + zero_put_sell, + "r<", + markersize=10, + ) + elif loss_line is not None: + plt.plot( + st.s, + zero_line, + "m--", + st.s, + loss_line, + "r--", + st.s, + st.strategy_profit, + "k-", + strike_call_buy, + zero_call_buy, + "b>", + strike_put_buy, + zero_put_buy, + "b<", + strike_call_sell, + zero_call_sell, + "r>", + strike_put_sell, + zero_put_sell, + "r<", + markersize=10, + ) + elif target_line is not None: + plt.plot( + st.s, + zero_line, + "m--", + st.s, + target_line, + "b--", + st.s, + st.strategy_profit, + "k-", + strike_call_buy, + zero_call_buy, + "b>", + strike_put_buy, + zero_put_buy, + "b<", + strike_call_sell, + zero_call_sell, + "r>", + strike_put_sell, + zero_put_sell, + "r<", + markersize=10, + ) + else: + plt.plot( + st.s, + zero_line, + "m--", + st.s, + st.strategy_profit, + "k-", + strike_call_buy, + zero_call_buy, + "b>", + strike_put_buy, + zero_put_buy, + "b<", + strike_call_sell, + zero_call_sell, + "r>", + strike_put_sell, + zero_put_sell, + "r<", + markersize=10, + ) diff --git a/optionlab/support.py b/optionlab/support.py index 2345b06..ba9bb81 100644 --- a/optionlab/support.py +++ b/optionlab/support.py @@ -1,5 +1,7 @@ from __future__ import division +from functools import lru_cache + import numpy as np from numpy import ndarray, exp, abs, round, diff, flatnonzero, arange, inf from numpy.lib.scimath import log, sqrt @@ -13,6 +15,7 @@ Distribution, ProbabilityOfProfitInputs, ProbabilityOfProfitArrayInputs, + Range, ) @@ -133,6 +136,7 @@ def get_pl_profile_bs( return fac * n * (calcprice - val) - commission, n * cost - commission +@lru_cache def create_price_seq(min_price: float, max_price: float) -> np.ndarray: """ create_price_seq(min_price, max_price) -> generates a sequence of stock prices @@ -151,6 +155,7 @@ def create_price_seq(min_price: float, max_price: float) -> np.ndarray: raise ValueError("Maximum price cannot be less than minimum price!") +@lru_cache def create_price_samples( s0: float, volatility: float, @@ -159,7 +164,7 @@ def create_price_samples( distribution: Distribution = "black-scholes", y: float = 0.0, n: int = 100_000, -) -> float: +) -> np.ndarray: """ create_price_samples(s0, volatility, years_to_maturity, r, distribution, y, n) -> generates random stock prices at maturity according to a statistical distribution. @@ -193,7 +198,7 @@ def create_price_samples( def get_profit_range( s: np.ndarray, profit: np.ndarray, target: float = 0.01 -) -> list[list[float]]: +) -> list[Range]: """ get_profit_range(s, profit, target) -> returns pairs of stock prices, as a list, for which an option trade is expected to get the desired profit in between. @@ -206,41 +211,41 @@ def get_profit_range( target: profit target (0.01 is the default). """ - profitrange: list[list[float]] = [] - t = s[profit >= target] if t.shape[0] == 0: - return profitrange + return [] + + profit_range: list[list[float]] = [] mask1 = diff(t) <= target + 0.001 mask2 = diff(t) > target + 0.001 maxi = flatnonzero(mask1[:-1] & mask2[1:]) + 1 for i in range(maxi.shape[0] + 1): - profitrange.append([]) + profit_range.append([]) if i == 0: if t[0] == s[0]: - profitrange[0].append(0.0) + profit_range[0].append(0.0) else: - profitrange[0].append(t[0]) + profit_range[0].append(t[0]) else: - profitrange[i].append(t[maxi[i - 1] + 1]) + profit_range[i].append(t[maxi[i - 1] + 1]) if i == maxi.shape[0]: if t[t.shape[0] - 1] == s[s.shape[0] - 1]: - profitrange[maxi.shape[0]].append(inf) + profit_range[maxi.shape[0]].append(inf) else: - profitrange[maxi.shape[0]].append(t[t.shape[0] - 1]) + profit_range[maxi.shape[0]].append(t[t.shape[0] - 1]) else: - profitrange[i].append(t[maxi[i]]) + profit_range[i].append(t[maxi[i]]) - return profitrange + return [(r[0], r[1]) for r in profit_range] def get_pop( - profit_ranges: list[list[float]], + profit_ranges: list[Range], inputs: ProbabilityOfProfitInputs | ProbabilityOfProfitArrayInputs, ) -> float: """ diff --git a/pyproject.toml b/pyproject.toml index 766bf33..50cc36f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "optionlab" -version = "1.0.1" +version = "1.1.0" description = "Evaluate option strategies" authors = ["rgaveiga"] readme = "README.md" diff --git a/tests/test_core.py b/tests/test_core.py index 5142c70..9bf4535 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -59,7 +59,6 @@ def test_covered_call(nvidia): st = StrategyEngine(inputs) outputs = st.run() - # Print useful information on screen assert isinstance(outputs, Outputs) assert outputs.model_dump(exclude_none=True) == pytest.approx(COVERED_CALL_RESULT) diff --git a/tests/test_misc.py b/tests/test_misc.py index 9a62eee..d5ac9c7 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -3,6 +3,7 @@ import pytest +from optionlab import create_price_samples from optionlab.utils import get_nonbusiness_days @@ -51,3 +52,31 @@ def test_benchmark_holidays(benchmark): benchmark(test_holidays_benchmark) assert time.time() - start_time < 2 # takes avg. ~1.1ms on M1 + + +def test_cache_price_samples(): + create_price_samples.cache_clear() + + sample1 = create_price_samples(168.99, 0.483, 23 / 365, 0.045) + + cache_info1 = create_price_samples.cache_info() + assert cache_info1.misses == 1 + assert cache_info1.hits == 0 + assert cache_info1.currsize == 1 + assert sample1.sum() == pytest.approx(16955828.375046223, rel=0.01) + + sample2 = create_price_samples(168.99, 0.483, 23 / 365, 0.045) + + cache_info2 = create_price_samples.cache_info() + assert cache_info2.misses == 1 + assert cache_info2.hits == 1 + assert cache_info2.currsize == 1 + assert sample2.sum() == pytest.approx(16955828.375046223, rel=0.01) + + sample3 = create_price_samples(167, 0.483, 23 / 365, 0.045) + + cache_info3 = create_price_samples.cache_info() + assert cache_info3.misses == 2 + assert cache_info3.hits == 1 + assert cache_info3.currsize == 2 + assert sample3.sum() == pytest.approx(16741936.007518211, rel=0.01)