diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 046cd815b..53a6cb737 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -91,6 +91,16 @@ extern "C" { // Z Extensions if Z==1 then Zenoh extensions are present #define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5 +/*=============================*/ +/* Patch */ +/*=============================*/ +/// Used to negotiate the patch version of the protocol +/// if not present (or 0), then protocol as released with 1.0.0 +/// if >= 1, then fragmentation start/stop marker +#define _Z_NO_PATCH 0x00 +#define _Z_CURRENT_PATCH 0x01 +#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1) + /*=============================*/ /* Transport Messages */ /*=============================*/ @@ -235,6 +245,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_join_t; void _z_t_msg_join_clear(_z_t_msg_join_t *msg); @@ -315,6 +328,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_init_t; void _z_t_msg_init_clear(_z_t_msg_init_t *msg); @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; + bool start; + bool stop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); -#define _Z_FRAGMENT_HEADER_SIZE 12 - /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; @@ -514,9 +530,9 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_svec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last); + bool is_last, bool start, bool stop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index 989d7491d..990b85ac4 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -43,7 +43,11 @@ extern "C" { /*=============================*/ /* Extension IDs */ /*=============================*/ -// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID) +#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) +#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ @@ -58,6 +62,7 @@ extern "C" { #define _Z_MSG_EXT_FLAG_M 0x10 #define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0) #define _Z_MSG_EXT_FLAG_Z 0x80 +#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0) typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 6809f20f3..59b9231c6 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,7 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index d4fcf7ea7..c2f87b6ee 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -57,6 +57,8 @@ typedef struct { uint8_t _state_best_effort; _z_wbuf_t _dbuf_reliable; _z_wbuf_t _dbuf_best_effort; + // Patch + uint8_t _patch; #endif } _z_transport_peer_entry_t; @@ -122,6 +124,8 @@ typedef struct { uint8_t _state_best_effort; _z_wbuf_t _dbuf_reliable; _z_wbuf_t _dbuf_best_effort; + // Patch + uint8_t _patch; #endif } _z_transport_unicast_t; @@ -161,6 +165,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; bool _is_qos; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_transport_unicast_establish_param_t; typedef struct { diff --git a/include/zenoh-pico/transport/utils.h b/include/zenoh-pico/transport/utils.h index 62fa319b4..25862370b 100644 --- a/include/zenoh-pico/transport/utils.h +++ b/include/zenoh-pico/transport/utils.h @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits); _z_zint_t _z_sn_half(_z_zint_t sn); _z_zint_t _z_sn_modulo_mask(uint8_t bits); bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn); _z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 5e07d5fea..095077c8a 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort)); +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg->_patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif if (msg->_next_sn._is_qos) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + @@ -82,6 +87,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } +#if Z_FEATURE_FRAGMENTATION == 1 + if (has_patch) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif return ret; } @@ -89,14 +105,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == - (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) { msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -148,7 +167,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } @@ -181,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } + #if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_CURRENT_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + #endif + + return ret; +} + +z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx; + if (false) { +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } return ret; } @@ -224,8 +269,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_null(); } - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01); + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } return ret; diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index fa910b771..9a8be3c90 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._join._patch = _Z_CURRENT_PATCH; +#endif if ((lease % 1000) == 0) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } - if (next_sn._is_qos) { +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (next_sn._is_qos == true || has_patch == true) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } @@ -131,6 +139,9 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_slice_reset(&msg._body._init._cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -138,6 +149,15 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -153,6 +173,9 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -160,6 +183,15 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -247,11 +279,11 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) { - return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) { + return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last, start, stop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last) { + bool is_last, bool start, bool stop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -263,12 +295,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; + if (start == true || stop == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + msg._body._fragment.start = start; + msg._body._fragment.stop = stop; return msg; } void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { + clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); + clone->start = msg->start; + clone->stop = msg->stop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { @@ -279,6 +319,7 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; + clone->_patch = msg->_patch; memcpy(clone->_zid.id, msg->_zid.id, 16); } diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 4b6d9bf32..021fc4db1 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -52,10 +52,9 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc if (!is_first) { sn = _z_transport_tx_get_sn(ztc, reliability); } - is_first = false; // Serialize fragment __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn); + z_result_t ret = __unsafe_z_serialize_zenoh_fragment(&ztc->_wbuf, frag_buff, reliability, sn, is_first); if (ret != _Z_RES_OK) { _Z_ERROR("Fragment serialization failed with err %d", ret); return ret; @@ -64,6 +63,7 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); _Z_RETURN_IF_ERR(_z_link_send_wbuf(&ztc->_link, &ztc->_wbuf)); ztc->_transmitted = true; // Tell session we transmitted data + is_first = false; } return _Z_RES_OK; } @@ -364,7 +364,7 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -373,7 +373,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index dc64460e4..08e0ac304 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -189,16 +189,56 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_wbuf_t *dbuf; uint8_t *dbuf_state; z_reliability_t tmsg_reliability; + bool consecutive; // Select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { tmsg_reliability = Z_RELIABILITY_RELIABLE; + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn) == + true) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; + } else { +#if Z_FEATURE_FRAGMENTATION == 1 + entry->_state_reliable = _Z_DBUF_STATE_NULL; + _z_wbuf_clear(&entry->_dbuf_reliable); +#endif + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } dbuf = &entry->_dbuf_reliable; dbuf_state = &entry->_state_reliable; } else { tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn) == true) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; + } else { +#if Z_FEATURE_FRAGMENTATION == 1 + entry->_state_best_effort = _Z_DBUF_STATE_NULL; + _z_wbuf_clear(&entry->_dbuf_best_effort); +#endif + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } dbuf = &entry->_dbuf_best_effort; dbuf_state = &entry->_state_best_effort; } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(entry->_patch)) { + if (t_msg->_body._fragment.start) { + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + } else if (*dbuf_state == _Z_DBUF_STATE_NULL) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop) { + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + break; + } + } // Allocate buffer if needed if (*dbuf_state == _Z_DBUF_STATE_NULL) { *dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); @@ -208,6 +248,11 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } *dbuf_state = _Z_DBUF_STATE_INIT; + } else if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + break; } // Process fragment data if (*dbuf_state == _Z_DBUF_STATE_INIT) { @@ -313,6 +358,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, entry->_state_best_effort = _Z_DBUF_STATE_NULL; entry->_dbuf_reliable = _z_wbuf_null(); entry->_dbuf_best_effort = _z_wbuf_null(); + + entry->_patch = t_msg->_body._join._patch; #endif // Update lease time (set as ms during) entry->_lease = t_msg->_body._join._lease; diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index b5feffcdb..4dc1ceeaa 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -32,6 +32,8 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans dst->_state_best_effort = src->_state_best_effort; _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); + + dst->_patch = src->_patch; #endif dst->_sn_res = src->_sn_res; diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e44d5bdab..1299b43e9 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -274,13 +274,12 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Get the fragment sequence number sn = __unsafe_z_raweth_get_sn(ztm, reliability); } - is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_common._wbuf); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_common._link, &ztm->_common._wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn), + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn, is_first), _z_transport_tx_mutex_unlock(&ztm->_common)); // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), @@ -290,6 +289,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _z_transport_tx_mutex_unlock(&ztm->_common)); // Mark the session that we have transmitted data ztm->_common._transmitted = true; + is_first = false; } // Clear the expandable buffer _z_wbuf_clear(&fbf); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index d90b59f38..550126712 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -143,16 +143,54 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t _z_wbuf_t *dbuf; uint8_t *dbuf_state; z_reliability_t tmsg_reliability; + bool consecutive; // Select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { tmsg_reliability = Z_RELIABILITY_RELIABLE; + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn) == true) { + consecutive = _z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; + } else { +#if Z_FEATURE_FRAGMENTATION == 1 + _z_wbuf_clear(&ztu->_dbuf_reliable); + ztu->_state_reliable = _Z_DBUF_STATE_NULL; +#endif + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } dbuf = &ztu->_dbuf_reliable; dbuf_state = &ztu->_state_reliable; } else { tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; + if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn) == true) { + consecutive = _z_sn_consecutive(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; + } else { +#if Z_FEATURE_FRAGMENTATION == 1 + _z_wbuf_clear(&ztu->_dbuf_best_effort); + ztu->_state_best_effort = _Z_DBUF_STATE_NULL; +#endif + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } dbuf = &ztu->_dbuf_best_effort; dbuf_state = &ztu->_state_best_effort; } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(ztu->_patch)) { + if (t_msg->_body._fragment.start) { + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + } else if (*dbuf_state == _Z_DBUF_STATE_NULL) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop) { + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + break; + } + } // Allocate buffer if needed if (*dbuf_state == _Z_DBUF_STATE_NULL) { *dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false); @@ -162,6 +200,11 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } *dbuf_state = _Z_DBUF_STATE_INIT; + } else if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_clear(dbuf); + *dbuf_state = _Z_DBUF_STATE_NULL; + break; } // Process fragment data if (*dbuf_state == _Z_DBUF_STATE_INIT) { diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 64a1d6540..d0b3666d8 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -90,6 +90,8 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, ztu->_state_best_effort = _Z_DBUF_STATE_NULL; ztu->_dbuf_reliable = _z_wbuf_null(); ztu->_dbuf_best_effort = _z_wbuf_null(); + // Patch + ztu->_patch = param->_patch; #endif // Z_FEATURE_FRAGMENTATION == 1 } @@ -173,6 +175,9 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par } else { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } +#if Z_FEATURE_FRAGMENTATION == 1 + param->_patch = iam._body._init._patch; +#endif if (ret != _Z_RES_OK) { _z_t_msg_clear(&iam); return ret; @@ -233,6 +238,9 @@ static z_result_t _z_unicast_handshake_listener(_z_transport_unicast_establish_p return _Z_ERR_MESSAGE_UNEXPECTED; } _Z_DEBUG("Received Z_INIT(Syn)"); +#if Z_FEATURE_FRAGMENTATION == 1 + param->_patch = tmsg._body._init._patch; +#endif // Encode InitAck _z_slice_t cookie = _z_slice_null(); _z_transport_message_t iam = _z_t_msg_make_init_ack(whatami, *local_zid, cookie); diff --git a/src/transport/utils.c b/src/transport/utils.c index 510a0f19c..dc0697043 100644 --- a/src/transport/utils.c +++ b/src/transport/utils.c @@ -82,6 +82,11 @@ bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, cons return ((distance <= _z_sn_half(sn_resolution)) && (distance != 0)); } +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right) { + _z_zint_t distance = (sn_right - sn_left) & sn_resolution; + return distance == 1; +} + _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn) { _z_zint_t ret = sn + 1; return (ret &= sn_resolution);