From 4085e006de6d06a2f262714b9bebb393ad03d77d Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 11 Dec 2023 16:23:00 +0100 Subject: [PATCH 1/8] feat: add data length header field --- include/zenoh-pico/system/link/raweth.h | 4 ++ src/system/unix/link/raweth.c | 4 ++ src/transport/raweth/rx.c | 27 ++++++++--- src/transport/raweth/tx.c | 62 ++++++++++++++++--------- 4 files changed, 67 insertions(+), 30 deletions(-) diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index d2009144e..e4360a5df 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -36,6 +36,7 @@ typedef struct { uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_header_t; typedef struct { @@ -44,6 +45,7 @@ typedef struct { uint16_t vlan_type; // Vlan ethtype uint16_t tag; // Vlan tag uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_vlan_header_t; typedef struct { @@ -61,6 +63,8 @@ int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface); size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len); size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr); int8_t _z_close_raweth(_z_sys_net_socket_t *sock); +size_t _z_raweth_ntohs(size_t val); +size_t _z_raweth_htons(size_t val); #endif diff --git a/src/system/unix/link/raweth.c b/src/system/unix/link/raweth.c index c64376594..78a8aee92 100644 --- a/src/system/unix/link/raweth.c +++ b/src/system/unix/link/raweth.c @@ -116,5 +116,9 @@ size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buf return bytesRead; } +size_t _z_raweth_ntohs(size_t val) { return ntohs(val); } + +size_t _z_raweth_htons(size_t val) { return htons(val); } + #endif // defined(__linux) #endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 1709ed67f..6deec0839 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -27,10 +27,6 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -void print_buf(_z_zbuf_t *buf) { - printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity); -} - static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) { uint8_t *buff = _z_zbuf_get_wptr(zbf); size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr); @@ -48,14 +44,31 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) { return SIZE_MAX; } - // Update buffer but skip eth header - _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb); + size_t data_length = 0; if (has_vlan) { + _zp_eth_vlan_header_t *header = (_zp_eth_vlan_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_vlan_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_vlan_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t)); } else { + _zp_eth_header_t *header = (_zp_eth_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t)); } - return rb; + return data_length; } /*------------------ Reception helper ------------------*/ diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e48f7640b..4fb0b643d 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -85,6 +85,16 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia return sn; } +static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) { + _z_raweth_socket_t *resocket = &zl->_socket._raweth; + // Reserve eth header in buffer + if (resocket->_has_vlan) { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_vlan_header_t)); + } else { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_header_t)); + } +} + /** * This function is unsafe because it operates in potentially concurrent data. * Make sure that the following mutexes are locked before calling this function: @@ -92,26 +102,38 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia */ static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { _z_raweth_socket_t *resocket = &zl->_socket._raweth; - + size_t wpos = 0; // Write eth header in buffer if (resocket->_has_vlan) { _zp_eth_vlan_header_t header; + // Save buf position + wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); + // Set header memset(&header, 0, sizeof(header)); memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.vlan_type = _ZP_ETH_TYPE_VLAN; header.tag = resocket->_vlan; header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } else { _zp_eth_header_t header; + // Save buf position + wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); + // Set header memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } + // Restore wpos + _z_wbuf_set_wpos(wbf, wpos); return _Z_RES_OK; } @@ -141,29 +163,17 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } // Discard const qualifier _z_link_t *mzl = (_z_link_t *)zl; // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth)); - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(mzl, &wbf); // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); // Send the wbuf on the socket ret = _z_raweth_link_send_wbuf(zl, &wbf); } @@ -183,10 +193,12 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me _z_wbuf_reset(&ztm->_wbuf); // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth)); - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Encode the session message _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data @@ -240,8 +252,8 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, _z_wbuf_reset(&ztm->_wbuf); // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth)); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Set the frame header _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); @@ -250,6 +262,8 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Encode the network message ret = _z_network_message_encode(&ztm->_wbuf, n_msg); if (ret == _Z_RES_OK) { + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data @@ -269,10 +283,12 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data From 2df5186cbd834c31ad5281246d48668329180099 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 11 Dec 2023 16:42:36 +0100 Subject: [PATCH 2/8] fix: replace magic number --- include/zenoh-pico/protocol/definitions/transport.h | 2 ++ src/transport/multicast/tx.c | 2 +- src/transport/raweth/tx.c | 2 +- src/transport/unicast/tx.c | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index f92bd3ae3..3bcb5ce33 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -477,6 +477,8 @@ typedef struct { } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); +#define _Z_FRAGMENT_HEADER_SIZE 12 + /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 3d5b18c0d..4b9d034b5 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 4fb0b643d..65f1231ce 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -270,7 +270,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); // Encode the message on the expandable wbuf _Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg)); // Fragment and send the message diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index f2ac6feee..25caabee6 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { From 2df748df7c2570674b35b3a5b928c17b5803891e Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 12 Dec 2023 11:38:53 +0100 Subject: [PATCH 3/8] fix: reset buffer out of if --- src/transport/raweth/tx.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 65f1231ce..68f254c2f 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -102,13 +102,12 @@ static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) { */ static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { _z_raweth_socket_t *resocket = &zl->_socket._raweth; - size_t wpos = 0; + // Save and reset buffer position + size_t wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); // Write eth header in buffer if (resocket->_has_vlan) { _zp_eth_vlan_header_t header; - // Save buf position - wpos = _z_wbuf_len(wbf); - _z_wbuf_set_wpos(wbf, 0); // Set header memset(&header, 0, sizeof(header)); memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); @@ -121,9 +120,6 @@ static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } else { _zp_eth_header_t header; - // Save buf position - wpos = _z_wbuf_len(wbf); - _z_wbuf_set_wpos(wbf, 0); // Set header memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); From ae0196cb0b6eb2cfce13f55f326d5a900802809e Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 13 Dec 2023 15:11:49 +0100 Subject: [PATCH 4/8] fix: don't memorize first encode return --- src/transport/raweth/tx.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 68f254c2f..079a5e0b6 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -256,8 +256,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Encode the frame header _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); // Encode the network message - ret = _z_network_message_encode(&ztm->_wbuf, n_msg); - if (ret == _Z_RES_OK) { + if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) { // Write the eth header _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket From 1093791576ca6ae946f161097585a9ae878b0a11 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 13 Dec 2023 15:25:28 +0100 Subject: [PATCH 5/8] feat: increase mtu for eth header --- include/zenoh-pico/system/link/raweth.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index e4360a5df..6ce8ad67e 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -29,7 +29,7 @@ #define _ZP_MAC_ADDR_LENGTH 6 // Max frame size -#define _ZP_MAX_ETH_FRAME_SIZE 1500 +#define _ZP_MAX_ETH_FRAME_SIZE 1514 // Ethernet header structure type typedef struct { From e4f47535ff2f62dc4d83e7fd9612a4a0ec06b070 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 13 Dec 2023 15:28:01 +0100 Subject: [PATCH 6/8] fix: flatten tx function --- src/transport/raweth/tx.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 079a5e0b6..6dd030485 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -166,13 +166,11 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message // Prepare buff __unsafe_z_raweth_prepare_header(mzl, &wbf); // Encode the session message - ret = _z_transport_message_encode(&wbf, t_msg); - if (ret == _Z_RES_OK) { - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); - // Send the wbuf on the socket - ret = _z_raweth_link_send_wbuf(zl, &wbf); - } + _Z_RETURN_IF_ERR(_z_transport_message_encode(&wbf, t_msg)); + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); + // Send the wbuf on the socket + ret = _z_raweth_link_send_wbuf(zl, &wbf); _z_wbuf_clear(&wbf); return ret; From f4d680750a22a89a6d91e1b962ab4ff92957209b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 15 Dec 2023 11:10:17 +0100 Subject: [PATCH 7/8] feat: add macro for early return with cleaning ops --- include/zenoh-pico/protocol/codec/core.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/include/zenoh-pico/protocol/codec/core.h b/include/zenoh-pico/protocol/codec/core.h index 222fb5922..13725f8d9 100644 --- a/include/zenoh-pico/protocol/codec/core.h +++ b/include/zenoh-pico/protocol/codec/core.h @@ -31,6 +31,15 @@ } \ } +#define _Z_CLEAN_RETURN_IF_ERR(base_expr, clean_expr) \ + { \ + int8_t __res = base_expr; \ + if (__res != _Z_RES_OK) { \ + clean_expr; \ + return __res; \ + } \ + } + /*------------------ Internal Zenoh-net Macros ------------------*/ int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en); int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf); From dae9f8748bdd9f08c3ade7c2c0231c3f4098416a Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 15 Dec 2023 11:10:41 +0100 Subject: [PATCH 8/8] fix: free mutex when early return --- src/transport/raweth/tx.c | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 6dd030485..2f9ca8ed5 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -31,7 +31,13 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) { +#if Z_FEATURE_MULTI_THREAD == 1 +static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); } +#else +static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } +#endif + +static int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) { int8_t ret = _Z_RES_OK; if (_ZP_RAWETH_CFG_SIZE < 1) { @@ -186,15 +192,15 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Set socket info - _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _zp_raweth_unlock_tx_mutex(ztm)); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Encode the session message - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; @@ -245,27 +251,29 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Set socket info - _Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth), + _zp_raweth_unlock_tx_mutex(ztm)); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Set the frame header _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); // Encode the frame header - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Encode the network message if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) { // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), + _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); // Encode the message on the expandable wbuf - _Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Fragment and send the message _Bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { @@ -279,11 +287,13 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment - _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), + _zp_raweth_unlock_tx_mutex(ztm)); // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), + _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; }