Skip to content

Commit

Permalink
feat: update transport files
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Nov 17, 2023
1 parent b29b95f commit 6e7a3c9
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 89 deletions.
4 changes: 2 additions & 2 deletions include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/transport/transport.h"

void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities);
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
Expand Down
10 changes: 3 additions & 7 deletions src/transport/common/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) {
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
_z_zbuf_reset(&zbf);

switch (zl->_capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
// Read the message length
if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) {
size_t len = 0;
Expand All @@ -52,9 +50,7 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
Expand Down
40 changes: 14 additions & 26 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,19 @@
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) {
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) {
_z_wbuf_reset(buf);

switch (link_capabilities) {
switch (flow) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, 0, i);
}
_z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE);
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
Expand All @@ -52,20 +50,18 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) {
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) {
switch (link_capabilities) {
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) {
switch (flow) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM: {
case Z_LINK_CAP_FLOW_STREAM: {
size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
Expand Down Expand Up @@ -94,18 +90,14 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);

switch (zl->_capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, 0, i);
}
_z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE);
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
Expand All @@ -114,20 +106,16 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
switch (zl->_capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM: {
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM: {
// Write the message length in the reserved space if needed
size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
Expand Down
18 changes: 6 additions & 12 deletions src/transport/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local
return ret;
}
// Open transport
switch (zl._capabilities) {
switch (zl._cap._transport) {
// Unicast transport
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_UNICAST_DATAGRAM: {
case Z_LINK_CAP_TRANSPORT_UNICAST: {
_z_transport_unicast_establish_param_t tp_param;
ret = _z_unicast_open_client(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
Expand All @@ -45,8 +44,7 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local
break;
}
// Multicast transport
case Z_LINK_CAP_MULTICAST_STREAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM: {
case Z_LINK_CAP_TRANSPORT_MULTICAST: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_multicast_open_client(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
Expand All @@ -73,10 +71,8 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z
if (ret != _Z_RES_OK) {
return ret;
}
switch (zl._capabilities) {
// Unicast capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_UNICAST_DATAGRAM: {
switch (zl._cap._transport) {
case Z_LINK_CAP_TRANSPORT_UNICAST: {
_z_transport_unicast_establish_param_t tp_param;
ret = _z_unicast_open_peer(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
Expand All @@ -86,9 +82,7 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z
ret = _z_unicast_transport_create(zt, &zl, &tp_param);
break;
}
// Multicast capable links
case Z_LINK_CAP_MULTICAST_STREAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM: {
case Z_LINK_CAP_TRANSPORT_MULTICAST: {
_z_transport_multicast_establish_param_t tp_param;
ret = _z_multicast_open_peer(&tp_param, &zl, local_zid);
if (ret != _Z_RES_OK) {
Expand Down
10 changes: 3 additions & 7 deletions src/transport/multicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ void *_zp_multicast_read_task(void *ztm_arg) {
// Read bytes from socket to the main buffer
size_t to_read = 0;

switch (ztm->_link._capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (ztm->_link._cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr);
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
Expand All @@ -99,9 +97,7 @@ void *_zp_multicast_read_task(void *ztm_arg) {
}
}
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztm->_zbuf);
to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr);
if (to_read == SIZE_MAX) {
Expand Down
9 changes: 3 additions & 6 deletions src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me

size_t to_read = 0;
do {
switch (ztm->_link._capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (ztm->_link._cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr);
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
Expand All @@ -85,8 +83,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me
}
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztm->_zbuf);
to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr);
if (to_read == SIZE_MAX) {
Expand Down
12 changes: 6 additions & 6 deletions src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport
#endif // Z_FEATURE_MULTI_THREAD == 1

// Prepare the buffer eventually reserving space for the message length
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);

// Encode the session message
ret = _z_transport_message_encode(&ztm->_wbuf, t_msg);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);
// Send the wbuf on the socket
ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf);
if (ret == _Z_RES_OK) {
Expand Down Expand Up @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m

if (drop == false) {
// Prepare the buffer eventually reserving space for the message length
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);

_z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number

Expand All @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);

ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket
if (ret == _Z_RES_OK) {
Expand All @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
is_first = false;

// Clear the buffer for serialization
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);

// Serialize one fragment
ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow);

ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket
if (ret == _Z_RES_OK) {
Expand Down
10 changes: 3 additions & 7 deletions src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ void *_zp_unicast_read_task(void *ztu_arg) {
while (ztu->_read_task_running == true) {
// Read bytes from socket to the main buffer
size_t to_read = 0;
switch (ztu->_link._capabilities) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (ztu->_link._cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL);
if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
Expand All @@ -93,9 +91,7 @@ void *_zp_unicast_read_task(void *ztu_arg) {
}
}
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztu->_zbuf);
to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL);
if (to_read == SIZE_MAX) {
Expand Down
8 changes: 3 additions & 5 deletions src/transport/unicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag

size_t to_read = 0;
do {
switch (ztu->_link._capabilities) {
switch (ztu->_link._cap._flow) {
// Stream capable links
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL);
if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
Expand All @@ -63,8 +62,7 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag
}
break;
// Datagram capable links
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztu->_zbuf);
to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL);
if (to_read == SIZE_MAX) {
Expand Down
8 changes: 3 additions & 5 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
size_t dbuf_size = 0;
_Bool expandable = false;

switch (zl->_capabilities) {
case Z_LINK_CAP_UNICAST_STREAM:
case Z_LINK_CAP_MULTICAST_STREAM:
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
expandable = true;
break;
case Z_LINK_CAP_UNICAST_DATAGRAM:
case Z_LINK_CAP_MULTICAST_DATAGRAM:
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
expandable = false;
break;
Expand Down
12 changes: 6 additions & 6 deletions src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes
#endif // Z_FEATURE_MULTI_THREAD == 1

// Prepare the buffer eventually reserving space for the message length
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);

// Encode the session message
ret = _z_transport_message_encode(&ztu->_wbuf, t_msg);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);
// Send the wbuf on the socket
ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf);
if (ret == _Z_RES_OK) {
Expand Down Expand Up @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg

if (drop == false) {
// Prepare the buffer eventually reserving space for the message length
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);

_z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number

Expand All @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);

if (ztu->_wbuf._ioss._len == 1) {
ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket
Expand All @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
is_first = false;

// Clear the buffer for serialization
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);

// Serialize one fragment
ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities);
__unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow);

ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket
if (ret == _Z_RES_OK) {
Expand Down

0 comments on commit 6e7a3c9

Please sign in to comment.