Skip to content

Commit

Permalink
feat: add subscription config token
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Oct 23, 2023
1 parent 95f94fd commit 3fc4481
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 78 deletions.
2 changes: 2 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
*/
int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options);

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Constructs the default values for the subscriber entity.
*
Expand Down Expand Up @@ -1237,6 +1238,7 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub);
* Returns ``0`` if the pull operation is successful, or a ``negative value`` otherwise.
*/
int8_t z_subscriber_pull(const z_pull_subscriber_t sub);
#endif

/**
* Checks if a given value is valid.
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
#define Z_FEATURE_QUERY 1
#endif

/**
* Enable subscription to this node
*/
#ifndef Z_FEATURE_SUBSCRIPTION
#define Z_FEATURE_SUBSCRIPTION 1
#endif

/**
* Enable TCP links.
*/
Expand Down
24 changes: 13 additions & 11 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ _z_publisher_t *_z_declare_publisher(_z_session_t *zn, _z_keyexpr_t keyexpr, z_c
*/
int8_t _z_undeclare_publisher(_z_publisher_t *pub);

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Declare a :c:type:`_z_subscriber_t` for the given resource key.
*
Expand Down Expand Up @@ -128,6 +129,18 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _
*/
int8_t _z_undeclare_subscriber(_z_subscriber_t *sub);

/**
* Pull data for a pull mode :c:type:`_z_subscriber_t`. The pulled data will be provided
* by calling the **callback** function provided to the :c:func:`_z_declare_subscriber` function.
*
* Parameters:
* sub: The :c:type:`_z_subscriber_t` to pull from.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_subscriber_pull(const _z_subscriber_t *sub);
#endif

#if Z_FEATURE_QUERYABLE == 1
/**
* Declare a :c:type:`_z_queryable_t` for the given resource key.
Expand Down Expand Up @@ -218,15 +231,4 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay
const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority);

/**
* Pull data for a pull mode :c:type:`_z_subscriber_t`. The pulled data will be provided
* by calling the **callback** function provided to the :c:func:`_z_declare_subscriber` function.
*
* Parameters:
* sub: The :c:type:`_z_subscriber_t` to pull from.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_subscriber_pull(const _z_subscriber_t *sub);

#endif /* ZENOH_PICO_PRIMITIVES_NETAPI_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ typedef struct {
_z_resource_list_t *_remote_resources;

// Session subscriptions
#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_sptr_list_t *_local_subscriptions;
_z_subscription_sptr_list_t *_remote_subscriptions;
#endif

// Session queryables
#if Z_FEATURE_QUERYABLE == 1
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef struct {

typedef _z_subscriber_t _z_pull_subscriber_t;

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Create a default subscription info for a push subscriber.
*
Expand All @@ -48,5 +49,6 @@ _z_subinfo_t _z_subinfo_pull_default(void);

void _z_subscriber_clear(_z_subscriber_t *sub);
void _z_subscriber_free(_z_subscriber_t **sub);
#endif

#endif /* ZENOH_PICO_SUBSCRIBE_NETAPI_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "zenoh-pico/net/session.h"

#if Z_FEATURE_SUBSCRIPTION == 1
/*------------------ Subscription ------------------*/
_z_subscription_sptr_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
_z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local,
Expand All @@ -30,5 +31,6 @@ void _z_flush_subscriptions(_z_session_t *zn);

/*------------------ Pull ------------------*/
_z_zint_t _z_get_pull_id(_z_session_t *zn);
#endif

#endif /* ZENOH_PICO_SESSION_SUBSCRIPTION_H */
64 changes: 33 additions & 31 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,6 @@ OWNED_FUNCTIONS_PTR_COMMON(z_session_t, z_owned_session_t, session)
OWNED_FUNCTIONS_PTR_CLONE(z_session_t, z_owned_session_t, session, _z_owner_noop_copy)
void z_session_drop(z_owned_session_t *val) { z_close(val); }

OWNED_FUNCTIONS_PTR_COMMON(z_subscriber_t, z_owned_subscriber_t, subscriber)
OWNED_FUNCTIONS_PTR_CLONE(z_subscriber_t, z_owned_subscriber_t, subscriber, _z_owner_noop_copy)
void z_subscriber_drop(z_owned_subscriber_t *val) { z_undeclare_subscriber(val); }

OWNED_FUNCTIONS_PTR_COMMON(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber)
OWNED_FUNCTIONS_PTR_CLONE(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber, _z_owner_noop_copy)
void z_pull_subscriber_drop(z_owned_pull_subscriber_t *val) { z_undeclare_pull_subscriber(val); }

OWNED_FUNCTIONS_PTR_COMMON(z_publisher_t, z_owned_publisher_t, publisher)
OWNED_FUNCTIONS_PTR_CLONE(z_publisher_t, z_owned_publisher_t, publisher, _z_owner_noop_copy)
void z_publisher_drop(z_owned_publisher_t *val) { z_undeclare_publisher(val); }
Expand Down Expand Up @@ -878,6 +870,15 @@ int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_opti
pub._val->_congestion_control, pub._val->_priority);
}

