Skip to content

Commit e2a3201

Browse files
committed
Patch refactor:
Based on what I've learned doing POC work in the router core and PR request feedback. The tls ordinal is leveraged as a group attribute for the core. This will allow the core to identify which connection in the group takes precedence. Also removed extra copies of the correlator string and various code cleanups.
1 parent 8a6f770 commit e2a3201

File tree

12 files changed

+132
-83
lines changed

12 files changed

+132
-83
lines changed

include/qpid/dispatch/amqp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ extern const char * const QD_CONNECTION_PROPERTY_VERSION_KEY;
155155
extern const char * const QD_CONNECTION_PROPERTY_COST_KEY;
156156
extern const char * const QD_CONNECTION_PROPERTY_ROLE_KEY;
157157
extern const char * const QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY;
158+
extern const char * const QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY;
158159
extern const char * const QD_CONNECTION_PROPERTY_CONN_ID;
159160
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY;
160161
extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY;

include/qpid/dispatch/protocol_adaptor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
935935
bool streaming_links,
936936
bool connection_trunking);
937937

938-
void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator);
938+
void qdr_connection_info_set_group(qdr_connection_info_t *info, const char *correlator, uint64_t ordinal);
939939
void qdr_connection_info_set_tls(qdr_connection_info_t *info, bool enabled, char *version, char *ciphers, int ssf);
940940

941941
void qd_adaptor_listener_init(void);

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,53 +1376,39 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
13761376
char rversion[128];
13771377
uint64_t connection_id = qd_connection_connection_id(conn);
13781378
pn_connection_t *pn_conn = qd_connection_pn(conn);
1379-
pn_transport_t *tport = 0;
1380-
pn_sasl_t *sasl = 0;
1381-
const char *mech = 0;
1382-
const char *user = 0;
1383-
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
1379+
const char *host = 0;
1380+
uint64_t group_ordinal = 0;
1381+
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
1382+
char group_correlator[QD_DISCRIMINATOR_SIZE];
1383+
char host_local[255];
1384+
1385+
rversion[0] = 0;
1386+
group_correlator[0] = 0;
1387+
host_local[0] = 0;
13841388

1385-
rversion[0] = 0;
13861389
conn->strip_annotations_in = false;
13871390
conn->strip_annotations_out = false;
1388-
if (conn->pn_conn) {
1389-
tport = pn_connection_transport(conn->pn_conn);
1390-
}
1391-
if (tport) {
1392-
sasl = pn_sasl(tport);
1393-
if(conn->user_id)
1394-
user = conn->user_id;
1395-
else
1396-
user = pn_transport_get_user(tport);
1397-
}
1398-
1399-
if (sasl)
1400-
mech = pn_sasl_get_mech(sasl);
1391+
qd_router_connection_get_config(conn, &role, &cost, &name,
1392+
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
14011393

1402-
const char *host = 0;
1403-
char host_local[255];
1404-
const qd_server_config_t *config;
14051394
qd_connector_t *connector = qd_connection_connector(conn);
1406-
14071395
if (connector) {
1408-
config = qd_connector_get_config(connector);
1396+
const qd_server_config_t *config = qd_connector_get_server_config(connector);
14091397
snprintf(host_local, 254, "%s", config->host_port);
14101398
host = &host_local[0];
1411-
}
1412-
else
1413-
host = qd_connection_name(conn);
14141399

1415-
1416-
qd_router_connection_get_config(conn, &role, &cost, &name,
1417-
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
1418-
1419-
if (connector && !!connector->ctor_config->data_connection_count) {
1420-
memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
1400+
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
1401+
// value has the most up-to-date security credentials and should take precedence over connections with a lower
1402+
// ordinal value.
1403+
(void) qd_connector_get_tls_ordinal(connector, &group_ordinal);
1404+
memcpy(group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
14211405
if (connector->is_data_connector) {
14221406
// override the configured role to identify this as a data connection
14231407
assert(role == QDR_ROLE_INTER_ROUTER);
14241408
role = QDR_ROLE_INTER_ROUTER_DATA;
14251409
}
1410+
} else {
1411+
host = qd_connection_name(conn);
14261412
}
14271413

14281414
// check offered capabilities for streaming link support and connection trunking support
@@ -1457,10 +1443,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
14571443
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
14581444
pn_data_rewind(props);
14591445
if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
1460-
const size_t num_items = pn_data_get_map(props);
1461-
int props_found = 0; // once all props found exit loop
1446+
1447+
const size_t num_items = pn_data_get_map(props);
1448+
const int max_props = 8; // total possible props
1449+
int props_found = 0; // once all props found exit loop
1450+
14621451
pn_data_enter(props);
1463-
for (int i = 0; i < num_items / 2 && props_found < 7; ++i) {
1452+
for (int i = 0; i < num_items / 2 && props_found < max_props; ++i) {
14641453
if (!pn_data_next(props)) break;
14651454
if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map
14661455
pn_bytes_t key = pn_data_get_symbol(props);
@@ -1493,11 +1482,26 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
14931482
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY) &&
14941483
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY, key.size) == 0) {
14951484
props_found += 1;
1485+
assert(!connector); // expect: connector sets correlator, listener consumes it
14961486
if (!pn_data_next(props)) break;
14971487
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
14981488
if (pn_data_type(props) == PN_STRING) {
1489+
// pn_bytes is not null terminated
14991490
pn_bytes_t gc = pn_data_get_string(props);
1500-
strncpy(conn->group_correlator, gc.start, MIN(gc.size, QD_DISCRIMINATOR_SIZE));
1491+
size_t len = MIN(gc.size, QD_DISCRIMINATOR_BYTES);
1492+
memcpy(group_correlator, gc.start, len);
1493+
group_correlator[len] = '\0';
1494+
}
1495+
}
1496+
1497+
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY) &&
1498+
strncmp(key.start, QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY, key.size) == 0) {
1499+
props_found += 1;
1500+
assert(!connector); // expect: connector sets ordinal, listener consumes it
1501+
if (!pn_data_next(props)) break;
1502+
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA) {
1503+
if (pn_data_type(props) == PN_ULONG) {
1504+
group_ordinal = pn_data_get_ulong(props);
15011505
}
15021506
}
15031507

