Skip to content

Commit

Permalink
Added Python files to module
Browse files Browse the repository at this point in the history
  • Loading branch information
MatrixEditor committed Apr 26, 2024
1 parent af7c4e4 commit 0e651f9
Show file tree
Hide file tree
Showing 23 changed files with 5,940 additions and 32 deletions.
60 changes: 28 additions & 32 deletions src/caterpillar/_C.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from _typeshed import Incomplete
from typing import Any
from typing import Any, Optional, Collection, Union

DefaultOption: DefaultOptionType
FIELD_OPTIONS: set
Expand Down Expand Up @@ -152,21 +154,21 @@ class Struct(fieldatom):
def __init__(self, *args, **kwargs) -> None: ...

class UnaryExpr:
expr: Incomplete
value: Incomplete
expr: int
value: Any
def __init__(self, expr, value) -> Any: ...
def __call__(self, *args, **kwargs): ...
def __hash__(self) -> int: ...

class atom:
def __init__(self, *args, **kwargs) -> None: ...
def __init__(self) -> None: ...
def __pack__(self, obj, ctx) -> Any: ...
def __size__(self, ctx) -> Any: ...
def __type__(self) -> Any: ...
def __unpack__(self, ctx) -> Any: ...

class catom(atom):
def __init__(self, *args, **kwargs) -> None: ...
def __init__(self) -> None: ...
def __pack__(self, *args, **kwargs): ...
def __pack_many__(self, *args, **kwargs): ...
def __size__(self, *args, **kwargs): ...
Expand All @@ -175,7 +177,7 @@ class catom(atom):
def __unpack_many__(self, *args, **kwargs): ...

class fieldatom(atom):
def __init__(self, *args, **kwargs) -> None: ...
def __init__(self) -> None: ...
def __add__(self, other): ...
def __floordiv__(self, other): ...
def __getitem__(self, index): ...
Expand All @@ -191,7 +193,7 @@ class fieldatom(atom):
def __xor__(self, other): ...

class fieldcatom(catom):
def __init__(self, *args, **kwargs) -> None: ...
def __init__(self) -> None: ...
def __add__(self, other): ...
def __floordiv__(self, other): ...
def __getitem__(self, index): ...
Expand All @@ -207,33 +209,27 @@ class fieldcatom(catom):
def __xor__(self, other): ...

class fieldinfo:
excluded: Incomplete
field: Incomplete
def __init__(self, *args, **kwargs) -> None: ...

class intatom(fieldcatom):
bytes: Incomplete
little_endian: Incomplete
signed: Incomplete
def __init__(self, *args, **kwargs) -> None: ...
excluded: bool
field: Field
def __init__(self, field: Field, excluded: bool = ...) -> None: ...

class layer:
field: Incomplete
greedy: Incomplete
index: Incomplete
length: Incomplete
obj: Incomplete
parent: Incomplete
path: Incomplete
sequence: Incomplete
sequential: Incomplete
state: Incomplete
value: Incomplete
field: Field
greedy: bool
index: int
length: int
obj: Context
parent: layer
path: str
sequence: Collection
sequential: bool
state: State
value: Any
def __init__(self, *args, **kwargs) -> None: ...
def __context_getattr__(self, *args, **kwargs): ...

def pack(*args, **kwargs): ...
def pack_into(*args, **kwargs): ...
def sizeof(*args, **kwargs): ...
def typeof(*args, **kwargs): ...
def unpack(*args, **kwargs): ...
def pack(obj: Any, struct: atom, **kwargs) -> bytes: ...
def pack_into(obj: Any, struct: atom, io: Any, **kwargs): ...
def sizeof(struct: atom, **kwargs): ...
def typeof(struct: atom, **kwargs): ...
def unpack(io: Any, struct: atom, **kwargs): ...
179 changes: 179 additions & 0 deletions src/caterpillar/_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Copyright (C) MatrixEditor 2023
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import itertools

from typing import List, Any, Union

from caterpillar.abc import _GreedyType, _ContextLike, _StreamType, _PrefixedType
from caterpillar.context import (
Context,
CTX_PATH,
CTX_FIELD,
CTX_POS,
CTX_INDEX,
CTX_OBJECT,
CTX_STREAM,
CTX_SEQ,
)
from caterpillar.exception import Stop, StructException, InvalidValueError
from caterpillar.options import O_ARRAY_FACTORY