#if Z_FEATURE_SUBSCRIPTION == 1
OWNED_FUNCTIONS_PTR_COMMON(z_subscriber_t, z_owned_subscriber_t, subscriber)
OWNED_FUNCTIONS_PTR_CLONE(z_subscriber_t, z_owned_subscriber_t, subscriber, _z_owner_noop_copy)
void z_subscriber_drop(z_owned_subscriber_t *val) { z_undeclare_subscriber(val); }

OWNED_FUNCTIONS_PTR_COMMON(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber)
OWNED_FUNCTIONS_PTR_CLONE(z_pull_subscriber_t, z_owned_pull_subscriber_t, pull_subscriber, _z_owner_noop_copy)
void z_pull_subscriber_drop(z_owned_pull_subscriber_t *val) { z_undeclare_pull_subscriber(val); }

z_subscriber_options_t z_subscriber_options_default(void) {
return (z_subscriber_options_t){.reliability = Z_RELIABILITY_DEFAULT};
}
Expand Down Expand Up @@ -981,6 +982,29 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub) {

int8_t z_subscriber_pull(const z_pull_subscriber_t sub) { return _z_subscriber_pull(sub._val); }

z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) {
z_owned_keyexpr_t ret = z_keyexpr_null();
uint32_t lookup = sub._val->_entity_id;
if (sub._val != NULL) {
_z_subscription_sptr_list_t *tail = sub._val->_zn->_local_subscriptions;
while (tail != NULL && !z_keyexpr_check(&ret)) {
_z_subscription_sptr_t *head = _z_subscription_sptr_list_head(tail);
if (head->ptr->_id == lookup) {
_z_keyexpr_t key = _z_keyexpr_duplicate(head->ptr->_key);
ret = (z_owned_keyexpr_t){._value = z_malloc(sizeof(_z_keyexpr_t))};
if (ret._value != NULL) {
*ret._value = key;
} else {
_z_keyexpr_clear(&key);
}
}
tail = _z_subscription_sptr_list_tail(tail);
}
}
return ret;
}
#endif

/**************** Tasks ****************/
zp_task_read_options_t zp_task_read_options_default(void) { return (zp_task_read_options_t){.__dummy = 0}; }

Expand Down Expand Up @@ -1053,26 +1077,4 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) {
*ret._value = _z_keyexpr_duplicate(publisher._val->_key);
}
return ret;
}

