Skip to content

Commit 57a5ddc

Browse files
committed
Patch refactor:
Based on what I've learned doing POC work in the router core and PR request feedback. The tls ordinal is leveraged as a group attribute for the core. This will allow the core to identify which connection in the group takes precedence. Also removed extra copies of the correlator string and various code cleanups.
1 parent 8a6f770 commit 57a5ddc

File tree

13 files changed

+131
-109
lines changed

13 files changed

+131
-109
lines changed

include/qpid/dispatch/amqp.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ extern const char * const QD_CONNECTION_PROPERTY_VERSION_KEY;
155155
extern const char * const QD_CONNECTION_PROPERTY_COST_KEY;
156156
extern const char * const QD_CONNECTION_PROPERTY_ROLE_KEY;
157157
extern const char * const QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY;
158+
extern const char * const QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY;
158159
extern const char * const QD_CONNECTION_PROPERTY_CONN_ID;
159160
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY;
160161
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY;
@@ -165,7 +166,6 @@ extern const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY;
165166
extern const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE;
166167
extern const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY;
167168
extern const char * const QD_CONNECTION_PROPERTY_ACCESS_ID;
168-
extern const char * const QD_CONNECTION_PROPERTY_TLS_ORDINAL;
169169
/// @}
170170

171171
/** @name Terminus Addresses */

include/qpid/dispatch/protocol_adaptor.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -928,14 +928,13 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
928928
const char *user,
929929
const char *container,
930930
pn_data_t *connection_properties,
931-
uint64_t tls_ordinal,
932931
int ssl_ssf,
933932
bool ssl,
934933
const char *version,
935934
bool streaming_links,
936935
bool connection_trunking);
937936

938-
void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator);
937+
void qdr_connection_info_set_group(qdr_connection_info_t *info, const char *correlator, uint64_t ordinal);
939938
void qdr_connection_info_set_tls(qdr_connection_info_t *info, bool enabled, char *version, char *ciphers, int ssf);
940939

941940
void qd_adaptor_listener_init(void);

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 68 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,53 +1376,39 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
13761376
char rversion[128];
13771377
uint64_t connection_id = qd_connection_connection_id(conn);
13781378
pn_connection_t *pn_conn = qd_connection_pn(conn);
1379-
pn_transport_t *tport = 0;
1380-
pn_sasl_t *sasl = 0;
1381-
const char *mech = 0;
1382-
const char *user = 0;
1383-
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
1379+
const char *host = 0;
1380+
uint64_t group_ordinal = 0;
1381+
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
1382+
char group_correlator[QD_DISCRIMINATOR_SIZE];
1383+
char host_local[255];
1384+
1385+
rversion[0] = 0;
1386+
group_correlator[0] = 0;
1387+
host_local[0] = 0;
13841388