@@ -1530,6 +1534,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15301534

15311535
} else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID)
15321536
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
1537+
props_found += 1;
15331538
if (!pn_data_next(props)) break;
15341539
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
15351540
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
@@ -1539,13 +1544,35 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15391544
// skip this key
15401545
if (!pn_data_next(props)) break;
15411546
}
1547+
1548+
// NOTE: if adding more keys update max_props value above!
15421549
}
15431550
}
15441551
}
15451552

1546-
char *proto = 0;
1547-
char *cipher = 0;
1548-
int ssl_ssf = 0;
1553+
// Gather transport-level information
1554+
1555+
pn_transport_t *tport = 0;
1556+
pn_sasl_t *sasl = 0;
1557+
const char *mech = 0;
1558+
const char *user = 0;
1559+
char *proto = 0;
1560+
char *cipher = 0;
1561+
int ssl_ssf = 0;
1562+
1563+
if (conn->pn_conn) {
1564+
tport = pn_connection_transport(conn->pn_conn);
1565+
}
1566+
if (tport) {
1567+
sasl = pn_sasl(tport);
1568+
if(conn->user_id)
1569+
user = conn->user_id;
1570+
else
1571+
user = pn_transport_get_user(tport);
1572+
}
1573+
1574+
if (sasl)
1575+
mech = pn_sasl_get_mech(sasl);
15491576

15501577
if (conn->ssl) {
15511578
proto = qd_tls_session_get_protocol_version(conn->ssl);
@@ -1574,7 +1601,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15741601
streaming_links,
15751602
connection_trunking);
15761603

1577-
qdr_connection_info_set_group_correlator(connection_info, conn->group_correlator);
1604+
qdr_connection_info_set_group(connection_info, group_correlator, group_ordinal);
15781605

15791606
qdr_connection_opened(router->router_core,
15801607
amqp_adaptor.adaptor,

src/adaptors/amqp/connection_manager.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en
8080
qd_listener_decref(li);
8181
return 0;
8282
}
83-
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
83+
li->tls_ordinal = qd_tls_config_get_ordinal(li->tls_config);
8484
li->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(li->tls_config);
8585
}
8686

src/adaptors/amqp/qd_connection.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ static void decorate_connection(qd_connection_t *ctx)
131131

132132
pn_data_put_symbol(pn_connection_properties(conn),
133133
pn_bytes(strlen(QD_CONNECTION_PROPERTY_CONN_ID), QD_CONNECTION_PROPERTY_CONN_ID));
134-
qd_connection_t *qd_conn = pn_connection_get_context(conn);
135-
pn_data_put_int(pn_connection_properties(conn), qd_conn->connection_id);
134+
pn_data_put_int(pn_connection_properties(conn), ctx->connection_id);
136135

