diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index fd89c4972..c5a76dbd8 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -118,7 +118,7 @@ _Z_OWNED_TYPE_VALUE(_z_queryable_t, queryable) /** * Represents a Zenoh Query entity, received by Zenoh Queryable entities. */ -_Z_OWNED_TYPE_VALUE(_z_query_t, query) +_Z_OWNED_TYPE_RC(_z_query_rc_t, query) /** * Represents the encoding of a payload, in a MIME-like format. diff --git a/include/zenoh-pico/collections/bytes.h b/include/zenoh-pico/collections/bytes.h index feef1006d..19e0d3136 100644 --- a/include/zenoh-pico/collections/bytes.h +++ b/include/zenoh-pico/collections/bytes.h @@ -51,6 +51,7 @@ static inline _z_bytes_t _z_bytes_null(void) { return (_z_bytes_t){0}; } static inline void _z_bytes_alias_arc_slice(_z_bytes_t *dst, _z_arc_slice_t *s) { dst->_slices = _z_arc_slice_svec_alias_element(s); } +_z_bytes_t _z_bytes_alias(const _z_bytes_t src); bool _z_bytes_check(const _z_bytes_t *bytes); z_result_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src); z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s); diff --git a/include/zenoh-pico/net/encoding.h b/include/zenoh-pico/net/encoding.h index 79a678084..a100071b4 100644 --- a/include/zenoh-pico/net/encoding.h +++ b/include/zenoh-pico/net/encoding.h @@ -40,6 +40,7 @@ _z_encoding_t _z_encoding_wrap(uint16_t id, const char *schema); z_result_t _z_encoding_make(_z_encoding_t *encoding, uint16_t id, const char *schema, size_t len); void _z_encoding_clear(_z_encoding_t *encoding); z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src); +_z_encoding_t _z_encoding_alias(_z_encoding_t src); void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src); _z_encoding_t _z_encoding_steal(_z_encoding_t *val); diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 433fc1217..eb1665f3f 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -20,6 +20,7 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/keyexpr.h" #ifdef __cplusplus extern "C" { @@ -32,7 +33,7 @@ typedef struct _z_query_t { _z_keyexpr_t _key; _z_value_t _value; uint32_t _request_id; - _z_session_rc_t _zn; + _z_session_weak_t _zn; _z_bytes_t _attachment; _z_string_t _parameters; bool _anyke; @@ -46,9 +47,10 @@ static inline bool _z_query_check(const _z_query_t *query) { } z_result_t _z_query_send_reply_final(_z_query_t *q); void _z_query_clear(_z_query_t *q); -z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src); void _z_query_free(_z_query_t **query); +_Z_REFCOUNT_DEFINE(_z_query, _z_query) + /** * Return type when declaring a queryable. */ @@ -65,11 +67,11 @@ static inline _z_query_t _z_query_alias(_z_value_t *value, _z_keyexpr_t *key, co _z_session_rc_t *zn, uint32_t request_id, const _z_bytes_t *attachment, bool anyke) { return (_z_query_t){ - ._key = *key, - ._value = *value, + ._key = _z_keyexpr_alias(*key), + ._value = _z_value_alias(*value), ._request_id = request_id, - ._zn = *zn, - ._attachment = *attachment, + ._zn = _z_session_rc_clone_as_weak(zn), + ._attachment = _z_bytes_alias(*attachment), ._parameters = _z_string_alias_slice(parameters), ._anyke = anyke, }; diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 7a9a79efb..0d98b5b4f 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -181,6 +181,7 @@ static inline bool _z_value_check(const _z_value_t *value) { } _z_value_t _z_value_steal(_z_value_t *value); z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src); +_z_value_t _z_value_alias(_z_value_t src); void _z_value_move(_z_value_t *dst, _z_value_t *src); void _z_value_clear(_z_value_t *src); void _z_value_free(_z_value_t **hello); diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index d1fefc934..9f1c02475 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -90,12 +90,12 @@ typedef struct { } _z_publication_t; // Forward type declaration to avoid cyclical include -typedef struct _z_query_t _z_query_t; +typedef struct _z_query_rc_t _z_query_rc_t; /** * The callback signature of the functions handling query messages. */ -typedef void (*_z_closure_query_callback_t)(_z_query_t *query, void *arg); +typedef void (*_z_closure_query_callback_t)(_z_query_rc_t *query, void *arg); typedef struct { _z_keyexpr_t _key; diff --git a/src/api/api.c b/src/api/api.c index c4b827209..1e6d13634 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -417,16 +417,18 @@ z_query_consolidation_t z_query_consolidation_none(void) { z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); } void z_query_parameters(const z_loaned_query_t *query, z_view_string_t *parameters) { - parameters->_val = _z_string_alias(query->_parameters); + parameters->_val = _z_string_alias(_Z_RC_IN_VAL(query)->_parameters); } -const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->_attachment; } +const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_attachment; } -const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->_key; } +const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_key; } -const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->_value.payload; } +const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_value.payload; } -const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->_value.encoding; } +const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { + return &_Z_RC_IN_VAL(query)->_value.encoding; +} void z_closure_sample_call(const z_loaned_closure_sample_t *closure, z_loaned_sample_t *sample) { if (closure->call != NULL) { @@ -1121,7 +1123,7 @@ bool z_reply_replier_id(const z_loaned_reply_t *reply, z_id_t *out_id) { #endif // Z_FEATURE_QUERY == 1 #if Z_FEATURE_QUERYABLE == 1 -_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_query_t, query, _z_query_check, _z_query_null, _z_query_copy, _z_query_clear) +_Z_OWNED_FUNCTIONS_RC_IMPL(query) void _z_queryable_drop(_z_queryable_t *queryable) { _z_undeclare_queryable(queryable); @@ -1191,7 +1193,9 @@ void z_query_reply_options_default(z_query_reply_options_t *options) { z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload, const z_query_reply_options_t *options) { - if (_Z_RC_IS_NULL(&query->_zn)) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn); + if (_Z_RC_IS_NULL(&sess_rc)) { return _Z_ERR_SESSION_CLOSED; } // Set options @@ -1206,11 +1210,12 @@ z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t _z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this), .encoding = _z_encoding_from_owned(&opts.encoding->_this)}; - z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, + z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, opts.congestion_control, opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(&opts.attachment->_this)); z_bytes_drop(payload); // Clean-up + _z_session_rc_drop(&sess_rc); z_encoding_drop(opts.encoding); z_bytes_drop(opts.attachment); return ret; @@ -1226,7 +1231,9 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options) { z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, const z_query_reply_del_options_t *options) { - if (_Z_RC_IS_NULL(&query->_zn)) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn); + if (_Z_RC_IS_NULL(&sess_rc)) { return _Z_ERR_SESSION_CLOSED; } _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); @@ -1239,10 +1246,11 @@ z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyex _z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()}; - z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, + z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control, opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(&opts.attachment->_this)); // Clean-up + _z_session_rc_drop(&sess_rc); z_bytes_drop(opts.attachment); return ret; } @@ -1251,7 +1259,9 @@ void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { o z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *payload, const z_query_reply_err_options_t *options) { - if (_Z_RC_IS_NULL(&query->_zn)) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn); + if (_Z_RC_IS_NULL(&sess_rc)) { return _Z_ERR_SESSION_CLOSED; } z_query_reply_err_options_t opts; @@ -1263,7 +1273,8 @@ z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *pay // Set value _z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this), .encoding = _z_encoding_from_owned(&opts.encoding->_this)}; - z_result_t ret = _z_send_reply_err(query, &query->_zn, value); + z_result_t ret = _z_send_reply_err(_Z_RC_IN_VAL(query), &sess_rc, value); + _z_session_rc_drop(&sess_rc); z_bytes_drop(payload); // Clean-up z_encoding_drop(opts.encoding); diff --git a/src/collections/bytes.c b/src/collections/bytes.c index 63273ff6d..aa81f07f0 100644 --- a/src/collections/bytes.c +++ b/src/collections/bytes.c @@ -31,6 +31,12 @@ z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src) { return _z_arc_slice_svec_copy(&dst->_slices, &src->_slices, true); } +_z_bytes_t _z_bytes_alias(const _z_bytes_t src) { + _z_bytes_t dst; + dst._slices = _z_arc_slice_svec_alias(&src._slices); + return dst; +} + _z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src) { _z_bytes_t dst = _z_bytes_null(); _z_bytes_copy(&dst, src); diff --git a/src/net/encoding.c b/src/net/encoding.c index 45b13e4d3..5349d3155 100644 --- a/src/net/encoding.c +++ b/src/net/encoding.c @@ -16,6 +16,7 @@ #include #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/collections/string.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" @@ -54,6 +55,17 @@ z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src) { return _Z_RES_OK; } +_z_encoding_t _z_encoding_alias(_z_encoding_t src) { + _z_encoding_t dst; + dst.id = src.id; + if (_z_string_check(&src.schema)) { + _z_string_alias(src.schema); + } else { + dst.schema = _z_string_null(); + } + return dst; +} + void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src) { dst->id = src->id; src->id = _Z_ENCODING_ID_DEFAULT; diff --git a/src/net/query.c b/src/net/query.c index e1748e8c8..219e04d12 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -13,6 +13,7 @@ #include "zenoh-pico/net/query.h" +#include "zenoh-pico/net/session.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/utils/logging.h" @@ -22,16 +23,19 @@ static void _z_query_clear_inner(_z_query_t *q) { _z_value_clear(&q->_value); _z_bytes_drop(&q->_attachment); _z_string_clear(&q->_parameters); - _z_session_rc_drop(&q->_zn); + _z_session_weak_drop(&q->_zn); } z_result_t _z_query_send_reply_final(_z_query_t *q) { - if (_Z_RC_IS_NULL(&q->_zn)) { + // Try to upgrade session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&q->_zn); + if (!_Z_RC_IS_NULL(&sess_rc)) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id); - z_result_t ret = _z_send_n_msg(_Z_RC_IN_VAL(&q->_zn), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + z_result_t ret = _z_send_n_msg(_Z_RC_IN_VAL(&sess_rc), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); _z_msg_clear(&z_msg); + _z_session_rc_drop(&sess_rc); return ret; } @@ -44,23 +48,6 @@ void _z_query_clear(_z_query_t *q) { _z_query_clear_inner(q); } -z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src) { - *dst = _z_query_null(); - _Z_RETURN_IF_ERR(_z_keyexpr_copy(&dst->_key, &src->_key)); - _Z_CLEAN_RETURN_IF_ERR(_z_value_copy(&dst->_value, &src->_value), _z_query_clear_inner(dst)); - _Z_CLEAN_RETURN_IF_ERR(_z_bytes_copy(&dst->_attachment, &src->_attachment), _z_query_clear_inner(dst)); - _Z_CLEAN_RETURN_IF_ERR(_z_string_copy(&dst->_parameters, &src->_parameters), _z_query_clear_inner(dst)); - _z_session_rc_copy(&dst->_zn, &src->_zn); - if (_Z_RC_IS_NULL(&dst->_zn)) { - _z_query_clear_inner(dst); - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - dst->_anyke = src->_anyke; - dst->_request_id = src->_request_id; - dst->_zn = src->_zn; - return _Z_RES_OK; -} - void _z_query_free(_z_query_t **query) { _z_query_t *ptr = *query; if (ptr != NULL) { diff --git a/src/protocol/core.c b/src/protocol/core.c index da1ef9ef8..37c2a7bcc 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -19,7 +19,9 @@ #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" +#include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/collections/slice.h" +#include "zenoh-pico/net/encoding.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/utils/endianness.h" @@ -60,6 +62,12 @@ z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src) { _Z_CLEAN_RETURN_IF_ERR(_z_bytes_copy(&dst->payload, &src->payload), _z_encoding_clear(&dst->encoding)); return _Z_RES_OK; } +_z_value_t _z_value_alias(_z_value_t src) { + _z_value_t dst; + dst.payload = _z_bytes_alias(src.payload); + dst.encoding = _z_encoding_alias(src.encoding); + return dst; +} z_result_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src) { *dst = _z_hello_null(); diff --git a/src/session/queryable.c b/src/session/queryable.c index 06d81c647..8c9e56895 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -224,15 +224,15 @@ static z_result_t _z_trigger_queryables_inner(_z_session_rc_t *zsrc, _z_msg_quer } } // Build the z_query - _z_query_t query = + _z_query_t q = _z_query_alias(&msgq->_ext_value, &key, &msgq->_parameters, zsrc, qid, &msgq->_ext_attachment, anyke); + _z_query_rc_t query = _z_query_rc_new_from_val(&q); // Parse session_queryable svec for (size_t i = 0; i < qle_nb; i++) { _z_queryable_infos_t *qle_info = _z_queryable_infos_svec_get(&qles, i); qle_info->callback(&query, qle_info->arg); } - // Send reply final message - _z_query_send_reply_final(&query); + _z_query_rc_drop(&query); // Clean up _z_keyexpr_clear(&key); #if Z_FEATURE_RX_CACHE != 1