z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) {
z_owned_keyexpr_t ret = z_keyexpr_null();
uint32_t lookup = sub._val->_entity_id;
if (sub._val != NULL) {
_z_subscription_sptr_list_t *tail = sub._val->_zn->_local_subscriptions;
while (tail != NULL && !z_keyexpr_check(&ret)) {
_z_subscription_sptr_t *head = _z_subscription_sptr_list_head(tail);
if (head->ptr->_id == lookup) {
_z_keyexpr_t key = _z_keyexpr_duplicate(head->ptr->_key);
ret = (z_owned_keyexpr_t){._value = z_malloc(sizeof(_z_keyexpr_t))};
if (ret._value != NULL) {
*ret._value = key;
} else {
_z_keyexpr_clear(&key);
}
}
tail = _z_subscription_sptr_list_tail(tail);
}
}
return ret;
}
}
40 changes: 21 additions & 19 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
return ret;
}

#if Z_FEATURE_SUBSCRIPTION == 1
/*------------------ Subscriber Declaration ------------------*/
_z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg) {
Expand Down Expand Up @@ -192,6 +193,25 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
return ret;
}

/*------------------ Pull ------------------*/
int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
int8_t ret = _Z_RES_OK;

_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
_z_zint_t pull_id = _z_get_pull_id(sub->_zn);
_z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id);
if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}

return ret;
}
#endif

#if Z_FEATURE_QUERYABLE == 1
/*------------------ Queryable Declaration ------------------*/
_z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete,
Expand Down Expand Up @@ -395,22 +415,4 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay
// Freeing z_msg is unnecessary, as all of its components are aliased

return ret;
}

/*------------------ Pull ------------------*/
int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
int8_t ret = _Z_RES_OK;

_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
_z_zint_t pull_id = _z_get_pull_id(sub->_zn);
_z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id);
if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}

return ret;
}
}
2 changes: 2 additions & 0 deletions src/net/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "zenoh-pico/net/subscribe.h"

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subinfo_t _z_subinfo_push_default(void) {
_z_subinfo_t si;
si.reliability = Z_RELIABILITY_RELIABLE;
Expand Down Expand Up @@ -44,3 +45,4 @@ void _z_subscriber_free(_z_subscriber_t **sub) {
*sub = NULL;
}
}
#endif
4 changes: 2 additions & 2 deletions src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
_z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_empty();
_z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : z_encoding_default();
int kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;

#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp);

#endif
return ret;
}
8 changes: 8 additions & 0 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_REQUEST_PUT: {
_z_msg_put_t put = req._body._put;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key);
ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
Expand All @@ -110,8 +112,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_REQUEST_DEL: {
_z_msg_del_t del = req._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key);
ret = _z_send_n_msg(zn, &ack, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
Expand Down Expand Up @@ -144,13 +148,17 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_RESPONSE_BODY_PUT: {
_z_msg_put_t put = response._body._put;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp);
#endif
} break;
case _Z_RESPONSE_BODY_DEL: {
_z_msg_del_t del = response._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp);
#endif
} break;
}
} break;
Expand Down
2 changes: 2 additions & 0 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_SUBSCRIPTION == 1
_Bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this) {
return this->_id == other->_id;
}
Expand Down Expand Up @@ -226,3 +227,4 @@ void _z_flush_subscriptions(_z_session_t *zn) {
_z_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
}
#endif
5 changes: 4 additions & 1 deletion src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) {
// Initialize the data structs
zn->_local_resources = NULL;
zn->_remote_resources = NULL;
#if Z_FEATURE_SUBSCRIPTION == 1
zn->_local_subscriptions = NULL;
zn->_remote_subscriptions = NULL;
#endif
#if Z_FEATURE_QUERYABLE == 1
zn->_local_questionable = NULL;
#endif
Expand Down Expand Up @@ -103,8 +105,9 @@ void _z_session_clear(_z_session_t *zn) {

// Clean up the entities
_z_flush_resources(zn);
#if Z_FEATURE_SUBSCRIPTION == 1
_z_flush_subscriptions(zn);

#endif
#if Z_FEATURE_QUERYABLE == 1
_z_flush_questionables(zn);
#endif
Expand Down
Loading

0 comments on commit 3fc4481

Please sign in to comment.