1385-
rversion[0] = 0;
13861389
conn->strip_annotations_in = false;
13871390
conn->strip_annotations_out = false;
1388-
if (conn->pn_conn) {
1389-
tport = pn_connection_transport(conn->pn_conn);
1390-
}
1391-
if (tport) {
1392-
sasl = pn_sasl(tport);
1393-
if(conn->user_id)
1394-
user = conn->user_id;
1395-
else
1396-
user = pn_transport_get_user(tport);
1397-
}
1398-
1399-
if (sasl)
1400-
mech = pn_sasl_get_mech(sasl);
1391+
qd_router_connection_get_config(conn, &role, &cost, &name,
1392+
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
14011393

1402-
const char *host = 0;
1403-
char host_local[255];
1404-
const qd_server_config_t *config;
14051394
qd_connector_t *connector = qd_connection_connector(conn);
1406-
14071395
if (connector) {
1408-
config = qd_connector_get_config(connector);
1396+
const qd_server_config_t *config = qd_connector_get_server_config(connector);
14091397
snprintf(host_local, 254, "%s", config->host_port);
14101398
host = &host_local[0];
1411-
}
1412-
else
1413-
host = qd_connection_name(conn);
14141399

1415-
1416-
qd_router_connection_get_config(conn, &role, &cost, &name,
1417-
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
1418-
1419-
if (connector && !!connector->ctor_config->data_connection_count) {
1420-
memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
1400+
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
1401+
// value has the most up-to-date security credentials and should take precedence over connections with a lower
1402+
// ordinal value.
1403+
(void) qd_connector_get_tls_ordinal(connector, &group_ordinal);
1404+
memcpy(group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
14211405
if (connector->is_data_connector) {
14221406
// override the configured role to identify this as a data connection
14231407
assert(role == QDR_ROLE_INTER_ROUTER);
14241408
role = QDR_ROLE_INTER_ROUTER_DATA;
14251409
}
1410+
} else {
1411+
host = qd_connection_name(conn);
14261412
}
14271413

14281414
// check offered capabilities for streaming link support and connection trunking support
@@ -1457,10 +1443,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
14571443
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
14581444
pn_data_rewind(props);
14591445
if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
1460-
const size_t num_items = pn_data_get_map(props);
1461-
int props_found = 0; // once all props found exit loop
1446+
1447+
const size_t num_items = pn_data_get_map(props);
1448+
const int max_props = 8; // total possible props
1449+
int props_found = 0; // once all props found exit loop
1450+
14621451
pn_data_enter(props);
1463-
for (int i = 0; i < num_items / 2 && props_found < 7; ++i) {
1452+
for (int i = 0; i < num_items / 2 && props_found < max_props; ++i) {
14641453
if (!pn_data_next(props)) break;
14651454
if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map
14661455
pn_bytes_t key = pn_data_get_symbol(props);
@@ -1493,11 +1482,26 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
14931482
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY) &&
14941483
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY, key.size) == 0) {
14951484
props_found += 1;
1485+
assert(!connector); // expect: connector sets correlator, listener consumes it
14961486
if (!pn_data_next(props)) break;
14971487
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
14981488
if (pn_data_type(props) == PN_STRING) {
1489+
// pn_bytes is not null terminated
14991490
pn_bytes_t gc = pn_data_get_string(props);
1500-
strncpy(conn->group_correlator, gc.start, MIN(gc.size, QD_DISCRIMINATOR_SIZE));
1491+
size_t len = MIN(gc.size, QD_DISCRIMINATOR_BYTES);
1492+
memcpy(group_correlator, gc.start, len);
1493+
group_correlator[len] = '\0';
1494+
}
1495+
}
1496+
1497+
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY) &&
1498+
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY, key.size) == 0) {
1499+
props_found += 1;
1500+
assert(!connector); // expect: connector sets ordinal, listener consumes it
1501+
if (!pn_data_next(props)) break;
1502+
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
1503+
if (pn_data_type(props) == PN_ULONG) {
1504+
group_ordinal = pn_data_get_ulong(props);
15011505
}
15021506
}
15031507

@@ -1530,6 +1534,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15301534

15311535
} else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID)
15321536
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
1537+
props_found += 1;
15331538
if (!pn_data_next(props)) break;
15341539
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
15351540
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
@@ -1539,13 +1544,35 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15391544
// skip this key
15401545
if (!pn_data_next(props)) break;
15411546
}
1547+
1548+
// NOTE: if adding more keys update max_props value above!
15421549
}
15431550
}
15441551
}
15451552

1546-
char *proto = 0;
1547-
char *cipher = 0;
1548-
int ssl_ssf = 0;
1553+
// Gather transport-level information
1554+
1555+
pn_transport_t *tport = 0;
1556+
pn_sasl_t *sasl = 0;
1557+
const char *mech = 0;
1558+
const char *user = 0;
1559+
char *proto = 0;
1560+
char *cipher = 0;
1561+
int ssl_ssf = 0;
1562+
1563+
if (conn->pn_conn) {
1564+
tport = pn_connection_transport(conn->pn_conn);
1565+
}
1566+
if (tport) {
1567+
sasl = pn_sasl(tport);
1568+
if(conn->user_id)
1569+
user = conn->user_id;
1570+
else
1571+
user = pn_transport_get_user(tport);
1572+
}
1573+
1574+
if (sasl)
1575+
mech = pn_sasl_get_mech(sasl);
15491576

