Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for RELATIVE-OID #196

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Revision 0.5.0, released XX-03-2020
`StreamingDecoder` class. Previously published API is implemented
as a thin wrapper on top of that ensuring backward compatibility.

- Added support for previously missing `RELATIVE-OID` construct.

Revision 0.4.9, released XX-03-2020
-----------------------------------

Expand Down
1 change: 1 addition & 0 deletions THANKS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Alex Gaynor
Geoffrey Thomas
Daniel Bratell
Kim Gräsman
Russ Housley
51 changes: 51 additions & 0 deletions pyasn1/codec/ber/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,56 @@ def valueDecoder(self, substrate, asn1Spec,
yield self._createComponent(asn1Spec, tagSet, oid, **options)


class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
protoComponent = univ.RelativeOID(())

def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')

for chunk in readFromStream(substrate, length, options):
if isinstance(chunk, SubstrateUnderrunError):
yield chunk

if not chunk:
raise error.PyAsn1Error('Empty substrate')

chunk = octs2ints(chunk)

reloid = ()
index = 0
substrateLen = len(chunk)
while index < substrateLen:
subId = chunk[index]
index += 1
if subId < 128:
reloid += (subId,)
elif subId > 128:
# Construct subid from a number of octets
nextSubId = subId
subId = 0
while nextSubId >= 128:
subId = (subId << 7) + (nextSubId & 0x7F)
if index >= substrateLen:
raise error.SubstrateUnderrunError(
'Short substrate for sub-OID past %s' % (reloid,)
)
nextSubId = chunk[index]
index += 1
reloid += ((subId << 7) + nextSubId,)
elif subId == 128:
# ASN.1 spec forbids leading zeros (0x80) in OID
# encoding, tolerating it opens a vulnerability. See
# https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
# page 7
raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')

yield self._createComponent(asn1Spec, tagSet, reloid, **options)


russhousley marked this conversation as resolved.
Show resolved Hide resolved
class RealPayloadDecoder(AbstractSimplePayloadDecoder):
protoComponent = univ.Real()

Expand Down Expand Up @@ -1417,6 +1467,7 @@ class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
univ.OctetString.tagSet: OctetStringPayloadDecoder(),
univ.Null.tagSet: NullPayloadDecoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
univ.Enumerated.tagSet: IntegerPayloadDecoder(),
univ.Real.tagSet: RealPayloadDecoder(),
univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
Expand Down
35 changes: 35 additions & 0 deletions pyasn1/codec/ber/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,39 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
return octets, False, False


class RelativeOIDEncoder(AbstractItemEncoder):
supportIndefLenMode = False

def encodeValue(self, value, asn1Spec, encodeFun, **options):
if asn1Spec is not None:
value = asn1Spec.clone(value)

octets = ()

# Cycle through subIds
for subOid in value.asTuple():
if 0 <= subOid <= 127:
# Optimize for the common case
octets += (subOid,)

elif subOid > 127:
# Pack large Sub-Object IDs
res = (subOid & 0x7f,)
subOid >>= 7

while subOid:
res = (0x80 | (subOid & 0x7f),) + res
subOid >>= 7

# Add packed Sub-Object ID to resulted RELATIVE-OID
octets += res

else:
raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value))

return octets, False, False


class RealEncoder(AbstractItemEncoder):
supportIndefLenMode = 0
binEncBase = 2 # set to None to choose encoding base automatically
Expand Down Expand Up @@ -714,6 +747,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.tagSet: OctetStringEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
univ.Real.tagSet: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down Expand Up @@ -746,6 +780,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.typeId: OctetStringEncoder(),
univ.Null.typeId: NullEncoder(),
univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
univ.RelativeOID.typeId: RelativeOIDEncoder(),
univ.Enumerated.typeId: IntegerEncoder(),
univ.Real.typeId: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down
2 changes: 2 additions & 0 deletions pyasn1/codec/native/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.OctetString.tagSet: AbstractScalarPayloadDecoder(),
univ.Null.tagSet: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.tagSet: AbstractScalarPayloadDecoder(),
univ.RelativeOID.tagSet: AbstractScalarPayloadDecoder(),
univ.Enumerated.tagSet: AbstractScalarPayloadDecoder(),
univ.Real.tagSet: AbstractScalarPayloadDecoder(),
univ.Sequence.tagSet: SequenceOrSetPayloadDecoder(), # conflicts with SequenceOf
Expand Down Expand Up @@ -102,6 +103,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.OctetString.typeId: AbstractScalarPayloadDecoder(),
univ.Null.typeId: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.typeId: AbstractScalarPayloadDecoder(),
univ.RelativeOID.typeId: AbstractScalarPayloadDecoder(),
univ.Enumerated.typeId: AbstractScalarPayloadDecoder(),
univ.Real.typeId: AbstractScalarPayloadDecoder(),
# ambiguous base types
Expand Down
7 changes: 7 additions & 0 deletions pyasn1/codec/native/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def encode(self, value, encodeFun, **options):
return str(value)


class RelativeOIDEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return str(value)


class RealEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return float(value)
Expand Down Expand Up @@ -110,6 +115,7 @@ def encode(self, value, encodeFun, **options):
univ.OctetString.tagSet: OctetStringEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
univ.Real.tagSet: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down Expand Up @@ -143,6 +149,7 @@ def encode(self, value, encodeFun, **options):
univ.OctetString.typeId: OctetStringEncoder(),
univ.Null.typeId: NullEncoder(),
univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
univ.RelativeOID.typeId: RelativeOIDEncoder(),
univ.Enumerated.typeId: IntegerEncoder(),
univ.Real.typeId: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down
138 changes: 138 additions & 0 deletions pyasn1/type/univ.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,144 @@ def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class RelativeOID(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.

|ASN.1| class is based on :class:`~pyasn1.type.base.SimpleAsn1Type`, its
objects are immutable and duck-type Python :class:`tuple` objects
(tuple of non-negative integers).

Keyword Args
------------
value: :class:`tuple`, :class:`str` or |ASN.1| object
Python sequence of :class:`int` or :class:`str` literal or |ASN.1| object.
If `value` is not given, schema object will be created.

tagSet: :py:class:`~pyasn1.type.tag.TagSet`
Object representing non-default ASN.1 tag(s)

subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection`
Object representing non-default ASN.1 subtype constraint(s). Constraints
verification for |ASN.1| type occurs automatically on object
instantiation.

Raises
------
~pyasn1.error.ValueConstraintError, ~pyasn1.error.PyAsn1Error
On constraint violation or bad initializer.

Examples
--------
.. code-block:: python

class RelOID(RelativeOID):
'''
ASN.1 specification:

id-pad-null RELATIVE-OID ::= { 0 }
id-pad-once RELATIVE-OID ::= { 5 6 }
id-pad-twice RELATIVE-OID ::= { 5 6 7 }
'''
id_pad_null = RelOID('0')
id_pad_once = RelOID('5.6')
id_pad_twice = id_pad_once + (7,)
"""
#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.tag.TagSet` object representing ASN.1 tag(s)
#: associated with |ASN.1| type.
tagSet = tag.initTagSet(
tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x0d)
)

#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` object
#: imposing constraints on |ASN.1| type initialization values.
subtypeSpec = constraint.ConstraintsIntersection()

# Optimization for faster codec lookup
typeId = base.SimpleAsn1Type.getTypeId()

def __add__(self, other):
return self.clone(self._value + other)

def __radd__(self, other):
return self.clone(other + self._value)

def asTuple(self):
return self._value

# Sequence object protocol

def __len__(self):
return len(self._value)

def __getitem__(self, i):
if i.__class__ is slice:
return self.clone(self._value[i])
else:
return self._value[i]

def __iter__(self):
return iter(self._value)

def __contains__(self, value):
return value in self._value

def index(self, suboid):
return self._value.index(suboid)

def isPrefixOf(self, other):
"""Indicate if this |ASN.1| object is a prefix of other |ASN.1| object.

Parameters
----------
other: |ASN.1| object
|ASN.1| object

Returns
-------
: :class:`bool`
:obj:`True` if this |ASN.1| object is a parent (e.g. prefix) of the other |ASN.1| object
or :obj:`False` otherwise.
"""
l = len(self)
if l <= len(other):
if self._value[:l] == other[:l]:
return True
return False

def prettyIn(self, value):
if isinstance(value, RelativeOID):
return tuple(value)
elif octets.isStringType(value):
if '-' in value:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)
try:
return tuple([int(subOid) for subOid in value.split('.') if subOid])
except ValueError:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

try:
tupleOfInts = tuple([int(subOid) for subOid in value if subOid >= 0])

except (ValueError, TypeError):
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

if len(tupleOfInts) == len(value):
return tupleOfInts

raise error.PyAsn1Error('Malformed RELATIVE-OID %s at %s' % (value, self.__class__.__name__))

def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class Real(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.

Expand Down
69 changes: 69 additions & 0 deletions tests/codec/ber/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,75 @@ def testLarge2(self):
) == ((2, 999, 18446744073709551535184467440737095), null)


class RelativeOIDDecoderTestCase(BaseTestCase):
def testThree(self):
assert decoder.decode(
ints2octs((13, 3, 5, 6, 7))
) == ((5, 6, 7), null)

def testTwo(self):
assert decoder.decode(
ints2octs((13, 2, 5, 6))
) == ((5, 6), null)

def testOne(self):
obj, rest = decoder.decode(ints2octs((13, 1, 39)))
assert str(obj) == '39'
assert rest == null

def testNonLeading0x80(self):
assert decoder.decode(
ints2octs((13, 5, 85, 4, 129, 128, 0)),
) == ((85, 4, 16384), null)

def testLeading0x80(self):
try:
decoder.decode(
ints2octs((13, 5, 85, 4, 128, 129, 0))
)
except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'

def testTagFormat(self):
try:
decoder.decode(ints2octs((38, 1, 239)))
except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'

def testZeroLength(self):
try:
decoder.decode(ints2octs((13, 0, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'zero length tolerated'

def testIndefiniteLength(self):
try:
decoder.decode(ints2octs((13, 128, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'indefinite length tolerated'

def testReservedLength(self):
try:
decoder.decode(ints2octs((13, 255, 0)))
except error.PyAsn1Error:
pass
else:
assert 0, 'reserved length tolerated'

def testLarge(self):
assert decoder.decode(
ints2octs((0x0D, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, 0xE2, 0xB6, 0x47))
) == ((1079, 18446744073709551535184467440737095), null)


class RealDecoderTestCase(BaseTestCase):
def testChar(self):
assert decoder.decode(
Expand Down
Loading