Skip to content

Commit 6d12f47

Browse files
authored
Fixes #1753: Expire Listener-side connection on certificate rotation (#1763)
* Fixes #1753: Expire Listener-side connection on certificate rotation When an AMQP listener's sslProfile oldestValidOrdinal attribute is updated scan all connections that are children of that listener and check if the connection's tlsOrdinal value is less than the new oldestValidOrdinal value. If it is then close the connection. Patch includes additional efforts to make the code fluffy: 1) Move qd_listener_t-specific code out of connection_manager.c and move it to the qd_listener.c file. 2) Modify the qd_connection_t deferred callback API to allow an extra context parameter to be passed to the callback. Hide the implementation details by moving the data structure from a header file into qd_connection.c. 3) Remove dead code. * fixup: verify tlsOrdinal of new inter-router connections
1 parent 1939b0e commit 6d12f47

File tree

10 files changed

+616
-208
lines changed

10 files changed

+616
-208
lines changed

include/qpid/dispatch/server.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,6 @@ typedef struct sys_mutex_t sys_mutex_t;
4343
* @{
4444
*/
4545

46-
/**
47-
* Deferred callback
48-
*
49-
* This type is for calls that are deferred until they can be invoked on
50-
* a specific connection's thread.
51-
*
52-
* @param context An opaque context to be passed back with the call.
53-
* @param discard If true, the call should be discarded because the connection it
54-
* was pending on was deleted.
55-
*/
56-
typedef void (*qd_deferred_t)(void *context, bool discard);
57-
58-
5946
/**
6047
* Run the server threads until completion - The blocking version.
6148
*

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ static char *edge_role = "edge";
5757
static char *inter_edge_role = "inter-edge";
5858

5959

60-
static void deferred_AMQP_rx_handler(void *context, bool discard);
60+
static void deferred_AMQP_rx_handler(qd_connection_t *qd_conn, void *context, bool discard);
6161
static bool parse_failover_property_list(qd_router_t *router, qd_connection_t *conn, pn_data_t *props);
6262
static void clear_producer_activation(qdr_core_t *core, qdr_delivery_t *delivery);
6363
static void clear_consumer_activation(qdr_core_t *core, qdr_delivery_t *delivery);
@@ -980,7 +980,7 @@ bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
980980
/**
981981
* Deferred callback for inbound delivery handler
982982
*/
983-
static void deferred_AMQP_rx_handler(void *context, bool discard)
983+
static void deferred_AMQP_rx_handler(qd_connection_t *qd_conn, void *context, bool discard)
984984
{
985985
qd_link_t_sp *safe_qdl = (qd_link_t_sp*) context;
986986

src/adaptors/amqp/connection_manager.c

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include "server_config.h"
2626
#include "dispatch_private.h"
2727
#include "entity.h"
28-
#include "server_private.h"
2928

3029
#include "qpid/dispatch/ctools.h"
3130
#include "qpid/dispatch/failoverlist.h"
@@ -42,7 +41,6 @@
4241

4342

4443
struct qd_connection_manager_t {
45-
qd_server_t *server;
4644
qd_listener_list_t listeners;
4745
qd_connector_config_list_t connector_configs;
4846
};
@@ -60,58 +58,11 @@ static void log_config(qd_server_config_t *c, const char *what, bool create)
6058
QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
6159
{
6260
qd_connection_manager_t *cm = qd->connection_manager;
63-
qd_listener_t *li = qd_listener(qd->server);
64-
if (!li || qd_server_config_load(&li->config, entity, true) != QD_ERROR_NONE) {
65-
qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message());
66-
qd_listener_decref(li);
61+
qd_listener_t *li = qd_listener_create(qd, entity);
62+
if (!li) {
6763
return 0;
6864
}
6965

70-
if (li->config.ssl_profile_name) {
71-
li->tls_config = qd_tls_config(li->config.ssl_profile_name,
72-
QD_TLS_TYPE_PROTON_AMQP,
73-
QD_TLS_CONFIG_SERVER_MODE,
74-
li->config.verify_host_name,
75-
li->config.ssl_require_peer_authentication);
76-
if (!li->tls_config) {
77-
// qd_tls_config() sets qd_error_message():
78-
qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Failed to configure TLS for Listener %s: %s",
79-
li->config.name, qd_error_message());
80-
qd_listener_decref(li);
81-
return 0;
82-
}
83-
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
84-
li->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(li->tls_config);
85-
}
86-
87-
char *fol = qd_entity_opt_string(entity, "failoverUrls", 0);
88-
if (fol) {
89-
li->config.failover_list = qd_failover_list(fol);
90-
free(fol);
91-
if (li->config.failover_list == 0) {
92-
qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener, bad failover list: %s",
93-
qd_error_message());
94-
qd_listener_decref(li);
95-
return 0;
96-
}
97-
} else {
98-
li->config.failover_list = 0;
99-
}
100-
101-
//
102-
// Set up the vanflow record for this listener (ROUTER_ACCESS).
103-
// Do this only for router-to-router links: not mgmt/metrics/healthz/websockets listeners
104-
//
105-
if (strcmp(li->config.role, "inter-router") == 0 ||
106-
strcmp(li->config.role, "edge") == 0 ||
107-
strcmp(li->config.role, "inter-edge") == 0) {
108-
li->vflow_record = vflow_start_record(VFLOW_RECORD_ROUTER_ACCESS, 0);
109-
vflow_set_string(li->vflow_record, VFLOW_ATTRIBUTE_NAME, li->config.name);
110-
vflow_set_string(li->vflow_record, VFLOW_ATTRIBUTE_ROLE, li->config.role);
111-
vflow_set_uint64(li->vflow_record, VFLOW_ATTRIBUTE_LINK_COUNT, 0);
112-
}
113-
114-
DEQ_ITEM_INIT(li);
11566
DEQ_INSERT_TAIL(cm->listeners, li);
11667
log_config(&li->config, "Listener", true);
11768
return li;
@@ -299,7 +250,6 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)
299250
if (!cm)
300251
return 0;
301252

