Skip to content

Commit 89ab8ca

Browse files
authored
Add Variant::Array() type (#51)
1 parent 9ad4528 commit 89ab8ca

File tree

9 files changed

+135
-26
lines changed

9 files changed

+135
-26
lines changed

CHANGES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [codec-0.9.4] - 2024-04-30
4+
5+
* Add Variant::Array() type
6+
37
## [codec-0.9.3] - 2024-04-29
48

59
* Fix `Variant::List` encoding

codec/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-amqp-codec"
3-
version = "0.9.3"
3+
version = "0.9.4"
44
description = "AMQP 1.0 Protocol Codec"
55
authors = ["Nikolay Kim <[email protected]>", "Max Gortman <[email protected]>", "Mike Yagley <[email protected]>"]
66
license = "MIT/Apache-2.0"

codec/src/codec/decode.rs

+18-21
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::error::AmqpParseError;
1111
use crate::framing::{self, AmqpFrame, SaslFrame, HEADER_LEN};
1212
use crate::protocol::{self, CompoundHeader};
1313
use crate::types::{
14-
Descriptor, List, Multiple, Str, Symbol, Variant, VariantMap, VecStringMap, VecSymbolMap,
14+
Array, Descriptor, List, Multiple, Str, Symbol, Variant, VariantMap, VecStringMap, VecSymbolMap,
1515
};
1616
use crate::HashMap;
1717

@@ -376,37 +376,31 @@ impl DecodeFormatted for Variant {
376376
DateTime::<Utc>::decode_with_format(input, fmt).map(Variant::Timestamp)
377377
}
378378
codec::FORMATCODE_UUID => Uuid::decode_with_format(input, fmt).map(Variant::Uuid),
379-
codec::FORMATCODE_BINARY8 => Bytes::decode_with_format(input, fmt).map(Variant::Binary),
380-
codec::FORMATCODE_BINARY32 => {
379+
codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
381380
Bytes::decode_with_format(input, fmt).map(Variant::Binary)
382381
}
383-
codec::FORMATCODE_STRING8 => {
382+
codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
384383
ByteString::decode_with_format(input, fmt).map(|o| Variant::String(o.into()))
385384
}
386-
codec::FORMATCODE_STRING32 => {
387-
ByteString::decode_with_format(input, fmt).map(|o| Variant::String(o.into()))
388-
}
389-
codec::FORMATCODE_SYMBOL8 => {
390-
Symbol::decode_with_format(input, fmt).map(Variant::Symbol)
391-
}
392-
codec::FORMATCODE_SYMBOL32 => {
385+
codec::FORMATCODE_SYMBOL8 | codec::FORMATCODE_SYMBOL32 => {
393386
Symbol::decode_with_format(input, fmt).map(Variant::Symbol)
394387
}
395388
codec::FORMATCODE_LIST0 => Ok(Variant::List(List(vec![]))),
396-
codec::FORMATCODE_LIST8 => List::decode_with_format(input, fmt).map(Variant::List),
397-
codec::FORMATCODE_LIST32 => List::decode_with_format(input, fmt).map(Variant::List),
398-
codec::FORMATCODE_MAP8 => HashMap::<Variant, Variant>::decode_with_format(input, fmt)
399-
.map(|o| Variant::Map(VariantMap::new(o))),
400-
codec::FORMATCODE_MAP32 => HashMap::<Variant, Variant>::decode_with_format(input, fmt)
401-
.map(|o| Variant::Map(VariantMap::new(o))),
389+
codec::FORMATCODE_LIST8 | codec::FORMATCODE_LIST32 => {
390+
List::decode_with_format(input, fmt).map(Variant::List)
391+
}
392+
codec::FORMATCODE_ARRAY8 | codec::FORMATCODE_ARRAY32 => {
393+
Array::decode_with_format(input, fmt).map(Variant::Array)
394+
}
395+
codec::FORMATCODE_MAP8 | codec::FORMATCODE_MAP32 => {
396+
HashMap::<Variant, Variant>::decode_with_format(input, fmt)
397+
.map(|o| Variant::Map(VariantMap::new(o)))
398+
}
402399
codec::FORMATCODE_DESCRIBED => {
403400
let descriptor = Descriptor::decode(input)?;
404401
let value = Variant::decode(input)?;
405402
Ok(Variant::Described((descriptor, Box::new(value))))
406403
}
407-
codec::FORMATCODE_ARRAY8 | codec::FORMATCODE_ARRAY32 => {
408-
Err(AmqpParseError::ArrayTypeIsNotSupported)
409-
}
410404
_ => Err(AmqpParseError::InvalidFormatCode(fmt)),
411405
}
412406
}
@@ -475,7 +469,10 @@ fn decode_frame_header(input: &mut Bytes, expected_frame_type: u8) -> Result<u16
475469
Ok(channel_id)
476470
}
477471

