From e4200299f06875edcc6ceff1b4bb815961ccb958 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Wed, 15 Nov 2023 11:11:14 +0100 Subject: [PATCH] Refactor transport layer (#279) --- include/zenoh-pico/link/config/udp.h | 3 +- include/zenoh-pico/transport/common/join.h | 22 + include/zenoh-pico/transport/common/lease.h | 23 + include/zenoh-pico/transport/common/read.h | 23 + include/zenoh-pico/transport/common/rx.h | 24 + .../transport/{link => common}/tx.h | 19 +- include/zenoh-pico/transport/multicast.h | 23 + .../transport/{link/task => multicast}/join.h | 7 +- .../{link/task => multicast}/lease.h | 13 +- .../transport/{link/task => multicast}/read.h | 13 +- .../transport/{link => multicast}/rx.h | 15 +- .../transport/multicast/transport.h | 29 + include/zenoh-pico/transport/multicast/tx.h | 27 + include/zenoh-pico/transport/transport.h | 20 - include/zenoh-pico/transport/unicast.h | 23 + include/zenoh-pico/transport/unicast/lease.h | 25 + include/zenoh-pico/transport/unicast/read.h | 25 + include/zenoh-pico/transport/unicast/rx.h | 24 + .../zenoh-pico/transport/unicast/transport.h | 28 + include/zenoh-pico/transport/unicast/tx.h | 27 + include/zenoh-pico/utils/result.h | 2 + src/api/api.c | 59 +-- src/net/session.c | 214 +++----- src/session/scout.c | 7 +- src/session/tx.c | 28 +- src/session/utils.c | 30 +- src/transport/common/join.c | 22 +- src/transport/common/lease.c | 52 +- src/transport/common/read.c | 53 +- src/transport/common/rx.c | 3 +- src/transport/common/tx.c | 30 +- src/transport/manager.c | 98 ++-- src/transport/multicast.c | 65 +++ .../multicast/{link/task => }/join.c | 10 +- .../multicast/{link/task => }/lease.c | 52 +- .../multicast/{link/task => }/read.c | 45 +- src/transport/multicast/{link => }/rx.c | 16 +- src/transport/multicast/transport.c | 230 ++++++++ src/transport/multicast/{link => }/tx.c | 18 +- src/transport/transport.c | 495 ++---------------- src/transport/unicast.c | 56 ++ src/transport/unicast/{link/task => }/lease.c | 47 +- src/transport/unicast/{link/task => }/read.c | 44 +- src/transport/unicast/{link => }/rx.c | 13 +- src/transport/unicast/transport.c | 317 +++++++++++ src/transport/unicast/{link => }/tx.c | 17 +- 46 files changed, 1540 insertions(+), 896 deletions(-) create mode 100644 include/zenoh-pico/transport/common/join.h create mode 100644 include/zenoh-pico/transport/common/lease.h create mode 100644 include/zenoh-pico/transport/common/read.h create mode 100644 include/zenoh-pico/transport/common/rx.h rename include/zenoh-pico/transport/{link => common}/tx.h (55%) create mode 100644 include/zenoh-pico/transport/multicast.h rename include/zenoh-pico/transport/{link/task => multicast}/join.h (75%) rename include/zenoh-pico/transport/{link/task => multicast}/lease.h (58%) rename include/zenoh-pico/transport/{link/task => multicast}/read.h (59%) rename include/zenoh-pico/transport/{link => multicast}/rx.h (60%) create mode 100644 include/zenoh-pico/transport/multicast/transport.h create mode 100644 include/zenoh-pico/transport/multicast/tx.h create mode 100644 include/zenoh-pico/transport/unicast.h create mode 100644 include/zenoh-pico/transport/unicast/lease.h create mode 100644 include/zenoh-pico/transport/unicast/read.h create mode 100644 include/zenoh-pico/transport/unicast/rx.h create mode 100644 include/zenoh-pico/transport/unicast/transport.h create mode 100644 include/zenoh-pico/transport/unicast/tx.h create mode 100644 src/transport/multicast.c rename src/transport/multicast/{link/task => }/join.c (82%) rename src/transport/multicast/{link/task => }/lease.c (75%) rename src/transport/multicast/{link/task => }/read.c (73%) rename src/transport/multicast/{link => }/rx.c (95%) create mode 100644 src/transport/multicast/transport.c rename src/transport/multicast/{link => }/tx.c (91%) create mode 100644 src/transport/unicast.c rename src/transport/unicast/{link/task => }/lease.c (65%) rename src/transport/unicast/{link/task => }/read.c (73%) rename src/transport/unicast/{link => }/rx.c (94%) create mode 100644 src/transport/unicast/transport.c rename src/transport/unicast/{link => }/tx.c (91%) diff --git a/include/zenoh-pico/link/config/udp.h b/include/zenoh-pico/link/config/udp.h index ab8875d5e..dd374b5b6 100644 --- a/include/zenoh-pico/link/config/udp.h +++ b/include/zenoh-pico/link/config/udp.h @@ -18,8 +18,6 @@ #include "zenoh-pico/collections/intmap.h" #include "zenoh-pico/collections/string.h" -#if Z_FEATURE_LINK_UDP_UNICAST == 1 || Z_FEATURE_LINK_UDP_MULTICAST == 1 - #define UDP_CONFIG_ARGC 3 #define UDP_CONFIG_IFACE_KEY 0x01 @@ -31,6 +29,7 @@ #define UDP_CONFIG_JOIN_KEY 0x03 #define UDP_CONFIG_JOIN_STR "join" +#if Z_FEATURE_LINK_UDP_UNICAST == 1 || Z_FEATURE_LINK_UDP_MULTICAST == 1 #define UDP_CONFIG_MAPPING_BUILD \ _z_str_intmapping_t args[UDP_CONFIG_ARGC]; \ args[0]._key = UDP_CONFIG_IFACE_KEY; \ diff --git a/include/zenoh-pico/transport/common/join.h b/include/zenoh-pico/transport/common/join.h new file mode 100644 index 000000000..8b8d5fc8b --- /dev/null +++ b/include/zenoh-pico/transport/common/join.h @@ -0,0 +1,22 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_TRANSPORT_JOIN_H +#define ZENOH_PICO_TRANSPORT_JOIN_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _z_send_join(_z_transport_t *zt); + +#endif /* ZENOH_PICO_TRANSPORT_JOIN_H */ diff --git a/include/zenoh-pico/transport/common/lease.h b/include/zenoh-pico/transport/common/lease.h new file mode 100644 index 000000000..f35ab2f09 --- /dev/null +++ b/include/zenoh-pico/transport/common/lease.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_TRANSPORT_LEASE_H +#define ZENOH_PICO_TRANSPORT_LEASE_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _z_send_keep_alive(_z_transport_t *zt); +void *_zp_lease_task(void *zt_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_TRANSPORT_LEASE_H */ diff --git a/include/zenoh-pico/transport/common/read.h b/include/zenoh-pico/transport/common/read.h new file mode 100644 index 000000000..fe87896ab --- /dev/null +++ b/include/zenoh-pico/transport/common/read.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_TRANSPORT_READ_H +#define ZENOH_PICO_TRANSPORT_READ_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _z_read(_z_transport_t *zt); +void *_zp_read_task(void *zt_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_TRANSPORT_READ_H */ diff --git a/include/zenoh-pico/transport/common/rx.h b/include/zenoh-pico/transport/common/rx.h new file mode 100644 index 000000000..950f9dcb1 --- /dev/null +++ b/include/zenoh-pico/transport/common/rx.h @@ -0,0 +1,24 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_TRANSPORT_RX_H +#define ZENOH_PICO_TRANSPORT_RX_H + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/transport.h" + +/*------------------ Transmission and Reception helpers ------------------*/ +int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl); + +#endif /* ZENOH_PICO_TRANSPORT_RX_H */ diff --git a/include/zenoh-pico/transport/link/tx.h b/include/zenoh-pico/transport/common/tx.h similarity index 55% rename from include/zenoh-pico/transport/link/tx.h rename to include/zenoh-pico/transport/common/tx.h index 51aa3412d..33ba5593f 100644 --- a/include/zenoh-pico/transport/link/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -12,8 +12,8 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_TRANSPORT_LINK_TX_H -#define ZENOH_PICO_TRANSPORT_LINK_TX_H +#ifndef ZENOH_PICO_TRANSPORT_TX_H +#define ZENOH_PICO_TRANSPORT_TX_H #include "zenoh-pico/link/link.h" #include "zenoh-pico/net/session.h" @@ -26,20 +26,7 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed); int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); /*------------------ Transmission and Reception helpers ------------------*/ -int8_t _z_unicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); -int8_t _z_multicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); - -int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); -int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); - int8_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); -int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg); -int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); - int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg); -#endif /* ZENOH_PICO_TRANSPORT_LINK_TX_H */ +#endif /* ZENOH_PICO_TRANSPORT_TX_H */ diff --git a/include/zenoh-pico/transport/multicast.h b/include/zenoh-pico/transport/multicast.h new file mode 100644 index 000000000..fa1c873b6 --- /dev/null +++ b/include/zenoh-pico/transport/multicast.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_MULTICAST_H +#define ZENOH_PICO_MULTICAST_H + +#include "zenoh-pico/api/types.h" + +void _zp_multicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback); +void _zp_multicast_info_session(const _z_transport_t *zt, _z_config_t *ps); + +#endif /* ZENOH_PICO_MULTICAST_H */ diff --git a/include/zenoh-pico/transport/link/task/join.h b/include/zenoh-pico/transport/multicast/join.h similarity index 75% rename from include/zenoh-pico/transport/link/task/join.h rename to include/zenoh-pico/transport/multicast/join.h index fe245681f..b7caaadd3 100644 --- a/include/zenoh-pico/transport/link/task/join.h +++ b/include/zenoh-pico/transport/multicast/join.h @@ -12,12 +12,11 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_TRANSPORT_LINK_TASK_JOIN_H -#define ZENOH_PICO_TRANSPORT_LINK_TASK_JOIN_H +#ifndef ZENOH_MULTICAST_JOIN_H +#define ZENOH_MULTICAST_JOIN_H #include "zenoh-pico/transport/transport.h" -int8_t _z_send_join(_z_transport_t *zt); int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm); -#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_JOIN_H */ +#endif /* ZENOH_MULTICAST_JOIN_H */ diff --git a/include/zenoh-pico/transport/link/task/lease.h b/include/zenoh-pico/transport/multicast/lease.h similarity index 58% rename from include/zenoh-pico/transport/link/task/lease.h rename to include/zenoh-pico/transport/multicast/lease.h index 1b2f49f21..aef7701ed 100644 --- a/include/zenoh-pico/transport/link/task/lease.h +++ b/include/zenoh-pico/transport/multicast/lease.h @@ -12,17 +12,14 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_TRANSPORT_LINK_TASK_LEASE_H -#define ZENOH_PICO_TRANSPORT_LINK_TASK_LEASE_H +#ifndef ZENOH_PICO_MULTICAST_LEASE_H +#define ZENOH_PICO_MULTICAST_LEASE_H #include "zenoh-pico/transport/transport.h" -int8_t _z_send_keep_alive(_z_transport_t *zt); -int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu); int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm); - -void *_zp_lease_task(void *zt_arg); // The argument is void* to avoid incompatible pointer types in tasks -void *_zp_unicast_lease_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks +int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt); void *_zp_multicast_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks -#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_LEASE_H */ +#endif /* ZENOH_PICO_MULTICAST_LEASE_H */ diff --git a/include/zenoh-pico/transport/link/task/read.h b/include/zenoh-pico/transport/multicast/read.h similarity index 59% rename from include/zenoh-pico/transport/link/task/read.h rename to include/zenoh-pico/transport/multicast/read.h index 17b75bb5e..b2d53f770 100644 --- a/include/zenoh-pico/transport/link/task/read.h +++ b/include/zenoh-pico/transport/multicast/read.h @@ -12,17 +12,14 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_TRANSPORT_LINK_TASK_READ_H -#define ZENOH_PICO_TRANSPORT_LINK_TASK_READ_H +#ifndef ZENOH_PICO_MULTICAST_READ_H +#define ZENOH_PICO_MULTICAST_READ_H #include "zenoh-pico/transport/transport.h" -int8_t _z_read(_z_transport_t *zt); -int8_t _zp_unicast_read(_z_transport_unicast_t *ztu); int8_t _zp_multicast_read(_z_transport_multicast_t *ztm); - -void *_zp_read_task(void *zt_arg); // The argument is void* to avoid incompatible pointer types in tasks -void *_zp_unicast_read_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_multicast_stop_read_task(_z_transport_t *zt); void *_zp_multicast_read_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks -#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_READ_H */ \ No newline at end of file +#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_READ_H */ diff --git a/include/zenoh-pico/transport/link/rx.h b/include/zenoh-pico/transport/multicast/rx.h similarity index 60% rename from include/zenoh-pico/transport/link/rx.h rename to include/zenoh-pico/transport/multicast/rx.h index f4c428a64..1ffba60f0 100644 --- a/include/zenoh-pico/transport/link/rx.h +++ b/include/zenoh-pico/transport/multicast/rx.h @@ -12,23 +12,14 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_TRANSPORT_LINK_RX_H -#define ZENOH_PICO_TRANSPORT_LINK_RX_H +#ifndef ZENOH_PICO_MULTICAST_RX_H +#define ZENOH_PICO_MULTICAST_RX_H -#include "zenoh-pico/link/link.h" #include "zenoh-pico/transport/transport.h" -/*------------------ Transmission and Reception helpers ------------------*/ -int8_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); - -int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl); - -int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); - -int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); -#endif /* ZENOH_PICO_TRANSPORT_LINK_RX_H */ \ No newline at end of file +#endif /* ZENOH_PICO_TRANSPORT_LINK_RX_H */ diff --git a/include/zenoh-pico/transport/multicast/transport.h b/include/zenoh-pico/transport/multicast/transport.h new file mode 100644 index 000000000..fa1f0bedd --- /dev/null +++ b/include/zenoh-pico/transport/multicast/transport.h @@ -0,0 +1,29 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_MULTICAST_TRANSPORT_H +#define ZENOH_PICO_MULTICAST_TRANSPORT_H + +#include "zenoh-pico/api/types.h" + +int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, + _z_transport_multicast_establish_param_t *param); +int8_t _z_multicast_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only); +int8_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason); +void _z_multicast_transport_clear(_z_transport_t *zt); +#endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/multicast/tx.h b/include/zenoh-pico/transport/multicast/tx.h new file mode 100644 index 000000000..d8171b719 --- /dev/null +++ b/include/zenoh-pico/transport/multicast/tx.h @@ -0,0 +1,27 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_MULTICAST_TX_H +#define ZENOH_PICO_MULTICAST_TX_H + +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/transport/transport.h" + +int8_t _z_multicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); + +#endif /* ZENOH_PICO_MULTICAST_TX_H */ diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 6996f4b5a..00356c20b 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -164,28 +164,8 @@ typedef struct { uint8_t _seq_num_res; } _z_transport_multicast_establish_param_t; -int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unicast_establish_param_t *param); -int8_t _z_transport_multicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param); - -int8_t _z_transport_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); -int8_t _z_transport_multicast_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); -int8_t _z_transport_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); -int8_t _z_transport_multicast_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); - int8_t _z_transport_close(_z_transport_t *zt, uint8_t reason); -int8_t _z_transport_unicast_close(_z_transport_unicast_t *ztu, uint8_t reason); -int8_t _z_transport_multicast_close(_z_transport_multicast_t *ztm, uint8_t reason); - -void _z_transport_unicast_clear(_z_transport_unicast_t *ztu); -void _z_transport_multicast_clear(_z_transport_multicast_t *ztm); - void _z_transport_clear(_z_transport_t *zt); void _z_transport_free(_z_transport_t **zt); -void _z_transport_unicast_free(_z_transport_unicast_t **ztu); -void _z_transport_multicast_free(_z_transport_multicast_t **ztm); #endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/unicast.h b/include/zenoh-pico/transport/unicast.h new file mode 100644 index 000000000..bcfbcc1ef --- /dev/null +++ b/include/zenoh-pico/transport/unicast.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_H +#define ZENOH_PICO_UNICAST_H + +#include "zenoh-pico/api/types.h" + +void _zp_unicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback); +void _zp_unicast_info_session(const _z_transport_t *zt, _z_config_t *ps); + +#endif /* ZENOH_PICO_UNICAST_H */ diff --git a/include/zenoh-pico/transport/unicast/lease.h b/include/zenoh-pico/transport/unicast/lease.h new file mode 100644 index 000000000..fbe97e5f7 --- /dev/null +++ b/include/zenoh-pico/transport/unicast/lease.h @@ -0,0 +1,25 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_LEASE_H +#define ZENOH_PICO_UNICAST_LEASE_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu); +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt); +void *_zp_unicast_lease_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_LEASE_H */ diff --git a/include/zenoh-pico/transport/unicast/read.h b/include/zenoh-pico/transport/unicast/read.h new file mode 100644 index 000000000..e5d4db852 --- /dev/null +++ b/include/zenoh-pico/transport/unicast/read.h @@ -0,0 +1,25 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_READ_H +#define ZENOH_PICO_UNICAST_READ_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _zp_unicast_read(_z_transport_unicast_t *ztu); +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_unicast_stop_read_task(_z_transport_t *zt); +void *_zp_unicast_read_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_UNICAST_READ_H */ diff --git a/include/zenoh-pico/transport/unicast/rx.h b/include/zenoh-pico/transport/unicast/rx.h new file mode 100644 index 000000000..61dd89507 --- /dev/null +++ b/include/zenoh-pico/transport/unicast/rx.h @@ -0,0 +1,24 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_RX_H +#define ZENOH_PICO_UNICAST_RX_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); +int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); +int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); + +#endif /* ZENOH_PICO_UNICAST_RX_H */ diff --git a/include/zenoh-pico/transport/unicast/transport.h b/include/zenoh-pico/transport/unicast/transport.h new file mode 100644 index 000000000..9ac7b885b --- /dev/null +++ b/include/zenoh-pico/transport/unicast/transport.h @@ -0,0 +1,28 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_TRANSPORT_H +#define ZENOH_PICO_UNICAST_TRANSPORT_H + +#include "zenoh-pico/api/types.h" + +int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_unicast_establish_param_t *param); +int8_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, _Bool link_only); +int8_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason); +void _z_unicast_transport_clear(_z_transport_t *zt); +#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/unicast/tx.h b/include/zenoh-pico/transport/unicast/tx.h new file mode 100644 index 000000000..ed42e0c73 --- /dev/null +++ b/include/zenoh-pico/transport/unicast/tx.h @@ -0,0 +1,27 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_UNICAST_TX_H +#define ZENOH_PICO_UNICAST_TX_H + +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/transport/transport.h" + +int8_t _z_unicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg); + +#endif /* ZENOH_PICO_TRANSPORT_LINK_TX_H */ diff --git a/include/zenoh-pico/utils/result.h b/include/zenoh-pico/utils/result.h index 7b4afe42e..bae8d7b17 100644 --- a/include/zenoh-pico/utils/result.h +++ b/include/zenoh-pico/utils/result.h @@ -15,6 +15,8 @@ #ifndef ZENOH_PICO_UTILS_RESULT_H #define ZENOH_PICO_UTILS_RESULT_H +#define _ZP_UNUSED(x) (void)(x) + #define _Z_ERR_MESSAGE_MASK 0x88 #define _Z_ERR_ENTITY_MASK 0x90 #define _Z_ERR_TRANSPORT_MASK 0x98 diff --git a/src/api/api.c b/src/api/api.c index 0e2e59749..8e228fc45 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -33,6 +33,8 @@ #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/system/platform.h" +#include "zenoh-pico/transport/multicast.h" +#include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" #include "zenoh-pico/utils/uuid.h" @@ -542,45 +544,40 @@ int8_t z_close(z_owned_session_t *zs) { } int8_t z_info_peers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { + // Call transport function + switch (zs._val->_tp._type) { + case _Z_TRANSPORT_MULTICAST_TYPE: + _zp_multicast_fetch_zid(&zs._val->_tp, callback); + break; + default: + break; + } + // Note and clear context void *ctx = callback->context; callback->context = NULL; - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zs._val->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - _z_transport_peer_entry_list_t *l = zs._val->_tp._transport._multicast._peers; - for (; l != NULL; l = _z_transport_peer_entry_list_tail(l)) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(l); - z_id_t id = val->_remote_zid; - - callback->call(&id, ctx); - } - } -#else - (void)zs; -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - + // Drop if needed if (callback->drop != NULL) { callback->drop(ctx); } - return 0; } int8_t z_info_routers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { + // Call transport function + switch (zs._val->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + _zp_unicast_fetch_zid(&zs._val->_tp, callback); + break; + default: + break; + } + // Note and clear context void *ctx = callback->context; callback->context = NULL; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - z_id_t id = zs._val->_tp._transport._unicast._remote_zid; - callback->call(&id, ctx); - } -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - + // Drop if needed if (callback->drop != NULL) { callback->drop(ctx); } - return 0; } @@ -641,17 +638,13 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { uint16_t id = _z_declare_resource(zs._val, keyexpr); key = _z_rid_with_suffix(id, NULL); } -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 z_publisher_options_t opt = z_publisher_options_default(); if (options != NULL) { @@ -801,17 +794,13 @@ z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_o // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { uint16_t id = _z_declare_resource(zs._val, keyexpr); key = _z_rid_with_suffix(id, NULL); } -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 z_queryable_options_t opt = z_queryable_options_default(); if (options != NULL) { @@ -908,9 +897,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { char *wild = strpbrk(keyexpr._suffix, "*$"); @@ -933,9 +920,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z key = _z_rid_with_suffix(id, wild); } } -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 _z_subinfo_t subinfo = _z_subinfo_push_default(); if (options != NULL) { diff --git a/src/net/session.c b/src/net/session.c index c89f83f35..f0e2f3b30 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -24,9 +24,15 @@ #include "zenoh-pico/net/memory.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/link/task/join.h" -#include "zenoh-pico/transport/link/task/lease.h" -#include "zenoh-pico/transport/link/task/read.h" +#include "zenoh-pico/transport/common/join.h" +#include "zenoh-pico/transport/common/lease.h" +#include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/multicast.h" +#include "zenoh-pico/transport/multicast/lease.h" +#include "zenoh-pico/transport/multicast/read.h" +#include "zenoh-pico/transport/unicast.h" +#include "zenoh-pico/transport/unicast/lease.h" +#include "zenoh-pico/transport/unicast/read.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/uuid.h" @@ -34,23 +40,17 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) { int8_t ret = _Z_RES_OK; _z_id_t local_zid = _z_id_empty(); -#if Z_FEATURE_UNICAST_TRANSPORT == 1 || Z_FEATURE_MULTICAST_TRANSPORT == 1 ret = _z_session_generate_zid(&local_zid, Z_ZID_LENGTH); - if (ret == _Z_RES_OK) { - ret = _z_new_transport(&zn->_tp, &local_zid, locator, mode); - if (ret != _Z_RES_OK) { - local_zid = _z_id_empty(); - } - } else { + if (ret != _Z_RES_OK) { local_zid = _z_id_empty(); + return ret; } -#else - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; -#endif - if (ret == _Z_RES_OK) { - ret = _z_session_init(zn, &local_zid); + ret = _z_new_transport(&zn->_tp, &local_zid, locator, mode); + if (ret != _Z_RES_OK) { + local_zid = _z_id_empty(); + return ret; } - + ret = _z_session_init(zn, &local_zid); return ret; } @@ -155,27 +155,15 @@ _z_config_t *_z_info(const _z_session_t *zn) { _z_bytes_t local_zid = _z_bytes_wrap(zn->_local_zid.id, _z_id_len(zn->_local_zid)); _zp_config_insert(ps, Z_INFO_PID_KEY, _z_string_from_bytes(&local_zid)); -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - _z_id_t remote_zid = zn->_tp._transport._unicast._remote_zid; - _z_bytes_t remote_zidbytes = _z_bytes_wrap(remote_zid.id, _z_id_len(remote_zid)); - _zp_config_insert(ps, Z_INFO_ROUTER_PID_KEY, _z_string_from_bytes(&remote_zidbytes)); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - _z_transport_peer_entry_list_t *xs = zn->_tp._transport._multicast._peers; - while (xs != NULL) { - _z_transport_peer_entry_t *peer = _z_transport_peer_entry_list_head(xs); - _z_bytes_t remote_zid = _z_bytes_wrap(peer->_remote_zid.id, _z_id_len(peer->_remote_zid)); - _zp_config_insert(ps, Z_INFO_PEER_PID_KEY, _z_string_from_bytes(&remote_zid)); - - xs = _z_transport_peer_entry_list_tail(xs); - } - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - __asm__("nop"); + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + _zp_unicast_info_session(&zn->_tp, ps); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + _zp_multicast_info_session(&zn->_tp, ps); + break; + default: + break; } } @@ -191,117 +179,87 @@ int8_t _zp_send_join(_z_session_t *zn) { return _z_send_join(&zn->_tp); } #if Z_FEATURE_MULTI_THREAD == 1 int8_t _zp_start_read_task(_z_session_t *zn, _z_task_attr_t *attr) { int8_t ret = _Z_RES_OK; - + // Allocate task _z_task_t *task = (_z_task_t *)z_malloc(sizeof(_z_task_t)); - if (task != NULL) { - (void)memset(task, 0, sizeof(_z_task_t)); - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - zn->_tp._transport._unicast._read_task = task; - zn->_tp._transport._unicast._read_task_running = true; - if (_z_task_init(task, attr, _zp_unicast_read_task, &zn->_tp._transport._unicast) != _Z_RES_OK) { - zn->_tp._transport._unicast._read_task_running = false; - ret = _Z_ERR_SYSTEM_TASK_FAILED; - z_free(task); - } - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - zn->_tp._transport._multicast._read_task = task; - zn->_tp._transport._multicast._read_task_running = true; - if (_z_task_init(task, attr, _zp_multicast_read_task, &zn->_tp._transport._multicast) != _Z_RES_OK) { - zn->_tp._transport._multicast._read_task_running = false; - ret = _Z_ERR_SYSTEM_TASK_FAILED; - z_free(task); - } - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { + if (task == NULL) { + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_start_read_task(&zn->_tp, attr, task); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_start_read_task(&zn->_tp, attr, task); + break; + default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - z_free(task); - } + break; + } + // Free task if operation failed + if (ret != _Z_RES_OK) { + z_free(task); } - return ret; } -int8_t _zp_stop_read_task(_z_session_t *zn) { +int8_t _zp_start_lease_task(_z_session_t *zn, _z_task_attr_t *attr) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - zn->_tp._transport._unicast._read_task_running = false; - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - zn->_tp._transport._multicast._read_task_running = false; - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + // Allocate task + _z_task_t *task = (_z_task_t *)z_malloc(sizeof(_z_task_t)); + if (task == NULL) { + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_start_lease_task(&zn->_tp, attr, task); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_start_lease_task(&zn->_tp, attr, task); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; + } + // Free task if operation failed + if (ret != _Z_RES_OK) { + z_free(task); } - return ret; } -int8_t _zp_start_lease_task(_z_session_t *zn, _z_task_attr_t *attr) { +int8_t _zp_stop_read_task(_z_session_t *zn) { int8_t ret = _Z_RES_OK; - - _z_task_t *task = (_z_task_t *)z_malloc(sizeof(_z_task_t)); - if (task != NULL) { - (void)memset(task, 0, sizeof(_z_task_t)); - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - zn->_tp._transport._unicast._lease_task = task; - zn->_tp._transport._unicast._lease_task_running = true; - if (_z_task_init(task, attr, _zp_unicast_lease_task, &zn->_tp._transport._unicast) != _Z_RES_OK) { - zn->_tp._transport._unicast._lease_task_running = false; - ret = _Z_ERR_SYSTEM_TASK_FAILED; - z_free(task); - } - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - zn->_tp._transport._multicast._lease_task = task; - zn->_tp._transport._multicast._lease_task_running = true; - if (_z_task_init(task, attr, _zp_multicast_lease_task, &zn->_tp._transport._multicast) != _Z_RES_OK) { - zn->_tp._transport._multicast._lease_task_running = false; - ret = _Z_ERR_SYSTEM_TASK_FAILED; - z_free(task); - } - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_stop_read_task(&zn->_tp); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_stop_read_task(&zn->_tp); + break; + default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - z_free(task); - } + break; } - return ret; } int8_t _zp_stop_lease_task(_z_session_t *zn) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - zn->_tp._transport._unicast._lease_task_running = false; - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - zn->_tp._transport._multicast._lease_task_running = false; - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_stop_lease_task(&zn->_tp); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_stop_lease_task(&zn->_tp); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } #endif // Z_FEATURE_MULTI_THREAD == 1 diff --git a/src/session/scout.c b/src/session/scout.c index e9191da07..689dd8a53 100644 --- a/src/session/scout.c +++ b/src/session/scout.c @@ -18,6 +18,7 @@ #include "zenoh-pico/link/manager.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_SCOUTING_UDP == 1 && Z_FEATURE_LINK_UDP_UNICAST == 0 @@ -140,13 +141,7 @@ _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t zid, const char *lo _z_scouting_message_encode(&wbf, &scout); // Scout on multicast -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 ret = __z_scout_loop(&wbf, locator, timeout, exit_on_first); -#else - (void)(locator); - (void)(timeout); - (void)(exit_on_first); -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 _z_wbuf_clear(&wbf); diff --git a/src/session/tx.c b/src/session/tx.c index 409f5d402..b00c072f2 100644 --- a/src/session/tx.c +++ b/src/session/tx.c @@ -12,28 +12,26 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" int8_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl) { int8_t ret = _Z_RES_OK; _Z_DEBUG(">> send network message\n"); - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _z_unicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _z_multicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + // Call transport function + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _z_unicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _z_multicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } diff --git a/src/session/utils.c b/src/session/utils.c index b43104f25..1985c8f8a 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -74,26 +74,24 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { #if Z_FEATURE_MULTI_THREAD == 1 ret = _z_mutex_init(&zn->_mutex_inner); + if (ret != _Z_RES_OK) { + _z_transport_clear(&zn->_tp); + return ret; + } #endif // Z_FEATURE_MULTI_THREAD == 1 - if (ret == _Z_RES_OK) { - zn->_local_zid = *zid; -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + + zn->_local_zid = *zid; + // Note session in transport + switch (zn->_tp._type) { + case _Z_TRANSPORT_UNICAST_TYPE: zn->_tp._transport._unicast._session = zn; - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) { + break; + case _Z_TRANSPORT_MULTICAST_TYPE: zn->_tp._transport._multicast._session = zn; - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - // Do nothing. Required to be here because of the #if directive - } - } else { - _z_transport_clear(&zn->_tp); + break; + default: + break; } - return ret; } diff --git a/src/transport/common/join.c b/src/transport/common/join.c index 5c9d6819b..4d3c8498a 100644 --- a/src/transport/common/join.c +++ b/src/transport/common/join.c @@ -12,21 +12,21 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/join.h" +#include "zenoh-pico/transport/common/join.h" + +#include "zenoh-pico/transport/multicast/join.h" int8_t _z_send_join(_z_transport_t *zt) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 // Join task only applies to multicast transports - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _zp_multicast_send_join(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - (void)zt; - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + switch (zt->_type) { + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_send_join(&zt->_transport._multicast); + break; + default: + (void)zt; + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } diff --git a/src/transport/common/lease.c b/src/transport/common/lease.c index bf32a5884..8e337f497 100644 --- a/src/transport/common/lease.c +++ b/src/transport/common/lease.c @@ -12,46 +12,42 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/lease.h" +#include "zenoh-pico/transport/common/lease.h" #include +#include "zenoh-pico/transport/multicast/lease.h" +#include "zenoh-pico/transport/unicast/lease.h" + int8_t _z_send_keep_alive(_z_transport_t *zt) { int8_t ret = _Z_RES_OK; -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _zp_unicast_send_keep_alive(&zt->_transport._unicast); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _zp_multicast_send_keep_alive(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_send_keep_alive(&zt->_transport._unicast); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_send_keep_alive(&zt->_transport._multicast); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } void *_zp_lease_task(void *zt_arg) { void *ret = NULL; _z_transport_t *zt = (_z_transport_t *)zt_arg; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _zp_unicast_lease_task(&zt->_transport._unicast); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _zp_multicast_lease_task(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = NULL; + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_lease_task(&zt->_transport._unicast); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_lease_task(&zt->_transport._multicast); + break; + default: + ret = NULL; + break; } - return ret; } diff --git a/src/transport/common/read.c b/src/transport/common/read.c index e1bcdcee8..a9e2485a4 100644 --- a/src/transport/common/read.c +++ b/src/transport/common/read.c @@ -12,47 +12,42 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/read.h" +#include "zenoh-pico/transport/common/read.h" #include +#include "zenoh-pico/transport/multicast/read.h" +#include "zenoh-pico/transport/unicast/read.h" + int8_t _z_read(_z_transport_t *zt) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _zp_unicast_read(&zt->_transport._unicast); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _zp_multicast_read(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_read(&zt->_transport._unicast); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_read(&zt->_transport._multicast); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } void *_zp_read_task(void *zt_arg) { void *ret = NULL; _z_transport_t *zt = (_z_transport_t *)zt_arg; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _zp_unicast_read_task(&zt->_transport._unicast); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _zp_multicast_read_task(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = NULL; + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _zp_unicast_read_task(&zt->_transport._unicast); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _zp_multicast_read_task(&zt->_transport._multicast); + break; + default: + ret = NULL; + break; } - return ret; } diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 5a7d2e45a..67692eb3c 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -12,11 +12,12 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/rx.h" +#include "zenoh-pico/transport/multicast/rx.h" #include #include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" /*------------------ Reception helper ------------------*/ diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 811cf7a2d..f0d68eb6c 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -12,12 +12,13 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/api/constants.h" #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/definitions/transport.h" +#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" /*------------------ Transmission helper ------------------*/ @@ -53,25 +54,20 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { int8_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _z_unicast_send_t_msg(&zt->_transport._unicast, t_msg); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _z_multicast_send_t_msg(&zt->_transport._multicast, t_msg); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _z_unicast_send_t_msg(&zt->_transport._unicast, t_msg); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _z_multicast_send_t_msg(&zt->_transport._multicast, t_msg); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } -#if Z_FEATURE_UNICAST_TRANSPORT == 1 || Z_FEATURE_MULTICAST_TRANSPORT == 1 int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg) { int8_t ret = _Z_RES_OK; @@ -103,7 +99,7 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m return ret; } -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 || Z_FEATURE_MULTICAST_TRANSPORT == 1 + int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { int8_t ret = _Z_RES_OK; diff --git a/src/transport/manager.c b/src/transport/manager.c index c7e868ed1..06662e513 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -17,81 +17,67 @@ #include #include +#include "zenoh-pico/transport/multicast/transport.h" +#include "zenoh-pico/transport/unicast/transport.h" + int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local_zid) { int8_t ret = _Z_RES_OK; - + // Init link _z_link_t zl; memset(&zl, 0, sizeof(_z_link_t)); - + // Open link ret = _z_open_link(&zl, locator); - if (ret == _Z_RES_OK) { -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (_Z_LINK_IS_MULTICAST(zl._capabilities) == false) { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_transport_unicast_open_client(&tp_param, &zl, local_zid); - if (ret == _Z_RES_OK) { - ret = _z_transport_unicast(zt, &zl, &tp_param); - } else { - _z_link_clear(&zl); - } - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (_Z_LINK_IS_MULTICAST(zl._capabilities) == true) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_transport_multicast_open_client(&tp_param, &zl, local_zid); - if (ret == _Z_RES_OK) { - ret = _z_transport_multicast(zt, &zl, &tp_param); - } else { - _z_link_clear(&zl); - } - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { + if (ret != _Z_RES_OK) { + return ret; + } + // Open transport + if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { _z_link_clear(&zl); - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + return ret; } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + } else { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); } - return ret; } int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_zid) { int8_t ret = _Z_RES_OK; - + // Init link _z_link_t zl; memset(&zl, 0, sizeof(_z_link_t)); - + // Listen link ret = _z_listen_link(&zl, locator); - if (ret == _Z_RES_OK) { -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (_Z_LINK_IS_MULTICAST(zl._capabilities) == false) { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_transport_unicast_open_peer(&tp_param, &zl, local_zid); - if (ret == _Z_RES_OK) { - ret = _z_transport_unicast(zt, &zl, &tp_param); - } else { - _z_link_clear(&zl); - } - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (_Z_LINK_IS_MULTICAST(zl._capabilities) == true) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_transport_multicast_open_peer(&tp_param, &zl, local_zid); - if (ret == _Z_RES_OK) { - ret = _z_transport_multicast(zt, &zl, &tp_param); - } else { - _z_link_clear(&zl); - } - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { + if (ret != _Z_RES_OK) { + return ret; + } + if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { _z_link_clear(&zl); - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + return ret; } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + } else { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); } - return ret; } diff --git a/src/transport/multicast.c b/src/transport/multicast.c new file mode 100644 index 000000000..d8613dc89 --- /dev/null +++ b/src/transport/multicast.c @@ -0,0 +1,65 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/transport/multicast.h" + +#include +#include +#include +#include +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/common/lease.h" +#include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 +void _zp_multicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback) { + void *ctx = callback->context; + _z_transport_peer_entry_list_t *l = zt->_transport._multicast._peers; + for (; l != NULL; l = _z_transport_peer_entry_list_tail(l)) { + _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(l); + z_id_t id = val->_remote_zid; + + callback->call(&id, ctx); + } +} + +void _zp_multicast_info_session(const _z_transport_t *zt, _z_config_t *ps) { + _z_transport_peer_entry_list_t *xs = zt->_transport._multicast._peers; + while (xs != NULL) { + _z_transport_peer_entry_t *peer = _z_transport_peer_entry_list_head(xs); + _z_bytes_t remote_zid = _z_bytes_wrap(peer->_remote_zid.id, _z_id_len(peer->_remote_zid)); + _zp_config_insert(ps, Z_INFO_PEER_PID_KEY, _z_string_from_bytes(&remote_zid)); + + xs = _z_transport_peer_entry_list_tail(xs); + } +} + +#else +void _zp_multicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback) { + _ZP_UNUSED(zt); + _ZP_UNUSED(callback); +} + +void _zp_multicast_info_session(const _z_transport_t *zt, _z_config_t *ps) { + _ZP_UNUSED(zt); + _ZP_UNUSED(ps); +} +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/link/task/join.c b/src/transport/multicast/join.c similarity index 82% rename from src/transport/multicast/link/task/join.c rename to src/transport/multicast/join.c index 45ce08aaa..d577db1da 100644 --- a/src/transport/multicast/link/task/join.c +++ b/src/transport/multicast/join.c @@ -12,10 +12,10 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/join.h" +#include "zenoh-pico/transport/multicast/join.h" #include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/multicast/tx.h" #if Z_FEATURE_MULTICAST_TRANSPORT == 1 @@ -30,5 +30,9 @@ int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { return _z_multicast_send_t_msg(ztm, &jsm); } - +#else +int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} #endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/link/task/lease.c b/src/transport/multicast/lease.c similarity index 75% rename from src/transport/multicast/link/task/lease.c rename to src/transport/multicast/lease.c index 91a594c4b..c30fe4c7a 100644 --- a/src/transport/multicast/link/task/lease.c +++ b/src/transport/multicast/lease.c @@ -12,19 +12,21 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/lease.h" +#include "zenoh-pico/transport/multicast/lease.h" #include #include "zenoh-pico/config.h" #include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/link/task/join.h" -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/common/join.h" +#include "zenoh-pico/transport/multicast/join.h" +#include "zenoh-pico/transport/multicast/transport.h" +#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_MULTICAST_TRANSPORT == 1 -_z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { +static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { _z_zint_t ret = local_lease; _z_transport_peer_entry_list_t *it = peers; @@ -41,7 +43,7 @@ _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t return ret; } -_z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { +static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { _z_zint_t ret = SIZE_MAX; _z_transport_peer_entry_list_t *it = peers; @@ -67,6 +69,25 @@ int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { return ret; } +int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._multicast._lease_task = task; + zt->_transport._multicast._lease_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_multicast_lease_task, &zt->_transport._multicast) != _Z_RES_OK) { + zt->_transport._multicast._lease_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { + zt->_transport._multicast._lease_task_running = false; + return _Z_RES_OK; +} + void *_zp_multicast_lease_task(void *ztm_arg) { #if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; @@ -162,5 +183,26 @@ void *_zp_multicast_lease_task(void *ztm_arg) { return 0; } +#else +int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_multicast_lease_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} #endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/link/task/read.c b/src/transport/multicast/read.c similarity index 73% rename from src/transport/multicast/link/task/read.c rename to src/transport/multicast/read.c index 606f2a8c1..a0c483a6b 100644 --- a/src/transport/multicast/link/task/read.c +++ b/src/transport/multicast/read.c @@ -12,14 +12,15 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/read.h" +#include "zenoh-pico/transport/multicast/read.h" #include #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/iobuf.h" -#include "zenoh-pico/transport/link/rx.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_MULTICAST_TRANSPORT == 1 @@ -38,6 +39,25 @@ int8_t _zp_multicast_read(_z_transport_multicast_t *ztm) { return ret; } +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._multicast._read_task = task; + zt->_transport._multicast._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_multicast_read_task, &zt->_transport._multicast) != _Z_RES_OK) { + zt->_transport._multicast._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_multicast_stop_read_task(_z_transport_t *zt) { + zt->_transport._multicast._read_task_running = false; + return _Z_RES_OK; +} + void *_zp_multicast_read_task(void *ztm_arg) { #if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; @@ -117,5 +137,26 @@ void *_zp_multicast_read_task(void *ztm_arg) { return NULL; } +#else +int8_t _zp_multicast_read(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_multicast_stop_read_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_multicast_read_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} #endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/link/rx.c b/src/transport/multicast/rx.c similarity index 95% rename from src/transport/multicast/link/rx.c rename to src/transport/multicast/rx.c index cb585503c..6d73c10f8 100644 --- a/src/transport/multicast/link/rx.c +++ b/src/transport/multicast/rx.c @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/rx.h" +#include "zenoh-pico/transport/multicast/rx.h" #include #include @@ -319,5 +319,19 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t return ret; } +#else +int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(t_msg); + _ZP_UNUSED(addr); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, + _z_bytes_t *addr) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(t_msg); + _ZP_UNUSED(addr); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} #endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c new file mode 100644 index 000000000..51286de6b --- /dev/null +++ b/src/transport/multicast/transport.c @@ -0,0 +1,230 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/common/lease.h" +#include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/multicast.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 + +int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, + _z_transport_multicast_establish_param_t *param) { + int8_t ret = _Z_RES_OK; + + zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; + +#if Z_FEATURE_MULTI_THREAD == 1 + // Initialize the mutexes + ret = _z_mutex_init(&zt->_transport._multicast._mutex_tx); + if (ret == _Z_RES_OK) { + ret = _z_mutex_init(&zt->_transport._multicast._mutex_rx); + if (ret == _Z_RES_OK) { + ret = _z_mutex_init(&zt->_transport._multicast._mutex_peer); + if (ret != _Z_RES_OK) { + _z_mutex_free(&zt->_transport._multicast._mutex_tx); + _z_mutex_free(&zt->_transport._multicast._mutex_rx); + } + } else { + _z_mutex_free(&zt->_transport._multicast._mutex_tx); + } + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Initialize the read and write buffers + if (ret == _Z_RES_OK) { + uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; + zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, false); + zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + + // Clean up the buffers if one of them failed to be allocated + if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) || + (_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) { + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_free(&zt->_transport._multicast._mutex_tx); + _z_mutex_free(&zt->_transport._multicast._mutex_rx); + _z_mutex_free(&zt->_transport._multicast._mutex_peer); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + _z_wbuf_clear(&zt->_transport._multicast._wbuf); + _z_zbuf_clear(&zt->_transport._multicast._zbuf); + } + } + + if (ret == _Z_RES_OK) { + // Set default SN resolution + zt->_transport._multicast._sn_res = _z_sn_max(param->_seq_num_res); + + // The initial SN at TX side + zt->_transport._multicast._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; + zt->_transport._multicast._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; + + // Initialize peer list + zt->_transport._multicast._peers = _z_transport_peer_entry_list_new(); + +#if Z_FEATURE_MULTI_THREAD == 1 + // Tasks + zt->_transport._multicast._read_task_running = false; + zt->_transport._multicast._read_task = NULL; + zt->_transport._multicast._lease_task_running = false; + zt->_transport._multicast._lease_task = NULL; +#endif // Z_FEATURE_MULTI_THREAD == 1 + + zt->_transport._multicast._lease = Z_TRANSPORT_LEASE; + + // Notifiers + zt->_transport._multicast._transmitted = false; + + // Transport link for multicast + zt->_transport._multicast._link = *zl; + } + + return ret; +} + +int8_t _z_multicast_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + int8_t ret = _Z_RES_OK; + + _z_zint_t initial_sn_tx = 0; + z_random_fill(&initial_sn_tx, sizeof(initial_sn_tx)); + initial_sn_tx = initial_sn_tx & !_z_sn_modulo_mask(Z_SN_RESOLUTION); + + _z_conduit_sn_list_t next_sn; + next_sn._is_qos = false; + next_sn._val._plain._best_effort = initial_sn_tx; + next_sn._val._plain._reliable = initial_sn_tx; + + _z_id_t zid = *local_zid; + _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); + + // Encode and send the message + _Z_INFO("Sending Z_JOIN message\n"); + ret = _z_link_send_t_msg(zl, &jsm); + _z_t_msg_clear(&jsm); + + if (ret == _Z_RES_OK) { + param->_seq_num_res = jsm._body._join._seq_num_res; + param->_initial_sn_tx = next_sn; + } + + return ret; +} + +int8_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_CLIENT_MULTICAST; + // @TODO: not implemented + return ret; +} + +int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { + int8_t ret = _Z_RES_OK; + // Send and clear message + _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); + ret = _z_multicast_send_t_msg(ztm, &cm); + _z_t_msg_clear(&cm); + return ret; +} + +int8_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { + return _z_multicast_send_close(ztm, reason, false); +} + +void _z_multicast_transport_clear(_z_transport_t *zt) { + _z_transport_multicast_t *ztm = &zt->_transport._multicast; +#if Z_FEATURE_MULTI_THREAD == 1 + // Clean up tasks + if (ztm->_read_task != NULL) { + _z_task_join(ztm->_read_task); + _z_task_free(&ztm->_read_task); + } + if (ztm->_lease_task != NULL) { + _z_task_join(ztm->_lease_task); + _z_task_free(&ztm->_lease_task); + } + + // Clean up the mutexes + _z_mutex_free(&ztm->_mutex_tx); + _z_mutex_free(&ztm->_mutex_rx); + _z_mutex_free(&ztm->_mutex_peer); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Clean up the buffers + _z_wbuf_clear(&ztm->_wbuf); + _z_zbuf_clear(&ztm->_zbuf); + + // Clean up peer list + _z_transport_peer_entry_list_free(&ztm->_peers); + _z_link_clear(&ztm->_link); +} + +#else + +int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, + _z_transport_multicast_establish_param_t *param) { + _ZP_UNUSED(zt); + _ZP_UNUSED(zl); + _ZP_UNUSED(param); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_multicast_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(reason); + _ZP_UNUSED(link_only); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(reason); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + ; +} + +void _z_multicast_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); } +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/link/tx.c b/src/transport/multicast/tx.c similarity index 91% rename from src/transport/multicast/link/tx.c rename to src/transport/multicast/tx.c index 81197bf12..c2f665a19 100644 --- a/src/transport/multicast/link/tx.c +++ b/src/transport/multicast/tx.c @@ -12,11 +12,12 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/network.h" #include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -153,4 +154,19 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m return ret; } +#else +int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztu, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl) { + _ZP_UNUSED(zn); + _ZP_UNUSED(n_msg); + _ZP_UNUSED(reliability); + _ZP_UNUSED(cong_ctrl); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} #endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/transport.c b/src/transport/transport.c index 0019be34e..27170233c 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -21,487 +21,56 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/link/link.h" #include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/transport/link/rx.h" -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/multicast/transport.h" +#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/unicast/transport.h" +#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, _Bool link_only) { - int8_t ret = _Z_RES_OK; - - _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - - ret = _z_unicast_send_t_msg(ztu, &cm); - _z_t_msg_clear(&cm); - - return ret; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { - int8_t ret = _Z_RES_OK; - - _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - - ret = _z_multicast_send_t_msg(ztm, &cm); - _z_t_msg_clear(&cm); - - return ret; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - int8_t _z_send_close(_z_transport_t *zt, uint8_t reason, _Bool link_only) { int8_t ret = _Z_RES_OK; - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - ret = _z_unicast_send_close(&zt->_transport._unicast, reason, link_only); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - ret = _z_multicast_send_close(&zt->_transport._multicast, reason, link_only); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - } - - return ret; -} - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unicast_establish_param_t *param) { - int8_t ret = _Z_RES_OK; - - zt->_type = _Z_TRANSPORT_UNICAST_TYPE; - -#if Z_FEATURE_MULTI_THREAD == 1 - // Initialize the mutexes - ret = _z_mutex_init(&zt->_transport._unicast._mutex_tx); - if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._unicast._mutex_rx); - if (ret != _Z_RES_OK) { - _z_mutex_free(&zt->_transport._unicast._mutex_tx); - } - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Initialize the read and write buffers - if (ret == _Z_RES_OK) { - uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; - _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); - size_t dbuf_size = 0; - -#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 - expandable = false; - dbuf_size = Z_FRAG_MAX_SIZE; -#endif - zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); - - // Initialize the defragmentation buffers - zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); - zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable); - - // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) || -#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 - (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || - (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { -#else - (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != Z_IOSLICE_SIZE) || - (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != Z_IOSLICE_SIZE)) { -#endif - ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; - -#if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_free(&zt->_transport._unicast._mutex_tx); - _z_mutex_free(&zt->_transport._unicast._mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - _z_wbuf_clear(&zt->_transport._unicast._wbuf); - _z_zbuf_clear(&zt->_transport._unicast._zbuf); - _z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable); - _z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort); - } - } - - if (ret == _Z_RES_OK) { - // Set default SN resolution - zt->_transport._unicast._sn_res = _z_sn_max(param->_seq_num_res); - - // The initial SN at TX side - zt->_transport._unicast._sn_tx_reliable = param->_initial_sn_tx; - zt->_transport._unicast._sn_tx_best_effort = param->_initial_sn_tx; - - // The initial SN at RX side - _z_zint_t initial_sn_rx = _z_sn_decrement(zt->_transport._unicast._sn_res, param->_initial_sn_rx); - zt->_transport._unicast._sn_rx_reliable = initial_sn_rx; - zt->_transport._unicast._sn_rx_best_effort = initial_sn_rx; - -#if Z_FEATURE_MULTI_THREAD == 1 - // Tasks - zt->_transport._unicast._read_task_running = false; - zt->_transport._unicast._read_task = NULL; - zt->_transport._unicast._lease_task_running = false; - zt->_transport._unicast._lease_task = NULL; -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Notifiers - zt->_transport._unicast._received = 0; - zt->_transport._unicast._transmitted = 0; - - // Transport lease - zt->_transport._unicast._lease = param->_lease; - - // Transport link for unicast - zt->_transport._unicast._link = *zl; - - // Remote peer PID - zt->_transport._unicast._remote_zid = param->_remote_zid; - } else { - param->_remote_zid = _z_id_empty(); - } - - return ret; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _z_transport_multicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { - int8_t ret = _Z_RES_OK; - - zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; - -#if Z_FEATURE_MULTI_THREAD == 1 - // Initialize the mutexes - ret = _z_mutex_init(&zt->_transport._multicast._mutex_tx); - if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._multicast._mutex_rx); - if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._multicast._mutex_peer); - if (ret != _Z_RES_OK) { - _z_mutex_free(&zt->_transport._multicast._mutex_tx); - _z_mutex_free(&zt->_transport._multicast._mutex_rx); - } - } else { - _z_mutex_free(&zt->_transport._multicast._mutex_tx); - } - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Initialize the read and write buffers - if (ret == _Z_RES_OK) { - uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; - zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); - - // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) { - ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; - -#if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_free(&zt->_transport._multicast._mutex_tx); - _z_mutex_free(&zt->_transport._multicast._mutex_rx); - _z_mutex_free(&zt->_transport._multicast._mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - _z_wbuf_clear(&zt->_transport._multicast._wbuf); - _z_zbuf_clear(&zt->_transport._multicast._zbuf); - } - } - - if (ret == _Z_RES_OK) { - // Set default SN resolution - zt->_transport._multicast._sn_res = _z_sn_max(param->_seq_num_res); - - // The initial SN at TX side - zt->_transport._multicast._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; - zt->_transport._multicast._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; - - // Initialize peer list - zt->_transport._multicast._peers = _z_transport_peer_entry_list_new(); - -#if Z_FEATURE_MULTI_THREAD == 1 - // Tasks - zt->_transport._multicast._read_task_running = false; - zt->_transport._multicast._read_task = NULL; - zt->_transport._multicast._lease_task_running = false; - zt->_transport._multicast._lease_task = NULL; -#endif // Z_FEATURE_MULTI_THREAD == 1 - - zt->_transport._multicast._lease = Z_TRANSPORT_LEASE; - - // Notifiers - zt->_transport._multicast._transmitted = false; - - // Transport link for multicast - zt->_transport._multicast._link = *zl; - } - - return ret; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _z_transport_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - int8_t ret = _Z_RES_OK; - - _z_id_t zid = *local_zid; - _z_transport_message_t ism = _z_t_msg_make_init_syn(Z_WHATAMI_CLIENT, zid); - param->_seq_num_res = ism._body._init._seq_num_res; // The announced sn resolution - param->_req_id_res = ism._body._init._req_id_res; // The announced req id resolution - param->_batch_size = ism._body._init._batch_size; // The announced batch size - - // Encode and send the message - _Z_INFO("Sending Z_INIT(Syn)\n"); - ret = _z_link_send_t_msg(zl, &ism); - _z_t_msg_clear(&ism); - if (ret == _Z_RES_OK) { - _z_transport_message_t iam; - ret = _z_link_recv_t_msg(&iam, zl); - if (ret == _Z_RES_OK) { - if ((_Z_MID(iam._header) == _Z_MID_T_INIT) && (_Z_HAS_FLAG(iam._header, _Z_FLAG_T_INIT_A) == true)) { - _Z_INFO("Received Z_INIT(Ack)\n"); - - // Any of the size parameters in the InitAck must be less or equal than the one in the InitSyn, - // otherwise the InitAck message is considered invalid and it should be treated as a - // CLOSE message with L==0 by the Initiating Peer -- the recipient of the InitAck message. - if (iam._body._init._seq_num_res <= param->_seq_num_res) { - param->_seq_num_res = iam._body._init._seq_num_res; - } else { - ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; - } - - if (iam._body._init._req_id_res <= param->_req_id_res) { - param->_req_id_res = iam._body._init._req_id_res; - } else { - ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; - } - - if (iam._body._init._batch_size <= param->_batch_size) { - param->_batch_size = iam._body._init._batch_size; - } else { - ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; - } - - if (ret == _Z_RES_OK) { - param->_key_id_res = 0x08 << param->_key_id_res; - param->_req_id_res = 0x08 << param->_req_id_res; - - // The initial SN at TX side - z_random_fill(¶m->_initial_sn_tx, sizeof(param->_initial_sn_tx)); - param->_initial_sn_tx = param->_initial_sn_tx & !_z_sn_modulo_mask(param->_seq_num_res); - - // Initialize the Local and Remote Peer IDs - param->_remote_zid = iam._body._init._zid; - - // Create the OpenSyn message - _z_zint_t lease = Z_TRANSPORT_LEASE; - _z_zint_t initial_sn = param->_initial_sn_tx; - _z_bytes_t cookie; - _z_bytes_copy(&cookie, &iam._body._init._cookie); - - _z_transport_message_t osm = _z_t_msg_make_open_syn(lease, initial_sn, cookie); - - // Encode and send the message - _Z_INFO("Sending Z_OPEN(Syn)\n"); - ret = _z_link_send_t_msg(zl, &osm); - if (ret == _Z_RES_OK) { - _z_transport_message_t oam; - ret = _z_link_recv_t_msg(&oam, zl); - if (ret == _Z_RES_OK) { - if ((_Z_MID(oam._header) == _Z_MID_T_OPEN) && - (_Z_HAS_FLAG(oam._header, _Z_FLAG_T_OPEN_A) == true)) { - _Z_INFO("Received Z_OPEN(Ack)\n"); - param->_lease = oam._body._open._lease; // The session lease - - // The initial SN at RX side. Initialize the session as we had already received - // a message with a SN equal to initial_sn - 1. - param->_initial_sn_rx = oam._body._open._initial_sn; - } else { - ret = _Z_ERR_MESSAGE_UNEXPECTED; - } - _z_t_msg_clear(&oam); - } - } - _z_t_msg_clear(&osm); - } - } else { - ret = _Z_ERR_MESSAGE_UNEXPECTED; - } - _z_t_msg_clear(&iam); - } + // Call transport function + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + ret = _z_unicast_send_close(&zt->_transport._unicast, reason, link_only); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + ret = _z_multicast_send_close(&zt->_transport._multicast, reason, link_only); + break; + default: + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; + break; } - return ret; } -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _z_transport_multicast_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - (void)(param); - (void)(zl); - (void)(local_zid); - int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_CLIENT_MULTICAST; - - // @TODO: not implemented - - return ret; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _z_transport_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - (void)(param); - (void)(zl); - (void)(local_zid); - int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_PEER_UNICAST; - - // @TODO: not implemented - - return ret; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _z_transport_multicast_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - int8_t ret = _Z_RES_OK; - - _z_zint_t initial_sn_tx = 0; - z_random_fill(&initial_sn_tx, sizeof(initial_sn_tx)); - initial_sn_tx = initial_sn_tx & !_z_sn_modulo_mask(Z_SN_RESOLUTION); - - _z_conduit_sn_list_t next_sn; - next_sn._is_qos = false; - next_sn._val._plain._best_effort = initial_sn_tx; - next_sn._val._plain._reliable = initial_sn_tx; - - _z_id_t zid = *local_zid; - _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - - // Encode and send the message - _Z_INFO("Sending Z_JOIN message\n"); - ret = _z_link_send_t_msg(zl, &jsm); - _z_t_msg_clear(&jsm); - - if (ret == _Z_RES_OK) { - param->_seq_num_res = jsm._body._join._seq_num_res; - param->_initial_sn_tx = next_sn; - } - - return ret; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _z_transport_unicast_close(_z_transport_unicast_t *ztu, uint8_t reason) { - return _z_unicast_send_close(ztu, reason, false); -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _z_transport_multicast_close(_z_transport_multicast_t *ztm, uint8_t reason) { - return _z_multicast_send_close(ztm, reason, false); -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - int8_t _z_transport_close(_z_transport_t *zt, uint8_t reason) { return _z_send_close(zt, reason, false); } -#if Z_FEATURE_UNICAST_TRANSPORT == 1 -void _z_transport_unicast_clear(_z_transport_unicast_t *ztu) { -#if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztu->_read_task != NULL) { - _z_task_join(ztu->_read_task); - _z_task_free(&ztu->_read_task); - } - if (ztu->_lease_task != NULL) { - _z_task_join(ztu->_lease_task); - _z_task_free(&ztu->_lease_task); - } - - // Clean up the mutexes - _z_mutex_free(&ztu->_mutex_tx); - _z_mutex_free(&ztu->_mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Clean up the buffers - _z_wbuf_clear(&ztu->_wbuf); - _z_zbuf_clear(&ztu->_zbuf); - _z_wbuf_clear(&ztu->_dbuf_reliable); - _z_wbuf_clear(&ztu->_dbuf_best_effort); - - // Clean up PIDs - ztu->_remote_zid = _z_id_empty(); - _z_link_clear(&ztu->_link); -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 -void _z_transport_multicast_clear(_z_transport_multicast_t *ztm) { -#if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztm->_read_task != NULL) { - _z_task_join(ztm->_read_task); - _z_task_free(&ztm->_read_task); - } - if (ztm->_lease_task != NULL) { - _z_task_join(ztm->_lease_task); - _z_task_free(&ztm->_lease_task); - } - - // Clean up the mutexes - _z_mutex_free(&ztm->_mutex_tx); - _z_mutex_free(&ztm->_mutex_rx); - _z_mutex_free(&ztm->_mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Clean up the buffers - _z_wbuf_clear(&ztm->_wbuf); - _z_zbuf_clear(&ztm->_zbuf); - - // Clean up peer list - _z_transport_peer_entry_list_free(&ztm->_peers); - _z_link_clear(&ztm->_link); -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - void _z_transport_clear(_z_transport_t *zt) { -#if Z_FEATURE_UNICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_UNICAST_TYPE) { - _z_transport_unicast_clear(&zt->_transport._unicast); - } else -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zt->_type == _Z_TRANSPORT_MULTICAST_TYPE) { - _z_transport_multicast_clear(&zt->_transport._multicast); - } else -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - { - __asm__("nop"); + switch (zt->_type) { + case _Z_TRANSPORT_UNICAST_TYPE: + _z_unicast_transport_clear(zt); + break; + case _Z_TRANSPORT_MULTICAST_TYPE: + _z_multicast_transport_clear(zt); + break; + default: + break; } } void _z_transport_free(_z_transport_t **zt) { _z_transport_t *ptr = *zt; - - if (ptr != NULL) { - _z_transport_clear(ptr); - - z_free(ptr); - *zt = NULL; + if (ptr == NULL) { + return; } + // Clear and free transport + _z_transport_clear(ptr); + z_free(ptr); + *zt = NULL; } /** diff --git a/src/transport/unicast.c b/src/transport/unicast.c new file mode 100644 index 000000000..0681bc945 --- /dev/null +++ b/src/transport/unicast.c @@ -0,0 +1,56 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/transport/unicast.h" + +#include +#include +#include +#include +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/common/rx.h" +#include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/unicast/lease.h" +#include "zenoh-pico/transport/unicast/read.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/unicast/tx.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_UNICAST_TRANSPORT == 1 +void _zp_unicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback) { + void *ctx = callback->context; + z_id_t id = zt->_transport._unicast._remote_zid; + callback->call(&id, ctx); +} + +void _zp_unicast_info_session(const _z_transport_t *zt, _z_config_t *ps) { + _z_id_t remote_zid = zt->_transport._unicast._remote_zid; + _z_bytes_t remote_zidbytes = _z_bytes_wrap(remote_zid.id, _z_id_len(remote_zid)); + _zp_config_insert(ps, Z_INFO_ROUTER_PID_KEY, _z_string_from_bytes(&remote_zidbytes)); +} + +#else +void _zp_unicast_fetch_zid(const _z_transport_t *zt, z_owned_closure_zid_t *callback) { + _ZP_UNUSED(zt); + _ZP_UNUSED(callback); +} + +void _zp_unicast_info_session(const _z_transport_t *zt, _z_config_t *ps) { + _ZP_UNUSED(zt); + _ZP_UNUSED(ps); +} +#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 \ No newline at end of file diff --git a/src/transport/unicast/link/task/lease.c b/src/transport/unicast/lease.c similarity index 65% rename from src/transport/unicast/link/task/lease.c rename to src/transport/unicast/lease.c index 02ff178db..8fde5868e 100644 --- a/src/transport/unicast/link/task/lease.c +++ b/src/transport/unicast/lease.c @@ -12,9 +12,10 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/lease.h" +#include "zenoh-pico/transport/unicast/lease.h" -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/unicast/transport.h" +#include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_UNICAST_TRANSPORT == 1 @@ -28,6 +29,25 @@ int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { return ret; } +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._unicast._lease_task = task; + zt->_transport._unicast._lease_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_unicast_lease_task, &zt->_transport._unicast) != _Z_RES_OK) { + zt->_transport._unicast._lease_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { + zt->_transport._unicast._lease_task_running = false; + return _Z_RES_OK; +} + void *_zp_unicast_lease_task(void *ztu_arg) { #if Z_FEATURE_MULTI_THREAD == 1 _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; @@ -46,7 +66,7 @@ void *_zp_unicast_lease_task(void *ztu_arg) { } else { _Z_INFO("Closing session because it has expired after %zums\n", ztu->_lease); ztu->_lease_task_running = false; - _z_transport_unicast_close(ztu, _Z_CLOSE_EXPIRED); + _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); break; } @@ -87,5 +107,26 @@ void *_zp_unicast_lease_task(void *ztu_arg) { return 0; } +#else +int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { + _ZP_UNUSED(ztu); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_unicast_lease_task(void *ztu_arg) { + _ZP_UNUSED(ztu_arg); + return NULL; +} #endif // Z_FEATURE_UNICAST_TRANSPORT == 1 diff --git a/src/transport/unicast/link/task/read.c b/src/transport/unicast/read.c similarity index 73% rename from src/transport/unicast/link/task/read.c rename to src/transport/unicast/read.c index bf27b1c70..4d0728eee 100644 --- a/src/transport/unicast/link/task/read.c +++ b/src/transport/unicast/read.c @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/task/read.h" +#include "zenoh-pico/transport/unicast/read.h" #include #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" -#include "zenoh-pico/transport/link/rx.h" +#include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_UNICAST_TRANSPORT == 1 @@ -36,6 +36,25 @@ int8_t _zp_unicast_read(_z_transport_unicast_t *ztu) { return ret; } +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._unicast._read_task = task; + zt->_transport._unicast._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_unicast_read_task, &zt->_transport._unicast) != _Z_RES_OK) { + zt->_transport._unicast._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_unicast_stop_read_task(_z_transport_t *zt) { + zt->_transport._unicast._read_task_running = false; + return _Z_RES_OK; +} + void *_zp_unicast_read_task(void *ztu_arg) { #if Z_FEATURE_MULTI_THREAD == 1 _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; @@ -111,5 +130,26 @@ void *_zp_unicast_read_task(void *ztu_arg) { return NULL; } +#else +int8_t _zp_unicast_read(_z_transport_unicast_t *ztu) { + _ZP_UNUSED(ztu); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_unicast_stop_read_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_unicast_read_task(void *ztu_arg) { + _ZP_UNUSED(ztu_arg); + return NULL; +} #endif // Z_FEATURE_UNICAST_TRANSPORT == 1 diff --git a/src/transport/unicast/link/rx.c b/src/transport/unicast/rx.c similarity index 94% rename from src/transport/unicast/link/rx.c rename to src/transport/unicast/rx.c index dfc580e4d..965ec58c1 100644 --- a/src/transport/unicast/link/rx.c +++ b/src/transport/unicast/rx.c @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/rx.h" +#include "zenoh-pico/transport/unicast/rx.h" #include @@ -196,5 +196,16 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans return _Z_RES_OK; } +#else +int8_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} #endif // Z_FEATURE_UNICAST_TRANSPORT == 1 diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c new file mode 100644 index 000000000..0aa951b5b --- /dev/null +++ b/src/transport/unicast/transport.c @@ -0,0 +1,317 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/common/rx.h" +#include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/unicast.h" +#include "zenoh-pico/transport/unicast/lease.h" +#include "zenoh-pico/transport/unicast/read.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/transport/unicast/tx.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_UNICAST_TRANSPORT == 1 + +int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_unicast_establish_param_t *param) { + int8_t ret = _Z_RES_OK; + + zt->_type = _Z_TRANSPORT_UNICAST_TYPE; + +#if Z_FEATURE_MULTI_THREAD == 1 + // Initialize the mutexes + ret = _z_mutex_init(&zt->_transport._unicast._mutex_tx); + if (ret == _Z_RES_OK) { + ret = _z_mutex_init(&zt->_transport._unicast._mutex_rx); + if (ret != _Z_RES_OK) { + _z_mutex_free(&zt->_transport._unicast._mutex_tx); + } + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Initialize the read and write buffers + if (ret == _Z_RES_OK) { + uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; + _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); + size_t dbuf_size = 0; + +#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 + expandable = false; + dbuf_size = Z_FRAG_MAX_SIZE; +#endif + zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false); + zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); + + // Initialize the defragmentation buffers + zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); + zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable); + + // Clean up the buffers if one of them failed to be allocated + if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) || + (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) || +#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 + (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || + (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { +#else + (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != Z_IOSLICE_SIZE) || + (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != Z_IOSLICE_SIZE)) { +#endif + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_free(&zt->_transport._unicast._mutex_tx); + _z_mutex_free(&zt->_transport._unicast._mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + _z_wbuf_clear(&zt->_transport._unicast._wbuf); + _z_zbuf_clear(&zt->_transport._unicast._zbuf); + _z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable); + _z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort); + } + } + + if (ret == _Z_RES_OK) { + // Set default SN resolution + zt->_transport._unicast._sn_res = _z_sn_max(param->_seq_num_res); + + // The initial SN at TX side + zt->_transport._unicast._sn_tx_reliable = param->_initial_sn_tx; + zt->_transport._unicast._sn_tx_best_effort = param->_initial_sn_tx; + + // The initial SN at RX side + _z_zint_t initial_sn_rx = _z_sn_decrement(zt->_transport._unicast._sn_res, param->_initial_sn_rx); + zt->_transport._unicast._sn_rx_reliable = initial_sn_rx; + zt->_transport._unicast._sn_rx_best_effort = initial_sn_rx; + +#if Z_FEATURE_MULTI_THREAD == 1 + // Tasks + zt->_transport._unicast._read_task_running = false; + zt->_transport._unicast._read_task = NULL; + zt->_transport._unicast._lease_task_running = false; + zt->_transport._unicast._lease_task = NULL; +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Notifiers + zt->_transport._unicast._received = 0; + zt->_transport._unicast._transmitted = 0; + + // Transport lease + zt->_transport._unicast._lease = param->_lease; + + // Transport link for unicast + zt->_transport._unicast._link = *zl; + + // Remote peer PID + zt->_transport._unicast._remote_zid = param->_remote_zid; + } else { + param->_remote_zid = _z_id_empty(); + } + + return ret; +} + +int8_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + int8_t ret = _Z_RES_OK; + + _z_id_t zid = *local_zid; + _z_transport_message_t ism = _z_t_msg_make_init_syn(Z_WHATAMI_CLIENT, zid); + param->_seq_num_res = ism._body._init._seq_num_res; // The announced sn resolution + param->_req_id_res = ism._body._init._req_id_res; // The announced req id resolution + param->_batch_size = ism._body._init._batch_size; // The announced batch size + + // Encode and send the message + _Z_INFO("Sending Z_INIT(Syn)\n"); + ret = _z_link_send_t_msg(zl, &ism); + _z_t_msg_clear(&ism); + if (ret == _Z_RES_OK) { + _z_transport_message_t iam; + ret = _z_link_recv_t_msg(&iam, zl); + if (ret == _Z_RES_OK) { + if ((_Z_MID(iam._header) == _Z_MID_T_INIT) && (_Z_HAS_FLAG(iam._header, _Z_FLAG_T_INIT_A) == true)) { + _Z_INFO("Received Z_INIT(Ack)\n"); + + // Any of the size parameters in the InitAck must be less or equal than the one in the InitSyn, + // otherwise the InitAck message is considered invalid and it should be treated as a + // CLOSE message with L==0 by the Initiating Peer -- the recipient of the InitAck message. + if (iam._body._init._seq_num_res <= param->_seq_num_res) { + param->_seq_num_res = iam._body._init._seq_num_res; + } else { + ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; + } + + if (iam._body._init._req_id_res <= param->_req_id_res) { + param->_req_id_res = iam._body._init._req_id_res; + } else { + ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; + } + + if (iam._body._init._batch_size <= param->_batch_size) { + param->_batch_size = iam._body._init._batch_size; + } else { + ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; + } + + if (ret == _Z_RES_OK) { + param->_key_id_res = 0x08 << param->_key_id_res; + param->_req_id_res = 0x08 << param->_req_id_res; + + // The initial SN at TX side + z_random_fill(¶m->_initial_sn_tx, sizeof(param->_initial_sn_tx)); + param->_initial_sn_tx = param->_initial_sn_tx & !_z_sn_modulo_mask(param->_seq_num_res); + + // Initialize the Local and Remote Peer IDs + param->_remote_zid = iam._body._init._zid; + + // Create the OpenSyn message + _z_zint_t lease = Z_TRANSPORT_LEASE; + _z_zint_t initial_sn = param->_initial_sn_tx; + _z_bytes_t cookie; + _z_bytes_copy(&cookie, &iam._body._init._cookie); + + _z_transport_message_t osm = _z_t_msg_make_open_syn(lease, initial_sn, cookie); + + // Encode and send the message + _Z_INFO("Sending Z_OPEN(Syn)\n"); + ret = _z_link_send_t_msg(zl, &osm); + if (ret == _Z_RES_OK) { + _z_transport_message_t oam; + ret = _z_link_recv_t_msg(&oam, zl); + if (ret == _Z_RES_OK) { + if ((_Z_MID(oam._header) == _Z_MID_T_OPEN) && + (_Z_HAS_FLAG(oam._header, _Z_FLAG_T_OPEN_A) == true)) { + _Z_INFO("Received Z_OPEN(Ack)\n"); + param->_lease = oam._body._open._lease; // The session lease + + // The initial SN at RX side. Initialize the session as we had already received + // a message with a SN equal to initial_sn - 1. + param->_initial_sn_rx = oam._body._open._initial_sn; + } else { + ret = _Z_ERR_MESSAGE_UNEXPECTED; + } + _z_t_msg_clear(&oam); + } + } + _z_t_msg_clear(&osm); + } + } else { + ret = _Z_ERR_MESSAGE_UNEXPECTED; + } + _z_t_msg_clear(&iam); + } + } + + return ret; +} + +int8_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_PEER_UNICAST; + // @TODO: not implemented + return ret; +} + +int8_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, _Bool link_only) { + int8_t ret = _Z_RES_OK; + // Send and clear message + _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); + ret = _z_unicast_send_t_msg(ztu, &cm); + _z_t_msg_clear(&cm); + return ret; +} + +int8_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason) { + return _z_unicast_send_close(ztu, reason, false); +} + +void _z_unicast_transport_clear(_z_transport_t *zt) { + _z_transport_unicast_t *ztu = &zt->_transport._unicast; +#if Z_FEATURE_MULTI_THREAD == 1 + // Clean up tasks + if (ztu->_read_task != NULL) { + _z_task_join(ztu->_read_task); + _z_task_free(&ztu->_read_task); + } + if (ztu->_lease_task != NULL) { + _z_task_join(ztu->_lease_task); + _z_task_free(&ztu->_lease_task); + } + + // Clean up the mutexes + _z_mutex_free(&ztu->_mutex_tx); + _z_mutex_free(&ztu->_mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Clean up the buffers + _z_wbuf_clear(&ztu->_wbuf); + _z_zbuf_clear(&ztu->_zbuf); + _z_wbuf_clear(&ztu->_dbuf_reliable); + _z_wbuf_clear(&ztu->_dbuf_best_effort); + + // Clean up PIDs + ztu->_remote_zid = _z_id_empty(); + _z_link_clear(&ztu->_link); +} + +#else + +int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_unicast_establish_param_t *param) { + _ZP_UNUSED(zt); + _ZP_UNUSED(zl); + _ZP_UNUSED(param); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, _Bool link_only) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(reason); + _ZP_UNUSED(link_only); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(reason); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void _z_unicast_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); } + +#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 diff --git a/src/transport/unicast/link/tx.c b/src/transport/unicast/tx.c similarity index 91% rename from src/transport/unicast/link/tx.c rename to src/transport/unicast/tx.c index eb43c769d..6e7ea4f85 100644 --- a/src/transport/unicast/link/tx.c +++ b/src/transport/unicast/tx.c @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/link/tx.h" +#include "zenoh-pico/transport/unicast/tx.h" #include @@ -20,6 +20,7 @@ #include "zenoh-pico/protocol/codec/network.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -161,5 +162,19 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg return ret; } +#else +int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztu); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl) { + _ZP_UNUSED(zn); + _ZP_UNUSED(n_msg); + _ZP_UNUSED(reliability); + _ZP_UNUSED(cong_ctrl); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} #endif // Z_FEATURE_UNICAST_TRANSPORT == 1