137136
if (config && config->inter_router_cost > 1) {
138137
pn_data_put_symbol(pn_connection_properties(conn),
@@ -146,16 +145,22 @@ static void decorate_connection(qd_connection_t *ctx)
146145
pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA);
147146
}
148147

149-
if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) {
148+
// The connector-side assigns the group correlator and ordinal values and passes it to the listener side
149+
//
150+
if (ctx->connector && !!ctx->connector->ctor_config->group_correlator[0]) {
150151
uint64_t tls_ordinal;
152+
const qd_connector_config_t *ctor_config = ctx->connector->ctor_config;
151153
pn_data_put_symbol(pn_connection_properties(conn),
152154
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY));
153155
pn_data_put_string(pn_connection_properties(conn),
154-
pn_bytes(strnlen(ctx->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctx->group_correlator));
156+
pn_bytes(strnlen(ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE - 1), ctor_config->group_correlator));
155157

156-
if (qd_connection_get_tls_ordinal(qd_conn, &tls_ordinal)) {
158+
// Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
159+
// value has the most up-to-date security credentials and should take precedence over connections with a lower
160+
// ordinal value.
161+
if (qd_connector_get_tls_ordinal(ctx->connector, &tls_ordinal)) {
157162
pn_data_put_symbol(pn_connection_properties(conn),
158-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_TLS_ORDINAL), QD_CONNECTION_PROPERTY_TLS_ORDINAL));
163+
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY), QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY));
159164
pn_data_put_ulong(pn_connection_properties(conn), tls_ordinal);
160165
}
161166
}

src/adaptors/amqp/qd_connection.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ struct qd_connection_t {
9999
sys_mutex_t deferred_call_lock;
100100
bool policy_counted;
101101
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
102-
char group_correlator[QD_DISCRIMINATOR_SIZE];
103102
qd_pn_free_link_list_t free_link_list;
104103
bool strip_annotations_in;
105104
bool strip_annotations_out;

src/adaptors/amqp/qd_connector.c

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ static void deferred_close(void *context, bool discard)
136136
}
137137

138138

139-
const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c)
139+
const qd_server_config_t *qd_connector_get_server_config(const qd_connector_t *c)
140140
{
141141
return &c->ctor_config->config;
142142
}
@@ -157,11 +157,11 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
157157
connector->reconnect_enabled = true;
158158
connector->is_data_connector = is_data_connector;
159159

160-
connector->ctor_config = ctor_config;
161160
sys_atomic_inc(&ctor_config->ref_count);
162-
163-
connector->conn_index = 1;
164-
connector->state = CTOR_STATE_INIT;
161+
connector->ctor_config = ctor_config;
162+
connector->conn_index = 1;
163+
connector->state = CTOR_STATE_INIT;
164+
connector->tls_ordinal = ctor_config->tls_ordinal;
165165

166166
qd_failover_item_t *item = NEW(qd_failover_item_t);
167167
ZERO(item);
@@ -379,8 +379,6 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx
379379
sys_atomic_inc(&connector->ref_count);
380380
ctx->connector = connector;
381381
connector->qd_conn = ctx;
382-
383-
strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
384382
}
385383

386384

@@ -502,6 +500,8 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
502500
return 0;
503501
}
504502

503+
const bool is_inter_router = strcmp(ctor_config->config.role, "inter-router") == 0;
504+
505505
//
506506
// If an sslProfile is configured allocate a TLS config to be used by all child connector's connections
507507
//
@@ -515,21 +515,20 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
515515
// qd_tls2_config() has set the qd_error_message(), which is logged below
516516
goto error;
517517
}
518-
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
518+
ctor_config->tls_ordinal = qd_tls_config_get_ordinal(ctor_config->tls_config);
519519
ctor_config->tls_oldest_valid_ordinal = qd_tls_config_get_oldest_valid_ordinal(ctor_config->tls_config);
520-
if (strcmp(ctor_config->config.role, "inter-router") == 0) {
520+
if (is_inter_router) {
521521
qd_tls_config_register_update_callback(ctor_config->tls_config, ctor_config,
522522
handle_connector_ssl_profile_mgmt_update);
523523
}
524524
}
525525

526526
// For inter-router connectors create associated inter-router data connectors if configured
527527

528-
if (strcmp(ctor_config->config.role, "inter-router") == 0) {
528+
if (is_inter_router) {
529+
qd_generate_discriminator(ctor_config->group_correlator);
529530
ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd);
530531
if (!!ctor_config->data_connection_count) {
531-
qd_generate_discriminator(ctor_config->group_correlator);
532-
533532
// Add any data connectors to the head of the connectors list first. This allows the
534533
// router control connector to be located at the head of the list.
535534

@@ -621,3 +620,13 @@ void qd_connector_config_connect(qd_connector_config_t *ctor_config)
621620
}
622621
}
623622