478-
fn decode_array_header(input: &mut Bytes, fmt: u8) -> Result<CompoundHeader, AmqpParseError> {
472+
pub(crate) fn decode_array_header(
473+
input: &mut Bytes,
474+
fmt: u8,
475+
) -> Result<CompoundHeader, AmqpParseError> {
479476
match fmt {
480477
codec::FORMATCODE_ARRAY8 => decode_compound8(input),
481478
codec::FORMATCODE_ARRAY32 => decode_compound32(input),

codec/src/codec/encode.rs

+3
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ fn map_encoded_size<K: Hash + Eq + Encode, V: Encode, S: BuildHasher>(
454454
map.iter()
455455
.fold(0, |r, (k, v)| r + k.encoded_size() + v.encoded_size())
456456
}
457+
457458
impl<K: Eq + Hash + Encode, V: Encode, S: BuildHasher> Encode for HashMap<K, V, S> {
458459
fn encoded_size(&self) -> usize {
459460
let size = map_encoded_size(self);
@@ -683,6 +684,7 @@ impl Encode for Variant {
683684
Variant::Symbol(ref s) => s.encoded_size(),
684685
Variant::StaticSymbol(ref s) => s.encoded_size(),
685686
Variant::List(ref l) => l.encoded_size(),
687+
Variant::Array(ref a) => a.encoded_size(),
686688
Variant::Map(ref m) => m.map.encoded_size(),
687689
Variant::Described(ref dv) => dv.0.encoded_size() + dv.1.encoded_size(),
688690
}
@@ -712,6 +714,7 @@ impl Encode for Variant {
712714
Variant::StaticSymbol(ref s) => s.encode(buf),
713715
Variant::List(ref l) => l.encode(buf),
714716
Variant::Map(ref m) => m.map.encode(buf),
717+
Variant::Array(ref a) => a.encode(buf),
715718
Variant::Described(ref dv) => {
716719
dv.0.encode(buf);
717720
dv.1.encode(buf);

codec/src/codec/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ macro_rules! decode_check_len {
1111
}
1212

1313
#[macro_use]
14-
mod decode;
14+
pub(crate) mod decode;
1515
mod encode;
1616

1717
pub(crate) use self::decode::decode_list_header;

codec/src/error.rs

-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ pub enum AmqpParseError {
1111
#[from(ignore)]
1212
#[display(fmt = "Unexpected format code: '{}'", "_0")]
1313
InvalidFormatCode(u8),
14-
#[display(fmt = "Array type is not supported in Amqp Value")]
15-
ArrayTypeIsNotSupported,
1614
#[display(fmt = "Invalid value converting to char: {}", "_0")]
1715
InvalidChar(u32),
1816
#[display(fmt = "Unexpected descriptor: '{:?}'", "_0")]

codec/src/types/array.rs

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use ntex_bytes::{BufMut, Bytes, BytesMut};
2+
3+
use crate::codec::{self, decode, ArrayEncode, DecodeFormatted, Encode};
4+
use crate::error::AmqpParseError;
5+
6+
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
7+
pub struct Array {
8+
len: u32,
9+
format: u8,
10+
payload: Bytes,
11+
}
12+
13+
impl Array {
14+
pub fn new<'a, I, T>(iter: I) -> Array
15+
where
16+
I: Iterator<Item = &'a T>,
17+
T: ArrayEncode + 'a,
18+
{
19+
let mut len = 0;
20+
let mut buf = BytesMut::new();
21+
for item in iter {
22+
len += 1;
23+
item.array_encode(&mut buf);
24+
}
25+
26+
Array {
27+
len,
28+
payload: buf.freeze(),
29+
format: T::ARRAY_FORMAT_CODE,
30+
}
31+
}
32+
33+
pub fn decode<T: DecodeFormatted>(&self) -> Result<Vec<T>, AmqpParseError> {
34+
let mut buf = self.payload.clone();
35+
let mut result: Vec<T> = Vec::with_capacity(self.len as usize);
36+
for _ in 0..self.len {
37+
let decoded = T::decode_with_format(&mut buf, self.format)?;
38+
result.push(decoded);
39+
}
40+
Ok(result)
41+
}
42+
}
43+
44+
impl<T> From<Vec<T>> for Array
45+
where
46+
T: ArrayEncode,
47+
{
48+
fn from(data: Vec<T>) -> Array {
49+
Array::new(data.iter())
50+
}
51+
}
52+
53+
impl Encode for Array {
54+
fn encoded_size(&self) -> usize {
55+
// format_code + size + count + item constructor -- todo: support described ctor?
56+
(if self.payload.len() + 1 > u8::MAX as usize {
57+
10
58+
} else {
59+
4
60+
}) // +1 for 1 byte count and 1 byte format code
61+
+ self.payload.len()
62+
}
63+
64+
fn encode(&self, buf: &mut BytesMut) {
65+
if self.payload.len() + 1 > u8::MAX as usize {
66+
buf.put_u8(codec::FORMATCODE_ARRAY32);
67+
buf.put_u32((self.payload.len() + 5) as u32); // +4 for 4 byte count and 1 byte item ctor that follow
68+
buf.put_u32(self.len);
69+
} else {
70+
buf.put_u8(codec::FORMATCODE_ARRAY8);
71+
buf.put_u8((self.payload.len() + 2) as u8); // +1 for 1 byte count and 1 byte item ctor that follow
72+
buf.put_u8(self.len as u8);
73+
}
74+
buf.put_u8(self.format);
75+
buf.extend_from_slice(&self.payload[..]);
76+
}
77+
}
78+
79+
impl DecodeFormatted for Array {
80+
fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
81+
let header = decode::decode_array_header(input, fmt)?;
82+
decode_check_len!(input, 1);
83+
let size = header.size as usize - 1;
84+
let format = input[0]; // todo: support descriptor
85+
input.split_to(1);
86+
decode_check_len!(input, size);
87+
let payload = input.split_to(size);
88+
89+
Ok(Array {
90+
format,
91+
payload,
92+
len: header.count,
93+
})
94+
}
95+
}

codec/src/types/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ use std::{borrow, fmt, hash, ops, str};
22

33
use ntex_bytes::ByteString;
44

5+
mod array;
56
mod symbol;
67
mod variant;
78

9+
pub use self::array::Array;
810
pub use self::symbol::{StaticSymbol, Symbol};
911
pub use self::variant::{Variant, VariantMap, VecStringMap, VecSymbolMap};
1012

codec/src/types/variant.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use ntex_bytes::{ByteString, Bytes};
55
use ordered_float::OrderedFloat;
66
use uuid::Uuid;
77

8-
use crate::types::{Descriptor, List, StaticSymbol, Str, Symbol};
8+
use crate::types::{Array, Descriptor, List, StaticSymbol, Str, Symbol};
99
use crate::{protocol::Annotations, HashMap};
1010

1111
/// Represents an AMQP type for use in polymorphic collections
@@ -82,6 +82,10 @@ pub enum Variant {
8282
/// Map
8383
Map(VariantMap),
8484

85+
/// Array
86+
#[display(fmt = "Array({:?})", _0)]
87+
Array(Array),
88+
8589
/// Described value
8690
#[display(fmt = "Described{:?}", _0)]
8791
Described((Descriptor, Box<Variant>)),
@@ -93,6 +97,12 @@ impl From<Vec<Variant>> for Variant {
9397
}
9498
}
9599

100+
impl From<HashMap<Variant, Variant>> for Variant {
101+
fn from(data: HashMap<Variant, Variant>) -> Self {
102+
Variant::Map(VariantMap { map: data })
103+
}
104+
}
105+
96106
impl From<ByteString> for Variant {
97107
fn from(s: ByteString) -> Self {
98108
Str::from(s).into()

0 commit comments

Comments
 (0)