Skip to content

Commit

Permalink
Query clone fix (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Dec 3, 2024
1 parent e5f4008 commit e530209
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 44 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ _Z_OWNED_TYPE_VALUE(_z_queryable_t, queryable)
/**
* Represents a Zenoh Query entity, received by Zenoh Queryable entities.
*/
_Z_OWNED_TYPE_VALUE(_z_query_t, query)
_Z_OWNED_TYPE_RC(_z_query_rc_t, query)

/**
* Represents the encoding of a payload, in a MIME-like format.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static inline _z_bytes_t _z_bytes_null(void) { return (_z_bytes_t){0}; }
static inline void _z_bytes_alias_arc_slice(_z_bytes_t *dst, _z_arc_slice_t *s) {
dst->_slices = _z_arc_slice_svec_alias_element(s);
}
_z_bytes_t _z_bytes_alias(const _z_bytes_t src);
bool _z_bytes_check(const _z_bytes_t *bytes);
z_result_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src);
z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ _z_encoding_t _z_encoding_wrap(uint16_t id, const char *schema);
z_result_t _z_encoding_make(_z_encoding_t *encoding, uint16_t id, const char *schema, size_t len);
void _z_encoding_clear(_z_encoding_t *encoding);
z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src);
_z_encoding_t _z_encoding_alias(_z_encoding_t src);
void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src);
_z_encoding_t _z_encoding_steal(_z_encoding_t *val);

Expand Down
14 changes: 8 additions & 6 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"

#ifdef __cplusplus
extern "C" {
Expand All @@ -32,7 +33,7 @@ typedef struct _z_query_t {
_z_keyexpr_t _key;
_z_value_t _value;
uint32_t _request_id;
_z_session_rc_t _zn;
_z_session_weak_t _zn;
_z_bytes_t _attachment;
_z_string_t _parameters;
bool _anyke;
Expand All @@ -46,9 +47,10 @@ static inline bool _z_query_check(const _z_query_t *query) {
}
z_result_t _z_query_send_reply_final(_z_query_t *q);
void _z_query_clear(_z_query_t *q);
z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src);
void _z_query_free(_z_query_t **query);

_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
* Return type when declaring a queryable.
*/
Expand All @@ -65,11 +67,11 @@ static inline _z_query_t _z_query_alias(_z_value_t *value, _z_keyexpr_t *key, co
_z_session_rc_t *zn, uint32_t request_id, const _z_bytes_t *attachment,
bool anyke) {
return (_z_query_t){
._key = *key,
._value = *value,
._key = _z_keyexpr_alias(*key),
._value = _z_value_alias(*value),
._request_id = request_id,
._zn = *zn,
._attachment = *attachment,
._zn = _z_session_rc_clone_as_weak(zn),
._attachment = _z_bytes_alias(*attachment),
._parameters = _z_string_alias_slice(parameters),
._anyke = anyke,
};
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ static inline bool _z_value_check(const _z_value_t *value) {
}
_z_value_t _z_value_steal(_z_value_t *value);
z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src);
_z_value_t _z_value_alias(_z_value_t src);
void _z_value_move(_z_value_t *dst, _z_value_t *src);
void _z_value_clear(_z_value_t *src);
void _z_value_free(_z_value_t **hello);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ typedef struct {
} _z_publication_t;

// Forward type declaration to avoid cyclical include
typedef struct _z_query_t _z_query_t;
typedef struct _z_query_rc_t _z_query_rc_t;

/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_closure_query_callback_t)(_z_query_t *query, void *arg);
typedef void (*_z_closure_query_callback_t)(_z_query_rc_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand Down
35 changes: 23 additions & 12 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,16 +417,18 @@ z_query_consolidation_t z_query_consolidation_none(void) {
z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); }

void z_query_parameters(const z_loaned_query_t *query, z_view_string_t *parameters) {
parameters->_val = _z_string_alias(query->_parameters);
parameters->_val = _z_string_alias(_Z_RC_IN_VAL(query)->_parameters);
}