623+
624+
bool qd_connector_get_tls_ordinal(const qd_connector_t *ctor, uint64_t *tls_ordinal)
625+
{
626+
if (!!ctor->ctor_config->tls_config) {
627+
*tls_ordinal = ctor->tls_ordinal;
628+
return true;
629+
}
630+
*tls_ordinal = 0;
631+
return false;
632+
}

src/adaptors/amqp/qd_connector.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ typedef struct qd_connector_t {
5858
sys_atomic_t ref_count;
5959
qd_timer_t *timer;
6060
long delay;
61+
uint64_t tls_ordinal; // ordinal that was in effect when created
6162

6263
/* Connector state and qd_conn can be modified by I/O or management threads. */
6364
sys_mutex_t lock;
@@ -154,7 +155,7 @@ void qd_connector_close(qd_connector_t *ctor);
154155

155156
void qd_connector_decref(qd_connector_t *ctor);
156157

157-
const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ctor);
158+
const qd_server_config_t *qd_connector_get_server_config(const qd_connector_t *ctor);
158159
const char *qd_connector_get_group_correlator(const qd_connector_t *ctor);
159160
bool qd_connector_has_failover_info(const qd_connector_t* ctor);
160161
const char *qd_connector_policy_vhost(const qd_connector_t* ctor);
@@ -165,6 +166,9 @@ void qd_connector_remote_opened(qd_connector_t *ctor);
165166
void qd_connector_add_connection(qd_connector_t *ctor, qd_connection_t *qd_conn);
166167
void qd_connector_add_link(qd_connector_t *ctor);
167168

169+
// return True if ordinal is used, return false if no ordinal configured
170+
bool qd_connector_get_tls_ordinal(const qd_connector_t *ctor, uint64_t *ordinal);
171+
168172
// remove the child connection
169173
// NOTE WELL: this may free the connector if the connection is holding the last
170174
// reference to it

src/amqp.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const char * const QD_CONNECTION_PROPERTY_VERSION_KEY = "version";
4747
const char * const QD_CONNECTION_PROPERTY_COST_KEY = "qd.inter-router-cost";
4848
const char * const QD_CONNECTION_PROPERTY_ROLE_KEY = "qd.inter-router-role";
4949
const char * const QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY = "qd.group-correlator";
50+
const char * const QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY = "qd.group-ordinal";
5051
const char * const QD_CONNECTION_PROPERTY_CONN_ID = "qd.conn-id";
5152
const char * const QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY = "failover-server-list";
5253
const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY = "network-host";
@@ -57,7 +58,6 @@ const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY = "qd.adaptor";
5758
const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE = "tcp";
5859
const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY = "qd.annotations-version";
5960
const char * const QD_CONNECTION_PROPERTY_ACCESS_ID = "qd.access-id";
60-
const char * const QD_CONNECTION_PROPERTY_TLS_ORDINAL = "qd.tls-ordinal";
6161

6262
const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING = "_$qd.edge_addr_tracking";
6363
const char * const QD_TERMINUS_HEARTBEAT = "_$qd.edge_heartbeat";

src/router_core/connections.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,18 +224,19 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
224224
pn_data_copy(qdr_conn_properties, connection_properties);
225225

226226
connection_info->connection_properties = qdr_conn_properties;
227-
connection_info->tls_ssf = tls_ssf;
228-
connection_info->tls = tls;
229-
connection_info->tls_ordinal = tls_ordinal;
230-
connection_info->streaming_links = streaming_links;
231-
connection_info->connection_trunking = connection_trunking;
227+
connection_info->tls_ssf = tls_ssf;
228+
connection_info->tls = tls;
229+
connection_info->tls_ordinal = tls_ordinal;
230+
connection_info->streaming_links = streaming_links;
231+
connection_info->connection_trunking = connection_trunking;
232232
sys_mutex_init(&connection_info->connection_info_lock);
233233
return connection_info;
234234
}
235235

236236

237-
void qdr_connection_info_set_group_correlator(qdr_connection_info_t *info, const char *correlator)
237+
void qdr_connection_info_set_group(qdr_connection_info_t *info, const char *correlator, uint64_t ordinal)
238238
{
239+
info->group_ordinal = ordinal;
239240
memcpy(info->group_correlator, correlator, QD_DISCRIMINATOR_SIZE);
240241
}
241242

0 commit comments

Comments
 (0)