15501577
if (conn->ssl) {
15511578
proto = qd_tls_session_get_protocol_version(conn->ssl);
@@ -1567,14 +1594,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15671594
(char*) user,
15681595
container,
15691596
props,
1570-
qd_tls_session_get_profile_ordinal(conn->ssl),
15711597
ssl_ssf,
15721598
!!conn->ssl,
15731599
rversion,
15741600
streaming_links,
15751601
connection_trunking);
15761602

1577-
qdr_connection_info_set_group_correlator(connection_info, conn->group_correlator);
1603+
qdr_connection_info_set_group(connection_info, group_correlator, group_ordinal);
15781604

15791605
qdr_connection_opened(router->router_core,
15801606
amqp_adaptor.adaptor,

src/adaptors/amqp/connection_manager.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en
8080
qd_listener_decref(li);
8181
return 0;
8282
}
83-
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
83+
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
8484
li->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(li->tls_config);
8585
}
8686

src/adaptors/amqp/qd_connection.c

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ static void decorate_connection(qd_connection_t *ctx)
131131

132132
pn_data_put_symbol(pn_connection_properties(conn),
133133
pn_bytes(strlen(QD_CONNECTION_PROPERTY_CONN_ID), QD_CONNECTION_PROPERTY_CONN_ID));
134-
qd_connection_t *qd_conn = pn_connection_get_context(conn);
135-
pn_data_put_int(pn_connection_properties(conn), qd_conn->connection_id);
134+
pn_data_put_int(pn_connection_properties(conn), ctx->connection_id);
136135

137136
if (config && config->inter_router_cost > 1) {
138137
pn_data_put_symbol(pn_connection_properties(conn),
@@ -146,16 +145,22 @@ static void decorate_connection(qd_connection_t *ctx)
146145
pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA);
147146
}
148147

149-
if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) {
148+
// The connector-side assigns the group correlator and ordinal values and passes to the listener side
149+
//
150+
if (ctx->connector && !!ctx->connector->ctor_config->group_correlator[0]) {
150151
uint64_t tls_ordinal;
152+
const qd_connector_config_t *ctor_config = ctx->connector->ctor_config;
151153
pn_data_put_symbol(pn_connection_properties(conn),
152154
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY));
153155
pn_data_put_string(pn_connection_properties(conn),
154-
pn_bytes(strnlen(ctx->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctx->group_correlator));
156+
pn_bytes(strnlen(ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctor_config->group_correlator));
155157

156-
if (qd_connection_get_tls_ordinal(qd_conn, &tls_ordinal)) {
158+
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
159+
// value has the most up-to-date security credentials and should take precedence over connections with a lower
160+
// ordinal value.
161+
if (qd_connector_get_tls_ordinal(ctx->connector, &tls_ordinal)) {
157162
pn_data_put_symbol(pn_connection_properties(conn),
158-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_TLS_ORDINAL), QD_CONNECTION_PROPERTY_TLS_ORDINAL));
163+
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY), QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY));
159164
pn_data_put_ulong(pn_connection_properties(conn), tls_ordinal);
160165
}
161166
}
@@ -852,14 +857,3 @@ void qd_amqp_connection_set_tracing(bool enable_tracing)
852857
sys_mutex_unlock(&amqp_adaptor.lock);
853858
}
854859
}
855-
856-
857-
bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal)
858-
{
859-
if (qd_conn->ssl) {
860-
*tls_ordinal = qd_tls_session_get_profile_ordinal(qd_conn->ssl);
861-
return true;
862-
}
863-
*tls_ordinal = 0;
864-
return false;
865-
}