const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->_attachment; }
const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_attachment; }

const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->_key; }
const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_key; }

const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->_value.payload; }
const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_value.payload; }

const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->_value.encoding; }
const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) {
return &_Z_RC_IN_VAL(query)->_value.encoding;
}

void z_closure_sample_call(const z_loaned_closure_sample_t *closure, z_loaned_sample_t *sample) {
if (closure->call != NULL) {
Expand Down Expand Up @@ -1121,7 +1123,7 @@ bool z_reply_replier_id(const z_loaned_reply_t *reply, z_id_t *out_id) {
#endif // Z_FEATURE_QUERY == 1

#if Z_FEATURE_QUERYABLE == 1
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_query_t, query, _z_query_check, _z_query_null, _z_query_copy, _z_query_clear)
_Z_OWNED_FUNCTIONS_RC_IMPL(query)

void _z_queryable_drop(_z_queryable_t *queryable) {
_z_undeclare_queryable(queryable);
Expand Down Expand Up @@ -1191,7 +1193,9 @@ void z_query_reply_options_default(z_query_reply_options_t *options) {

z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload,
const z_query_reply_options_t *options) {
if (_Z_RC_IS_NULL(&query->_zn)) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_SESSION_CLOSED;
}
// Set options
Expand All @@ -1206,11 +1210,12 @@ z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this),
.encoding = _z_encoding_from_owned(&opts.encoding->_this)};

z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
z_bytes_drop(payload);
// Clean-up
_z_session_rc_drop(&sess_rc);
z_encoding_drop(opts.encoding);
z_bytes_drop(opts.attachment);
return ret;
Expand All @@ -1226,7 +1231,9 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options) {

z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options) {
if (_Z_RC_IS_NULL(&query->_zn)) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_SESSION_CLOSED;
}
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
Expand All @@ -1239,10 +1246,11 @@ z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyex

_z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()};

z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
// Clean-up
_z_session_rc_drop(&sess_rc);
z_bytes_drop(opts.attachment);
return ret;
}
Expand All @@ -1251,7 +1259,9 @@ void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { o

z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *payload,
const z_query_reply_err_options_t *options) {
if (_Z_RC_IS_NULL(&query->_zn)) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_SESSION_CLOSED;
}
z_query_reply_err_options_t opts;
Expand All @@ -1263,7 +1273,8 @@ z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *pay
// Set value
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this),
.encoding = _z_encoding_from_owned(&opts.encoding->_this)};
z_result_t ret = _z_send_reply_err(query, &query->_zn, value);
z_result_t ret = _z_send_reply_err(_Z_RC_IN_VAL(query), &sess_rc, value);
_z_session_rc_drop(&sess_rc);
z_bytes_drop(payload);
// Clean-up
z_encoding_drop(opts.encoding);
Expand Down
6 changes: 6 additions & 0 deletions src/collections/bytes.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src) {
return _z_arc_slice_svec_copy(&dst->_slices, &src->_slices, true);
}

_z_bytes_t _z_bytes_alias(const _z_bytes_t src) {
_z_bytes_t dst;
dst._slices = _z_arc_slice_svec_alias(&src._slices);
return dst;
}

_z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src) {
_z_bytes_t dst = _z_bytes_null();
_z_bytes_copy(&dst, src);
Expand Down
12 changes: 12 additions & 0 deletions src/net/encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <string.h>

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/collections/string.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"

Expand Down Expand Up @@ -54,6 +55,17 @@ z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src) {
return _Z_RES_OK;
}

_z_encoding_t _z_encoding_alias(_z_encoding_t src) {
_z_encoding_t dst;
dst.id = src.id;
if (_z_string_check(&src.schema)) {
_z_string_alias(src.schema);
} else {
dst.schema = _z_string_null();
}
return dst;
}