302-
cm->server = qd->server;
303253
DEQ_INIT(cm->listeners);
304254
DEQ_INIT(cm->connector_configs);
305255

@@ -315,17 +265,7 @@ void qd_connection_manager_free(qd_connection_manager_t *cm)
315265
qd_listener_t *li = DEQ_HEAD(cm->listeners);
316266
while (li) {
317267
DEQ_REMOVE_HEAD(cm->listeners);
318-
if (li->pn_listener) {
319-
// DISPATCH-1508: force cleanup of pn_listener context. This is
320-
// usually done in the PN_LISTENER_CLOSE event handler in server.c,
321-
// but since the router is going down those events will no longer
322-
// be generated.
323-
pn_listener_set_context(li->pn_listener, 0);
324-
pn_listener_close(li->pn_listener);
325-
li->pn_listener = 0;
326-
qd_listener_decref(li); // for the pn_listener's context
327-
}
328-
qd_listener_decref(li);
268+
qd_listener_delete(li, true); // true == router is shutting down
329269
li = DEQ_HEAD(cm->listeners);
330270
}
331271

@@ -351,6 +291,10 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd)
351291

352292
while (li) {
353293
if (!li->pn_listener) {
294+
// DISPATCH-55: failure to bind on router initialization can result in a router that cannot be accessed by
295+
// management, preventing diagnosing/fixing the issue. Treat listener failure on initial bring up as a
296+
// critical issue. Failure of listeners added after the router has been successfully started will simply
297+
// result in a logged error.
354298
if (!qd_listener_listen(li) && first_start) {
355299
qd_log(LOG_CONN_MGR, QD_LOG_CRITICAL, "Listen on %s failed during initial config",
356300
li->config.host_port);
@@ -375,17 +319,9 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im
375319
{
376320
qd_listener_t *li = (qd_listener_t*) impl;
377321
if (li) {
378-
if (li->pn_listener) {
379-
pn_listener_close(li->pn_listener);
380-
}
381-
else if (li->http) {
382-
qd_lws_listener_close(li->http);
383-
}
384-
385-
log_config(&li->config, "Listener", false);
386-
387322
DEQ_REMOVE(qd->connection_manager->listeners, li);
388-
qd_listener_decref(li);
323+
log_config(&li->config, "Listener", false);
324+
qd_listener_delete(li, false); // false == do a clean listener close
389325
}
390326
}
391327

src/adaptors/amqp/qd_connection.c

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,18 @@
3838
#include <inttypes.h>
3939

4040

41-
ALLOC_DEFINE(qd_deferred_call_t);
4241
ALLOC_DEFINE_SAFE(qd_connection_t);
4342

43+
/**
44+
* Context for the deferred callback
45+
*/
46+
struct qd_deferred_call_t {
47+
DEQ_LINKS(struct qd_deferred_call_t);
48+
qd_deferred_cb_t call;
49+
void *context;
50+
};
51+
ALLOC_DEFINE(qd_deferred_call_t);
52+
4453
const char *MECH_EXTERNAL = "EXTERNAL";
4554

4655

@@ -474,24 +483,16 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
474483
}
475484

476485

477-
void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context)
486+
void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_cb_t call, void *context)
478487
{
479-
if (!conn)
480-
return;
481-
482-
qd_connection_invoke_deferred_impl(conn, call, context, new_qd_deferred_call_t());
488+
assert(conn);
489+
qd_connection_invoke_deferred_impl(conn, qd_connection_new_qd_deferred_call_t(call, context));
483490
}
484491

