diff --git a/include/zenoh-pico/protocol/codec/core.h b/include/zenoh-pico/protocol/codec/core.h index 222fb5922..13725f8d9 100644 --- a/include/zenoh-pico/protocol/codec/core.h +++ b/include/zenoh-pico/protocol/codec/core.h @@ -31,6 +31,15 @@ } \ } +#define _Z_CLEAN_RETURN_IF_ERR(base_expr, clean_expr) \ + { \ + int8_t __res = base_expr; \ + if (__res != _Z_RES_OK) { \ + clean_expr; \ + return __res; \ + } \ + } + /*------------------ Internal Zenoh-net Macros ------------------*/ int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en); int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf); diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index f92bd3ae3..3bcb5ce33 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -477,6 +477,8 @@ typedef struct { } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); +#define _Z_FRAGMENT_HEADER_SIZE 12 + /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index d2009144e..6ce8ad67e 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -29,13 +29,14 @@ #define _ZP_MAC_ADDR_LENGTH 6 // Max frame size -#define _ZP_MAX_ETH_FRAME_SIZE 1500 +#define _ZP_MAX_ETH_FRAME_SIZE 1514 // Ethernet header structure type typedef struct { uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_header_t; typedef struct { @@ -44,6 +45,7 @@ typedef struct { uint16_t vlan_type; // Vlan ethtype uint16_t tag; // Vlan tag uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_vlan_header_t; typedef struct { @@ -61,6 +63,8 @@ int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface); size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len); size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr); int8_t _z_close_raweth(_z_sys_net_socket_t *sock); +size_t _z_raweth_ntohs(size_t val); +size_t _z_raweth_htons(size_t val); #endif diff --git a/src/system/unix/link/raweth.c b/src/system/unix/link/raweth.c index c64376594..78a8aee92 100644 --- a/src/system/unix/link/raweth.c +++ b/src/system/unix/link/raweth.c @@ -116,5 +116,9 @@ size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buf return bytesRead; } +size_t _z_raweth_ntohs(size_t val) { return ntohs(val); } + +size_t _z_raweth_htons(size_t val) { return htons(val); } + #endif // defined(__linux) #endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 3d5b18c0d..4b9d034b5 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 1709ed67f..6deec0839 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -27,10 +27,6 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -void print_buf(_z_zbuf_t *buf) { - printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity); -} - static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) { uint8_t *buff = _z_zbuf_get_wptr(zbf); size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr); @@ -48,14 +44,31 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) { return SIZE_MAX; } - // Update buffer but skip eth header - _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb); + size_t data_length = 0; if (has_vlan) { + _zp_eth_vlan_header_t *header = (_zp_eth_vlan_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_vlan_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_vlan_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t)); } else { + _zp_eth_header_t *header = (_zp_eth_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t)); } - return rb; + return data_length; } /*------------------ Reception helper ------------------*/ diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e48f7640b..2f9ca8ed5 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -31,7 +31,13 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) { +#if Z_FEATURE_MULTI_THREAD == 1 +static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); } +#else +static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); } +#endif + +static int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) { int8_t ret = _Z_RES_OK; if (_ZP_RAWETH_CFG_SIZE < 1) { @@ -85,6 +91,16 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia return sn; } +static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) { + _z_raweth_socket_t *resocket = &zl->_socket._raweth; + // Reserve eth header in buffer + if (resocket->_has_vlan) { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_vlan_header_t)); + } else { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_header_t)); + } +} + /** * This function is unsafe because it operates in potentially concurrent data. * Make sure that the following mutexes are locked before calling this function: @@ -92,26 +108,34 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia */ static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { _z_raweth_socket_t *resocket = &zl->_socket._raweth; - + // Save and reset buffer position + size_t wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); // Write eth header in buffer if (resocket->_has_vlan) { _zp_eth_vlan_header_t header; + // Set header memset(&header, 0, sizeof(header)); memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.vlan_type = _ZP_ETH_TYPE_VLAN; header.tag = resocket->_vlan; header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } else { _zp_eth_header_t header; + // Set header memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } + // Restore wpos + _z_wbuf_set_wpos(wbf, wpos); return _Z_RES_OK; } @@ -141,32 +165,18 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } // Discard const qualifier _z_link_t *mzl = (_z_link_t *)zl; // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth)); + // Prepare buff + __unsafe_z_raweth_prepare_header(mzl, &wbf); + // Encode the session message + _Z_RETURN_IF_ERR(_z_transport_message_encode(&wbf, t_msg)); // Write the message header _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); - // Encode the session message - ret = _z_transport_message_encode(&wbf, t_msg); - if (ret == _Z_RES_OK) { - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } - // Send the wbuf on the socket - ret = _z_raweth_link_send_wbuf(zl, &wbf); - } + // Send the wbuf on the socket + ret = _z_raweth_link_send_wbuf(zl, &wbf); _z_wbuf_clear(&wbf); return ret; @@ -182,13 +192,15 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Set socket info - _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth)); - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _zp_raweth_unlock_tx_mutex(ztm)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Encode the session message - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _zp_raweth_unlock_tx_mutex(ztm)); + // Write the message header + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; @@ -239,26 +251,29 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Set socket info - _Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth)); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth), + _zp_raweth_unlock_tx_mutex(ztm)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Set the frame header _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); // Encode the frame header - _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Encode the network message - ret = _z_network_message_encode(&ztm->_wbuf, n_msg); - if (ret == _Z_RES_OK) { + if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) { + // Write the eth header + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), + _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); // Encode the message on the expandable wbuf - _Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg)); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Fragment and send the message _Bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { @@ -269,12 +284,16 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment - _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), + _zp_raweth_unlock_tx_mutex(ztm)); + // Write the eth header + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), + _zp_raweth_unlock_tx_mutex(ztm)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; } diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index f2ac6feee..25caabee6 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - 12, true); + _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) {