diff --git a/ebmlite/core.py b/ebmlite/core.py index 453c4a3..c9ce7a0 100644 --- a/ebmlite/core.py +++ b/ebmlite/core.py @@ -2,33 +2,33 @@ EBMLite: A lightweight EBML parsing library. It is designed to crawl through EBML files quickly and efficiently, and that's about it. -@todo: Complete EBML encoding. Specifically, make 'master' elements write +:todo: Complete EBML encoding. Specifically, make 'master' elements write directly to the stream, rather than build bytearrays, so huge 'master' elements can be handled. It appears that the official spec may prohibit (or at least counter-indicate) multiple root elements. Possible compromise until proper fix: handle root 'master' elements differently than deeper ones, more like the current `Document`. -@todo: Validation. Enforce the hierarchy defined in each schema. -@todo: Optimize 'infinite' master elements (i.e `size` is `None`). See notes +:todo: Validation. Enforce the hierarchy defined in each schema. +:todo: Optimize 'infinite' master elements (i.e `size` is `None`). See notes in `MasterElement` class' method definitions. -@todo: Improved `MasterElement.__eq__()` method, possibly doing a recursive +:todo: Improved `MasterElement.__eq__()` method, possibly doing a recursive crawl of both elements and comparing the actual contents, or iterating over chunks of the raw binary data. Current implementation doesn't check element contents, just ID and payload size (for speed). -@todo: Document-wide caching, for future handling of streamed data. Affects +:todo: Document-wide caching, for future handling of streamed data. Affects the longer-term streaming to-do (listed below) and optimization of 'infinite' elements (listed above). -@todo: Clean up and standardize usage of the term 'size' versus 'length.' -@todo: General documentation (more detailed than the README) and examples. -@todo: Document the best way to load schemata in a PyInstaller executable. +:todo: Clean up and standardize usage of the term 'size' versus 'length.' +:todo: General documentation (more detailed than the README) and examples. +:todo: Document the best way to load schemata in a PyInstaller executable. -@todo: (longer term) Consider making schema loading automatic based on the EBML +:todo: (longer term) Consider making schema loading automatic based on the EBML DocType, DocTypeVersion, and DocTypeReadVersion. Would mean a refactoring of how schemata are loaded. -@todo: (longer term) Refactor to support streaming data. This will require +:todo: (longer term) Refactor to support streaming data. This will require modifying the indexing and iterating methods of `Document`. Also affects the document-wide caching to-do item, listed above. -@todo: (longer term) Support the official Schema definition format. Start by +:todo: (longer term) Support the official Schema definition format. Start by adopting some of the attributes, specifically ``minOccurs`` and ``maxOccurs`` (they serve the function provided by the current ``mandatory`` and ``multiple`` attributes). Add ``range`` later. @@ -54,6 +54,7 @@ import re import sys import types +from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Tuple, Union from xml.etree import ElementTree as ET from .decoding import readElementID, readElementSize @@ -65,9 +66,9 @@ # Dictionaries in Python 3.7+ are explicitly insert-ordered in all # implementations. If older, continue to use `collections.OrderedDict`. if sys.hexversion < 0x03070000: - from collections import OrderedDict as Dict + from collections import OrderedDict as _Dict else: - Dict = dict + _Dict = dict # Additionally, `importlib.resources.files` is new to 3.9 as well; this is # part of a work-around. @@ -102,21 +103,22 @@ class Element(object): """ Base class for all EBML elements. Each data type has its own subclass, and these subclasses get subclassed when a Schema is read. - @cvar id: The element's EBML ID. - @cvar name: The element's name. - @cvar schema: The `Schema` to which this element belongs. - @cvar multiple: Can this element be appear multiple times? Note: + :var id: The element's EBML ID. + :var name: The element's name. + :var schema: The `Schema` to which this element belongs. + :var multiple: Can this element appear multiple times? Note: Currently only enforced for encoding. - @cvar mandatory: Must this element appear in all EBML files using + :var mandatory: Must this element appear in all EBML files using this element's schema? Note: Not currently enforced. - @cvar children: A list of valid child element types. Only applicable to - `Document` and `Master` subclasses. Note: Not currently enforced. - @cvar dtype: The element's native Python data type. - @cvar precache: If `True`, the Element's value is read when the Element + :var children: A list of valid child element types. Only applicable to + `Document` and `Master` subclasses. Note: Not currently enforced; + only used when decoding 'infinite' length elements. + :var dtype: The element's native Python data type. + :var precache: If `True`, the Element's value is read when the Element is parsed. if `False`, the value is lazy-loaded when needed. Numeric element types default to `True`. Can be used to reduce the number of file seeks, potentially speeding things up. - @cvar length: An explicit length (in bytes) of the element when + :var length: An explicit length (in bytes) of the element when encoding. `None` will use standard EBML variable-length encoding. """ __slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value") @@ -124,6 +126,12 @@ class Element(object): # Parent `Schema` schema = None + # Element name + name = None + + # Element EBML ID + id = None + # Python native data type. dtype = bytearray @@ -142,14 +150,17 @@ class Element(object): # For python-ebml compatibility; not currently used. children = None - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int): """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ # Document-wide caching could be implemented here. return bytearray(stream.read(size)) - def __init__(self, stream=None, offset=0, size=0, payloadOffset=0): + def __init__(self, stream: BinaryIO = None, + offset: int = 0, + size: int = 0, + payloadOffset: int = 0): """ Constructor. Instantiate a new Element from a file. In most cases, elements should be created when a `Document` is loaded, rather than instantiated explicitly. @@ -166,11 +177,11 @@ def __init__(self, stream=None, offset=0, size=0, payloadOffset=0): self.payloadOffset = payloadOffset self._value = None - def __repr__(self): + def __repr__(self) -> str: return "<%s (ID:0x%02X), offset %s, size %s>" % \ (self.__class__.__name__, self.id, self.offset, self.size) - def __eq__(self, other): + def __eq__(self, other) -> bool: """ Equality check. Elements are considered equal if they are the same type and have the same ID, size, offset, and schema. Note: element value is not considered! Check for value equality explicitly @@ -196,13 +207,13 @@ def value(self): self._value = self.parse(self.stream, self.size) return self._value - def getRaw(self): + def getRaw(self) -> bytes: """ Get the element's raw binary data, including EBML headers. """ self.stream.seek(self.offset) return self.stream.read(self.size + (self.payloadOffset - self.offset)) - def getRawValue(self): + def getRawValue(self) -> bytes: """ Get the raw binary of the element's value. """ self.stream.seek(self.payloadOffset) @@ -212,7 +223,7 @@ def getRawValue(self): # Caching (experimental) # ========================================================================== - def gc(self, recurse=False): + def gc(self, recurse=False) -> int: """ Clear any cached values. To save memory and/or force values to be re-read from the file. Returns the number of cached values cleared. """ @@ -227,12 +238,17 @@ def gc(self, recurse=False): # ========================================================================== @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: Any, length: Optional[int] = None) -> bytes: """ Type-specific payload encoder. """ return encoding.encodeBinary(data, length) + @classmethod - def encode(cls, value, length=None, lengthSize=None, infinite=False): + def encode(cls, + value: Any, + length: Optional[int] = None, + lengthSize: Optional[int] = None, + infinite: bool = False) -> bytes: """ Encode an EBML element. :param value: The value to encode, or a list of values to encode. @@ -243,7 +259,11 @@ def encode(cls, value, length=None, lengthSize=None, infinite=False): byte-aligned structures. :param lengthSize: An explicit length for the encoded element size, overriding the variable length encoding. - @return: A bytearray containing the encoded EBML data. + :param infinite: If `True`, the element will be marked as being + 'infinite'. Infinite elements are read until an element is + encountered that is not defined as a valid child in the + schema. + :return: A bytearray containing the encoded EBML data. """ if infinite and not issubclass(cls, MasterElement): raise ValueError("Only Master elements can have 'infinite' lengths") @@ -285,14 +305,14 @@ def __eq__(self, other): return False return self.value == other.value - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> int: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readInt(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: int, length: int = None) -> bytes: """ Type-specific payload encoder for signed integer elements. """ return encoding.encodeInt(data, length) @@ -308,14 +328,14 @@ class UIntegerElement(IntegerElement): dtype = int precache = True - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> int: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readUInt(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: int, length: int = None) -> bytes: """ Type-specific payload encoder for unsigned integer elements. """ return encoding.encodeUInt(data, length) @@ -336,14 +356,14 @@ def __eq__(self, other): return False return self.value == other.value - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> float: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readFloat(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: float, length: int = None) -> bytes: """ Type-specific payload encoder for floating point elements. """ return encoding.encodeFloat(data, length) @@ -366,14 +386,14 @@ def __eq__(self, other): def __len__(self): return self.size - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> str: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readString(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: str, length: int = None) -> bytes: """ Type-specific payload encoder for ASCII string elements. """ return encoding.encodeString(data, length) @@ -388,18 +408,18 @@ class UnicodeElement(StringElement): __slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value") dtype = str - def __len__(self): + def __len__(self) -> int: # Value may be multiple bytes per character return len(self.value) - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> str: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readUnicode(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: str, length: int = None) -> bytes: """ Type-specific payload encoder for Unicode string elements. """ return encoding.encodeUnicode(data, length) @@ -414,14 +434,14 @@ class DateElement(IntegerElement): __slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value") dtype = datetime - def parse(self, stream, size): + def parse(self, stream: BinaryIO, size: int) -> datetime: """ Type-specific helper function for parsing the element's payload. It is assumed the file pointer is at the start of the payload. """ return readDate(stream, size) @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, data: datetime, length: Optional[int] = None) -> bytes: """ Type-specific payload encoder for date elements. """ return encoding.encodeDate(data, length) @@ -450,11 +470,13 @@ class VoidElement(BinaryElement): """ __slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value") - def parse(self, stream, size): + def parse(self, + stream: BinaryIO, + size: Optional[int]) -> bytearray: return bytearray() @classmethod - def encodePayload(cls, data, length=0): + def encodePayload(cls, data: Any, length: int = 0) -> bytearray: """ Type-specific payload encoder for Void elements. """ length = 0 if length is None else length return bytearray(b'\xff' * length) @@ -463,6 +485,7 @@ def encodePayload(cls, data, length=0): # ============================================================================== +# noinspection PyDunderSlots class UnknownElement(BinaryElement): """ Special case ``Unknown`` element, used for elements with IDs not present in a schema. Unlike other elements, each instance has its own @@ -473,8 +496,13 @@ class UnknownElement(BinaryElement): name = "UnknownElement" precache = False - def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None, - schema=None): + def __init__(self, + stream: Optional[BinaryIO] = None, + offset: int = 0, + size: int = 0, + payloadOffset: int = 0, + eid: Optional[int] = None, + schema: Optional["Schema"] = None): """ Constructor. Instantiate a new `UnknownElement` from a file. In most cases, elements should be created when a `Document` is loaded, rather than instantiated explicitly. @@ -484,7 +512,7 @@ def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None, :param size: The size of the whole element. :param payloadOffset: The starting location of the element's payload (i.e. immediately after the element's header). - :param id: The unknown element's ID. Unlike 'normal' elements, + :param eid: The unknown element's ID. Unlike 'normal' elements, in which ID is a class attribute, each UnknownElement instance explicitly defines this. :param schema: The schema used to load the element. Specified @@ -496,7 +524,7 @@ def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None, self.id = eid self.schema = schema - def __eq__(self, other): + def __eq__(self, other) -> bool: """ Equality check. Unknown elements are considered equal if they have the same ID and value. Note that this differs from the criteria used for other element classes! @@ -522,14 +550,20 @@ class MasterElement(Element): "_size", "_length") dtype = list - def parse(self): + _childIds = None + + def parse(self, *args) -> List[Element]: """ Type-specific helper function for parsing the element's payload. + This is a special case; parameters `stream` and `size` are not + used. """ # Special case; unlike other elements, value() property doesn't call # parse(). Used only when pre-caching. return self.value - def parseElement(self, stream, nocache=False): + def parseElement(self, + stream: BinaryIO, + nocache: bool = False) -> Tuple[Element, int]: """ Read the next element from a stream, instantiate a `MasterElement` object, and then return it and the offset of the next element (this element's position + size). @@ -561,7 +595,7 @@ def parseElement(self, stream, nocache=False): return el, payloadOffset + el.size @classmethod - def _isValidChild(cls, elId): + def _isValidChild(cls, elId: int) -> bool: """ Is the given element ID represent a valid sub-element, i.e. explicitly specified as a child element or a 'global' in the schema? @@ -573,7 +607,7 @@ def _isValidChild(cls, elId): @property - def size(self): + def size(self) -> int: """ The element's size. Master elements can be instantiated with this as `None`; this denotes an 'infinite' EBML element, and its size will be determined by iterating over its contents until an invalid @@ -583,7 +617,7 @@ def size(self): return self._size except AttributeError: # An "infinite" element (size specified in file is all 0xFF) - pos = end = self.payloadOffset + pos = self.payloadOffset numChildren = 0 while True: self.stream.seek(pos) @@ -607,13 +641,13 @@ def size(self): return self._size @size.setter - def size(self, esize): + def size(self, esize: Optional[int]): if esize is not None: # Only create the `_size` attribute for a real value. Don't # define it if it's `None`, so `size` will get calculated. self._size = esize - def __iter__(self, nocache=False): + def __iter__(self, nocache: bool = False): """ x.__iter__() <==> iter(x) """ # TODO: Better support for 'infinite' elements (getting the size of @@ -631,7 +665,7 @@ def __iter__(self, nocache=False): break raise - def __len__(self): + def __len__(self) -> int: """ x.__len__() <==> len(x) """ try: @@ -647,7 +681,7 @@ def __len__(self): return self._length @property - def value(self): + def value(self) -> List[Element]: """ Parse and cache the element's value. """ if self._value is not None: @@ -655,7 +689,7 @@ def value(self): self._value = list(self) return self._value - def __getitem__(self, *args): + def __getitem__(self, *args) -> Element: # TODO: Parse only the requested item(s), like `Document` return self.value.__getitem__(*args) @@ -663,7 +697,7 @@ def __getitem__(self, *args): # Caching (experimental!) # ========================================================================== - def gc(self, recurse=False): + def gc(self, recurse: bool = False) -> int: """ Clear any cached values. To save memory and/or force values to be re-read from the file. """ @@ -679,7 +713,9 @@ def gc(self, recurse=False): # ========================================================================== @classmethod - def encodePayload(cls, data, length=None): + def encodePayload(cls, + data: Union[Dict[str, Any], List[Tuple[str, Any]], None], + length: Optional[int] = None): """ Type-specific payload encoder for 'master' elements. """ result = bytearray() @@ -699,13 +735,22 @@ def encodePayload(cls, data, length=None): return result @classmethod - def encode(cls, data, length=None, lengthSize=None, infinite=False): + def encode(cls, + data: Union[Dict[str, Any], List[Tuple[str, Any]]], + length: Optional[int] = None, + lengthSize: Optional[int] = None, + infinite: bool = False) -> bytes: """ Encode an EBML master element. :param data: The data to encode, provided as a dictionary keyed by element name, a list of two-item name/value tuples, or a list of either. Note: individual items in a list of name/value pairs *must* be tuples! + :param length: An explicit length for the encoded data, + overriding the variable length encoding. For producing + byte-aligned structures. + :param lengthSize: An explicit length for the encoded element + size, overriding the variable length encoding. :param infinite: If `True`, the element will be written with an undefined size. When parsed, its end will be determined by the occurrence of an invalid child element (or end-of-file). @@ -728,18 +773,18 @@ def encode(cls, data, length=None, lengthSize=None, infinite=False): lengthSize=lengthSize, infinite=infinite) - def dump(self): + def dump(self) -> Dict[str, Any]: """ Dump this element's value as nested dictionaries, keyed by element name. The values of 'multiple' elements return as lists. Note: The order of 'multiple' elements relative to other elements will be lost; a file containing elements ``A1 B1 A2 B2 A3 B3`` will result in``[A1 A2 A3][B1 B2 B3]``. - @todo: Decide if this should be in the `util` submodule. It is + :todo: Decide if this should be in the `util` submodule. It is very specific, and it isn't totally necessary for the core library. """ - result = Dict() + result = _Dict() for el in self: if el.multiple: result.setdefault(el.name, []).append(el.dump()) @@ -758,7 +803,11 @@ class Document(MasterElement): Loading a `Schema` generates a subclass. """ - def __init__(self, stream, name=None, size=None, headers=True): + def __init__(self, + stream: BinaryIO, + name: Optional[str] = None, + size: Optional[int] = None, + headers: bool = True): """ Constructor. Instantiate a `Document` from a file-like stream. In most cases, `Schema.load()` should be used instead of explicitly instantiating a `Document`. @@ -768,7 +817,7 @@ def __init__(self, stream, name=None, size=None, headers=True): :param name: The name of the document. Defaults to the filename (if applicable). :param size: The size of the document, in bytes. Use if the - stream is neither a file or a `BytesIO` object. + stream is neither a file nor a `BytesIO` object. :param headers: If `False`, the file's ``EBML`` header element (if present) will not appear as a root element in the document. The contents of the ``EBML`` element will always be read, @@ -819,12 +868,12 @@ def __init__(self, stream, name=None, size=None, headers=True): self.info = el.dump() if not headers: self.payloadOffset = pos - except: + except Exception: # Failed to read the first element. Don't raise here; do that when # the Document is actually used. pass - def __repr__(self): + def __repr__(self) -> str: """ "x.__repr__() <==> repr(x) """ if self.name == self.__class__.__name__: return object.__repr__(self) @@ -849,7 +898,7 @@ def close(self): if self._ownsStream: self.stream.close() - def __len__(self): + def __len__(self) -> int: """ x.__len__() <==> len(x) Not recommended for huge documents. """ @@ -862,7 +911,7 @@ def __len__(self): self._length = n return self._length - def __iter__(self, nocache=False): + def __iter__(self, nocache: bool = False): """ Iterate root elements. """ # TODO: Cache root elements, prevent unnecessary duplicates. Maybe a @@ -888,7 +937,7 @@ def value(self): # 'value' not really applicable to a document; return an iterator. return iter(self) - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> Element: """ Get one of the document's root elements by index. """ # TODO: Cache parsed root elements, handle indexing dynamically. @@ -902,19 +951,19 @@ def __getitem__(self, idx): if n is None: # If object being enumerated is empty, `n` is never set. raise IndexError("Document contained no readable data") - raise IndexError("list index out of range (0-%d)" % n) + raise IndexError("list index out of range (0-{})".format(n)) elif isinstance(idx, slice): raise IndexError("Document root slicing not (yet) supported") else: raise TypeError("list indices must be integers, not %s" % type(idx)) @property - def version(self): + def version(self) -> int: """ The document's type version (i.e. the EBML ``DocTypeVersion``). """ return self.info.get('DocTypeVersion') @property - def type(self): + def type(self) -> str: """ The document's type name (i.e. the EBML ``DocType``). """ return self.info.get('DocType') @@ -922,7 +971,7 @@ def type(self): # Caching (experimental!) # ========================================================================== - def gc(self, recurse=False): + def gc(self, recurse: bool = False) -> int: # TODO: Implement this if/when caching of root elements is implemented. return 0 @@ -931,7 +980,7 @@ def gc(self, recurse=False): # ========================================================================== @classmethod - def _createHeaders(cls): + def _createHeaders(cls) -> Dict[str, Any]: """ Create the default EBML 'header' elements for a Document, using the default values in the schema. @@ -942,7 +991,7 @@ def _createHeaders(cls): if 'EBML' not in cls.schema: return {} - headers = Dict() + headers = _Dict() for elName, elType in (('EBMLVersion', int), ('EBMLReadVersion', int), ('DocType', str), @@ -953,16 +1002,22 @@ def _createHeaders(cls): if v is not None: headers[elName] = v - return Dict(EBML=headers) + return _Dict(EBML=headers) @classmethod - def encode(cls, stream, data, headers=False, **kwargs): + def encode(cls, + stream: BinaryIO, + data: Union[Dict[str, Any], List[Tuple[str, Any]]], + headers: bool = False, **kwargs): """ Encode an EBML document. + :param stream: :param data: The data to encode, provided as a dictionary keyed by element name, or a list of two-item name/value tuples. Note: individual items in a list of name/value pairs *must* be tuples! + :param headers: If `True`, include the standard ``EBML`` header + element. :return: A bytearray containing the encoded EBML binary. """ if headers is True: @@ -990,23 +1045,23 @@ class Schema(object): the document and element types, this is not a base class; all schemata are actual instances of this class. - @ivar document: The schema's Document subclass. - @ivar elements: A dictionary mapping element IDs to the schema's + :ivar document: The schema's Document subclass. + :ivar elements: A dictionary mapping element IDs to the schema's corresponding `Element` subclasses. - @ivar elementsByName: A dictionary mapping element names to the + :ivar elementsByName: A dictionary mapping element names to the schema's corresponding `Element` subclasses. - @ivar elementInfo: A dictionary mapping IDs to the raw schema + :ivar elementInfo: A dictionary mapping IDs to the raw schema attribute data. It may have additional items not present in the created element class' attributes. - @ivar UNKNOWN: A class/function that handles unknown element IDs. By + :ivar UNKNOWN: A class/function that handles unknown element IDs. By default, this is the `UnknownElement` class. Special-case handling can be done by substituting a different class, or an element-producing factory function. - @ivar source: The source from which the Schema was loaded; either a + :ivar source: The source from which the Schema was loaded; either a filename or a file-like stream. - @ivar filename: The absolute path of the source file, if the source + :ivar filename: The absolute path of the source file, if the source was a file or a filename. """ @@ -1040,7 +1095,9 @@ class Schema(object): # factory function. UNKNOWN = UnknownElement - def __init__(self, source, name=None): + def __init__(self, + source: Union[str, Path, TextIO], + name: Optional[str] = None): """ Constructor. Creates a new Schema from a schema description XML. :param source: The Schema's source, either a string with the full @@ -1152,8 +1209,13 @@ def _parseSchema(self, el, parent=None): for chEl in el: self._parseSchema(chEl, cls) - def addElement(self, eid, ename, baseClass, attribs=None, parent=None, - docs=None): + def addElement(self, + eid: int, + ename: str, + baseClass, + attribs: Optional[Dict[str, Any]] = None, + parent=None, + docs: Optional[str] = None): """ Create a new `Element` subclass and add it to the schema. Duplicate elements are permitted (e.g. if one kind of element can @@ -1164,7 +1226,7 @@ def addElement(self, eid, ename, baseClass, attribs=None, parent=None, :param eid: The element's EBML ID. :param ename: The element's name. - :param baseClass: + :param baseClass: The base `Element` class. :param attribs: A dictionary of raw element attributes, as read from the schema file. :param parent: The new element's parent element class. @@ -1280,7 +1342,7 @@ def __repr__(self): except AttributeError: return object.__repr__(self) - def __eq__(self, other): + def __eq__(self, other) -> bool: """ Equality check. Schemata are considered equal if the attributes of their elements match. """ @@ -1289,23 +1351,27 @@ def __eq__(self, other): except AttributeError: return False - def __contains__(self, key): + def __contains__(self, key: Union[str, int]): """ Does the Schema contain a given element name or ID? """ return (key in self.elementsByName) or (key in self.elements) - def __getitem__(self, key): + def __getitem__(self, key: Union[str, int]): """ Get an Element class from the schema, by name or by ID. """ try: return self.elements[key] except KeyError: return self.elementsByName[key] - def get(self, key, default=None): + def get(self, key: Union[str, int, None], default=None): if key in self: return self[key] return default - def load(self, fp, name=None, headers=False, **kwargs): + def load(self, + fp: BinaryIO, + name: Optional[str] = None, + headers: bool = False, + **kwargs) -> Document: """ Load an EBML file using this Schema. :param fp: A file-like object containing the EBML to load, or the @@ -1318,7 +1384,7 @@ def load(self, fp, name=None, headers=False, **kwargs): """ return self.document(fp, name=name, headers=headers, **kwargs) - def loads(self, data, name=None): + def loads(self, data: bytes, name: Optional[str] = None) -> Document: """ Load EBML from a string using this Schema. :param data: A string or bytearray containing raw EBML data. @@ -1327,10 +1393,10 @@ def loads(self, data, name=None): """ return self.load(BytesIO(data), name=name) - def __call__(self, fp, name=None): + def __call__(self, fp: BinaryIO, name: Optional[str] = None): """ Load an EBML file using this Schema. Same as `Schema.load()`. - @todo: Decide if this is worth keeping. It exists for historical + :todo: Decide if this is worth keeping. It exists for historical reasons that may have been refactored out. :param fp: A file-like object containing the EBML to load, or the @@ -1351,12 +1417,12 @@ def _getInfo(self, eid, dtype): return None @property - def version(self): + def version(self) -> int: """ Schema version, extracted from EBML ``DocTypeVersion`` default. """ return self._getInfo(0x4287, int) # ID of EBML 'DocTypeVersion' @property - def type(self): + def type(self) -> str: """ Schema type name, extracted from EBML ``DocType`` default. """ return self._getInfo(0x4282, str) # ID of EBML 'DocType' @@ -1364,7 +1430,10 @@ def type(self): # Encoding # ========================================================================== - def encode(self, stream, data, headers=False): + def encode(self, + stream: BinaryIO, + data: Union[Dict[str, Any], List[Tuple[str, Any]]], + headers: bool = False): """ Write an EBML document using this Schema to a file or file-like stream. @@ -1373,23 +1442,30 @@ def encode(self, stream, data, headers=False): :param data: The data to encode, provided as a dictionary keyed by element name, or a list of two-item name/value tuples. Note: individual items in a list of name/value pairs *must* be tuples! + :param headers: If `True`, include the standard ``EBML`` header + element. """ self.document.encode(stream, data, headers=headers) return stream - def encodes(self, data, headers=False): + def encodes(self, + data: Union[Dict[str, Any], List[Tuple[str, Any]]], + headers: bool = False) -> bytes: """ Create an EBML document using this Schema, returned as a string. - :param data: The data to encode, provided as a dictionary keyed by - element name, or a list of two-item name/value tuples. Note: - individual items in a list of name/value pairs *must* be tuples! + :param data: The data to encode, provided as a dictionary keyed + by element name, or a list of two-item name/value tuples. + Note: individual items in a list of name/value pairs *must* + be tuples! + :param headers: If `True`, include the standard ``EBML`` header + element. :return: A string containing the encoded EBML binary. """ stream = BytesIO() self.encode(stream, data, headers=headers) return stream.getvalue() - def verify(self, data): + def verify(self, data: bytes) -> bool: """ Perform basic tests on EBML binary data, ensuring it can be parsed using this `Schema`. Failure will raise an expression. """ @@ -1413,11 +1489,12 @@ def _crawl(el): # # ============================================================================== -def _expandSchemaPath(path, name=''): +def _expandSchemaPath(path: Union[str, Path, types.ModuleType], + name: Union[str, Path] = '') -> Path: """ Helper function to process a schema path or name, converting module references to Paths. - :param path: The schema path. May be a directory name, a module + :param path: The schema path. It may be a directory name, a module name in braces (e.g., `{idelib.schemata}`), or a module instance. Directory and module names may contain schema filenames. @@ -1434,7 +1511,7 @@ def _expandSchemaPath(path, name=''): if '}' not in strpath: raise IOError(errno.ENOENT, 'Malformed module path', strpath) - m = re.match(r'(\{.+\})[/\\](.+)', strpath) + m = re.match(r'(\{.+})[/\\](.+)', strpath) if m: path, subdir = m.groups() strpath = path @@ -1456,7 +1533,7 @@ def _expandSchemaPath(path, name=''): return Path(path) / subdir / name -def listSchemata(*paths, absolute=True): +def listSchemata(*paths, absolute: bool = True) -> Dict[str, List[Schema]]: """ Gather all EBML schemata. `ebmlite.SCHEMA_PATH` is used by default; alternatively, one or more paths or modules can be supplied as arguments. @@ -1493,7 +1570,10 @@ def listSchemata(*paths, absolute=True): return schemata -def loadSchema(filename, reload=False, paths=None, **kwargs): +def loadSchema(filename: str, + reload: bool = False, + paths: Optional[str] = None, + **kwargs) -> Schema: """ Import a Schema XML file. Loading the same file more than once will return the initial instantiation, unless `reload` is `True`. @@ -1510,7 +1590,7 @@ def loadSchema(filename, reload=False, paths=None, **kwargs): Additional keyword arguments are sent verbatim to the `Schema` constructor. - @raises: IOError, ModuleNotFoundError + :raises: IOError, ModuleNotFoundError """ global SCHEMATA @@ -1551,7 +1631,10 @@ def loadSchema(filename, reload=False, paths=None, **kwargs): return schema -def parseSchema(src, name=None, reload=False, **kwargs): +def parseSchema(src: str, + name: Optional[str] = None, + reload: bool = False, + **kwargs) -> Schema: """ Read Schema XML data from a string or stream. Loading one with the same `name` will return the initial instantiation, unless `reload` is `True`. Calls to `loadSchema()` using a name previously used with diff --git a/ebmlite/decoding.py b/ebmlite/decoding.py index 2322997..bb2dbb0 100644 --- a/ebmlite/decoding.py +++ b/ebmlite/decoding.py @@ -15,6 +15,7 @@ from datetime import datetime, timedelta import struct +from typing import BinaryIO, Optional, Tuple import warnings # ============================================================================== @@ -42,10 +43,10 @@ # --- Reading and Decoding # ============================================================================== -def decodeIntLength(byte): +def decodeIntLength(byte: int) -> Tuple[int, int]: """ Extract the encoded size from an initial byte. - @return: The size, and the byte with the size removed (it is the first + :return: The size, and the byte with the size removed (it is the first byte of the value). """ # An inelegant implementation, but it's fast. @@ -67,11 +68,11 @@ def decodeIntLength(byte): return 8, 0 -def decodeIDLength(byte): +def decodeIDLength(byte: int) -> Tuple[int, int]: """ Extract the encoded ID size from an initial byte. - @return: The size and the original byte (it is part of the ID). - @raise IOError: raise if the length of an ID is invalid. + :return: The size and the original byte (it is part of the ID). + :raise IOError: raise if the length of an ID is invalid. """ if byte >= 128: return 1, byte @@ -86,12 +87,12 @@ def decodeIDLength(byte): raise IOError('Invalid length for ID: %d' % length) -def readElementID(stream): +def readElementID(stream: BinaryIO) -> Tuple[int, int]: """ Read an element ID from a file (or file-like stream). - @param stream: The source file-like object. - @return: The decoded element ID and its length in bytes. - @raise IOError: raised if the length of the ID of an element is greater than 4 bytes. + :param stream: The source file-like object. + :return: The decoded element ID and its length in bytes. + :raise IOError: raised if the length of the ID of an element is greater than 4 bytes. """ ch = stream.read(1) length, eid = decodeIDLength(ord(ch)) @@ -104,11 +105,11 @@ def readElementID(stream): return eid, length -def readElementSize(stream): +def readElementSize(stream: BinaryIO) -> Tuple[Optional[int], int]: """ Read an element size from a file (or file-like stream). - @param stream: The source file-like object. - @return: The decoded size (or `None`) and the length of the + :param stream: The source file-like object. + :return: The decoded size (or `None`) and the length of the descriptor in bytes. """ ch = stream.read(1) @@ -126,12 +127,12 @@ def readElementSize(stream): return size, length -def readUInt(stream, size): +def readUInt(stream: BinaryIO, size: int) -> int: """ Read an unsigned integer from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value. + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value. """ if size == 0: @@ -141,12 +142,12 @@ def readUInt(stream, size): return _struct_uint64_unpack_from(data.rjust(8, b'\x00'))[0] -def readInt(stream, size): +def readInt(stream: BinaryIO, size: int) -> int: """ Read a signed integer from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value. + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value. """ if size == 0: @@ -160,13 +161,13 @@ def readInt(stream, size): return _struct_int64_unpack_from(data.rjust(8, pad))[0] -def readFloat(stream, size): - """ Read an floating point value from a file (or file-like stream). +def readFloat(stream: BinaryIO, size: int) -> float: + """ Read a floating point value from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value. - @raise IOError: raised if the length of this floating point number is not + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value. + :raise IOError: raised if the length of this floating point number is not valid (0, 4, 8 bytes) """ if size == 4: @@ -180,12 +181,12 @@ def readFloat(stream, size): "only lengths of 0, 4, or 8 bytes supported." % size) -def readString(stream, size): +def readString(stream: BinaryIO, size: int) -> str: """ Read an ASCII string from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value. + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value. """ if size == 0: return u'' @@ -200,12 +201,12 @@ def readString(stream, size): return str(value, 'ascii', 'replace') -def readUnicode(stream, size): - """ Read an UTF-8 encoded string from a file (or file-like stream). +def readUnicode(stream: BinaryIO, size: int) -> str: + """ Read a UTF-8 encoded string from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value. + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value. """ if size == 0: @@ -216,14 +217,14 @@ def readUnicode(stream, size): return str(data, 'utf_8') -def readDate(stream, size=8): +def readDate(stream: BinaryIO, size: int = 8) -> datetime: """ Read an EBML encoded date (nanoseconds since UTC 2001-01-01T00:00:00) from a file (or file-like stream). - @param stream: The source file-like object. - @param size: The number of bytes to read from the stream. - @return: The decoded value (as `datetime.datetime`). - @raise IOError: raised if the length of the date is not 8 bytes. + :param stream: The source file-like object. + :param size: The number of bytes to read from the stream. + :return: The decoded value (as `datetime.datetime`). + :raise IOError: raised if the length of the date is not 8 bytes. """ if size != 8: raise IOError("Cannot read date value of length %d, only 8." % size) diff --git a/ebmlite/encoding.py b/ebmlite/encoding.py index ebe367a..f7b9e53 100644 --- a/ebmlite/encoding.py +++ b/ebmlite/encoding.py @@ -14,6 +14,7 @@ import datetime import struct import sys +from typing import AnyStr, Optional import warnings from .decoding import _struct_uint64, _struct_int64 @@ -45,11 +46,11 @@ # ============================================================================== -def getLength(val): +def getLength(val: int) -> int: """ Calculate the encoded length of a value. - @param val: A value to be encoded, generally either an ID or a size for + :param val: A value to be encoded, generally either an ID or a size for an EBML element - @return The minimum length, in bytes, that can be used to represent val + :return The minimum length, in bytes, that can be used to represent val """ # Brute force it. Ugly but faster than calculating it. if val <= 126: @@ -70,15 +71,15 @@ def getLength(val): return 8 -def encodeSize(val, length=None): +def encodeSize(val: Optional[int], length: Optional[int] = None) -> bytes: """ Encode an element size. - @param val: The size to encode. If `None`, the EBML 'unknown' size + :param val: The size to encode. If `None`, the EBML 'unknown' size will be returned (1 or `length` bytes, all bits 1). - @keyword length: An explicit length for the encoded size. If `None`, + :param length: An explicit length for the encoded size. If `None`, the size will be encoded at the minimum length required. - @return: an encoded size for an EBML element. - @raise ValueError: raised if the length is invalid, or the length cannot + :return: an encoded size for an EBML element. + :raise ValueError: raised if the length is invalid, or the length cannot be encoded. """ if val is None: @@ -98,16 +99,16 @@ def encodeSize(val, length=None): # --- Encoding # ============================================================================== -def encodeId(eid, length=None): +def encodeId(eid: int, length: Optional[int] = None) -> bytes: """ Encode an element ID. - @param eid: The EBML ID to encode. - @keyword length: An explicit length for the encoded data. A `ValueError` + :param eid: The EBML ID to encode. + :param length: An explicit length for the encoded data. A `ValueError` will be raised if the length is too short to encode the value. - @return: The binary representation of ID, left-padded with ``0x00`` if + :return: The binary representation of ID, left-padded with ``0x00`` if `length` is not `None`. - @return: The encoded version of the ID. - @raise ValueError: raised if length is less than one or more than 4. + :return: The encoded version of the ID. + :raise ValueError: raised if length is less than one or more than 4. """ if length is not None: if length < 1 or length > 4: @@ -119,15 +120,15 @@ def encodeId(eid, length=None): raise TypeError('Cannot encode {} {!r} as ID'.format(type(eid).__name__, eid)) -def encodeUInt(val, length=None): +def encodeUInt(val: int, length: Optional[int] = None) -> bytes: """ Encode an unsigned integer. - @param val: The unsigned integer value to encode. - @keyword length: An explicit length for the encoded data. A `ValueError` + :param val: The unsigned integer value to encode. + :param length: An explicit length for the encoded data. A `ValueError` will be raised if the length is too short to encode the value. - @return: The binary representation of val as an unsigned integer, + :return: The binary representation of val as an unsigned integer, left-padded with ``0x00`` if `length` is not `None`. - @raise ValueError: raised if val is longer than length. + :raise ValueError: raised if val is longer than length. """ if isinstance(val, float): fval, val = val, int(val) @@ -155,16 +156,16 @@ def encodeUInt(val, length=None): return packed.rjust(length, pad) -def encodeInt(val, length=None): +def encodeInt(val: int, length: Optional[int] = None) -> bytes: """ Encode a signed integer. - @param val: The signed integer value to encode. - @keyword length: An explicit length for the encoded data. A `ValueError` + :param val: The signed integer value to encode. + :param length: An explicit length for the encoded data. A `ValueError` will be raised if the length is too short to encode the value. - @return: The binary representation of val as a signed integer, + :return: The binary representation of val as a signed integer, left-padded with either ```0x00`` (for positive values) or ``0xFF`` (for negative) if `length` is not `None`. - @raise ValueError: raised if val is longer than length. + :raise ValueError: raised if val is longer than length. """ if isinstance(val, float): fval, val = val, int(val) @@ -194,15 +195,15 @@ def encodeInt(val, length=None): raise TypeError('Cannot encode {} {!r} as integer'.format(type(val).__name__, val)) -def encodeFloat(val, length=None): +def encodeFloat(val: float, length: Optional[int] = None) -> bytes: """ Encode a floating point value. - @param val: The floating point value to encode. - @keyword length: An explicit length for the encoded data. Must be + :param val: The floating point value to encode. + :param length: An explicit length for the encoded data. Must be `None`, 0, 4, or 8; otherwise, a `ValueError` will be raised. - @return: The binary representation of val as a float, left-padded with + :return: The binary representation of val as a float, left-padded with ``0x00`` if `length` is not `None`. - @raise ValueError: raised if val not length 0, 4, or 8 + :raise ValueError: raised if val not length 0, 4, or 8 """ if length is None: if val is None or val == 0.0: @@ -224,16 +225,16 @@ def encodeFloat(val, length=None): raise TypeError('Cannot encode {} {!r} as float'.format(type(val).__name__, val)) -def encodeBinary(val, length=None): +def encodeBinary(val: AnyStr, length: Optional[int] = None) -> bytes: """ Encode binary data. - @param val: A string, bytes, or bytearray containing the data to encode. - @keyword length: An explicit length for the encoded data. A + :param val: A string, bytes, or bytearray containing the data to encode. + :param length: An explicit length for the encoded data. A `ValueError` will be raised if `length` is shorter than the actual length of the binary data. - @return: The binary representation of value as binary data, left-padded + :return: The binary representation of value as binary data, left-padded with ``0x00`` if `length` is not `None`. - @raise ValueError: raised if val is longer than length. + :raise ValueError: raised if val is longer than length. """ if val is None: val = b'' @@ -251,13 +252,13 @@ def encodeBinary(val, length=None): (len(val), length)) -def encodeString(val, length=None): +def encodeString(val: AnyStr, length: Optional[int] = None) -> bytes: """ Encode an ASCII string. - @param val: The string (or bytearray) to encode. - @keyword length: An explicit length for the encoded data. The result + :param val: The string (or bytearray) to encode. + :param length: An explicit length for the encoded data. The result will be truncated if the original string is longer. - @return: The binary representation of val as a string, truncated or + :return: The binary representation of val as a string, truncated or left-padded with ``0x00`` if `length` is not `None`. """ if isinstance(val, str): @@ -271,13 +272,13 @@ def encodeString(val, length=None): return encodeBinary(val.translate(STRING_CHARACTERS), length) -def encodeUnicode(val, length=None): +def encodeUnicode(val: str, length: Optional[int] = None) -> bytes: """ Encode a Unicode string. - @param val: The Unicode string to encode. - @keyword length: An explicit length for the encoded data. The result + :param val: The Unicode string to encode. + :param length: An explicit length for the encoded data. The result will be truncated if the original string is longer. - @return: The binary representation of val as a string, truncated or + :return: The binary representation of val as a string, truncated or left-padded with ``0x00`` if `length` is not `None`. """ if not isinstance(val, (bytearray, bytes, str)): @@ -291,15 +292,15 @@ def encodeUnicode(val, length=None): return encodeBinary(val, length) -def encodeDate(val, length=None): +def encodeDate(val: datetime.datetime, length: Optional[int] = None) -> bytes: """ Encode a `datetime` object as an EBML date (i.e. nanoseconds since 2001-01-01T00:00:00). - @param val: The `datetime.datetime` object value to encode. - @keyword length: An explicit length for the encoded data. Must be + :param val: The `datetime.datetime` object value to encode. + :param length: An explicit length for the encoded data. Must be `None` or 8; otherwise, a `ValueError` will be raised. - @return: The binary representation of val as an 8-byte dateTime. - @raise ValueError: raised if the length of the input is not 8 bytes. + :return: The binary representation of val as an 8-byte dateTime. + :raise ValueError: raised if the length of the input is not 8 bytes. """ if length is None: length = 8 diff --git a/ebmlite/threaded_file.py b/ebmlite/threaded_file.py index 0fae09d..3fa0fe6 100644 --- a/ebmlite/threaded_file.py +++ b/ebmlite/threaded_file.py @@ -1,4 +1,4 @@ -''' +""" A special-case, drop-in 'replacement' for a standard read-only file stream that supports simultaneous access by multiple threads without (explicit) blocking. Each thread actually gets its own stream, so it can perform its @@ -6,7 +6,7 @@ functionality is transparent. @author: dstokes -''' +""" __author__ = "David Randall Stokes, Connor Flanigan" __copyright__ = "Copyright 2021, Mide Technology Corporation" __credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt" @@ -16,6 +16,8 @@ import io import platform from threading import currentThread, Event +from typing import BinaryIO, TextIO, Union + class ThreadAwareFile(io.FileIO): """ A 'replacement' for a standard read-only file stream that supports @@ -28,7 +30,7 @@ class ThreadAwareFile(io.FileIO): the standard attributes and properties. Most of these affect only the current thread. - @var timeout: A value (in seconds) for blocking operations to wait. + :var timeout: A value (in seconds) for blocking operations to wait. Very few operations block; specifically, only those that do (or depend upon) internal housekeeping. Timeout should only occur in certain extreme conditions (e.g. filesystem-related file @@ -71,7 +73,7 @@ def __init__(self, *args, **kwargs): self._mode = mode - def __repr__(self): + def __repr__(self) -> str: # Format the object's ID appropriately for the architecture (32b/64b) if '32' in platform.architecture()[0]: fmt = "<%s %s %r, mode %r at 0x%08X>" @@ -86,7 +88,7 @@ def __repr__(self): @classmethod - def makeThreadAware(cls, fileStream): + def makeThreadAware(cls, fileStream: Union[TextIO, BinaryIO]) -> "ThreadAwareFile": """ Create a new `ThreadAwareFile` from an already-open file. If the object is a `ThreadAwareFile`, it is returned verbatim. """ @@ -100,7 +102,7 @@ def makeThreadAware(cls, fileStream): return f - def getThreadStream(self): + def getThreadStream(self) -> Union[TextIO, BinaryIO]: """ Get (or create) the file stream for the current thread. """ self._ready.wait(self.timeout) @@ -143,7 +145,7 @@ def cleanup(self): @property - def closed(self): + def closed(self) -> bool: """ Is the file not open? Note: A thread that never accessed the file will get `True`. """ @@ -153,56 +155,50 @@ def closed(self): return True - def close(self, *args, **kwargs): + def close(self): """ Close the file for the current thread. The file will remain open for other threads. """ - result = self.getThreadStream().close(*args, **kwargs) + result = self.getThreadStream().close() self.cleanup() return result # Standard file methods, overridden - def __format__(self, *args, **kwargs): - return self.getThreadStream().__format__(*args, **kwargs) - - def __hash__(self, *args, **kwargs): - return self.getThreadStream().__hash__(*args, **kwargs) + def __format__(self, *args): + return self.getThreadStream().__format__(*args) - def __iter__(self, *args, **kwargs): - return self.getThreadStream().__iter__(*args, **kwargs) + def __hash__(self): + return self.getThreadStream().__hash__() - def __reduce__(self, *args, **kwargs): - return self.getThreadStream().__reduce__(*args, **kwargs) + def __iter__(self): + return self.getThreadStream().__iter__() - def __reduce_ex__(self, *args, **kwargs): - return self.getThreadStream().__reduce_ex__(*args, **kwargs) + def __reduce__(self): + return self.getThreadStream().__reduce__() - def __sizeof__(self, *args, **kwargs): - return self.getThreadStream().__sizeof__(*args, **kwargs) + def __reduce_ex__(self, *args): + return self.getThreadStream().__reduce_ex__(*args) - def __str__(self, *args, **kwargs): - return self.getThreadStream().__str__(*args, **kwargs) + def __sizeof__(self): + return self.getThreadStream().__sizeof__() - def fileno(self, *args, **kwargs): - return self.getThreadStream().fileno(*args, **kwargs) + def __str__(self): + return self.getThreadStream().__str__() - def flush(self, *args, **kwargs): - return self.getThreadStream().flush(*args, **kwargs) + def fileno(self): + return self.getThreadStream().fileno() - def isatty(self, *args, **kwargs): - return self.getThreadStream().isatty(*args, **kwargs) + def flush(self): + return self.getThreadStream().flush() - def next(self, *args, **kwargs): - return self.getThreadStream().next(*args, **kwargs) + def isatty(self): + return self.getThreadStream().isatty() def read(self, *args, **kwargs): return self.getThreadStream().read(*args, **kwargs) - def readinto(self, *args, **kwargs): - return self.getThreadStream().readinto(*args, **kwargs) - def readline(self, *args, **kwargs): return self.getThreadStream().readline(*args, **kwargs) @@ -212,8 +208,8 @@ def readlines(self, *args, **kwargs): def seek(self, *args, **kwargs): return self.getThreadStream().seek(*args, **kwargs) - def tell(self, *args, **kwargs): - return self.getThreadStream().tell(*args, **kwargs) + def tell(self): + return self.getThreadStream().tell() def truncate(self, *args, **kwargs): raise IOError("Can't truncate(); %s is read-only" % @@ -227,11 +223,8 @@ def writelines(self, *args, **kwargs): raise IOError("Can't writelines(); %s is read-only" % self.__class__.__name__) - def xreadlines(self, *args, **kwargs): - return self.getThreadStream().xreadlines(*args, **kwargs) - def __enter__(self, *args, **kwargs): - return self.getThreadStream().__enter__(*args, **kwargs) + return self.getThreadStream().__enter__() def __exit__(self, *args, **kwargs): return self.getThreadStream().__exit__(*args, **kwargs) @@ -259,11 +252,3 @@ def name(self): @property def newlines(self): return self.getThreadStream().newlines - - @property - def softspace(self): - return self.getThreadStream().softspace - - @softspace.setter - def softspace(self, val): - self.getThreadStream().softspace = val diff --git a/ebmlite/tools/list_schemata.py b/ebmlite/tools/list_schemata.py index 3aef585..be39c5b 100644 --- a/ebmlite/tools/list_schemata.py +++ b/ebmlite/tools/list_schemata.py @@ -34,4 +34,3 @@ def main(): if __name__ == "__main__": main() - diff --git a/ebmlite/tools/utils.py b/ebmlite/tools/utils.py index 136412a..7169429 100644 --- a/ebmlite/tools/utils.py +++ b/ebmlite/tools/utils.py @@ -17,6 +17,7 @@ def load_files(args, binary_output=False): sys.stderr.write("Input file does not exist: %s\n" % args.input) exit(1) + schema = None try: schema_file = args.schema if os.path.splitext(schema_file.strip())[1] == '': @@ -26,11 +27,11 @@ def load_files(args, binary_output=False): errPrint("Error loading schema: %s\n" % err) if not args.output: - yield (schema, sys.stdout) + yield schema, sys.stdout return output = os.path.realpath(os.path.expanduser(args.output)) if os.path.exists(output) and not args.clobber: errPrint("Error: Output file already exists: %s" % args.output) with open(output, ('wb' if binary_output else 'w')) as out: - yield (schema, out) + yield schema, out diff --git a/ebmlite/util.py b/ebmlite/util.py index b8fecd5..f568653 100644 --- a/ebmlite/util.py +++ b/ebmlite/util.py @@ -4,10 +4,10 @@ Created on Aug 11, 2017 -@todo: Clean up and standardize usage of the term 'size' versus 'length.' -@todo: Modify (or create an alternate version of) `toXml()` that writes +:todo: Clean up and standardize usage of the term 'size' versus 'length.' +:todo: Modify (or create an alternate version of) `toXml()` that writes directly to a file, allowing the conversion of huge EBML files. -@todo: Add other options to command-line utility for the other arguments of +:todo: Add other options to command-line utility for the other arguments of `toXml()` and `xml2ebml()`. """ __author__ = "David Randall Stokes, Connor Flanigan" @@ -18,12 +18,12 @@ 'printSchemata', 'flatiter'] import ast -from base64 import b64encode, b64decode -from io import StringIO +from io import BytesIO import pathlib import struct import sys import tempfile +from typing import BinaryIO, Callable, IO, List, Optional, Tuple, Union from xml.etree import ElementTree as ET from . import core, encoding, decoding @@ -34,22 +34,27 @@ # ============================================================================== -def createID(schema, idClass, exclude=(), minId=0x81, maxId=0x1FFFFFFE, count=1): +def createID(schema: core.Schema, + idClass: str, + exclude: Tuple[int] = (), + minId: int = 0x81, + maxId: int = 0x1FFFFFFE, + count: int = 1) -> List[int]: """ Generate unique EBML IDs. Primarily intended for use 'offline' by humans creating EBML schemata. - @param schema: The `Schema` in which the new IDs must coexist. - @param idClass: The EBML class of ID, one of (case-insensitive): + :param schema: The `Schema` in which the new IDs must coexist. + :param idClass: The EBML class of ID, one of (case-insensitive): * `'a'`: Class A (1 octet, base 0x8X) * `'b'`: Class B (2 octets, base 0x4000) * `'c'`: Class C (3 octets, base 0x200000) * `'d'`: Class D (4 octets, base 0x10000000) - @param exclude: A list of additional IDs to avoid. - @param minId: The minimum ID value, within the ID class' range. - @param maxId: The maximum ID value, within the ID class' range. - @param count: The maximum number of IDs to generate. The result may be + :param exclude: A list of additional IDs to avoid. + :param minId: The minimum ID value, within the ID class' range. + :param maxId: The maximum ID value, within the ID class' range. + :param count: The maximum number of IDs to generate. The result may be fewer than specified if too few meet the given criteria. - @return: A list of EBML IDs that match the given criteria. + :return: A list of EBML IDs that match the given criteria. """ ranges = dict(A=(0x81, 0xFE), B=(0x407F, 0x7FFE), @@ -75,7 +80,7 @@ def createID(schema, idClass, exclude=(), minId=0x81, maxId=0x1FFFFFFE, count=1) return result -def validateID(elementId): +def validateID(elementId: int) -> bool: """ Verify that a number is a valid EBML element ID. A `ValueError` will be raised if the element ID is invalid. @@ -85,8 +90,8 @@ def validateID(elementId): * C: 0x203FFF to 0x3FFFFE * D: 0x101FFFFF to 0x1FFFFFFE - @param elementId: The element ID to validate - @raises: `ValueError`, although certain edge cases may raise + :param elementId: The element ID to validate + :raises: `ValueError`, although certain edge cases may raise another type. """ ranges = ((0x81, 0xFE), (0x407F, 0x7FFE), (0x203FFF, 0x3FFFFE), (0x101FFFFF, 0x1FFFFFFE)) @@ -123,30 +128,36 @@ def validateID(elementId): # ============================================================================== -def toXml(el, parent=None, offsets=True, sizes=True, types=True, ids=True, - binary_codec='base64', void_codec='ignore'): +def toXml(el: core.Element, + parent=None, + offsets: bool = True, + sizes: bool = True, + types: bool = True, + ids: bool = True, + binary_codec: Union[Callable, str] = 'base64', + void_codec: Union[Callable, str] = 'ignore'): """ Convert an EBML Document to XML. Binary elements will contain base64-encoded data in their body. Other non-master elements will contain their value in a ``value`` attribute. - @param el: An instance of an EBML Element or Document subclass. - @keyword parent: The resulting XML element's parent element, if any. - @keyword offsets: If `True`, create a ``offset`` attributes for each + :param el: An instance of an EBML Element or Document subclass. + :param parent: The resulting XML element's parent element, if any. + :param offsets: If `True`, create a ``offset`` attributes for each generated XML element, containing the corresponding EBML element's offset. - @keyword sizes: If `True`, create ``size`` attributes containing the + :param sizes: If `True`, create ``size`` attributes containing the corresponding EBML element's size. - @keyword types: If `True`, create ``type`` attributes containing the + :param types: If `True`, create ``type`` attributes containing the name of the corresponding EBML element type. - @keyword ids: If `True`, create ``id`` attributes containing the + :param ids: If `True`, create ``id`` attributes containing the corresponding EBML element's EBML ID. - @keyword binary_codec: The name of an XML codec class from + :param binary_codec: The name of an XML codec class from `ebmlite.xml_codecs`, or an instance of a codec, for rendering binary elements as text. - @keyword void_codec: The name of an XML codec class from + :param void_codec: The name of an XML codec class from `ebmlite.xml_codecs`, or an instance of a codec, for rendering the contents of Void elements as text. - @return The root XML element of the file. + :return The root XML element of the file. """ if isinstance(binary_codec, str): binary_codec = xml_codecs.BINARY_CODECS[binary_codec]() @@ -194,26 +205,30 @@ def toXml(el, parent=None, offsets=True, sizes=True, types=True, ids=True, return xmlEl -# ============================================================================== +# =========================================================================== # -# ============================================================================== +# =========================================================================== -def xmlElement2ebml(xmlEl, ebmlFile, schema, sizeLength=None, unknown=True): +def xmlElement2ebml(xmlEl, + ebmlFile: BinaryIO, + schema: core.Schema, + sizeLength: Optional[int] = None, + unknown: bool = True): """ Convert an XML element to EBML, recursing if necessary. For converting an entire XML document, use `xml2ebml()`. - @param xmlEl: The XML element. Its tag must match an element defined + :param xmlEl: The XML element. Its tag must match an element defined in the `schema`. - @param ebmlFile: An open file-like stream, to which the EBML data will + :param ebmlFile: An open file-like stream, to which the EBML data will be written. - @param schema: An `ebmlite.core.Schema` instance to use when + :param schema: An `ebmlite.core.Schema` instance to use when writing the EBML document. - @keyword sizeLength: - @param unknown: If `True`, unknown element names will be allowed, + :param sizeLength: + :param unknown: If `True`, unknown element names will be allowed, provided their XML elements include an ``id`` attribute with the EBML ID (in hexadecimal). - @return The length of the encoded element, including header and children. - @raise NameError: raised if an xml element is not present in the schema and unknown is False, OR if the xml + :return The length of the encoded element, including header and children. + :raise NameError: raised if an XML element is not present in the schema and unknown is False, OR if the xml element does not have an ID. """ if not isinstance(xmlEl.tag, (str, bytes, bytearray)): @@ -284,30 +299,34 @@ def xmlElement2ebml(xmlEl, ebmlFile, schema, sizeLength=None, unknown=True): return len(encoded) -def xml2ebml(xmlFile, ebmlFile, schema, sizeLength=None, headers=True, - unknown=True): +def xml2ebml(xmlFile, + ebmlFile: BinaryIO, + schema: Union[str, core.Schema], + sizeLength: Optional[int] = None, + headers: bool = True, + unknown: bool = True): """ Convert an XML file to EBML. - @todo: Convert XML on the fly, rather than parsing it first, allowing + :todo: Convert XML on the fly, rather than parsing it first, allowing for the conversion of arbitrarily huge files. - @param xmlFile: The XML source. Can be a filename, an open file-like + :param xmlFile: The XML source. Can be a filename, an open file-like stream, or a parsed XML document. - @param ebmlFile: The EBML file to write. Can be a filename or an open + :param ebmlFile: The EBML file to write. Can be a filename or an open file-like stream. - @param schema: The EBML schema to use. Can be a filename or an + :param schema: The EBML schema to use. Can be a filename or an instance of a `Schema`. - @keyword sizeLength: The default length of each element's size + :param sizeLength: The default length of each element's size descriptor. Must be large enough to store the largest 'master' element. If an XML element has a ``sizeLength`` attribute, it will override this. - @keyword headers: If `True`, generate the standard ``EBML`` EBML + :param headers: If `True`, generate the standard ``EBML`` EBML element if the XML document does not contain one. - @param unknown: If `True`, unknown element names will be allowed, + :param unknown: If `True`, unknown element names will be allowed, provided their XML elements include an ``id`` attribute with the EBML ID (in hexadecimal). - @return: the size of the ebml file in bytes. - @raise NameError: raises if an xml element is not present in the schema. + :return: the size of the ebml file in bytes. + :raise NameError: raises if an xml element is not present in the schema. """ if isinstance(ebmlFile, (str, bytes, bytearray)): ebmlFile = open(ebmlFile, 'wb') @@ -354,25 +373,27 @@ def xml2ebml(xmlFile, ebmlFile, schema, sizeLength=None, headers=True, return numBytes -#=============================================================================== +# =========================================================================== # -#=============================================================================== +# =========================================================================== -def loadXml(xmlFile, schema, ebmlFile=None): +def loadXml(xmlFile, + schema: core.Schema, + ebmlFile: Union[BinaryIO, str, None] = None): """ Helpful utility to load an EBML document from an XML file. - @param xmlFile: The XML source. Can be a filename, an open file-like + :param xmlFile: The XML source. Can be a filename, an open file-like stream, or a parsed XML document. - @param schema: The EBML schema to use. Can be a filename or an + :param schema: The EBML schema to use. Can be a filename or an instance of a `Schema`. - @keyword ebmlFile: The name of the temporary EBML file to write, or + :param ebmlFile: The name of the temporary EBML file to write, or ``:memory:`` to use RAM (like `sqlite3`). Defaults to an automatically-generated temporary file. - @return The root node of the specified EBML file. + :return The root node of the specified EBML file. """ if ebmlFile == ":memory:": - ebmlFile = StringIO() + ebmlFile = BytesIO() xml2ebml(xmlFile, ebmlFile, schema) ebmlFile.seek(0) else: @@ -382,23 +403,28 @@ def loadXml(xmlFile, schema, ebmlFile=None): return schema.load(ebmlFile) -#=============================================================================== +# =========================================================================== # -#=============================================================================== - -def pprint(el, values=True, out=sys.stdout, indent=" ", binary_codec="ignore", - void_codec="ignore", _depth=0): +# =========================================================================== + +def pprint(el: core.Element, + values: bool = True, + out: IO = sys.stdout, + indent: str = " ", + binary_codec: Union[Callable, str] = "ignore", + void_codec: Union[Callable, str] = "ignore", + _depth: int = 0): """ Test function to recursively crawl an EBML document or element and print its structure, with child elements shown indented. - @param el: An instance of a `Document` or `Element` subclass. - @keyword values: If `True`, show elements' values. - @keyword out: A file-like stream to which to write. - @keyword indent: The string containing the character(s) used for each + :param el: An instance of a `Document` or `Element` subclass. + :param values: If `True`, show elements' values. + :param out: A file-like stream to which to write. + :param indent: The string containing the character(s) used for each indentation. - @keyword binary_codec: The name of a class from `ebmlite.xml_codecs`, + :param binary_codec: The name of a class from `ebmlite.xml_codecs`, or an instance of a codec, for rendering binary elements as text. - @keyword void_codec: The name of a class from `ebmlite.xml_codecs`, + :param void_codec: The name of a class from `ebmlite.xml_codecs`, or an instance of a codec, for rendering the contents of Void elements as text. """ @@ -444,15 +470,21 @@ def pprint(el, values=True, out=sys.stdout, indent=" ", binary_codec="ignore", out.flush() -#=============================================================================== +# =========================================================================== # -#=============================================================================== +# =========================================================================== -def printSchemata(paths=None, out=sys.stdout, absolute=True): +def printSchemata(paths: Optional[List[str]] = None, + out: Union[str, IO] = sys.stdout, + absolute: bool = True): """ Display a list of schemata in `SCHEMA_PATH`. A thin wrapper for the core `listSchemata()` function. - @param out: A file-like stream to which to write. + :param paths: A list of paths to search for schemata, in addition to + those in `SCHEMA_PATH`. + :param out: A file-like stream or filename to which to write. + :param absolute: If `True`, use absolute paths in the schema + filenames. """ out = out or sys.stdout newfile = isinstance(out, (str, pathlib.Path)) diff --git a/ebmlite/xml_codecs.py b/ebmlite/xml_codecs.py index 225bc28..62407ca 100644 --- a/ebmlite/xml_codecs.py +++ b/ebmlite/xml_codecs.py @@ -6,6 +6,7 @@ import base64 from io import BytesIO, StringIO +from typing import BinaryIO, Optional, Union # ============================================================================== @@ -30,7 +31,12 @@ def __init__(self, **kwargs): """ pass - def encode(self, data, stream=None, indent='', offset=0, **kwargs): + def encode(self, + data: bytes, + stream: Optional[BinaryIO] = None, + indent: Union[str, bytes] = '', + offset: int = 0, + **kwargs): """ Convert binary data to text. Typical arguments: :param data: The binary data from an EBML `BinaryElement`. @@ -69,7 +75,7 @@ class Base64Codec(BinaryCodec): """ NAME = "base64" - def __init__(self, cols=76, **kwargs): + def __init__(self, cols=76, **_kwargs): """ Constructor. :param cols: The length of each line of base64 data, excluding @@ -80,10 +86,15 @@ def __init__(self, cols=76, **kwargs): Additional keyword arguments will be accepted (to maintain compatibility with other codecs) but ignored. """ + super().__init__() self.cols = cols - def encode(self, data, stream=None, indent='', **kwargs): + def encode(self, + data: bytes, + stream: Optional[BinaryIO] = None, + indent: Union[str, bytes] = '', + **kwargs) -> Union[str, int]: """ Convert binary data to base64 text. :param data: The binary data from an EBML `BinaryElement`. @@ -176,7 +187,11 @@ class HexCodec(BinaryCodec): # The name shown in the encoded XML element's `encoding` attribute NAME = "hex" - def __init__(self, width=2, cols=32, offsets=True, **kwargs): + def __init__(self, + width: int = 2, + cols: int = 32, + offsets: bool = True, + **_kwargs): """ Constructor. :param width: The number of bytes displayed per column when @@ -187,12 +202,18 @@ def __init__(self, width=2, cols=32, offsets=True, **kwargs): :param offsets: If `True`, each line will start with its offset (in decimal). Applicable if `cols` is a non-zero number. """ + super().__init__() self.width = width self.cols = cols self.offsets = bool(offsets and cols) - def encode(self, data, stream=None, offset=0, indent='', **kwargs): + def encode(self, + data: bytes, + stream: Optional[BinaryIO] = None, + offset: int = 0, + indent='', + **kwargs) -> Union[str, int]: """ Convert binary data to hexadecimal text. :param data: The binary data from an EBML `BinaryElement`. @@ -233,7 +254,9 @@ def encode(self, data, stream=None, offset=0, indent='', **kwargs): @classmethod - def decode(cls, data, stream=None): + def decode(cls, + data: bytes, + stream: Optional[BinaryIO] = None) -> Union[bytes, int]: """ Decode binary data in hexadecimal (e.g., from an XML file). Note: this is a `classmethod`, and works regardles of how the encoded data was formatted (e.g., number of columns, with or without @@ -281,13 +304,17 @@ class IgnoreCodec(BinaryCodec): NAME = "ignore" @staticmethod - def encode(data, stream=None, **kwargs): + def encode(data: bytes, + stream: Optional[BinaryIO] = None, + **kwargs) -> Union[str, int]: if stream: return 0 return '' @staticmethod - def decode(data, stream=None, **kwargs): + def decode(data: bytes, + stream: Optional[BinaryIO] = None, + **kwargs) -> Union[bytes, int]: if stream: return 0 return b''