From 8b4c63b242068cdf5833d5590866ae52b528f3f9 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Tue, 29 Apr 2025 15:16:16 -0500 Subject: [PATCH 1/2] Callbacks when a stanza is ack'd or fails to send This adds two new callbacks, one of which runs with a stanza id when a stanza sends and gets an sm ack from the server, and one which runs with a stanza id when a stanza fails and will not be retried. Incomplete and presented for discussion of the design. An id element has been added to the send/sm queue in order to facilitate passing this to the callback without re-parsing the stanza. This is also added to the sm serialization, though we could parse the stanza on restore to avoid this I suppose. TODO: fail everything on stream end --- src/common.h | 5 +++++ src/conn.c | 59 ++++++++++++++++++++++++++++++++++++++++++++++++---- strophe.h | 10 +++++++++ 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/common.h b/src/common.h index a367367a..23b30e66 100644 --- a/src/common.h +++ b/src/common.h @@ -163,6 +163,7 @@ struct _xmpp_send_queue_t { xmpp_send_queue_owner_t owner; void *userdata; uint32_t sm_h; + char *id; xmpp_send_queue_t *prev, *next; }; @@ -330,6 +331,10 @@ struct _xmpp_conn_t { xmpp_sockopt_callback sockopt_cb; xmpp_sm_callback sm_callback; void *sm_callback_ctx; + xmpp_sm_ack_callback sm_ack_callback; + void *sm_ack_callback_ctx; + xmpp_sm_ack_callback sm_fail_callback; + void *sm_fail_callback_ctx; }; void conn_disconnect(xmpp_conn_t *conn); diff --git a/src/conn.c b/src/conn.c index f586a59d..3aba72d0 100644 --- a/src/conn.c +++ b/src/conn.c @@ -20,6 +20,7 @@ * A part of those functions is listed under the \ref TLS section. */ +#include #include #include #include @@ -115,6 +116,7 @@ static void _send_valist(xmpp_conn_t *conn, static int _send_raw(xmpp_conn_t *conn, char *data, size_t len, + const char *id, xmpp_send_queue_owner_t owner, void *userdata); @@ -1279,6 +1281,22 @@ void xmpp_conn_set_sm_callback(xmpp_conn_t *conn, conn->sm_callback_ctx = ctx; } +void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn, + xmpp_sm_ack_callback cb, + void *ctx) +{ + conn->sm_ack_callback = cb; + conn->sm_ack_callback_ctx = ctx; +} + +void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn, + xmpp_sm_ack_callback cb, + void *ctx) +{ + conn->sm_fail_callback = cb; + conn->sm_fail_callback_ctx = ctx; +} + struct sm_restore { xmpp_conn_t *conn; const unsigned char *state; @@ -1324,7 +1342,8 @@ static int sm_load_string(struct sm_restore *sm, char **val, size_t *len) memcpy(*val, sm->state, l); (*val)[l] = '\0'; sm->state += l; - *len = l; + if (len) + *len = l; return 0; } @@ -1454,10 +1473,17 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn, ret = sm_load_string(&sm, &item->data, &item->len); if (ret) goto err_reload; + if (sm.state < sm.state_end) { + ret = sm_load_string(&sm, &item->id, NULL); + if (ret) + goto err_reload; + } item->owner = XMPP_QUEUE_USER; } + assert(sm.state == sm.state_end); + return XMPP_EOK; err_reload: @@ -1496,6 +1522,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf) while (peek) { sm_queue_len++; sm_queue_size += 10 + peek->len; + if (peek->id) sm_queue_size += 5 + strlen(peek->id); peek = peek->next; } @@ -1505,6 +1532,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf) while (peek) { send_queue_len++; send_queue_size += 5 + peek->len; + if (peek->id) send_queue_size += 5 + strlen(peek->id); peek = peek->next; } @@ -1563,6 +1591,17 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf) goto err_serialize; memcpy(next, peek->data, peek->len); next += peek->len; + + if (peek->id) { + uint32_t len = strlen(peek->id); + if (sm_store_u32(&next, end, 0x7a, len)) + goto err_serialize; + if (next + len > end) + goto err_serialize; + memcpy(next, peek->id, len); + next += len; + } + peek = peek->next; } @@ -1813,6 +1852,10 @@ char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn, if (!t) return NULL; + if (conn->sm_ack_callback && t->id) { + conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, t->id); + } + /* In case there exists a SM stanza that is linked to the * one we're currently dropping, also delete that one. */ @@ -2088,6 +2131,10 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn, e = pop_queue_front(&conn->sm_state->sm_queue); strophe_debug_verbose(2, conn->ctx, "conn", "SM_Q_DROP: %p, h=%lu", e, e->sm_h); + if (conn->sm_ack_callback && e->id) { + conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, + e->id); + } c = queue_element_free(conn->ctx, e); strophe_free(conn->ctx, c); } @@ -2115,6 +2162,7 @@ char *queue_element_free(xmpp_ctx_t *ctx, xmpp_send_queue_t *e) { char *ret = e->data; strophe_debug_verbose(2, ctx, "conn", "Q_FREE: %p", e); + strophe_free(ctx, e->id); memset(e, 0, sizeof(*e)); strophe_free(ctx, e); strophe_debug_verbose(3, ctx, "conn", "Q_CONTENT: %s", ret); @@ -2231,7 +2279,7 @@ void send_raw(xmpp_conn_t *conn, return; } - _send_raw(conn, d, len, owner, userdata); + _send_raw(conn, d, len, NULL, owner, userdata); } static void _send_valist(xmpp_conn_t *conn, @@ -2266,7 +2314,7 @@ static void _send_valist(xmpp_conn_t *conn, va_end(apdup); /* len - 1 so we don't send trailing \0 */ - _send_raw(conn, bigbuf, len - 1, owner, NULL); + _send_raw(conn, bigbuf, len - 1, NULL, owner, NULL); } else { /* go through send_raw() which does the strdup() for us */ send_raw(conn, buf, len, owner, NULL); @@ -2300,7 +2348,8 @@ void send_stanza(xmpp_conn_t *conn, goto out; } - _send_raw(conn, buf, len, owner, NULL); + _send_raw(conn, buf, len, xmpp_stanza_get_attribute(stanza, "id"), owner, + NULL); out: xmpp_stanza_release(stanza); } @@ -2342,6 +2391,7 @@ xmpp_send_queue_t *pop_queue_front(xmpp_queue_t *queue) static int _send_raw(xmpp_conn_t *conn, char *data, size_t len, + const char *id, xmpp_send_queue_owner_t owner, void *userdata) { @@ -2358,6 +2408,7 @@ static int _send_raw(xmpp_conn_t *conn, item->data = data; item->len = len; + item->id = id ? strophe_strdup(conn->ctx, id) : NULL; item->next = NULL; item->prev = conn->send_queue_tail; item->written = 0; diff --git a/strophe.h b/strophe.h index 18ca3e19..b34697f6 100644 --- a/strophe.h +++ b/strophe.h @@ -448,6 +448,16 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn, const unsigned char *sm_state, size_t sm_state_len); +typedef void (*xmpp_sm_ack_callback)(xmpp_conn_t *conn, + void *ctx, + const char *id); +void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn, + xmpp_sm_ack_callback cb, + void *ctx); +void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn, + xmpp_sm_ack_callback cb, + void *ctx); + void xmpp_free_sm_state(xmpp_sm_state_t *sm_state); int xmpp_connect_client(xmpp_conn_t *conn, From f60571d8dc6328f801120e4a8411bcbd97f4781f Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Tue, 29 Apr 2025 18:55:54 -0500 Subject: [PATCH 2/2] Also send ack/fail callbacks on resume/resume fail --- src/auth.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/auth.c b/src/auth.c index a1bc3b5f..344711bd 100644 --- a/src/auth.c +++ b/src/auth.c @@ -1359,13 +1359,18 @@ static int _get_h_attribute(xmpp_stanza_t *stanza, unsigned long *ul_h) return 0; } -static void _sm_queue_cleanup(xmpp_conn_t *conn, unsigned long ul_h) +static void _sm_queue_cleanup(xmpp_conn_t *conn, unsigned long ul_h, int failed) { xmpp_send_queue_t *e; while ((e = peek_queue_front(&conn->sm_state->sm_queue))) { if (e->sm_h >= ul_h) break; e = pop_queue_front(&conn->sm_state->sm_queue); + if (failed && conn->sm_fail_callback && e->id) { + conn->sm_fail_callback(conn, conn->sm_fail_callback_ctx, e->id); + } else if (conn->sm_ack_callback && e->id) { + conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, e->id); + } strophe_free(conn->ctx, queue_element_free(conn->ctx, e)); } } @@ -1441,7 +1446,7 @@ static int _handle_sm(xmpp_conn_t *const conn, conn->sm_state->sm_sent_nr = conn->sm_state->sm_queue.head->sm_h; else conn->sm_state->sm_sent_nr = ul_h; - _sm_queue_cleanup(conn, ul_h); + _sm_queue_cleanup(conn, ul_h, 0); _sm_queue_resend(conn); strophe_debug(conn->ctx, "xmpp", "Session resumed successfully."); _stream_negotiation_success(conn); @@ -1468,7 +1473,7 @@ static int _handle_sm(xmpp_conn_t *const conn, /* In cases there's no `h` included, drop all elements. */ ul_h = (unsigned long)-1; } - _sm_queue_cleanup(conn, ul_h); + _sm_queue_cleanup(conn, ul_h, 1); } } else if (!strcmp(cause, "feature-not-implemented")) { conn->sm_state->resume = 0;