void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src) {
dst->id = src->id;
src->id = _Z_ENCODING_ID_DEFAULT;
Expand Down
27 changes: 7 additions & 20 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "zenoh-pico/net/query.h"

#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/transport/common/tx.h"
#include "zenoh-pico/utils/logging.h"
Expand All @@ -22,16 +23,19 @@ static void _z_query_clear_inner(_z_query_t *q) {
_z_value_clear(&q->_value);
_z_bytes_drop(&q->_attachment);
_z_string_clear(&q->_parameters);
_z_session_rc_drop(&q->_zn);
_z_session_weak_drop(&q->_zn);
}

z_result_t _z_query_send_reply_final(_z_query_t *q) {
if (_Z_RC_IS_NULL(&q->_zn)) {
// Try to upgrade session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&q->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_TRANSPORT_TX_FAILED;
}
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id);
z_result_t ret = _z_send_n_msg(_Z_RC_IN_VAL(&q->_zn), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
z_result_t ret = _z_send_n_msg(_Z_RC_IN_VAL(&sess_rc), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
_z_msg_clear(&z_msg);
_z_session_rc_drop(&sess_rc);
return ret;
}

Expand All @@ -44,23 +48,6 @@ void _z_query_clear(_z_query_t *q) {
_z_query_clear_inner(q);
}

z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src) {
*dst = _z_query_null();
_Z_RETURN_IF_ERR(_z_keyexpr_copy(&dst->_key, &src->_key));
_Z_CLEAN_RETURN_IF_ERR(_z_value_copy(&dst->_value, &src->_value), _z_query_clear_inner(dst));
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_copy(&dst->_attachment, &src->_attachment), _z_query_clear_inner(dst));
_Z_CLEAN_RETURN_IF_ERR(_z_string_copy(&dst->_parameters, &src->_parameters), _z_query_clear_inner(dst));
_z_session_rc_copy(&dst->_zn, &src->_zn);
if (_Z_RC_IS_NULL(&dst->_zn)) {
_z_query_clear_inner(dst);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
dst->_anyke = src->_anyke;
dst->_request_id = src->_request_id;
dst->_zn = src->_zn;
return _Z_RES_OK;
}

void _z_query_free(_z_query_t **query) {
_z_query_t *ptr = *query;
if (ptr != NULL) {
Expand Down
8 changes: 8 additions & 0 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/collections/slice.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/utils/endianness.h"
Expand Down Expand Up @@ -60,6 +62,12 @@ z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src) {
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_copy(&dst->payload, &src->payload), _z_encoding_clear(&dst->encoding));
return _Z_RES_OK;
}
_z_value_t _z_value_alias(_z_value_t src) {
_z_value_t dst;
dst.payload = _z_bytes_alias(src.payload);
dst.encoding = _z_encoding_alias(src.encoding);
return dst;
}

z_result_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src) {
*dst = _z_hello_null();
Expand Down
6 changes: 3 additions & 3 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ static z_result_t _z_trigger_queryables_inner(_z_session_rc_t *zsrc, _z_msg_quer
}
}
// Build the z_query
_z_query_t query =
_z_query_t q =
_z_query_alias(&msgq->_ext_value, &key, &msgq->_parameters, zsrc, qid, &msgq->_ext_attachment, anyke);
_z_query_rc_t query = _z_query_rc_new_from_val(&q);
// Parse session_queryable svec
for (size_t i = 0; i < qle_nb; i++) {
_z_queryable_infos_t *qle_info = _z_queryable_infos_svec_get(&qles, i);
qle_info->callback(&query, qle_info->arg);
}
// Send reply final message
_z_query_send_reply_final(&query);
_z_query_rc_drop(&query);
// Clean up
_z_keyexpr_clear(&key);
#if Z_FEATURE_RX_CACHE != 1
Expand Down

0 comments on commit e530209

Please sign in to comment.