diff --git a/hathor/exception.py b/hathor/exception.py index 1d3d42547..eafc98c1f 100644 --- a/hathor/exception.py +++ b/hathor/exception.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - -class HathorError(Exception): - """Base class for exceptions in Hathor.""" - pass +from hathorlib.exceptions import HathorError # noqa: F401 class BuilderError(Exception): diff --git a/hathor/nanocontracts/blueprint_syntax_validation.py b/hathor/nanocontracts/blueprint_syntax_validation.py index eb1307c41..c57524757 100644 --- a/hathor/nanocontracts/blueprint_syntax_validation.py +++ b/hathor/nanocontracts/blueprint_syntax_validation.py @@ -12,88 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -import inspect -from typing import Callable - -from hathor.nanocontracts.exception import BlueprintSyntaxError - - -def validate_has_self_arg(fn: Callable, annotation_name: str) -> None: - """Validate the `self` arg of a callable.""" - arg_spec = inspect.getfullargspec(fn) - if len(arg_spec.args) == 0: - raise BlueprintSyntaxError(f'@{annotation_name} method must have `self` argument: `{fn.__name__}()`') - - if arg_spec.args[0] != 'self': - raise BlueprintSyntaxError( - f'@{annotation_name} method first argument must be called `self`: `{fn.__name__}()`' - ) - - if 'self' in arg_spec.annotations.keys(): - raise BlueprintSyntaxError(f'@{annotation_name} method `self` argument must not be typed: `{fn.__name__}()`') - - -def validate_method_types(fn: Callable) -> None: - """Validate the arg and return types of a callable.""" - special_args = ['self'] - arg_spec = inspect.getfullargspec(fn) - - if 'return' not in arg_spec.annotations: - raise BlueprintSyntaxError(f'missing return type on method `{fn.__name__}`') - - # TODO: This currently fails for types such as unions, probably because this is the wrong - # parsing function to use. Fix this. - # from hathor.nanocontracts.fields import get_field_class_for_attr - # return_type = arg_spec.annotations['return'] - # if return_type is not None: - # try: - # get_field_class_for_attr(return_type) - # except UnknownFieldType: - # raise BlueprintSyntaxError( - # f'unsupported return type `{return_type}` on method `{fn.__name__}`' - # ) - - for arg_name in arg_spec.args: - if arg_name in special_args: - continue - - if arg_name not in arg_spec.annotations: - raise BlueprintSyntaxError(f'argument `{arg_name}` on method `{fn.__name__}` must be typed') - - # TODO: This currently fails for @view methods with NamedTuple as args for example, - # because API calls use a different parsing function. Fix this. - # arg_type = arg_spec.annotations[arg_name] - # try: - # get_field_class_for_attr(arg_type) - # except UnknownFieldType: - # raise BlueprintSyntaxError( - # f'unsupported type `{arg_type.__name__}` on argument `{arg_name}` of method `{fn.__name__}`' - # ) - - -def validate_has_ctx_arg(fn: Callable, annotation_name: str) -> None: - """Validate the context arg of a callable.""" - arg_spec = inspect.getfullargspec(fn) - - if len(arg_spec.args) < 2: - raise BlueprintSyntaxError( - f'@{annotation_name} method must have `Context` argument: `{fn.__name__}()`' - ) - - from hathor.nanocontracts import Context - second_arg = arg_spec.args[1] - if arg_spec.annotations[second_arg] is not Context: - raise BlueprintSyntaxError( - f'@{annotation_name} method second arg `{second_arg}` argument must be of type `Context`: ' - f'`{fn.__name__}()`' - ) - - -def validate_has_not_ctx_arg(fn: Callable, annotation_name: str) -> None: - """Validate that a callable doesn't have a `Context` arg.""" - from hathor.nanocontracts import Context - arg_spec = inspect.getfullargspec(fn) - if Context in arg_spec.annotations.values(): - raise BlueprintSyntaxError(f'@{annotation_name} method cannot have arg with type `Context`: `{fn.__name__}()`') +# Re-export from hathorlib for backward compatibility +from hathorlib.nanocontracts.blueprint_syntax_validation import ( # noqa: F401 + validate_has_ctx_arg, + validate_has_not_ctx_arg, + validate_has_self_arg, + validate_method_types, +) diff --git a/hathor/nanocontracts/exception.py b/hathor/nanocontracts/exception.py index 3c95bab5f..45a9bbba7 100644 --- a/hathor/nanocontracts/exception.py +++ b/hathor/nanocontracts/exception.py @@ -12,22 +12,49 @@ # See the License for the specific language governing permissions and # limitations under the License. -from hathor.exception import HathorError from hathor.transaction.exceptions import TxValidationError - -""" -All exceptions in this module MUST inherit from NCFail so they're -correctly caught by the block consensus to fail NC transactions. -""" - - -class NCFail(HathorError): - """Raised by Blueprint's methods to fail execution.""" - - -class BlueprintSyntaxError(SyntaxError, NCFail): - """Raised when a blueprint contains invalid syntax.""" - pass +# Re-export all exceptions from hathorlib for backward compatibility +from hathorlib.nanocontracts.exception import ( # noqa: F401 + BlueprintDoesNotExist, + BlueprintSyntaxError, + NanoContractDoesNotExist, + NCAlreadyInitializedContractError, + NCAttributeError, + NCContractCreationAtMempool, + NCContractCreationNotFound, + NCContractCreationVoided, + NCDisabledBuiltinError, + NCFail, + NCForbiddenAction, + NCForbiddenReentrancy, + NCInsufficientFunds, + NCInvalidAction, + NCInvalidContext, + NCInvalidContractId, + NCInvalidFee, + NCInvalidFeePaymentToken, + NCInvalidInitializeMethodCall, + NCInvalidPublicMethodCallFromView, + NCInvalidSyscall, + NCNumberOfCallsExceeded, + NCRecursionError, + NCSerializationArgTooLong, + NCSerializationError, + NCSerializationTypeError, + NCTokenAlreadyExists, + NCTypeError, + NCUninitializedContractError, + NCViewMethodError, + OCBBlueprintNotConfirmed, + OCBInvalidBlueprintVertexType, + OCBInvalidScript, + OCBOutOfFuelDuringLoading, + OCBOutOfMemoryDuringLoading, + OCBPubKeyNotAllowed, + UnknownFieldType, +) + +# hathor-specific exceptions that depend on TxValidationError class NCTxValidationError(TxValidationError, NCFail): @@ -46,178 +73,10 @@ class NCInvalidSeqnum(NCTxValidationError, NCFail): pass -class NanoContractDoesNotExist(NCFail): - pass - - -class BlueprintDoesNotExist(NCFail): - pass - - -class NCSerializationError(NCFail): - pass - - -class NCSerializationArgTooLong(NCSerializationError): - pass - - -class NCSerializationTypeError(NCSerializationError): - pass - - -class NCViewMethodError(NCFail): - """Raised when a view method changes the state of the contract.""" - pass - - class NCMethodNotFound(NCTxValidationError): """Raised when a method is not found in a nano contract.""" pass -class NCInsufficientFunds(NCFail): - """Raised when there is not enough funds to withdrawal from a nano contract.""" - pass - - -class NCAttributeError(NCFail): - pass - - -class NCInvalidContext(NCFail): - """Raised when trying to run a method with an invalid context.""" - pass - - -class NCRecursionError(NCFail): - """Raised when recursion gets too deep.""" - - -class NCNumberOfCallsExceeded(NCFail): - """Raised when the total number of calls have been exceeded.""" - - -class NCInvalidContractId(NCFail): - """Raised when a contract call is invalid.""" - - class NCInvalidMethodCall(NCTxValidationError): """Raised when a contract calls another contract's invalid method.""" - - -class NCInvalidFeePaymentToken(NCFail): - """Raised when a payment token is invalid.""" - - -class NCInvalidInitializeMethodCall(NCFail): - """Raised when a contract calls another contract's initialize method.""" - - -class NCInvalidPublicMethodCallFromView(NCFail): - """Raised when a contract calls another contract's initialize method.""" - - -class NCAlreadyInitializedContractError(NCFail): - """Raised when one tries to initialize a contract that has already been initialized.""" - - -class NCUninitializedContractError(NCFail): - """Raised when a contract calls a method from an uninitialized contract.""" - - -class NCInvalidAction(NCFail): - """Raised when an action is invalid.""" - pass - - -class NCInvalidFee(NCFail): - """Raised when a fee is invalid.""" - pass - - -class NCInvalidSyscall(NCFail): - """Raised when a syscall is invalid.""" - pass - - -class NCTokenAlreadyExists(NCFail): - """Raised when one tries to create a duplicated token.""" - - -class NCForbiddenAction(NCFail): - """Raised when an action is forbidden on a method.""" - pass - - -class NCForbiddenReentrancy(NCFail): - """Raised when a reentrancy is forbidden on a method.""" - pass - - -class NCTypeError(NCFail): - """Raised when a wrong type is used.""" - pass - - -class UnknownFieldType(NCFail): - """Raised when there is no field available for a given type.""" - pass - - -class NCContractCreationNotFound(NCFail): - """Raised when a nano contract creation transaction is not found. - - This error might also happen when the transaction is at the mempool or when it fails execution.""" - pass - - -class NCContractCreationAtMempool(NCContractCreationNotFound): - """Raised when a nano contract creation transaction is at the mempool, so it has not been - executed yet.""" - pass - - -class NCContractCreationVoided(NCContractCreationNotFound): - """Raised when a nano contract creation transaction is voided. - - The two most common reasons to have a voided transaction is because it was voided by - another transaction (e.g., double spending) or it has failed execution.""" - pass - - -class OCBInvalidScript(NCFail): - """Raised when an On-Chain Blueprint script does not pass our script restrictions check. - """ - pass - - -class OCBInvalidBlueprintVertexType(NCFail): - """Raised when a vertex that is not an OnChainBlueprint is used as a blueprint-id. - """ - pass - - -class OCBBlueprintNotConfirmed(NCFail): - """Raised when trying to use an OnChainBlueprint that is not confirmed by a block in the current best chain. - """ - - -class OCBPubKeyNotAllowed(NCFail): - """Raised when an OnChainBlueprint transaction uses a pubkey that is not explicitly allowed in the settings. - """ - - -class OCBOutOfFuelDuringLoading(NCFail): - """Raised when loading an On-chain Blueprint and the execution exceeds the fuel limit. - """ - - -class OCBOutOfMemoryDuringLoading(NCFail): - """Raised when loading an On-chain Blueprint and the execution exceeds the memory limit. - """ - - -class NCDisabledBuiltinError(NCFail): - """Raised when a disabled builtin is used during creation or execution of a nanocontract. - """ diff --git a/hathor/nanocontracts/types.py b/hathor/nanocontracts/types.py index 3d1835988..1ba911b56 100644 --- a/hathor/nanocontracts/types.py +++ b/hathor/nanocontracts/types.py @@ -12,524 +12,66 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -import inspect -from dataclasses import dataclass -from enum import Enum, unique -from typing import Any, Callable, Generic, Self, TypeAlias, TypeVar - -from typing_extensions import override - -from hathor.conf.settings import HATHOR_TOKEN_UID, HathorSettings -from hathor.crypto.util import decode_address, get_address_b58_from_bytes -from hathor.nanocontracts.blueprint_syntax_validation import ( - validate_has_ctx_arg, - validate_has_not_ctx_arg, - validate_has_self_arg, - validate_method_types, +# Re-export all types from hathorlib for backward compatibility +from hathorlib.nanocontracts.types import ( # noqa: F401 + BLUEPRINT_EXPORT_NAME, + HATHOR_TOKEN_UID, + NC_ALLOW_REENTRANCY, + NC_ALLOWED_ACTIONS_ATTR, + NC_FALLBACK_METHOD, + NC_INITIALIZE_METHOD, + NC_METHOD_TYPE_ATTR, + Address, + Amount, + BaseAction, + BaseAuthorityAction, + BaseTokenAction, + BlueprintId, + CallerId, + ContractId, + NCAcquireAuthorityAction, + NCAction, + NCActionType, + NCArgs, + NCDepositAction, + NCFee, + NCGrantAuthorityAction, + NCMethodType, + NCParsedArgs, + NCRawArgs, + NCWithdrawalAction, + RawSignedData, + SignedData, + Timestamp, + TokenUid, + TxOutputScript, + VertexId, + blueprint_id_from_bytes, + export, + fallback, + public, + set_checksig_backend, + view, ) -from hathor.nanocontracts.exception import BlueprintSyntaxError, NCSerializationError -from hathor.nanocontracts.faux_immutable import FauxImmutableMeta -from hathor.serialization import SerializationError -from hathor.transaction.scripts.opcode import OpcodesVersion -from hathor.transaction.util import bytes_to_int, get_deposit_token_withdraw_amount, int_to_bytes -from hathor.utils.typing import InnerTypeMixin - -# XXX: mypy gives the following errors on all subclasses of `bytes` that use FauxImmutableMeta: -# -# Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its -# bases -# -# However `bytes` metaclass appears to be `type` (just like `int`'s) and `FauxImmutableMeta` correctly inherits from -# `type`, so it seems like it's a mypy error. - - -# Types to be used by blueprints. -class Address(bytes, metaclass=FauxImmutableMeta): - __allow_faux_inheritance__ = True - __allow_faux_dunder__ = ('__str__', '__repr__') - __slots__ = () - - @classmethod - def from_str(cls, /, encoded_address: str) -> Self: - if not isinstance(encoded_address, str): - raise TypeError(f'expected `str` instance, got `{type(encoded_address)}` instance') - return cls(decode_address(encoded_address)) - - def __str__(self) -> str: - encoded_address = get_address_b58_from_bytes(self) - return encoded_address - - def __repr__(self) -> str: - encoded_address = str(self) # uses __str__ - # XXX: should we support `Address(encoded_address)` constructor? - return f"Address.from_str({encoded_address!r})" - - -class VertexId(bytes, metaclass=FauxImmutableMeta): - __slots__ = () - __allow_faux_inheritance__ = True - - -class BlueprintId(VertexId): - __slots__ = () - __allow_faux_inheritance__ = True - - -class ContractId(VertexId): - __slots__ = () - __allow_faux_inheritance__ = True - - -class TokenUid(bytes, metaclass=FauxImmutableMeta): - __slots__ = () - __allow_faux_inheritance__ = True - - -class TxOutputScript(bytes, metaclass=FauxImmutableMeta): - __slots__ = () - __allow_faux_inheritance__ = True - - -class Amount(int, metaclass=FauxImmutableMeta): - __slots__ = () - __allow_faux_inheritance__ = True - - -class Timestamp(int, metaclass=FauxImmutableMeta): - __slots__ = () - __allow_faux_inheritance__ = True - - -CallerId: TypeAlias = Address | ContractId - -T = TypeVar('T') -B = TypeVar('B', bound=type) - -NC_INITIALIZE_METHOD: str = 'initialize' -NC_FALLBACK_METHOD: str = 'fallback' -NC_ALLOWED_ACTIONS_ATTR = '__nc_allowed_actions' -NC_ALLOW_REENTRANCY = '__nc_allow_reentrancy' -NC_METHOD_TYPE_ATTR: str = '__nc_method_type' -# this is the name we use internally to store the blueprint that is exported by a module -BLUEPRINT_EXPORT_NAME: str = '__blueprint__' +def _checksig_impl(sighash_all_data: bytes, script_input: bytes, script: bytes) -> bool: + from hathor.transaction.exceptions import ScriptError + from hathor.transaction.scripts import ScriptExtras + from hathor.transaction.scripts.execute import raw_script_eval + from hathor.transaction.scripts.opcode import OpcodesVersion + class _FakeTx: + def get_sighash_all_data(self) -> bytes: + return sighash_all_data -class NCMethodType(Enum): - PUBLIC = 'public' - VIEW = 'view' - FALLBACK = 'fallback' - - -def blueprint_id_from_bytes(data: bytes) -> BlueprintId: - """Create a BlueprintId from a bytes object.""" - return BlueprintId(VertexId(data)) - - -class RawSignedData(InnerTypeMixin[T], Generic[T]): - """A wrapper class to sign data. - - T must be serializable. - """ - - def __init__(self, data: T, script_input: bytes) -> None: - from hathor.nanocontracts.nc_types import make_nc_type_for_return_type as make_nc_type - self.data = data - self.script_input = script_input - self.__nc_type = make_nc_type(self.__inner_type__) - - def __eq__(self, other): - if not isinstance(other, RawSignedData): - return False - if self.data != other.data: - return False - if self.script_input != other.script_input: - return False + extras = ScriptExtras(tx=_FakeTx(), version=OpcodesVersion.V2) # type: ignore[arg-type] + try: + raw_script_eval(input_data=script_input, output_script=script, extras=extras) + except ScriptError: + return False + else: return True - def get_data_bytes(self) -> bytes: - """Return the serialized data.""" - return self.__nc_type.to_bytes(self.data) - - def get_sighash_all_data(self) -> bytes: - """Workaround to be able to pass `self` for ScriptExtras. See the method `checksig`.""" - return self.get_data_bytes() - - def checksig(self, script: bytes) -> bool: - """Check if `self.script_input` satisfies the provided script.""" - from hathor.transaction.exceptions import ScriptError - from hathor.transaction.scripts import ScriptExtras - from hathor.transaction.scripts.execute import raw_script_eval - extras = ScriptExtras(tx=self, version=OpcodesVersion.V2) # type: ignore[arg-type] - try: - raw_script_eval(input_data=self.script_input, output_script=script, extras=extras) - except ScriptError: - return False - else: - return True - - -class SignedData(InnerTypeMixin[T], Generic[T]): - def __init__(self, data: T, script_input: bytes) -> None: - self.data = data - self.script_input = script_input - - def __eq__(self, other): - if not isinstance(other, SignedData): - return False - if self.data != other.data: - return False - if self.script_input != other.script_input: - return False - return True - - def _get_raw_signed_data(self, contract_id: ContractId) -> RawSignedData: - # XXX: for some reason mypy doesn't recognize that self.__inner_type__ is defined even though it should - raw_type: type = tuple[ContractId, self.__inner_type__] # type: ignore[name-defined] - raw_data = (contract_id, self.data) - return RawSignedData[raw_type](raw_data, self.script_input) # type: ignore[valid-type] - - def get_data_bytes(self, contract_id: ContractId) -> bytes: - """Return the serialized data.""" - raw_signed_data = self._get_raw_signed_data(contract_id) - return raw_signed_data.get_data_bytes() - - def checksig(self, contract_id: ContractId, script: bytes) -> bool: - """Check if script_input satisfies the provided script.""" - raw_signed_data = self._get_raw_signed_data(contract_id) - return raw_signed_data.checksig(script) - - -def _set_method_type(fn: Callable, method_type: NCMethodType) -> None: - if hasattr(fn, NC_METHOD_TYPE_ATTR): - raise BlueprintSyntaxError(f'method must be annotated with at most one method type: `{fn.__name__}()`') - setattr(fn, NC_METHOD_TYPE_ATTR, method_type) - - -def _create_decorator_with_allowed_actions( - *, - decorator_body: Callable[[Callable], None], - maybe_fn: Callable | None, - allow_deposit: bool | None, - allow_withdrawal: bool | None, - allow_grant_authority: bool | None, - allow_acquire_authority: bool | None, - allow_actions: list[NCActionType] | None, - allow_reentrancy: bool, -) -> Callable: - """Internal utility to create a decorator that sets allowed actions.""" - flags = { - NCActionType.DEPOSIT: allow_deposit, - NCActionType.WITHDRAWAL: allow_withdrawal, - NCActionType.GRANT_AUTHORITY: allow_grant_authority, - NCActionType.ACQUIRE_AUTHORITY: allow_acquire_authority, - } - - def decorator(fn: Callable) -> Callable: - if allow_actions is not None and any(flag is not None for flag in flags.values()): - raise BlueprintSyntaxError(f'use only one of `allow_actions` or per-action flags: `{fn.__name__}()`') - - allowed_actions = set(allow_actions) if allow_actions else set() - allowed_actions.update(action for action, flag in flags.items() if flag) - setattr(fn, NC_ALLOWED_ACTIONS_ATTR, allowed_actions) - setattr(fn, NC_ALLOW_REENTRANCY, allow_reentrancy) - - decorator_body(fn) - return fn - - if maybe_fn is not None: - return decorator(maybe_fn) - return decorator - - -def public( - maybe_fn: Callable | None = None, - /, - *, - allow_deposit: bool | None = None, - allow_withdrawal: bool | None = None, - allow_grant_authority: bool | None = None, - allow_acquire_authority: bool | None = None, - allow_actions: list[NCActionType] | None = None, - allow_reentrancy: bool = False, -) -> Callable: - """Decorator to mark a blueprint method as public.""" - def decorator(fn: Callable) -> None: - annotation_name = 'public' - forbidden_methods = {NC_FALLBACK_METHOD} - _set_method_type(fn, NCMethodType.PUBLIC) - - if fn.__name__ in forbidden_methods: - raise BlueprintSyntaxError(f'`{fn.__name__}` method cannot be annotated with @{annotation_name}') - - validate_has_self_arg(fn, annotation_name) - validate_method_types(fn) - validate_has_ctx_arg(fn, annotation_name) - - return _create_decorator_with_allowed_actions( - decorator_body=decorator, - maybe_fn=maybe_fn, - allow_deposit=allow_deposit, - allow_withdrawal=allow_withdrawal, - allow_grant_authority=allow_grant_authority, - allow_acquire_authority=allow_acquire_authority, - allow_actions=allow_actions, - allow_reentrancy=allow_reentrancy, - ) - - -def view(fn: Callable) -> Callable: - """Decorator to mark a blueprint method as view (read-only).""" - annotation_name = 'view' - forbidden_methods = {NC_INITIALIZE_METHOD, NC_FALLBACK_METHOD} - _set_method_type(fn, NCMethodType.VIEW) - - if fn.__name__ in forbidden_methods: - raise BlueprintSyntaxError(f'`{fn.__name__}` method cannot be annotated with @{annotation_name}') - - validate_has_self_arg(fn, annotation_name) - validate_has_not_ctx_arg(fn, annotation_name) - validate_method_types(fn) - return fn - - -def export(cls: B) -> B: - """Decorator to export the main Blueprint of a Python module.""" - current_frame = inspect.currentframe() - assert current_frame is not None - module_frame = current_frame.f_back - assert module_frame is not None - module_globals = module_frame.f_globals - if BLUEPRINT_EXPORT_NAME in module_globals: - raise TypeError('A Blueprint has already been registered') - module_globals[BLUEPRINT_EXPORT_NAME] = cls - return cls - - -def fallback( - maybe_fn: Callable | None = None, - /, - *, - allow_deposit: bool | None = None, - allow_withdrawal: bool | None = None, - allow_grant_authority: bool | None = None, - allow_acquire_authority: bool | None = None, - allow_actions: list[NCActionType] | None = None, - allow_reentrancy: bool = False, -) -> Callable: - """Decorator to mark a blueprint method as fallback. The method must also be called `fallback`.""" - def decorator(fn: Callable) -> None: - annotation_name = 'fallback' - _set_method_type(fn, NCMethodType.FALLBACK) - - if fn.__name__ != NC_FALLBACK_METHOD: - raise BlueprintSyntaxError(f'@{annotation_name} method must be called `fallback`: `{fn.__name__}()`') - - validate_has_self_arg(fn, annotation_name) - validate_method_types(fn) - validate_has_ctx_arg(fn, annotation_name) - - arg_spec = inspect.getfullargspec(fn) - msg = f'@{annotation_name} method must have these args: `ctx: Context, method_name: str, nc_args: NCArgs`' - - if len(arg_spec.args) < 4: - raise BlueprintSyntaxError(msg) - - third_arg = arg_spec.args[2] - fourth_arg = arg_spec.args[3] - - if arg_spec.annotations[third_arg] is not str or arg_spec.annotations[fourth_arg] is not NCArgs: - raise BlueprintSyntaxError(msg) - - return _create_decorator_with_allowed_actions( - decorator_body=decorator, - maybe_fn=maybe_fn, - allow_deposit=allow_deposit, - allow_withdrawal=allow_withdrawal, - allow_grant_authority=allow_grant_authority, - allow_acquire_authority=allow_acquire_authority, - allow_actions=allow_actions, - allow_reentrancy=allow_reentrancy, - ) - - -@unique -class NCActionType(Enum): - """ - Types of interactions a transaction might have with a contract. - Check the respective dataclasses below for more info. - """ - DEPOSIT = 1 - WITHDRAWAL = 2 - GRANT_AUTHORITY = 3 - ACQUIRE_AUTHORITY = 4 - - def __str__(self) -> str: - return self.name - - def to_bytes(self) -> bytes: - return int_to_bytes(number=self.value, size=1) - - @staticmethod - def from_bytes(data: bytes) -> NCActionType: - return NCActionType(bytes_to_int(data)) - - -@dataclass(slots=True, frozen=True, kw_only=True) -class BaseAction: - """The base dataclass for all NC actions. Shouldn't be instantiated directly.""" - token_uid: TokenUid - - @property - def type(self) -> NCActionType: - """The respective NCActionType for each NCAction.""" - action_types: dict[type[BaseAction], NCActionType] = { - NCDepositAction: NCActionType.DEPOSIT, - NCWithdrawalAction: NCActionType.WITHDRAWAL, - NCGrantAuthorityAction: NCActionType.GRANT_AUTHORITY, - NCAcquireAuthorityAction: NCActionType.ACQUIRE_AUTHORITY, - } - - if action_type := action_types.get(type(self)): - return action_type - - raise NotImplementedError(f'unknown action type {type(self)}') - - @property - def name(self) -> str: - """The action name.""" - return str(self.type) - - def to_json(self) -> dict[str, Any]: - """ - Convert this action to a json dict. - - >>> NCDepositAction(token_uid=TokenUid(b'\x01'), amount=123).to_json() - {'type': 'deposit', 'token_uid': '01', 'amount': 123} - >>> NCWithdrawalAction(token_uid=TokenUid(b'\x01'), amount=123).to_json() - {'type': 'withdrawal', 'token_uid': '01', 'amount': 123} - >>> NCGrantAuthorityAction(token_uid=TokenUid(b'\x01'), mint=True, melt=False).to_json() - {'type': 'grant_authority', 'token_uid': '01', 'mint': True, 'melt': False} - >>> NCAcquireAuthorityAction(token_uid=TokenUid(b'\x01'), mint=False, melt=True).to_json() - {'type': 'acquire_authority', 'token_uid': '01', 'mint': False, 'melt': True} - """ - return dict( - type=self.name.lower(), - token_uid=self.token_uid.hex(), - ) - - -@dataclass(slots=True, frozen=True, kw_only=True) -class BaseTokenAction(BaseAction): - """The base dataclass for all token-related NC actions. Shouldn't be instantiated directly.""" - amount: int - - @override - def to_json(self) -> dict[str, Any]: - json_dict = super(BaseTokenAction, self).to_json() - return dict( - **json_dict, - amount=self.amount, - ) - - -@dataclass(slots=True, frozen=True, kw_only=True) -class BaseAuthorityAction(BaseAction): - """The base dataclass for all authority-related NC actions. Shouldn't be instantiated directly.""" - mint: bool - melt: bool - - def __post_init__(self) -> None: - """Validate the token uid.""" - from hathor.conf.settings import HATHOR_TOKEN_UID - from hathor.nanocontracts.exception import NCInvalidAction - if self.token_uid == HATHOR_TOKEN_UID: - raise NCInvalidAction(f'{self.name} action cannot be executed on HTR token') - - @override - def to_json(self) -> dict[str, Any]: - json_dict = super(BaseAuthorityAction, self).to_json() - return dict( - **json_dict, - mint=self.mint, - melt=self.melt, - ) - - -@dataclass(slots=True, frozen=True, kw_only=True) -class NCDepositAction(BaseTokenAction): - """Deposit tokens into the contract.""" - - -@dataclass(slots=True, frozen=True, kw_only=True) -class NCWithdrawalAction(BaseTokenAction): - """Withdraw tokens from the contract.""" - - -@dataclass(slots=True, frozen=True, kw_only=True) -class NCGrantAuthorityAction(BaseAuthorityAction): - """Grant an authority to the contract.""" - - -@dataclass(slots=True, frozen=True, kw_only=True) -class NCAcquireAuthorityAction(BaseAuthorityAction): - """ - Acquire an authority stored in a contract to create authority outputs or mint/melt tokens in the tx, - or to store and use in a caller contract. - """ - - -"""A sum type representing all possible nano contract actions.""" -NCAction: TypeAlias = ( - NCDepositAction - | NCWithdrawalAction - | NCGrantAuthorityAction - | NCAcquireAuthorityAction -) - - -@dataclass(slots=True, frozen=True) -class NCRawArgs: - args_bytes: bytes - - def __str__(self) -> str: - return self.args_bytes.hex() - - def __repr__(self) -> str: - return f"NCRawArgs('{str(self)}')" - - def try_parse_as(self, arg_types: tuple[type, ...]) -> tuple[Any, ...] | None: - from hathor.nanocontracts.method import ArgsOnly - try: - args_parser = ArgsOnly.from_arg_types(arg_types) - return args_parser.deserialize_args_bytes(self.args_bytes) - except (NCSerializationError, SerializationError, TypeError, ValueError): - return None - - -@dataclass(slots=True, frozen=True) -class NCParsedArgs: - args: tuple[Any, ...] - kwargs: dict[str, Any] - - -NCArgs: TypeAlias = NCRawArgs | NCParsedArgs - - -@dataclass(slots=True, frozen=True, kw_only=True) -class NCFee: - """The dataclass for all NC actions fee""" - token_uid: TokenUid - amount: int - def get_htr_value(self, settings: HathorSettings) -> int: - """ - Get the amount converted to HTR - """ - if self.token_uid == HATHOR_TOKEN_UID: - return self.amount - else: - return get_deposit_token_withdraw_amount(settings, self.amount) +set_checksig_backend(_checksig_impl) diff --git a/hathor/transaction/util.py b/hathor/transaction/util.py index bbeabd967..74d21f999 100644 --- a/hathor/transaction/util.py +++ b/hathor/transaction/util.py @@ -16,11 +16,11 @@ import re import struct -from math import ceil, floor from struct import error as StructError from typing import TYPE_CHECKING, Any, Callable, Optional from hathor.transaction.exceptions import InvalidFeeAmount, InvalidOutputValue, TransactionDataError +from hathorlib.utils import get_deposit_token_deposit_amount, get_deposit_token_withdraw_amount # noqa: F401 if TYPE_CHECKING: from hathor import TokenUid @@ -56,14 +56,6 @@ def unpack_len(n: int, buf: bytes | memoryview) -> tuple[bytes, bytes | memoryvi return ret, buf[n:] -def get_deposit_token_deposit_amount(settings: HathorSettings, mint_amount: int) -> int: - return ceil(abs(settings.TOKEN_DEPOSIT_PERCENTAGE * mint_amount)) - - -def get_deposit_token_withdraw_amount(settings: HathorSettings, melt_amount: int) -> int: - return floor(abs(settings.TOKEN_DEPOSIT_PERCENTAGE * melt_amount)) - - def clean_token_string(string: str) -> str: """ Receives the token name/symbol and returns it after some cleanups. It sets to uppercase, removes double spaces and spaces at the beginning and end. diff --git a/hathor_tests/nanocontracts/test_exceptions.py b/hathor_tests/nanocontracts/test_exceptions.py index 2b0535fde..53a21edca 100644 --- a/hathor_tests/nanocontracts/test_exceptions.py +++ b/hathor_tests/nanocontracts/test_exceptions.py @@ -17,10 +17,11 @@ class TestExceptions(unittest.TestCase): def test_inherit_from_nc_fail(self) -> None: + from hathor.exception import HathorError from hathor.nanocontracts import exception as nano_exceptions skip = { - nano_exceptions.HathorError, + HathorError, nano_exceptions.NCFail, nano_exceptions.TxValidationError, } diff --git a/hathor_tests/nanocontracts/test_nc_exec_logs.py b/hathor_tests/nanocontracts/test_nc_exec_logs.py index 7b6dfb386..87d06854b 100644 --- a/hathor_tests/nanocontracts/test_nc_exec_logs.py +++ b/hathor_tests/nanocontracts/test_nc_exec_logs.py @@ -382,7 +382,7 @@ def test_nc_fail(self) -> None: error_tb = result.entries[b2.hash][0].error_traceback assert error_tb is not None assert error_tb.startswith('Traceback (most recent call last):') - assert error_tb.endswith('hathor.nanocontracts.exception.NCFail: some fail\n') + assert error_tb.endswith('hathorlib.nanocontracts.exception.NCFail: some fail\n') def test_value_error(self) -> None: self._prepare() @@ -436,7 +436,7 @@ def test_value_error(self) -> None: The above exception was the direct cause of the following exception:\n Traceback (most recent call last): """) in error_tb - assert error_tb.endswith('hathor.nanocontracts.exception.NCFail\n') + assert error_tb.endswith('hathorlib.nanocontracts.exception.NCFail\n') def test_reexecution_on_reorgs(self) -> None: self._prepare() diff --git a/hathorlib/hathorlib/exceptions.py b/hathorlib/hathorlib/exceptions.py index 7c525de10..933888f20 100644 --- a/hathorlib/hathorlib/exceptions.py +++ b/hathorlib/hathorlib/exceptions.py @@ -16,7 +16,8 @@ class HathorError(Exception): - """General error class""" + """Base class for exceptions in Hathor.""" + pass class InvalidAddress(HathorError): diff --git a/hathorlib/hathorlib/nanocontracts/blueprint_syntax_validation.py b/hathorlib/hathorlib/nanocontracts/blueprint_syntax_validation.py new file mode 100644 index 000000000..cf77ad236 --- /dev/null +++ b/hathorlib/hathorlib/nanocontracts/blueprint_syntax_validation.py @@ -0,0 +1,100 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import inspect +from typing import Callable + +from hathorlib.nanocontracts.exception import BlueprintSyntaxError + + +def validate_has_self_arg(fn: Callable, annotation_name: str) -> None: + """Validate the `self` arg of a callable.""" + arg_spec = inspect.getfullargspec(fn) + if len(arg_spec.args) == 0: + raise BlueprintSyntaxError(f'@{annotation_name} method must have `self` argument: `{fn.__name__}()`') + + if arg_spec.args[0] != 'self': + raise BlueprintSyntaxError( + f'@{annotation_name} method first argument must be called `self`: `{fn.__name__}()`' + ) + + if 'self' in arg_spec.annotations.keys(): + raise BlueprintSyntaxError(f'@{annotation_name} method `self` argument must not be typed: `{fn.__name__}()`') + + +def validate_method_types(fn: Callable) -> None: + """Validate the arg and return types of a callable.""" + special_args = ['self'] + arg_spec = inspect.getfullargspec(fn) + + if 'return' not in arg_spec.annotations: + raise BlueprintSyntaxError(f'missing return type on method `{fn.__name__}`') + + # TODO: This currently fails for types such as unions, probably because this is the wrong + # parsing function to use. Fix this. + # from hathorlib.nanocontracts.fields import get_field_class_for_attr + # return_type = arg_spec.annotations['return'] + # if return_type is not None: + # try: + # get_field_class_for_attr(return_type) + # except UnknownFieldType: + # raise BlueprintSyntaxError( + # f'unsupported return type `{return_type}` on method `{fn.__name__}`' + # ) + + for arg_name in arg_spec.args: + if arg_name in special_args: + continue + + if arg_name not in arg_spec.annotations: + raise BlueprintSyntaxError(f'argument `{arg_name}` on method `{fn.__name__}` must be typed') + + # TODO: This currently fails for @view methods with NamedTuple as args for example, + # because API calls use a different parsing function. Fix this. + # arg_type = arg_spec.annotations[arg_name] + # try: + # from hathorlib.nanocontracts.fields import get_field_class_for_attr + # get_field_class_for_attr(arg_type) + # except UnknownFieldType: + # raise BlueprintSyntaxError( + # f'unsupported type `{arg_type.__name__}` on argument `{arg_name}` of method `{fn.__name__}`' + # ) + + +def validate_has_ctx_arg(fn: Callable, annotation_name: str) -> None: + """Validate the context arg of a callable.""" + arg_spec = inspect.getfullargspec(fn) + + if len(arg_spec.args) < 2: + raise BlueprintSyntaxError( + f'@{annotation_name} method must have `Context` argument: `{fn.__name__}()`' + ) + + from hathor.nanocontracts.context import Context # type: ignore + second_arg = arg_spec.args[1] + if arg_spec.annotations[second_arg] is not Context: + raise BlueprintSyntaxError( + f'@{annotation_name} method second arg `{second_arg}` argument must be of type `Context`: ' + f'`{fn.__name__}()`' + ) + + +def validate_has_not_ctx_arg(fn: Callable, annotation_name: str) -> None: + """Validate that a callable doesn't have a `Context` arg.""" + from hathor.nanocontracts.context import Context # type: ignore[import-not-found] + arg_spec = inspect.getfullargspec(fn) + if Context in arg_spec.annotations.values(): + raise BlueprintSyntaxError(f'@{annotation_name} method cannot have arg with type `Context`: `{fn.__name__}()`') diff --git a/hathorlib/hathorlib/nanocontracts/exception.py b/hathorlib/hathorlib/nanocontracts/exception.py new file mode 100644 index 000000000..d99216777 --- /dev/null +++ b/hathorlib/hathorlib/nanocontracts/exception.py @@ -0,0 +1,197 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +All exceptions in this module MUST inherit from NCFail so they're +correctly caught by the block consensus to fail NC transactions. +""" + +from hathorlib.exceptions import HathorError + + +class NCFail(HathorError): + """Raised by Blueprint's methods to fail execution.""" + + +class BlueprintSyntaxError(SyntaxError, NCFail): + """Raised when a blueprint contains invalid syntax.""" + pass + + +class NanoContractDoesNotExist(NCFail): + pass + + +class BlueprintDoesNotExist(NCFail): + pass + + +class NCSerializationError(NCFail): + pass + + +class NCSerializationArgTooLong(NCSerializationError): + pass + + +class NCSerializationTypeError(NCSerializationError): + pass + + +class NCViewMethodError(NCFail): + """Raised when a view method changes the state of the contract.""" + pass + + +class NCInsufficientFunds(NCFail): + """Raised when there is not enough funds to withdrawal from a nano contract.""" + pass + + +class NCAttributeError(NCFail): + pass + + +class NCInvalidContext(NCFail): + """Raised when trying to run a method with an invalid context.""" + pass + + +class NCRecursionError(NCFail): + """Raised when recursion gets too deep.""" + + +class NCNumberOfCallsExceeded(NCFail): + """Raised when the total number of calls have been exceeded.""" + + +class NCInvalidContractId(NCFail): + """Raised when a contract call is invalid.""" + + +class NCInvalidFeePaymentToken(NCFail): + """Raised when a payment token is invalid.""" + + +class NCInvalidInitializeMethodCall(NCFail): + """Raised when a contract calls another contract's initialize method.""" + + +class NCInvalidPublicMethodCallFromView(NCFail): + """Raised when a contract calls another contract's initialize method.""" + + +class NCAlreadyInitializedContractError(NCFail): + """Raised when one tries to initialize a contract that has already been initialized.""" + + +class NCUninitializedContractError(NCFail): + """Raised when a contract calls a method from an uninitialized contract.""" + + +class NCInvalidAction(NCFail): + """Raised when an action is invalid.""" + pass + + +class NCInvalidFee(NCFail): + """Raised when a fee is invalid.""" + pass + + +class NCInvalidSyscall(NCFail): + """Raised when a syscall is invalid.""" + pass + + +class NCTokenAlreadyExists(NCFail): + """Raised when one tries to create a duplicated token.""" + + +class NCForbiddenAction(NCFail): + """Raised when an action is forbidden on a method.""" + pass + + +class NCForbiddenReentrancy(NCFail): + """Raised when a reentrancy is forbidden on a method.""" + pass + + +class NCTypeError(NCFail): + """Raised when a wrong type is used.""" + pass + + +class UnknownFieldType(NCFail): + """Raised when there is no field available for a given type.""" + pass + + +class NCContractCreationNotFound(NCFail): + """Raised when a nano contract creation transaction is not found. + + This error might also happen when the transaction is at the mempool or when it fails execution.""" + pass + + +class NCContractCreationAtMempool(NCContractCreationNotFound): + """Raised when a nano contract creation transaction is at the mempool, so it has not been + executed yet.""" + pass + + +class NCContractCreationVoided(NCContractCreationNotFound): + """Raised when a nano contract creation transaction is voided. + + The two most common reasons to have a voided transaction is because it was voided by + another transaction (e.g., double spending) or it has failed execution.""" + pass + + +class OCBInvalidScript(NCFail): + """Raised when an On-Chain Blueprint script does not pass our script restrictions check. + """ + pass + + +class OCBInvalidBlueprintVertexType(NCFail): + """Raised when a vertex that is not an OnChainBlueprint is used as a blueprint-id. + """ + pass + + +class OCBBlueprintNotConfirmed(NCFail): + """Raised when trying to use an OnChainBlueprint that is not confirmed by a block in the current best chain. + """ + + +class OCBPubKeyNotAllowed(NCFail): + """Raised when an OnChainBlueprint transaction uses a pubkey that is not explicitly allowed in the settings. + """ + + +class OCBOutOfFuelDuringLoading(NCFail): + """Raised when loading an On-chain Blueprint and the execution exceeds the fuel limit. + """ + + +class OCBOutOfMemoryDuringLoading(NCFail): + """Raised when loading an On-chain Blueprint and the execution exceeds the memory limit. + """ + + +class NCDisabledBuiltinError(NCFail): + """Raised when a disabled builtin is used during creation or execution of a nanocontract. + """ diff --git a/hathorlib/hathorlib/nanocontracts/types.py b/hathorlib/hathorlib/nanocontracts/types.py index 15341ba35..9d69c53cb 100644 --- a/hathorlib/hathorlib/nanocontracts/types.py +++ b/hathorlib/hathorlib/nanocontracts/types.py @@ -14,25 +14,535 @@ from __future__ import annotations +import inspect +from dataclasses import dataclass from enum import Enum, unique +from typing import Any, Callable, Generic, Self, TypeAlias, TypeVar -from hathorlib.utils import bytes_to_int, int_to_bytes +from typing_extensions import override + +from hathorlib.conf.settings import HathorSettings +from hathorlib.nanocontracts.blueprint_syntax_validation import ( + validate_has_ctx_arg, + validate_has_not_ctx_arg, + validate_has_self_arg, + validate_method_types, +) +from hathorlib.nanocontracts.exception import BlueprintSyntaxError, NCSerializationError +from hathorlib.nanocontracts.faux_immutable import FauxImmutableMeta +from hathorlib.serialization import SerializationError +from hathorlib.utils import get_deposit_token_withdraw_amount +from hathorlib.utils.address import decode_address, get_address_b58_from_bytes +from hathorlib.utils.typing import InnerTypeMixin + +# Well-known constant: HTR token UID is always 0x00 +HATHOR_TOKEN_UID: bytes = b'\x00' + +# XXX: mypy gives the following errors on all subclasses of `bytes` that use FauxImmutableMeta: +# +# Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its +# bases +# +# However `bytes` metaclass appears to be `type` (just like `int`'s) and `FauxImmutableMeta` correctly inherits from +# `type`, so it seems like it's a mypy error. + + +# Types to be used by blueprints. +class Address(bytes, metaclass=FauxImmutableMeta): + __allow_faux_inheritance__ = True + __allow_faux_dunder__ = ('__str__', '__repr__') + __slots__ = () + + @classmethod + def from_str(cls, /, encoded_address: str) -> Self: + if not isinstance(encoded_address, str): + raise TypeError(f'expected `str` instance, got `{type(encoded_address)}` instance') + return cls(decode_address(encoded_address)) + + def __str__(self) -> str: + encoded_address = get_address_b58_from_bytes(self) + return encoded_address + + def __repr__(self) -> str: + encoded_address = str(self) # uses __str__ + # XXX: should we support `Address(encoded_address)` constructor? + return f"Address.from_str({encoded_address!r})" + + +class VertexId(bytes, metaclass=FauxImmutableMeta): + __slots__ = () + __allow_faux_inheritance__ = True + + +class BlueprintId(VertexId): + __slots__ = () + __allow_faux_inheritance__ = True + + +class ContractId(VertexId): + __slots__ = () + __allow_faux_inheritance__ = True + + +class TokenUid(bytes, metaclass=FauxImmutableMeta): + __slots__ = () + __allow_faux_inheritance__ = True + + +class TxOutputScript(bytes, metaclass=FauxImmutableMeta): + __slots__ = () + __allow_faux_inheritance__ = True + + +class Amount(int, metaclass=FauxImmutableMeta): + __slots__ = () + __allow_faux_inheritance__ = True + + +class Timestamp(int, metaclass=FauxImmutableMeta): + __slots__ = () + __allow_faux_inheritance__ = True + + +CallerId: TypeAlias = Address | ContractId + +T = TypeVar('T') +B = TypeVar('B', bound=type) + +NC_INITIALIZE_METHOD: str = 'initialize' +NC_FALLBACK_METHOD: str = 'fallback' + +NC_ALLOWED_ACTIONS_ATTR = '__nc_allowed_actions' +NC_ALLOW_REENTRANCY = '__nc_allow_reentrancy' +NC_METHOD_TYPE_ATTR: str = '__nc_method_type' + +# this is the name we use internally to store the blueprint that is exported by a module +BLUEPRINT_EXPORT_NAME: str = '__blueprint__' + + +class NCMethodType(Enum): + PUBLIC = 'public' + VIEW = 'view' + FALLBACK = 'fallback' + + +def blueprint_id_from_bytes(data: bytes) -> BlueprintId: + """Create a BlueprintId from a bytes object.""" + return BlueprintId(VertexId(data)) + + +# Injectable backend for RawSignedData.checksig. +# hathorlib raises NotImplementedError by default; hathor-core registers the real implementation. +_checksig_backend: Callable[[bytes, bytes, bytes], bool] | None = None + + +def set_checksig_backend(fn: Callable[[bytes, bytes, bytes], bool]) -> None: + """Register the checksig implementation. Must be called by hathor-core during initialization. + + The callable receives (sighash_all_data: bytes, script_input: bytes, script: bytes) -> bool. + """ + global _checksig_backend + _checksig_backend = fn + + +class RawSignedData(InnerTypeMixin[T], Generic[T]): + """A wrapper class to sign data. + + T must be serializable. + """ + + def __init__(self, data: T, script_input: bytes) -> None: + # mypy: disable-error-code="import-not-found" + from hathor.nanocontracts.nc_types import ( # type: ignore[import-not-found] + make_nc_type_for_return_type as make_nc_type, + ) + + self.data = data + self.script_input = script_input + self.__nc_type = make_nc_type(self.__inner_type__) + + def __eq__(self, other): + if not isinstance(other, RawSignedData): + return False + if self.data != other.data: + return False + if self.script_input != other.script_input: + return False + return True + + def get_data_bytes(self) -> bytes: + """Return the serialized data.""" + return self.__nc_type.to_bytes(self.data) + + def get_sighash_all_data(self) -> bytes: + """Workaround to be able to pass `self` for ScriptExtras. See the method `checksig`.""" + return self.get_data_bytes() + + def checksig(self, script: bytes) -> bool: + """Check if `self.script_input` satisfies the provided script.""" + if _checksig_backend is None: + raise NotImplementedError('checksig requires hathor-core to be installed') + return _checksig_backend(self.get_sighash_all_data(), self.script_input, script) + + +class SignedData(InnerTypeMixin[T], Generic[T]): + def __init__(self, data: T, script_input: bytes) -> None: + self.data = data + self.script_input = script_input + + def __eq__(self, other): + if not isinstance(other, SignedData): + return False + if self.data != other.data: + return False + if self.script_input != other.script_input: + return False + return True + + def _get_raw_signed_data(self, contract_id: ContractId) -> RawSignedData: + # XXX: for some reason mypy doesn't recognize that self.__inner_type__ is defined even though it should + raw_type: type = tuple[ContractId, self.__inner_type__] # type: ignore[name-defined] + raw_data = (contract_id, self.data) + return RawSignedData[raw_type](raw_data, self.script_input) # type: ignore[valid-type] + + def get_data_bytes(self, contract_id: ContractId) -> bytes: + """Return the serialized data.""" + raw_signed_data = self._get_raw_signed_data(contract_id) + return raw_signed_data.get_data_bytes() + + def checksig(self, contract_id: ContractId, script: bytes) -> bool: + """Check if script_input satisfies the provided script.""" + raw_signed_data = self._get_raw_signed_data(contract_id) + return raw_signed_data.checksig(script) + + +def _set_method_type(fn: Callable, method_type: NCMethodType) -> None: + if hasattr(fn, NC_METHOD_TYPE_ATTR): + raise BlueprintSyntaxError(f'method must be annotated with at most one method type: `{fn.__name__}()`') + setattr(fn, NC_METHOD_TYPE_ATTR, method_type) + + +def _create_decorator_with_allowed_actions( + *, + decorator_body: Callable[[Callable], None], + maybe_fn: Callable | None, + allow_deposit: bool | None, + allow_withdrawal: bool | None, + allow_grant_authority: bool | None, + allow_acquire_authority: bool | None, + allow_actions: list[NCActionType] | None, + allow_reentrancy: bool, +) -> Callable: + """Internal utility to create a decorator that sets allowed actions.""" + flags = { + NCActionType.DEPOSIT: allow_deposit, + NCActionType.WITHDRAWAL: allow_withdrawal, + NCActionType.GRANT_AUTHORITY: allow_grant_authority, + NCActionType.ACQUIRE_AUTHORITY: allow_acquire_authority, + } + + def decorator(fn: Callable) -> Callable: + if allow_actions is not None and any(flag is not None for flag in flags.values()): + raise BlueprintSyntaxError(f'use only one of `allow_actions` or per-action flags: `{fn.__name__}()`') + + allowed_actions = set(allow_actions) if allow_actions else set() + allowed_actions.update(action for action, flag in flags.items() if flag) + setattr(fn, NC_ALLOWED_ACTIONS_ATTR, allowed_actions) + setattr(fn, NC_ALLOW_REENTRANCY, allow_reentrancy) + + decorator_body(fn) + return fn + + if maybe_fn is not None: + return decorator(maybe_fn) + return decorator + + +def public( + maybe_fn: Callable | None = None, + /, + *, + allow_deposit: bool | None = None, + allow_withdrawal: bool | None = None, + allow_grant_authority: bool | None = None, + allow_acquire_authority: bool | None = None, + allow_actions: list[NCActionType] | None = None, + allow_reentrancy: bool = False, +) -> Callable: + """Decorator to mark a blueprint method as public.""" + def decorator(fn: Callable) -> None: + annotation_name = 'public' + forbidden_methods = {NC_FALLBACK_METHOD} + _set_method_type(fn, NCMethodType.PUBLIC) + + if fn.__name__ in forbidden_methods: + raise BlueprintSyntaxError(f'`{fn.__name__}` method cannot be annotated with @{annotation_name}') + + validate_has_self_arg(fn, annotation_name) + validate_method_types(fn) + validate_has_ctx_arg(fn, annotation_name) + + return _create_decorator_with_allowed_actions( + decorator_body=decorator, + maybe_fn=maybe_fn, + allow_deposit=allow_deposit, + allow_withdrawal=allow_withdrawal, + allow_grant_authority=allow_grant_authority, + allow_acquire_authority=allow_acquire_authority, + allow_actions=allow_actions, + allow_reentrancy=allow_reentrancy, + ) + + +def view(fn: Callable) -> Callable: + """Decorator to mark a blueprint method as view (read-only).""" + annotation_name = 'view' + forbidden_methods = {NC_INITIALIZE_METHOD, NC_FALLBACK_METHOD} + _set_method_type(fn, NCMethodType.VIEW) + + if fn.__name__ in forbidden_methods: + raise BlueprintSyntaxError(f'`{fn.__name__}` method cannot be annotated with @{annotation_name}') + + validate_has_self_arg(fn, annotation_name) + validate_has_not_ctx_arg(fn, annotation_name) + validate_method_types(fn) + return fn + + +def export(cls: B) -> B: + """Decorator to export the main Blueprint of a Python module.""" + current_frame = inspect.currentframe() + assert current_frame is not None + module_frame = current_frame.f_back + assert module_frame is not None + module_globals = module_frame.f_globals + if BLUEPRINT_EXPORT_NAME in module_globals: + raise TypeError('A Blueprint has already been registered') + module_globals[BLUEPRINT_EXPORT_NAME] = cls + return cls + + +def fallback( + maybe_fn: Callable | None = None, + /, + *, + allow_deposit: bool | None = None, + allow_withdrawal: bool | None = None, + allow_grant_authority: bool | None = None, + allow_acquire_authority: bool | None = None, + allow_actions: list[NCActionType] | None = None, + allow_reentrancy: bool = False, +) -> Callable: + """Decorator to mark a blueprint method as fallback. The method must also be called `fallback`.""" + def decorator(fn: Callable) -> None: + annotation_name = 'fallback' + _set_method_type(fn, NCMethodType.FALLBACK) + + if fn.__name__ != NC_FALLBACK_METHOD: + raise BlueprintSyntaxError(f'@{annotation_name} method must be called `fallback`: `{fn.__name__}()`') + + validate_has_self_arg(fn, annotation_name) + validate_method_types(fn) + validate_has_ctx_arg(fn, annotation_name) + + arg_spec = inspect.getfullargspec(fn) + msg = f'@{annotation_name} method must have these args: `ctx: Context, method_name: str, nc_args: NCArgs`' + + if len(arg_spec.args) < 4: + raise BlueprintSyntaxError(msg) + + third_arg = arg_spec.args[2] + fourth_arg = arg_spec.args[3] + + if arg_spec.annotations[third_arg] is not str or arg_spec.annotations[fourth_arg] is not NCArgs: + raise BlueprintSyntaxError(msg) + + return _create_decorator_with_allowed_actions( + decorator_body=decorator, + maybe_fn=maybe_fn, + allow_deposit=allow_deposit, + allow_withdrawal=allow_withdrawal, + allow_grant_authority=allow_grant_authority, + allow_acquire_authority=allow_acquire_authority, + allow_actions=allow_actions, + allow_reentrancy=allow_reentrancy, + ) @unique class NCActionType(Enum): - """Types of interactions a transaction might have with a contract.""" + """ + Types of interactions a transaction might have with a contract. + Check the respective dataclasses below for more info. + """ DEPOSIT = 1 WITHDRAWAL = 2 GRANT_AUTHORITY = 3 ACQUIRE_AUTHORITY = 4 def __str__(self) -> str: - return self.name.lower() + return self.name def to_bytes(self) -> bytes: - return int_to_bytes(number=self.value, size=1) + return self.value.to_bytes(1, byteorder='big') @staticmethod def from_bytes(data: bytes) -> NCActionType: - return NCActionType(bytes_to_int(data)) + return NCActionType(int.from_bytes(data, byteorder='big')) + + +@dataclass(slots=True, frozen=True, kw_only=True) +class BaseAction: + """The base dataclass for all NC actions. Shouldn't be instantiated directly.""" + token_uid: TokenUid + + @property + def type(self) -> NCActionType: + """The respective NCActionType for each NCAction.""" + action_types: dict[type[BaseAction], NCActionType] = { + NCDepositAction: NCActionType.DEPOSIT, + NCWithdrawalAction: NCActionType.WITHDRAWAL, + NCGrantAuthorityAction: NCActionType.GRANT_AUTHORITY, + NCAcquireAuthorityAction: NCActionType.ACQUIRE_AUTHORITY, + } + + if action_type := action_types.get(type(self)): + return action_type + + raise NotImplementedError(f'unknown action type {type(self)}') + + @property + def name(self) -> str: + """The action name.""" + return str(self.type) + + def to_json(self) -> dict[str, Any]: + """ + Convert this action to a json dict. + + >>> NCDepositAction(token_uid=TokenUid(b'\x01'), amount=123).to_json() + {'type': 'deposit', 'token_uid': '01', 'amount': 123} + >>> NCWithdrawalAction(token_uid=TokenUid(b'\x01'), amount=123).to_json() + {'type': 'withdrawal', 'token_uid': '01', 'amount': 123} + >>> NCGrantAuthorityAction(token_uid=TokenUid(b'\x01'), mint=True, melt=False).to_json() + {'type': 'grant_authority', 'token_uid': '01', 'mint': True, 'melt': False} + >>> NCAcquireAuthorityAction(token_uid=TokenUid(b'\x01'), mint=False, melt=True).to_json() + {'type': 'acquire_authority', 'token_uid': '01', 'mint': False, 'melt': True} + """ + return dict( + type=self.name.lower(), + token_uid=self.token_uid.hex(), + ) + + +@dataclass(slots=True, frozen=True, kw_only=True) +class BaseTokenAction(BaseAction): + """The base dataclass for all token-related NC actions. Shouldn't be instantiated directly.""" + amount: int + + @override + def to_json(self) -> dict[str, Any]: + json_dict = super(BaseTokenAction, self).to_json() + return dict( + **json_dict, + amount=self.amount, + ) + + +@dataclass(slots=True, frozen=True, kw_only=True) +class BaseAuthorityAction(BaseAction): + """The base dataclass for all authority-related NC actions. Shouldn't be instantiated directly.""" + mint: bool + melt: bool + + def __post_init__(self) -> None: + """Validate the token uid.""" + from hathorlib.nanocontracts.exception import NCInvalidAction + if self.token_uid == HATHOR_TOKEN_UID: + raise NCInvalidAction(f'{self.name} action cannot be executed on HTR token') + + @override + def to_json(self) -> dict[str, Any]: + json_dict = super(BaseAuthorityAction, self).to_json() + return dict( + **json_dict, + mint=self.mint, + melt=self.melt, + ) + + +@dataclass(slots=True, frozen=True, kw_only=True) +class NCDepositAction(BaseTokenAction): + """Deposit tokens into the contract.""" + + +@dataclass(slots=True, frozen=True, kw_only=True) +class NCWithdrawalAction(BaseTokenAction): + """Withdraw tokens from the contract.""" + + +@dataclass(slots=True, frozen=True, kw_only=True) +class NCGrantAuthorityAction(BaseAuthorityAction): + """Grant an authority to the contract.""" + + +@dataclass(slots=True, frozen=True, kw_only=True) +class NCAcquireAuthorityAction(BaseAuthorityAction): + """ + Acquire an authority stored in a contract to create authority outputs or mint/melt tokens in the tx, + or to store and use in a caller contract. + """ + + +"""A sum type representing all possible nano contract actions.""" +NCAction: TypeAlias = ( + NCDepositAction + | NCWithdrawalAction + | NCGrantAuthorityAction + | NCAcquireAuthorityAction +) + + +@dataclass(slots=True, frozen=True) +class NCRawArgs: + args_bytes: bytes + + def __str__(self) -> str: + return self.args_bytes.hex() + + def __repr__(self) -> str: + return f"NCRawArgs('{str(self)}')" + + def try_parse_as(self, arg_types: tuple[type, ...]) -> tuple[Any, ...] | None: + # mypy: disable-error-code="import-not-found" + from hathor.nanocontracts.method import ArgsOnly # type: ignore[import-not-found] + try: + args_parser = ArgsOnly.from_arg_types(arg_types) + return args_parser.deserialize_args_bytes(self.args_bytes) + except (NCSerializationError, SerializationError, TypeError, ValueError): + return None + + +@dataclass(slots=True, frozen=True) +class NCParsedArgs: + args: tuple[Any, ...] + kwargs: dict[str, Any] + + +NCArgs: TypeAlias = NCRawArgs | NCParsedArgs + + +@dataclass(slots=True, frozen=True, kw_only=True) +class NCFee: + """The dataclass for all NC actions fee""" + token_uid: TokenUid + amount: int + + def get_htr_value(self, settings: HathorSettings) -> int: + """ + Get the amount converted to HTR + """ + if self.token_uid == HATHOR_TOKEN_UID: + return self.amount + else: + return get_deposit_token_withdraw_amount(settings, self.amount) diff --git a/hathorlib/hathorlib/utils/__init__.py b/hathorlib/hathorlib/utils/__init__.py index c41e75e85..6e0ee70e0 100644 --- a/hathorlib/hathorlib/utils/__init__.py +++ b/hathorlib/hathorlib/utils/__init__.py @@ -4,22 +4,32 @@ This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. """ -import hashlib import re import struct -from typing import Any, Tuple, Union, cast +from math import ceil, floor +from typing import TYPE_CHECKING, Any, Tuple, Union -import base58 -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +if TYPE_CHECKING: + from hathorlib.conf.settings import HathorSettings -from hathorlib.conf import HathorSettings -from hathorlib.exceptions import InvalidAddress from hathorlib.serialization import Deserializer, SerializationError, Serializer from hathorlib.serialization.adapters import MaxBytesExceededError from hathorlib.serialization.encoding.leb128 import decode_leb128, encode_leb128 - -settings = HathorSettings() +# Re-export address utilities from the dedicated module for backward compatibility +from hathorlib.utils.address import ( # noqa: F401 + decode_address, + get_address_b58_from_bytes, + get_address_b58_from_public_key, + get_address_b58_from_public_key_bytes, + get_address_b58_from_public_key_hash, + get_address_b58_from_redeem_script_hash, + get_address_from_public_key_hash, + get_address_from_redeem_script_hash, + get_checksum, + get_hash160, + get_public_key_bytes_compressed, + get_public_key_from_bytes_compressed, +) def int_to_bytes(number: int, size: int, signed: bool = False) -> bytes: @@ -48,119 +58,6 @@ def unpack_len(n: int, buf: bytes) -> Tuple[bytes, bytes]: return buf[:n], buf[n:] -def get_checksum(address_bytes: bytes) -> bytes: - """ Calculate double sha256 of address and gets first 4 bytes - - :param address_bytes: address before checksum - :param address_bytes: bytes - - :return: checksum of the address - :rtype: bytes - """ - return hashlib.sha256(hashlib.sha256(address_bytes).digest()).digest()[:4] - - -def decode_address(address58: str) -> bytes: - """ Decode address in base58 to bytes - - :param address58: Wallet address in base58 - :type address58: string - - :raises InvalidAddress: if address58 is not a valid base58 string or - not a valid address or has invalid checksum - - :return: Address in bytes - :rtype: bytes - """ - try: - decoded_address = base58.b58decode(address58) - except ValueError: - # Invalid base58 string - raise InvalidAddress('Invalid base58 address') - # Validate address size [25 bytes] - if len(decoded_address) != 25: - raise InvalidAddress('Address size must have 25 bytes') - # Validate the checksum - address_checksum = decoded_address[-4:] - valid_checksum = get_checksum(decoded_address[:-4]) - if address_checksum != valid_checksum: - raise InvalidAddress('Invalid checksum of address') - return decoded_address - - -def get_address_b58_from_public_key_hash(public_key_hash: bytes) -> str: - """Gets the b58 address from the hash of a public key. - - :param public_key_hash: hash of public key (sha256 and ripemd160) - :param public_key_hash: bytes - - :return: address in base 58 - :rtype: string - """ - address = get_address_from_public_key_hash(public_key_hash) - return base58.b58encode(address).decode('utf-8') - - -def get_address_from_public_key_hash(public_key_hash: bytes, - version_byte: bytes = settings.P2PKH_VERSION_BYTE) -> bytes: - """Gets the address in bytes from the public key hash - - :param public_key_hash: hash of public key (sha256 and ripemd160) - :param public_key_hash: bytes - - :param version_byte: first byte of address to define the version of this address - :param version_byte: bytes - - :return: address in bytes - :rtype: bytes - """ - address = b'' - # Version byte - address += version_byte - # Pubkey hash - address += public_key_hash - checksum = get_checksum(address) - address += checksum - return address - - -def get_address_b58_from_redeem_script_hash(redeem_script_hash: bytes, - version_byte: bytes = settings.MULTISIG_VERSION_BYTE) -> str: - """Gets the b58 address from the hash of the redeem script in multisig. - - :param redeem_script_hash: hash of the redeem script (sha256 and ripemd160) - :param redeem_script_hash: bytes - - :return: address in base 58 - :rtype: string - """ - address = get_address_from_redeem_script_hash(redeem_script_hash, version_byte) - return base58.b58encode(address).decode('utf-8') - - -def get_address_from_redeem_script_hash(redeem_script_hash: bytes, - version_byte: bytes = settings.MULTISIG_VERSION_BYTE) -> bytes: - """Gets the address in bytes from the redeem script hash - - :param redeem_script_hash: hash of redeem script (sha256 and ripemd160) - :param redeem_script_hash: bytes - - :param version_byte: first byte of address to define the version of this address - :param version_byte: bytes - - :return: address in bytes - :rtype: bytes - """ - address = b'' - # Version byte - address += version_byte - # redeem script hash - address += redeem_script_hash - checksum = get_checksum(address) - address += checksum - return address - - def clean_token_string(string: str) -> str: """ Receives the token name/symbol and returns it after some cleanups. It sets to uppercase, removes double spaces and spaces at the beginning and end. @@ -168,48 +65,6 @@ def clean_token_string(string: str) -> str: return re.sub(r'\s\s+', ' ', string).strip().upper() -def get_public_key_from_bytes_compressed(public_key_bytes: bytes) -> ec.EllipticCurvePublicKey: - """Return the cryptography public key from the compressed bytes format.""" - return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), public_key_bytes) - - -def get_address_b58_from_public_key(public_key: ec.EllipticCurvePublicKey) -> str: - """Get the b58 address from a public key.""" - public_key_bytes = get_public_key_bytes_compressed(public_key) - return get_address_b58_from_public_key_bytes(public_key_bytes) - - -def get_address_b58_from_public_key_bytes(public_key_bytes: bytes) -> str: - """Get the b58 address from a public key bytes.""" - public_key_hash = get_hash160(public_key_bytes) - return get_address_b58_from_public_key_hash(public_key_hash) - - -def get_public_key_bytes_compressed(public_key: ec.EllipticCurvePublicKey) -> bytes: - """Return the bytes of a pubkey in the compressed format.""" - return public_key.public_bytes(Encoding.X962, PublicFormat.CompressedPoint) - - -try: - hashlib.new('ripemd160', b'') -except Exception: - # XXX: the source says "Test-only pure Python RIPEMD160 implementation", however for our case this is acceptable - # for more details see: https://github.com/bitcoin/bitcoin/pull/23716/files which has a copy of the same code - import pycoin.contrib.ripemd160 # type: ignore[import-untyped] - - def get_hash160(public_key_bytes: bytes) -> bytes: - """The input is hashed twice: first with SHA-256 and then with RIPEMD-160""" - key_hash = hashlib.sha256(public_key_bytes) - return cast(bytes, pycoin.contrib.ripemd160.ripemd160(key_hash.digest())) -else: - def get_hash160(public_key_bytes: bytes) -> bytes: - """The input is hashed twice: first with SHA-256 and then with RIPEMD-160""" - key_hash = hashlib.sha256(public_key_bytes) - h = hashlib.new('ripemd160') - h.update(key_hash.digest()) - return h.digest() - - def encode_signed(value: int, *, max_bytes: Union[int, None] = None) -> bytes: """ Receive a signed integer and return its LEB128-encoded bytes. @@ -308,3 +163,11 @@ def decode_unsigned(data: bytes, *, max_bytes: Union[int, None] = None) -> tuple remaining_data = bytes(deserializer.read_all()) deserializer.finalize() return (value, remaining_data) + + +def get_deposit_token_deposit_amount(settings: 'HathorSettings', mint_amount: int) -> int: + return ceil(abs(settings.TOKEN_DEPOSIT_PERCENTAGE * mint_amount)) + + +def get_deposit_token_withdraw_amount(settings: 'HathorSettings', melt_amount: int) -> int: + return floor(abs(settings.TOKEN_DEPOSIT_PERCENTAGE * melt_amount)) diff --git a/hathorlib/hathorlib/utils/address.py b/hathorlib/hathorlib/utils/address.py new file mode 100644 index 000000000..efd42bab1 --- /dev/null +++ b/hathorlib/hathorlib/utils/address.py @@ -0,0 +1,184 @@ +""" +Copyright (c) Hathor Labs and its affiliates. + +This source code is licensed under the MIT license found in the +LICENSE file in the root directory of this source tree. +""" +import hashlib +from typing import cast + +import base58 +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + +from hathorlib.conf import HathorSettings +from hathorlib.exceptions import InvalidAddress + +settings = HathorSettings() + + +def get_checksum(address_bytes: bytes) -> bytes: + """ Calculate double sha256 of address and gets first 4 bytes + + :param address_bytes: address before checksum + :param address_bytes: bytes + + :return: checksum of the address + :rtype: bytes + """ + return hashlib.sha256(hashlib.sha256(address_bytes).digest()).digest()[:4] + + +def decode_address(address58: str) -> bytes: + """ Decode address in base58 to bytes + + :param address58: Wallet address in base58 + :type address58: string + + :raises InvalidAddress: if address58 is not a valid base58 string or + not a valid address or has invalid checksum + + :return: Address in bytes + :rtype: bytes + """ + try: + decoded_address = base58.b58decode(address58) + except ValueError: + # Invalid base58 string + raise InvalidAddress('Invalid base58 address') + # Validate address size [25 bytes] + if len(decoded_address) != 25: + raise InvalidAddress('Address size must have 25 bytes') + # Validate the checksum + address_checksum = decoded_address[-4:] + valid_checksum = get_checksum(decoded_address[:-4]) + if address_checksum != valid_checksum: + raise InvalidAddress('Invalid checksum of address') + return decoded_address + + +def get_address_b58_from_bytes(address: bytes) -> str: + """Encode address bytes to base58 string. + + :param address: address in bytes + :type address: bytes + + :return: address in base 58 + :rtype: string + """ + return base58.b58encode(address).decode('utf-8') + + +def get_address_b58_from_public_key_hash(public_key_hash: bytes) -> str: + """Gets the b58 address from the hash of a public key. + + :param public_key_hash: hash of public key (sha256 and ripemd160) + :param public_key_hash: bytes + + :return: address in base 58 + :rtype: string + """ + address = get_address_from_public_key_hash(public_key_hash) + return base58.b58encode(address).decode('utf-8') + + +def get_address_from_public_key_hash(public_key_hash: bytes, + version_byte: bytes = settings.P2PKH_VERSION_BYTE) -> bytes: + """Gets the address in bytes from the public key hash + + :param public_key_hash: hash of public key (sha256 and ripemd160) + :param public_key_hash: bytes + + :param version_byte: first byte of address to define the version of this address + :param version_byte: bytes + + :return: address in bytes + :rtype: bytes + """ + address = b'' + # Version byte + address += version_byte + # Pubkey hash + address += public_key_hash + checksum = get_checksum(address) + address += checksum + return address + + +def get_address_b58_from_redeem_script_hash(redeem_script_hash: bytes, + version_byte: bytes = settings.MULTISIG_VERSION_BYTE) -> str: + """Gets the b58 address from the hash of the redeem script in multisig. + + :param redeem_script_hash: hash of the redeem script (sha256 and ripemd160) + :param redeem_script_hash: bytes + + :return: address in base 58 + :rtype: string + """ + address = get_address_from_redeem_script_hash(redeem_script_hash, version_byte) + return base58.b58encode(address).decode('utf-8') + + +def get_address_from_redeem_script_hash(redeem_script_hash: bytes, + version_byte: bytes = settings.MULTISIG_VERSION_BYTE) -> bytes: + """Gets the address in bytes from the redeem script hash + + :param redeem_script_hash: hash of redeem script (sha256 and ripemd160) + :param redeem_script_hash: bytes + + :param version_byte: first byte of address to define the version of this address + :param version_byte: bytes + + :return: address in bytes + :rtype: bytes + """ + address = b'' + # Version byte + address += version_byte + # redeem script hash + address += redeem_script_hash + checksum = get_checksum(address) + address += checksum + return address + + +def get_public_key_from_bytes_compressed(public_key_bytes: bytes) -> ec.EllipticCurvePublicKey: + """Return the cryptography public key from the compressed bytes format.""" + return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), public_key_bytes) + + +def get_address_b58_from_public_key(public_key: ec.EllipticCurvePublicKey) -> str: + """Get the b58 address from a public key.""" + public_key_bytes = get_public_key_bytes_compressed(public_key) + return get_address_b58_from_public_key_bytes(public_key_bytes) + + +def get_address_b58_from_public_key_bytes(public_key_bytes: bytes) -> str: + """Get the b58 address from a public key bytes.""" + public_key_hash = get_hash160(public_key_bytes) + return get_address_b58_from_public_key_hash(public_key_hash) + + +def get_public_key_bytes_compressed(public_key: ec.EllipticCurvePublicKey) -> bytes: + """Return the bytes of a pubkey in the compressed format.""" + return public_key.public_bytes(Encoding.X962, PublicFormat.CompressedPoint) + + +try: + hashlib.new('ripemd160', b'') +except Exception: + # XXX: the source says "Test-only pure Python RIPEMD160 implementation", however for our case this is acceptable + # for more details see: https://github.com/bitcoin/bitcoin/pull/23716/files which has a copy of the same code + import pycoin.contrib.ripemd160 # type: ignore[import-untyped] + + def get_hash160(public_key_bytes: bytes) -> bytes: + """The input is hashed twice: first with SHA-256 and then with RIPEMD-160""" + key_hash = hashlib.sha256(public_key_bytes) + return cast(bytes, pycoin.contrib.ripemd160.ripemd160(key_hash.digest())) +else: + def get_hash160(public_key_bytes: bytes) -> bytes: + """The input is hashed twice: first with SHA-256 and then with RIPEMD-160""" + key_hash = hashlib.sha256(public_key_bytes) + h = hashlib.new('ripemd160') + h.update(key_hash.digest()) + return h.digest() diff --git a/hathorlib/pyproject.toml b/hathorlib/pyproject.toml index 4f2b639e1..65fbab5b5 100644 --- a/hathorlib/pyproject.toml +++ b/hathorlib/pyproject.toml @@ -80,11 +80,14 @@ plugins = [ [[tool.mypy.overrides]] module = [ + "hathorlib.nanocontracts.method", "hathorlib.nanocontracts.*", "hathorlib.utils.*", ] check_untyped_defs = false disallow_untyped_defs = false +warn_return_any = false +warn_unused_ignores = false [tool.pytest.ini_options] minversion = "6.0"