From 5a1e59fc5799491ede7b4e52e29ae8a6a6d0a8c9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Mon, 23 Oct 2023 15:12:26 +0200 Subject: [PATCH] Add subscription token config (#264) --- examples/arduino/z_pull.ino | 7 +++ examples/arduino/z_sub.ino | 8 +++ examples/espidf/z_pull.c | 6 +++ examples/espidf/z_sub.c | 6 +++ examples/mbed/z_pull.cpp | 7 +++ examples/mbed/z_sub.cpp | 7 +++ examples/unix/c11/z_ping.c | 7 +++ examples/unix/c11/z_pong.c | 7 +++ examples/unix/c11/z_pull.c | 7 +++ examples/unix/c11/z_sub.c | 7 +++ examples/unix/c11/z_sub_st.c | 7 +++ examples/unix/c99/z_ping.c | 9 +++- examples/unix/c99/z_pong.c | 9 +++- examples/unix/c99/z_pull.c | 7 +++ examples/unix/c99/z_sub.c | 7 +++ examples/unix/c99/z_sub_st.c | 7 +++ examples/windows/z_ping.c | 9 +++- examples/windows/z_pong.c | 9 +++- examples/windows/z_pull.c | 7 +++ examples/windows/z_sub.c | 7 +++ examples/windows/z_sub_st.c | 7 +++ examples/zephyr/z_pull.c | 7 +++ examples/zephyr/z_sub.c | 7 +++ include/zenoh-pico/api/primitives.h | 2 + include/zenoh-pico/config.h | 7 +++ include/zenoh-pico/net/primitives.h | 24 +++++---- include/zenoh-pico/net/session.h | 2 + include/zenoh-pico/net/subscribe.h | 2 + include/zenoh-pico/session/subscription.h | 2 + src/api/api.c | 64 ++++++++++++----------- src/net/primitives.c | 40 +++++++------- src/net/subscribe.c | 2 + src/session/push.c | 4 +- src/session/rx.c | 8 +++ src/session/subscription.c | 2 + src/session/utils.c | 5 +- tests/z_api_null_drop_test.c | 32 +++++++----- 37 files changed, 282 insertions(+), 82 deletions(-) diff --git a/examples/arduino/z_pull.ino b/examples/arduino/z_pull.ino index 5e2952d6c..c1ffd7865 100644 --- a/examples/arduino/z_pull.ino +++ b/examples/arduino/z_pull.ino @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 // WiFi-specific parameters #define SSID "SSID" #define PASS "PASS" @@ -108,3 +109,9 @@ void loop() { delay(5000); z_subscriber_pull(z_pull_subscriber_loan(&sub)); } +#else +void setup() { + Serial.println("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it."); +} +void loop() {} +#endif diff --git a/examples/arduino/z_sub.ino b/examples/arduino/z_sub.ino index e9b829245..b07ca70c0 100644 --- a/examples/arduino/z_sub.ino +++ b/examples/arduino/z_sub.ino @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 // WiFi-specific parameters #define SSID "SSID" #define PASS "PASS" @@ -104,3 +105,10 @@ void setup() { } void loop() { delay(5000); } + +#else +void setup() { + Serial.println("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it."); +} +void loop() {} +#endif \ No newline at end of file diff --git a/examples/espidf/z_pull.c b/examples/espidf/z_pull.c index 976880a79..03175c898 100644 --- a/examples/espidf/z_pull.c +++ b/examples/espidf/z_pull.c @@ -25,6 +25,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define ESP_WIFI_SSID "SSID" #define ESP_WIFI_PASS "PASS" #define ESP_MAXIMUM_RETRY 5 @@ -168,3 +169,8 @@ void app_main() { z_close(z_move(s)); printf("OK!\n"); } +#else +void app_main() { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); +} +#endif \ No newline at end of file diff --git a/examples/espidf/z_sub.c b/examples/espidf/z_sub.c index e09bad00d..01272c46e 100644 --- a/examples/espidf/z_sub.c +++ b/examples/espidf/z_sub.c @@ -25,6 +25,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define ESP_WIFI_SSID "SSID" #define ESP_WIFI_PASS "PASS" #define ESP_MAXIMUM_RETRY 5 @@ -166,3 +167,8 @@ void app_main() { z_close(z_move(s)); printf("OK!\n"); } +#else +void app_main() { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); +} +#endif \ No newline at end of file diff --git a/examples/mbed/z_pull.cpp b/examples/mbed/z_pull.cpp index 3ca962ed7..8a5e0522c 100644 --- a/examples/mbed/z_pull.cpp +++ b/examples/mbed/z_pull.cpp @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -91,3 +92,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/mbed/z_sub.cpp b/examples/mbed/z_sub.cpp index d8f9329ee..27297b1c8 100644 --- a/examples/mbed/z_sub.cpp +++ b/examples/mbed/z_sub.cpp @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -89,3 +90,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_ping.c b/examples/unix/c11/z_ping.c index dc9cbf4f7..775420bd7 100644 --- a/examples/unix/c11/z_ping.c +++ b/examples/unix/c11/z_ping.c @@ -21,6 +21,7 @@ #include "zenoh-pico.h" #include "zenoh-pico/system/platform.h" +#if Z_FEATURE_SUBSCRIPTION == 1 // WARNING: for the sake of this example we are using "internal" structs and functions (starting with "_"). // Synchronisation primitives are planned to be added to the API in the future. _z_condvar_t cond; @@ -165,3 +166,9 @@ struct args_t parse_args(int argc, char** argv) { .warmup_ms = warmup_ms, }; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_pong.c b/examples/unix/c11/z_pong.c index 8acf09564..a79fb7285 100644 --- a/examples/unix/c11/z_pong.c +++ b/examples/unix/c11/z_pong.c @@ -15,6 +15,7 @@ #include "stdio.h" #include "zenoh-pico.h" +#if Z_FEATURE_SUBSCRIPTION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context); z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); @@ -68,3 +69,9 @@ int main(int argc, char** argv) { z_close(z_move(session)); } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index 678641aed..cc8297a29 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -98,3 +99,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_sub.c b/examples/unix/c11/z_sub.c index 7b097fe72..77fdb29a8 100644 --- a/examples/unix/c11/z_sub.c +++ b/examples/unix/c11/z_sub.c @@ -19,6 +19,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -107,3 +108,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_sub_st.c b/examples/unix/c11/z_sub_st.c index 567290484..d6ee0966f 100644 --- a/examples/unix/c11/z_sub_st.c +++ b/examples/unix/c11/z_sub_st.c @@ -19,6 +19,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -96,3 +97,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_ping.c b/examples/unix/c99/z_ping.c index cbae19ccb..18c296a7d 100644 --- a/examples/unix/c99/z_ping.c +++ b/examples/unix/c99/z_ping.c @@ -22,6 +22,7 @@ #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/system/platform.h" +#if Z_FEATURE_SUBSCRIPTION == 1 _z_condvar_t cond; _z_mutex_t mutex; @@ -164,4 +165,10 @@ struct args_t parse_args(int argc, char** argv) { .number_of_pings = number_of_pings, .warmup_ms = warmup_ms, }; -} \ No newline at end of file +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_pong.c b/examples/unix/c99/z_pong.c index 53f6d14ee..d7671b38d 100644 --- a/examples/unix/c99/z_pong.c +++ b/examples/unix/c99/z_pong.c @@ -16,6 +16,7 @@ #include "zenoh-pico.h" #include "zenoh-pico/api/primitives.h" +#if Z_FEATURE_SUBSCRIPTION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_publisher_loan((z_owned_publisher_t*)context); z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); @@ -70,4 +71,10 @@ int main(int argc, char** argv) { zp_stop_lease_task(z_session_loan(&session)); z_close(z_session_move(&session)); -} \ No newline at end of file +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_pull.c b/examples/unix/c99/z_pull.c index e01f43155..88361ff27 100644 --- a/examples/unix/c99/z_pull.c +++ b/examples/unix/c99/z_pull.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -99,3 +100,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_sub.c b/examples/unix/c99/z_sub.c index 0f616525f..7cb1d2354 100644 --- a/examples/unix/c99/z_sub.c +++ b/examples/unix/c99/z_sub.c @@ -19,6 +19,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *arg) { (void)(arg); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -108,3 +109,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_sub_st.c b/examples/unix/c99/z_sub_st.c index fac31bd46..37c7d48cc 100644 --- a/examples/unix/c99/z_sub_st.c +++ b/examples/unix/c99/z_sub_st.c @@ -19,6 +19,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *arg) { (void)(arg); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -97,3 +98,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_ping.c b/examples/windows/z_ping.c index dcfb1fc8e..247b18eb7 100644 --- a/examples/windows/z_ping.c +++ b/examples/windows/z_ping.c @@ -21,6 +21,7 @@ #include "zenoh-pico.h" #include "zenoh-pico/system/platform.h" +#if Z_FEATURE_SUBSCRIPTION == 1 _z_condvar_t cond; _z_mutex_t mutex; @@ -161,4 +162,10 @@ struct args_t parse_args(int argc, char** argv) { .number_of_pings = number_of_pings, .warmup_ms = warmup_ms, }; -} \ No newline at end of file +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_pong.c b/examples/windows/z_pong.c index 698c086f7..a79fb7285 100644 --- a/examples/windows/z_pong.c +++ b/examples/windows/z_pong.c @@ -15,6 +15,7 @@ #include "stdio.h" #include "zenoh-pico.h" +#if Z_FEATURE_SUBSCRIPTION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context); z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); @@ -67,4 +68,10 @@ int main(int argc, char** argv) { zp_stop_lease_task(z_loan(session)); z_close(z_move(session)); -} \ No newline at end of file +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_pull.c b/examples/windows/z_pull.c index 52941550e..efaa7676e 100644 --- a/examples/windows/z_pull.c +++ b/examples/windows/z_pull.c @@ -17,6 +17,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -78,3 +79,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_sub.c b/examples/windows/z_sub.c index d48328500..e46e1fbcb 100644 --- a/examples/windows/z_sub.c +++ b/examples/windows/z_sub.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -77,3 +78,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_sub_st.c b/examples/windows/z_sub_st.c index d9b66fa80..d34a2d17f 100644 --- a/examples/windows/z_sub_st.c +++ b/examples/windows/z_sub_st.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -66,3 +67,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/zephyr/z_pull.c b/examples/zephyr/z_pull.c index 2d4d7c856..7b3522529 100644 --- a/examples/zephyr/z_pull.c +++ b/examples/zephyr/z_pull.c @@ -15,6 +15,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -85,3 +86,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/zephyr/z_sub.c b/examples/zephyr/z_sub.c index 3860f9e8a..4f1831154 100644 --- a/examples/zephyr/z_sub.c +++ b/examples/zephyr/z_sub.c @@ -15,6 +15,7 @@ #include #include +#if Z_FEATURE_SUBSCRIPTION == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -83,3 +84,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 14189c48d..36d4cfb05 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1116,6 +1116,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l */ int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options); +#if Z_FEATURE_SUBSCRIPTION == 1 /** * Constructs the default values for the subscriber entity. * @@ -1237,6 +1238,7 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub); * Returns ``0`` if the pull operation is successful, or a ``negative value`` otherwise. */ int8_t z_subscriber_pull(const z_pull_subscriber_t sub); +#endif /** * Checks if a given value is valid. diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index f19a94b08..1b8993694 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -132,6 +132,13 @@ #define Z_FEATURE_QUERY 1 #endif +/** + * Enable subscription to this node + */ +#ifndef Z_FEATURE_SUBSCRIPTION +#define Z_FEATURE_SUBSCRIPTION 1 +#endif + /** * Enable TCP links. */ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index bfb66ea66..7a8d0b428 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -99,6 +99,7 @@ _z_publisher_t *_z_declare_publisher(_z_session_t *zn, _z_keyexpr_t keyexpr, z_c */ int8_t _z_undeclare_publisher(_z_publisher_t *pub); +#if Z_FEATURE_SUBSCRIPTION == 1 /** * Declare a :c:type:`_z_subscriber_t` for the given resource key. * @@ -128,6 +129,18 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _ */ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub); +/** + * Pull data for a pull mode :c:type:`_z_subscriber_t`. The pulled data will be provided + * by calling the **callback** function provided to the :c:func:`_z_declare_subscriber` function. + * + * Parameters: + * sub: The :c:type:`_z_subscriber_t` to pull from. + * Returns: + * ``0`` in case of success, ``-1`` in case of failure. + */ +int8_t _z_subscriber_pull(const _z_subscriber_t *sub); +#endif + #if Z_FEATURE_QUERYABLE == 1 /** * Declare a :c:type:`_z_queryable_t` for the given resource key. @@ -218,15 +231,4 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority); -/** - * Pull data for a pull mode :c:type:`_z_subscriber_t`. The pulled data will be provided - * by calling the **callback** function provided to the :c:func:`_z_declare_subscriber` function. - * - * Parameters: - * sub: The :c:type:`_z_subscriber_t` to pull from. - * Returns: - * ``0`` in case of success, ``-1`` in case of failure. - */ -int8_t _z_subscriber_pull(const _z_subscriber_t *sub); - #endif /* ZENOH_PICO_PRIMITIVES_NETAPI_H */ diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 8985eba02..a93e387ce 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -50,8 +50,10 @@ typedef struct { _z_resource_list_t *_remote_resources; // Session subscriptions +#if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_sptr_list_t *_local_subscriptions; _z_subscription_sptr_list_t *_remote_subscriptions; +#endif // Session queryables #if Z_FEATURE_QUERYABLE == 1 diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index 2a8a53c1b..d6c3ba67a 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -30,6 +30,7 @@ typedef struct { typedef _z_subscriber_t _z_pull_subscriber_t; +#if Z_FEATURE_SUBSCRIPTION == 1 /** * Create a default subscription info for a push subscriber. * @@ -48,5 +49,6 @@ _z_subinfo_t _z_subinfo_pull_default(void); void _z_subscriber_clear(_z_subscriber_t *sub); void _z_subscriber_free(_z_subscriber_t **sub); +#endif #endif /* ZENOH_PICO_SUBSCRIBE_NETAPI_H */ diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index d352f3a6a..a05de96d9 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -17,6 +17,7 @@ #include "zenoh-pico/net/session.h" +#if Z_FEATURE_SUBSCRIPTION == 1 /*------------------ Subscription ------------------*/ _z_subscription_sptr_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); _z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, @@ -30,5 +31,6 @@ void _z_flush_subscriptions(_z_session_t *zn); /*------------------ Pull ------------------*/ _z_zint_t _z_get_pull_id(_z_session_t *zn); +#endif #endif /* ZENOH_PICO_SESSION_SUBSCRIPTION_H */ diff --git a/src/api/api.c b/src/api/api.c index 6ba0705d3..d02b0a036 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -409,14 +409,6 @@ OWNED_FUNCTIONS_PTR_COMMON(z_session_t, z_owned_session_t, session) OWNED_FUNCTIONS_PTR_CLONE(z_session_t, z_owned_session_t, session, _z_owner_noop_copy) void z_session_drop(z_owned_session_t *val) { z_close(val); } -OWNED_FUNCTIONS_PTR_COMMON(z_subscriber_t, z_owned_subscriber_t, subscriber) -OWNED_FUNCTIONS_PTR_CLONE(z_subscriber_t, z_owned_subscriber_t, subscriber, _z_owner_noop_copy) -void z_subscriber_drop(z_owned_subscriber_t *val) { z_undeclare_subscriber(val); } - -OWNED_FUNCTIONS_PTR_COMMON(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber) -OWNED_FUNCTIONS_PTR_CLONE(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber, _z_owner_noop_copy) -void z_pull_subscriber_drop(z_owned_pull_subscriber_t *val) { z_undeclare_pull_subscriber(val); } - OWNED_FUNCTIONS_PTR_COMMON(z_publisher_t, z_owned_publisher_t, publisher) OWNED_FUNCTIONS_PTR_CLONE(z_publisher_t, z_owned_publisher_t, publisher, _z_owner_noop_copy) void z_publisher_drop(z_owned_publisher_t *val) { z_undeclare_publisher(val); } @@ -878,6 +870,15 @@ int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_opti pub._val->_congestion_control, pub._val->_priority); } +#if Z_FEATURE_SUBSCRIPTION == 1 +OWNED_FUNCTIONS_PTR_COMMON(z_subscriber_t, z_owned_subscriber_t, subscriber) +OWNED_FUNCTIONS_PTR_CLONE(z_subscriber_t, z_owned_subscriber_t, subscriber, _z_owner_noop_copy) +void z_subscriber_drop(z_owned_subscriber_t *val) { z_undeclare_subscriber(val); } + +OWNED_FUNCTIONS_PTR_COMMON(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber) +OWNED_FUNCTIONS_PTR_CLONE(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber, _z_owner_noop_copy) +void z_pull_subscriber_drop(z_owned_pull_subscriber_t *val) { z_undeclare_pull_subscriber(val); } + z_subscriber_options_t z_subscriber_options_default(void) { return (z_subscriber_options_t){.reliability = Z_RELIABILITY_DEFAULT}; } @@ -981,6 +982,29 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub) { int8_t z_subscriber_pull(const z_pull_subscriber_t sub) { return _z_subscriber_pull(sub._val); } +z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { + z_owned_keyexpr_t ret = z_keyexpr_null(); + uint32_t lookup = sub._val->_entity_id; + if (sub._val != NULL) { + _z_subscription_sptr_list_t *tail = sub._val->_zn->_local_subscriptions; + while (tail != NULL && !z_keyexpr_check(&ret)) { + _z_subscription_sptr_t *head = _z_subscription_sptr_list_head(tail); + if (head->ptr->_id == lookup) { + _z_keyexpr_t key = _z_keyexpr_duplicate(head->ptr->_key); + ret = (z_owned_keyexpr_t){._value = z_malloc(sizeof(_z_keyexpr_t))}; + if (ret._value != NULL) { + *ret._value = key; + } else { + _z_keyexpr_clear(&key); + } + } + tail = _z_subscription_sptr_list_tail(tail); + } + } + return ret; +} +#endif + /**************** Tasks ****************/ zp_task_read_options_t zp_task_read_options_default(void) { return (zp_task_read_options_t){.__dummy = 0}; } @@ -1053,26 +1077,4 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) { *ret._value = _z_keyexpr_duplicate(publisher._val->_key); } return ret; -} - -z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { - z_owned_keyexpr_t ret = z_keyexpr_null(); - uint32_t lookup = sub._val->_entity_id; - if (sub._val != NULL) { - _z_subscription_sptr_list_t *tail = sub._val->_zn->_local_subscriptions; - while (tail != NULL && !z_keyexpr_check(&ret)) { - _z_subscription_sptr_t *head = _z_subscription_sptr_list_head(tail); - if (head->ptr->_id == lookup) { - _z_keyexpr_t key = _z_keyexpr_duplicate(head->ptr->_key); - ret = (z_owned_keyexpr_t){._value = z_malloc(sizeof(_z_keyexpr_t))}; - if (ret._value != NULL) { - *ret._value = key; - } else { - _z_keyexpr_clear(&key); - } - } - tail = _z_subscription_sptr_list_tail(tail); - } - } - return ret; -} +} \ No newline at end of file diff --git a/src/net/primitives.c b/src/net/primitives.c index 6263d7499..3a4087719 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -124,6 +124,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) { return ret; } +#if Z_FEATURE_SUBSCRIPTION == 1 /*------------------ Subscriber Declaration ------------------*/ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, _z_data_handler_t callback, _z_drop_handler_t dropper, void *arg) { @@ -192,6 +193,25 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { return ret; } +/*------------------ Pull ------------------*/ +int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { + int8_t ret = _Z_RES_OK; + + _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); + if (s != NULL) { + _z_zint_t pull_id = _z_get_pull_id(sub->_zn); + _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id); + if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + ret = _Z_ERR_TRANSPORT_TX_FAILED; + } + } else { + ret = _Z_ERR_ENTITY_UNKNOWN; + } + + return ret; +} +#endif + #if Z_FEATURE_QUERYABLE == 1 /*------------------ Queryable Declaration ------------------*/ _z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete, @@ -395,22 +415,4 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay // Freeing z_msg is unnecessary, as all of its components are aliased return ret; -} - -/*------------------ Pull ------------------*/ -int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { - int8_t ret = _Z_RES_OK; - - _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); - if (s != NULL) { - _z_zint_t pull_id = _z_get_pull_id(sub->_zn); - _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id); - if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; - } - - return ret; -} +} \ No newline at end of file diff --git a/src/net/subscribe.c b/src/net/subscribe.c index 97d0cf3ef..27e2ffe7c 100644 --- a/src/net/subscribe.c +++ b/src/net/subscribe.c @@ -13,6 +13,7 @@ #include "zenoh-pico/net/subscribe.h" +#if Z_FEATURE_SUBSCRIPTION == 1 _z_subinfo_t _z_subinfo_push_default(void) { _z_subinfo_t si; si.reliability = Z_RELIABILITY_RELIABLE; @@ -44,3 +45,4 @@ void _z_subscriber_free(_z_subscriber_t **sub) { *sub = NULL; } } +#endif \ No newline at end of file diff --git a/src/session/push.c b/src/session/push.c index 5e0649983..0028d4a83 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -28,8 +28,8 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { _z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_empty(); _z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : z_encoding_default(); int kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; - +#if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp); - +#endif return ret; } \ No newline at end of file diff --git a/src/session/rx.c b/src/session/rx.c index d9e17587b..28d225827 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -99,8 +99,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_REQUEST_PUT: { _z_msg_put_t put = req._body._put; +#if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, put._commons._timestamp); +#endif if (ret == _Z_RES_OK) { _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); @@ -110,8 +112,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_REQUEST_DEL: { _z_msg_del_t del = req._body._del; +#if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(), Z_SAMPLE_KIND_DELETE, del._commons._timestamp); +#endif if (ret == _Z_RES_OK) { _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); @@ -144,13 +148,17 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_RESPONSE_BODY_PUT: { _z_msg_put_t put = response._body._put; +#if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, put._commons._timestamp); +#endif } break; case _Z_RESPONSE_BODY_DEL: { _z_msg_del_t del = response._body._del; +#if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(), Z_SAMPLE_KIND_DELETE, del._commons._timestamp); +#endif } break; } } break; diff --git a/src/session/subscription.c b/src/session/subscription.c index e3e0833f3..fbb52e6b0 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -24,6 +24,7 @@ #include "zenoh-pico/session/session.h" #include "zenoh-pico/utils/logging.h" +#if Z_FEATURE_SUBSCRIPTION == 1 _Bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this) { return this->_id == other->_id; } @@ -226,3 +227,4 @@ void _z_flush_subscriptions(_z_session_t *zn) { _z_mutex_unlock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 } +#endif \ No newline at end of file diff --git a/src/session/utils.c b/src/session/utils.c index 8f80c260c..b43104f25 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -61,8 +61,10 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { // Initialize the data structs zn->_local_resources = NULL; zn->_remote_resources = NULL; +#if Z_FEATURE_SUBSCRIPTION == 1 zn->_local_subscriptions = NULL; zn->_remote_subscriptions = NULL; +#endif #if Z_FEATURE_QUERYABLE == 1 zn->_local_questionable = NULL; #endif @@ -103,8 +105,9 @@ void _z_session_clear(_z_session_t *zn) { // Clean up the entities _z_flush_resources(zn); +#if Z_FEATURE_SUBSCRIPTION == 1 _z_flush_subscriptions(zn); - +#endif #if Z_FEATURE_QUERYABLE == 1 _z_flush_questionables(zn); #endif diff --git a/tests/z_api_null_drop_test.c b/tests/z_api_null_drop_test.c index 6d54fea94..979280900 100644 --- a/tests/z_api_null_drop_test.c +++ b/tests/z_api_null_drop_test.c @@ -29,8 +29,6 @@ int main(void) { z_owned_keyexpr_t keyexpr_null_1 = z_keyexpr_null(); z_owned_config_t config_null_1 = z_config_null(); z_owned_scouting_config_t scouting_config_null_1 = z_scouting_config_null(); - z_owned_pull_subscriber_t pull_subscriber_null_1 = z_pull_subscriber_null(); - z_owned_subscriber_t subscriber_null_1 = z_subscriber_null(); z_owned_hello_t hello_null_1 = z_hello_null(); z_owned_closure_sample_t closure_sample_null_1 = z_closure_sample_null(); z_owned_closure_query_t closure_query_null_1 = z_closure_query_null(); @@ -47,8 +45,6 @@ int main(void) { assert(!z_check(keyexpr_null_1)); assert(!z_check(config_null_1)); assert(!z_check(scouting_config_null_1)); - assert(!z_check(pull_subscriber_null_1)); - assert(!z_check(subscriber_null_1)); assert(!z_check(hello_null_1)); assert(!z_check(str_null_1)); @@ -60,8 +56,6 @@ int main(void) { z_owned_keyexpr_t keyexpr_null_2; z_owned_config_t config_null_2; z_owned_scouting_config_t scouting_config_null_2; - z_owned_pull_subscriber_t pull_subscriber_null_2; - z_owned_subscriber_t subscriber_null_2; z_owned_hello_t hello_null_2; z_owned_closure_sample_t closure_sample_null_2; z_owned_closure_query_t closure_query_null_2; @@ -75,8 +69,6 @@ int main(void) { z_null(&keyexpr_null_2); z_null(&config_null_2); z_null(&scouting_config_null_2); - z_null(&pull_subscriber_null_2); - z_null(&subscriber_null_2); z_null(&hello_null_2); z_null(&closure_sample_null_2); z_null(&closure_query_null_2); @@ -85,6 +77,18 @@ int main(void) { z_null(&closure_zid_null_2); z_null(&str_null_2); +#if Z_FEATURE_SUBSCRIPTION == 1 + z_owned_pull_subscriber_t pull_subscriber_null_1 = z_pull_subscriber_null(); + z_owned_subscriber_t subscriber_null_1 = z_subscriber_null(); + assert(!z_check(pull_subscriber_null_1)); + assert(!z_check(subscriber_null_1)); + z_owned_pull_subscriber_t pull_subscriber_null_2; + z_owned_subscriber_t subscriber_null_2; + z_null(&pull_subscriber_null_2); + z_null(&subscriber_null_2); + assert(!z_check(pull_subscriber_null_2)); + assert(!z_check(subscriber_null_2)); +#endif #if Z_FEATURE_QUERYABLE == 1 z_owned_queryable_t queryable_null_1 = z_queryable_null(); assert(!z_check(queryable_null_1)); @@ -108,8 +112,6 @@ int main(void) { assert(!z_check(keyexpr_null_2)); assert(!z_check(config_null_2)); assert(!z_check(scouting_config_null_2)); - assert(!z_check(pull_subscriber_null_2)); - assert(!z_check(subscriber_null_2)); assert(!z_check(hello_null_2)); assert(!z_check(str_null_2)); @@ -122,8 +124,6 @@ int main(void) { z_drop(z_move(keyexpr_null_1)); z_drop(z_move(config_null_1)); z_drop(z_move(scouting_config_null_1)); - z_drop(z_move(pull_subscriber_null_1)); - z_drop(z_move(subscriber_null_1)); z_drop(z_move(hello_null_1)); z_drop(z_move(closure_sample_null_1)); z_drop(z_move(closure_query_null_1)); @@ -137,8 +137,6 @@ int main(void) { z_drop(z_move(keyexpr_null_2)); z_drop(z_move(config_null_2)); z_drop(z_move(scouting_config_null_2)); - z_drop(z_move(pull_subscriber_null_2)); - z_drop(z_move(subscriber_null_2)); z_drop(z_move(hello_null_2)); z_drop(z_move(closure_sample_null_2)); z_drop(z_move(closure_query_null_2)); @@ -147,6 +145,12 @@ int main(void) { z_drop(z_move(closure_zid_null_2)); z_drop(z_move(str_null_2)); +#if Z_FEATURE_SUBSCRIPTION == 1 + z_drop(z_move(pull_subscriber_null_1)); + z_drop(z_move(subscriber_null_1)); + z_drop(z_move(pull_subscriber_null_2)); + z_drop(z_move(subscriber_null_2)); +#endif #if Z_FEATURE_QUERYABLE == 1 z_drop(z_move(queryable_null_1)); z_drop(z_move(queryable_null_2));