485492

486-
void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_t call, void *context, void *dct)
493+
void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_call_t *dc)
487494
{
488-
if (!conn)
489-
return;
490-
491-
qd_deferred_call_t *dc = (qd_deferred_call_t*)dct;
492-
DEQ_ITEM_INIT(dc);
493-
dc->call = call;
494-
dc->context = context;
495+
assert(!!conn && !!dc);
495496

496497
sys_mutex_lock(&conn->deferred_call_lock);
497498
DEQ_INSERT_TAIL(conn->deferred_calls, dc);
@@ -502,37 +503,46 @@ void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_t cal
502503
sys_mutex_unlock(qd_server_get_activation_lock(conn->server));
503504
}
504505

505-
void *qd_connection_new_qd_deferred_call_t(void)
506+
507+
qd_deferred_call_t *qd_connection_new_qd_deferred_call_t(qd_deferred_cb_t callback, void *context)
506508
{
507-
return new_qd_deferred_call_t();
509+
qd_deferred_call_t *dc = new_qd_deferred_call_t();
510+
DEQ_ITEM_INIT(dc);
511+
dc->call = callback;
512+
dc->context = context;
513+
return dc;
508514
}
509515

510516

511-
void qd_connection_free_qd_deferred_call_t(void *dct)
517+
void qd_connection_free_qd_deferred_call_t(qd_deferred_call_t *dc)
512518
{
513-
free_qd_deferred_call_t((qd_deferred_call_t *)dct);
519+
free_qd_deferred_call_t(dc);
514520
}
515521

522+
516523
void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard)
517524
{
518-
if (!conn)
519-
return;
525+
assert(conn);
520526

521-
// Lock access to deferred_calls, other threads may concurrently add to it. Invoke
527+
// Snapshot the deferred_calls list under lock since other threads may concurrently add to it. Invoke
522528
// the calls outside of the critical section.
523529
//
530+
qd_deferred_call_list_t deferred_calls = DEQ_EMPTY;
524531
sys_mutex_lock(&conn->deferred_call_lock);
525-
qd_deferred_call_t *dc;
526-
while ((dc = DEQ_HEAD(conn->deferred_calls))) {
527-
DEQ_REMOVE_HEAD(conn->deferred_calls);
528-
sys_mutex_unlock(&conn->deferred_call_lock);
529-
dc->call(dc->context, discard);
530-
free_qd_deferred_call_t(dc);
531-
sys_mutex_lock(&conn->deferred_call_lock);
532-
}
532+
DEQ_MOVE(conn->deferred_calls, deferred_calls);
533533
sys_mutex_unlock(&conn->deferred_call_lock);
534-
}
535534

535+
qd_deferred_call_t *dc = DEQ_HEAD(deferred_calls);
536+
while (dc) {
537+
// Note: this destroys the list as it is traversed. This is an optimization: since everything is freed do not
538+
// bother with the overhead of dequeing
539+
qd_deferred_call_t *tmp = dc;
540+
dc = DEQ_NEXT(dc);
541+
542+
tmp->call(conn, tmp->context, discard);
543+
free_qd_deferred_call_t(tmp);
544+
}
545+
}
536546

537547

538548
const char* qd_connection_name(const qd_connection_t *c)
@@ -721,15 +731,14 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_
721731
void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t *qd_conn, bool conn_closed);
722732

723733

724-
static void timeout_on_handshake(void *context, bool discard)
734+
static void timeout_on_handshake(qd_connection_t *qd_conn, void *context, bool discard)
725735
{
726736
if (discard)
727737
return;
728738

729-
qd_connection_t *ctx = (qd_connection_t*) context;
730-
pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
731-
pn_transport_close_head(tport);
732-
connect_fail(ctx, QD_AMQP_COND_NOT_ALLOWED, "Timeout waiting for initial handshake");
739+
pn_transport_t *tport = pn_connection_transport(qd_conn->pn_conn);
740+
pn_transport_close_head(tport); // force close to avoid doing handshake (may hang otherwise)
741+
connect_fail(qd_conn, QD_AMQP_COND_NOT_ALLOWED, "Timeout waiting for initial handshake");
733742
}
734743

