diff --git a/include/zenoh-pico/transport/raweth/tx.h b/include/zenoh-pico/transport/raweth/tx.h index 2595e78eb..8146e38e4 100644 --- a/include/zenoh-pico/transport/raweth/tx.h +++ b/include/zenoh-pico/transport/raweth/tx.h @@ -18,6 +18,7 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" +int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg); int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); diff --git a/src/transport/raweth/transport.c b/src/transport/raweth/transport.c index b8c4bc876..7dced540d 100644 --- a/src/transport/raweth/transport.c +++ b/src/transport/raweth/transport.c @@ -123,7 +123,7 @@ int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, cons // Encode and send the message _Z_INFO("Sending Z_JOIN message\n"); - ret = _z_raweth_send_t_msg(zl, &jsm); + ret = _z_raweth_link_send_t_msg(zl, &jsm); _z_t_msg_clear(&jsm); if (ret == _Z_RES_OK) { diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 2834f5ff9..a65e27e53 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -112,15 +112,15 @@ static int8_t __unsafe_z_raweth_write_header(_z_transport_multicast_t *ztm) { return _Z_RES_OK; } -static int8_t _z_raweth_link_send_wbuf(const _z_transport_multicast_t *ztm) { +static int8_t _z_raweth_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; - for (size_t i = 0; (i < _z_wbuf_len_iosli(&ztm->_wbuf)) && (ret == _Z_RES_OK); i++) { - _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(&ztm->_wbuf, i)); + for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { + _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); size_t n = bs.len; do { // Retrieve addr from config + vlan tag above (locator) - size_t wb = _z_send_raweth(&ztm->_link._socket._raweth._sock, bs.start, n); // Unix + size_t wb = _z_send_raweth(&zl->_socket._raweth._sock, bs.start, n); // Unix if (wb == SIZE_MAX) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -131,19 +131,37 @@ static int8_t _z_raweth_link_send_wbuf(const _z_transport_multicast_t *ztm) { return ret; } -// static void _z_raweth_check_config(_z_transport_multicast_t *ztm, const _z_keyexpr_t *key) { -// // Check config -// if (ztm->_link._socket._raweth._config != NULL) { -// // Check key -// if ((key != NULL)) { -// // Get send info from config keyexpr mapping -// } else { -// // Get send info from config default values -// } -// } else { -// // Nothing to do -// } -// } +int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg) { + int8_t ret = _Z_RES_OK; + + // Create and prepare the buffer to serialize the message on + uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; + _z_wbuf_t wbf = _z_wbuf_make(mtu, false); + + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } + // Encode the session message + ret = _z_transport_message_encode(&wbf, t_msg); + if (ret == _Z_RES_OK) { + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } + // Send the wbuf on the socket + ret = _z_raweth_link_send_wbuf(zl, &wbf); + } + _z_wbuf_clear(&wbf); + + return ret; +} int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { int8_t ret = _Z_RES_OK; @@ -159,7 +177,7 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me // Encode the session message _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm)); + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data ztm->_transmitted = true; @@ -220,7 +238,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, ret = _z_network_message_encode(&ztm->_wbuf, n_msg); if (ret == _Z_RES_OK) { // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm)); + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it @@ -241,7 +259,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Serialize one fragment _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); // Send the wbuf on the socket - _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm)); + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data ztm->_transmitted = true; }