src/adaptors/amqp/qd_connection.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ struct qd_connection_t {
9999
sys_mutex_t deferred_call_lock;
100100
bool policy_counted;
101101
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
102-
char group_correlator[QD_DISCRIMINATOR_SIZE];
103102
qd_pn_free_link_list_t free_link_list;
104103
bool strip_annotations_in;
105104
bool strip_annotations_out;
@@ -297,13 +296,4 @@ void qd_connection_transport_tracer(pn_transport_t *transport, const char *messa
297296

298297
bool qd_connection_handle_event(qd_server_t *qd_server, pn_event_t *e, void *context);
299298
bool qd_connection_strip_annotations_in(const qd_connection_t *c);
300-
301-
/**
302-
* Get the value of the TLS ordinal that is in use by this connection.
303-
*
304-
* @return True if the TLS ordinal is configured and tls_ordinal has been set, false if the connection has no TLS
305-
* ordinal.
306-
*/
307-
bool qd_connection_get_tls_ordinal(const qd_connection_t *qd_conn, uint64_t *tls_ordinal);
308-
309299
#endif

src/adaptors/amqp/qd_connector.c

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ static void deferred_close(void *context, bool discard)
136136
}
137137

138138

139-
const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c)
139+
const qd_server_config_t *qd_connector_get_server_config(const qd_connector_t *c)
140140
{
141141
return &c->ctor_config->config;
142142
}
@@ -157,11 +157,11 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
157157
connector->reconnect_enabled = true;
158158
connector->is_data_connector = is_data_connector;
159159

160-
connector->ctor_config = ctor_config;
161160
sys_atomic_inc(&ctor_config->ref_count);
162-
163-
connector->conn_index = 1;
164-
connector->state = CTOR_STATE_INIT;
161+
connector->ctor_config = ctor_config;
162+
connector->conn_index = 1;
163+
connector->state = CTOR_STATE_INIT;
164+
connector->tls_ordinal = ctor_config->tls_ordinal;
165165

166166
qd_failover_item_t *item = NEW(qd_failover_item_t);
167167
ZERO(item);
@@ -379,8 +379,6 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx
379379
sys_atomic_inc(&connector->ref_count);
380380
ctx->connector = connector;
381381
connector->qd_conn = ctx;
382-
383-
strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
384382
}
385383

386384

@@ -502,6 +500,8 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
502500
return 0;
503501
}
504502

503+
const bool is_inter_router = strcmp(ctor_config->config.role, "inter-router") == 0;
504+
505505
//
506506
// If an sslProfile is configured allocate a TLS config to be used by all child connector's connections
507507
//
@@ -515,21 +515,20 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
515515
// qd_tls2_config() has set the qd_error_message(), which is logged below
516516
goto error;
517517
}
518-
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
518+
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
519519
ctor_config->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(ctor_config->tls_config);
520-
if (strcmp(ctor_config->config.role, "inter-router") == 0) {
520+
if (is_inter_router) {
521521
qd_tls_config_register_update_callback(ctor_config->tls_config, ctor_config,
522522
handle_connector_ssl_profile_mgmt_update);
523523
}
524524
}
525525

526526
// For inter-router connectors create associated inter-router data connectors if configured
527527

528-
if (strcmp(ctor_config->config.role, "inter-router") == 0) {
528+
if (is_inter_router) {
529+
qd_generate_discriminator(ctor_config->group_correlator);
529530
ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd);
530531
if (!!ctor_config->data_connection_count) {
531-
qd_generate_discriminator(ctor_config->group_correlator);
532-
533532
// Add any data connectors to the head of the connectors list first. This allows the
534533
// router control connector to be located at the head of the list.
535534

@@ -621,3 +620,13 @@ void qd_connector_config_connect(qd_connector_config_t *ctor_config)
621620
}
622621
}
623622

623+
624+
bool qd_connector_get_tls_ordinal(const qd_connector_t *ctor, uint64_t *tls_ordinal)
625+
{
626+
if (!!ctor->ctor_config->tls_config) {
627+
*tls_ordinal = ctor->tls_ordinal;
628+
return true;
629+
}
630+
*tls_ordinal = 0;
631+
return false;
632+
}

0 commit comments

Comments
 (0)