Skip to content

Commit

Permalink
fix: add missing send function
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Nov 28, 2023
1 parent b7a009e commit a98725e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/raweth/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/transport/transport.h"

int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg);
int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg);
Expand Down
2 changes: 1 addition & 1 deletion src/transport/raweth/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, cons

// Encode and send the message
_Z_INFO("Sending Z_JOIN message\n");
ret = _z_raweth_send_t_msg(zl, &jsm);
ret = _z_raweth_link_send_t_msg(zl, &jsm);
_z_t_msg_clear(&jsm);

if (ret == _Z_RES_OK) {
Expand Down
58 changes: 38 additions & 20 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ static int8_t __unsafe_z_raweth_write_header(_z_transport_multicast_t *ztm) {
return _Z_RES_OK;
}

static int8_t _z_raweth_link_send_wbuf(const _z_transport_multicast_t *ztm) {
static int8_t _z_raweth_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf) {
int8_t ret = _Z_RES_OK;
for (size_t i = 0; (i < _z_wbuf_len_iosli(&ztm->_wbuf)) && (ret == _Z_RES_OK); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(&ztm->_wbuf, i));
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i));
size_t n = bs.len;

do {
// Retrieve addr from config + vlan tag above (locator)
size_t wb = _z_send_raweth(&ztm->_link._socket._raweth._sock, bs.start, n); // Unix
size_t wb = _z_send_raweth(&zl->_socket._raweth._sock, bs.start, n); // Unix
if (wb == SIZE_MAX) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand All @@ -131,19 +131,37 @@ static int8_t _z_raweth_link_send_wbuf(const _z_transport_multicast_t *ztm) {
return ret;
}

// static void _z_raweth_check_config(_z_transport_multicast_t *ztm, const _z_keyexpr_t *key) {
// // Check config
// if (ztm->_link._socket._raweth._config != NULL) {
// // Check key
// if ((key != NULL)) {
// // Get send info from config keyexpr mapping
// } else {
// // Get send info from config default values
// }
// } else {
// // Nothing to do
// }
// }
int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg) {
int8_t ret = _Z_RES_OK;

// Create and prepare the buffer to serialize the message on
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Send the wbuf on the socket
ret = _z_raweth_link_send_wbuf(zl, &wbf);
}
_z_wbuf_clear(&wbf);

return ret;
}

int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) {
int8_t ret = _Z_RES_OK;
Expand All @@ -159,7 +177,7 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm));
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
// Mark the session that we have transmitted data
ztm->_transmitted = true;

Expand Down Expand Up @@ -220,7 +238,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
ret = _z_network_message_encode(&ztm->_wbuf, n_msg);
if (ret == _Z_RES_OK) {
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm));
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
Expand All @@ -241,7 +259,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Serialize one fragment
_Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(ztm));
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
}
Expand Down

0 comments on commit a98725e

Please sign in to comment.