class WithoutContextVar:
def __init__(self, context: _ContextLike, name, value) -> None:
self.context = context
self.old_value = context[name]
self.value = value
self.name = name
self.field = context[CTX_FIELD]

def __enter__(self) -> None:
self.context[self.name] = self.value

def __exit__(self, exc_type, exc_value, traceback) -> None:
self.context[self.name] = self.old_value
# We have to apply the right field as instance of the Field class
# might set their own value into the context.
self.context[CTX_FIELD] = self.field


def unpack_seq(context: _ContextLike, unpack_one) -> List[Any]:
"""Generic function to unpack sequenced elements.
:param stream: the input stream
:type stream: _StreamType
:param context: the current context
:type context: _ContextLike
:return: a list of unpacked elements
:rtype: List[Any]
"""
stream = context[CTX_STREAM]
field = context[CTX_FIELD]
assert field and context[CTX_SEQ]

length: Union[int, _GreedyType] = field.length(context)
base_path = context[CTX_PATH]
# Special elements '_index' and '_length' can be referenced within
# the new context. The '_pos' attribute will be adjusted automatically.
values = [] # always list (maybe add factory)
seq_context = Context(
_parent=context,
_io=stream,
_length=length,
_lst=values,
_field=field,
_obj=context.get(CTX_OBJECT),
_is_seq=False,
)
greedy = length is Ellipsis
# pylint: disable-next=unidiomatic-typecheck
prefixed = type(length) is _PrefixedType
if prefixed:
# We have to temporarily remove the array status from the parsing field
with WithoutContextVar(context, CTX_SEQ, False):
field.amount = 1
new_length = length.start.__unpack__(context)
field.amount, length = length, new_length

if not isinstance(length, int):
raise InvalidValueError(
f"Prefix struct returned non-integer: {length!r}", context
)

for i in range(length) if not greedy else itertools.count():
try:
seq_context[CTX_PATH] = f"{base_path}.{i}"
seq_context[CTX_INDEX] = i
values.append(unpack_one(seq_context))

# NOTE: we introduce this check to reduce time when unpacking
# a greedy range of elements.
if greedy and iseof(stream):
break
except Stop:
break
except Exception as exc:
if greedy:
break
raise StructException(str(exc), context) from exc

if O_ARRAY_FACTORY.value:
return O_ARRAY_FACTORY.value(values)
return values


def pack_seq(seq: List[Any], context: _ContextLike, pack_one) -> None:
"""Generic function to pack sequenced elements.
:param seq: the iterable of elements
:type seq: Iterable
:param stream: the output stream
:type stream: _StreamType
:param context: the current operation context
:type context: _ContextLike
:raises InvalidValueError: if the input object is not iterable
"""
stream = context[CTX_STREAM]
field = context[CTX_FIELD]
base_path = context[CTX_PATH]
# REVISIT: when to use field.length(context)
count = len(seq)
length = field.amount
# pylint: disable-next=unidiomatic-typecheck
if type(length) is _PrefixedType:
struct = length.start
# We have to temporatily alter the field's values,
with WithoutContextVar(context, CTX_SEQ, False):
field.amount = 1
struct.__pack__(count, context)
field.amount = length

# Special elements '_index' and '_length' can be referenced within
# the new context. The '_pos' attribute will be adjusted automatically.
seq_context = Context(
_parent=context,
_io=stream,
_length=count,
_field=field,
_obj=context.get(CTX_OBJECT),
# We have to unset the sequence status as we are going to call 'unpack_one'
_is_seq=False,
)
for i, elem in enumerate(seq):
# The path will contain an additional hint on what element is processed
# at the moment.
try:
seq_context[CTX_INDEX] = i
seq_context[CTX_PATH] = f"{base_path}.{i}"
seq_context[CTX_OBJECT] = elem
pack_one(elem, seq_context)
except Stop:
break
except Exception as exc:
raise StructException(str(exc), seq_context) from exc


def iseof(stream: _StreamType) -> bool:
"""
Check if the stream is at the end of the file.
:param _StreamType stream: The input stream.
:return: True if the stream is at the end of the file, False otherwise.
:rtype: bool
"""
pos = stream.tell()
eof = not stream.read(1)
stream.seek(pos)
return eof
Loading

0 comments on commit 0e651f9

Please sign in to comment.