735744

src/adaptors/amqp/qd_connection.h

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,9 @@ typedef struct qd_session_t qd_session_t;
4747
typedef struct qd_timer_t qd_timer_t;
4848
typedef struct qd_tls_session_t qd_tls_session_t;
4949

50-
51-
// defer a function call to be invoked on the connections proactor thread at
52-
// a later time
53-
//
54-
typedef struct qd_deferred_call_t {
55-
DEQ_LINKS(struct qd_deferred_call_t);
56-
qd_deferred_t call;
57-
void *context;
58-
} qd_deferred_call_t;
59-
50+
typedef struct qd_deferred_call_t qd_deferred_call_t;
6051
DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);
6152

62-
6353
typedef struct qd_pn_free_link_t qd_pn_free_link_t;
6454
DEQ_DECLARE(qd_pn_free_link_t, qd_pn_free_link_list_t);
6555

@@ -146,39 +136,47 @@ const char* qd_connection_name(const qd_connection_t *c);
146136
* Schedule a call to be invoked on a thread that has ownership of this connection.
147137
* It will be safe for the callback to perform operations related to this connection.
148138
*
149-
* @param conn Connection object
150-
* @param call The function to be invoked on the connection's thread
139+
* @param qd_conn Connection object. Note well: the caller must ensure qd_conn remains valid until after
140+
* this call returns.
141+
* @param call The function to be invoked asynchronously on the connection's thread.
151142
* @param context The context to be passed back in the callback
152143
*/
153-
void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context);
154-
155-
void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard);
144+
typedef void (*qd_deferred_cb_t)(qd_connection_t *qd_conn, void *context, bool discard);
145+
void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_cb_t call, void *context);
156146

157147
/**
158-
* Schedule a call to be invoked on a thread that has ownership of this connection
159-
* when it will be safe for the callback to perform operations related to this connection.
160-
* A qd_deferred_call_t object has been allocated before hand to avoid taking
161-
* the ENTITY_CACHE lock.
148+
* Alternative qd_connection_invoke_deferred() API
162149
*
163-
* @param conn Connection object
164-
* @param call The function to be invoked on the connection's thread
165-
* @param context The context to be passed back in the callback
166-
* @param dct Pointer to preallocated qd_deferred_call_t object
150+
* Allows the caller to pre-allocate the deferred callback context. This allows the context to be allocated outside of a
151+
* critical section where the qd_connection_t has been locked.
152+
*
153+
* @param conn Connection object. Note well: the caller must ensure qd_conn remains valid until after
154+
* this call returns.
155+
* @param call_context The qd_deferred_call_t allocated via qd_connection_new_qd_deferred_call_t(). Ownership is passed
156+
* to the call - do not call qd_connection_free_qd_deferred_call_t()
167157
*/
168-
void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_t call, void *context, void *dct);
169-
158+
void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_call_t *call_context);
170159

171160
/**
172161
* Allocate a qd_deferred_call_t object
173162
*/
174-
void *qd_connection_new_qd_deferred_call_t(void);
163+
qd_deferred_call_t *qd_connection_new_qd_deferred_call_t(qd_deferred_cb_t callback, void *context);
175164

176165
/**
177-
* Deallocate a qd_deferred_call_t object
166+
* Deallocate a qd_deferred_call_t object. Call this only in the case where the call context was not passed to
167+
* qd_connection_invoke_deferred_impl()
178168
*
179169
* @param dct Pointer to preallocated qd_deferred_call_t object
180170
*/
181-
void qd_connection_free_qd_deferred_call_t(void *dct);
171+
void qd_connection_free_qd_deferred_call_t(qd_deferred_call_t *dct);
172+
173+
174+
/**
175+
* Run all deferred callbacks.
176+
*
177+
* Invoked by the PN_CONNECTION_WAKE event handler
178+
*/
179+
void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard);
182180

183181

184182
/**
@@ -304,5 +302,4 @@ bool qd_connection_strip_annotations_in(const qd_connection_t *c);
304302
* ordinal.
305303
*/
306304
bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal);
307-
308305
#endif

0 commit comments

Comments
 (0)