@@ -1376,53 +1376,39 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1376
1376
char rversion [128 ];
1377
1377
uint64_t connection_id = qd_connection_connection_id (conn );
1378
1378
pn_connection_t * pn_conn = qd_connection_pn (conn );
1379
- pn_transport_t * tport = 0 ;
1380
- pn_sasl_t * sasl = 0 ;
1381
- const char * mech = 0 ;
1382
- const char * user = 0 ;
1383
- const char * container = conn -> pn_conn ? pn_connection_remote_container (conn -> pn_conn ) : 0 ;
1379
+ const char * host = 0 ;
1380
+ uint64_t group_ordinal = 0 ;
1381
+ const char * container = conn -> pn_conn ? pn_connection_remote_container (conn -> pn_conn ) : 0 ;
1382
+ char group_correlator [QD_DISCRIMINATOR_SIZE ];
1383
+ char host_local [255 ];
1384
+
1385
+ rversion [0 ] = 0 ;
1386
+ group_correlator [0 ] = 0 ;
1387
+ host_local [0 ] = 0 ;
1384
1388
1385
- rversion [0 ] = 0 ;
1386
1389
conn -> strip_annotations_in = false;
1387
1390
conn -> strip_annotations_out = false;
1388
- if (conn -> pn_conn ) {
1389
- tport = pn_connection_transport (conn -> pn_conn );
1390
- }
1391
- if (tport ) {
1392
- sasl = pn_sasl (tport );
1393
- if (conn -> user_id )
1394
- user = conn -> user_id ;
1395
- else
1396
- user = pn_transport_get_user (tport );
1397
- }
1398
-
1399
- if (sasl )
1400
- mech = pn_sasl_get_mech (sasl );
1391
+ qd_router_connection_get_config (conn , & role , & cost , & name ,
1392
+ & conn -> strip_annotations_in , & conn -> strip_annotations_out , & link_capacity );
1401
1393
1402
- const char * host = 0 ;
1403
- char host_local [255 ];
1404
- const qd_server_config_t * config ;
1405
1394
qd_connector_t * connector = qd_connection_connector (conn );
1406
-
1407
1395
if (connector ) {
1408
- config = qd_connector_get_config (connector );
1396
+ const qd_server_config_t * config = qd_connector_get_server_config (connector );
1409
1397
snprintf (host_local , 254 , "%s" , config -> host_port );
1410
1398
host = & host_local [0 ];
1411
- }
1412
- else
1413
- host = qd_connection_name (conn );
1414
1399
1415
-
1416
- qd_router_connection_get_config (conn , & role , & cost , & name ,
1417
- & conn -> strip_annotations_in , & conn -> strip_annotations_out , & link_capacity );
1418
-
1419
- if (connector && !!connector -> ctor_config -> data_connection_count ) {
1420
- memcpy (conn -> group_correlator , connector -> ctor_config -> group_correlator , QD_DISCRIMINATOR_SIZE );
1400
+ // Use the connectors tls_ordinal value as the group ordinal because the connection with the highest tls_ordinal
1401
+ // value has the most up-to-date security credentials and should take precedence over connections with a lower
1402
+ // ordinal value.
1403
+ (void ) qd_connector_get_tls_ordinal (connector , & group_ordinal );
1404
+ memcpy (group_correlator , connector -> ctor_config -> group_correlator , QD_DISCRIMINATOR_SIZE );
1421
1405
if (connector -> is_data_connector ) {
1422
1406
// override the configured role to identify this as a data connection
1423
1407
assert (role == QDR_ROLE_INTER_ROUTER );
1424
1408
role = QDR_ROLE_INTER_ROUTER_DATA ;
1425
1409
}
1410
+ } else {
1411
+ host = qd_connection_name (conn );
1426
1412
}
1427
1413
1428
1414
// check offered capabilities for streaming link support and connection trunking support
@@ -1457,10 +1443,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1457
1443
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION );
1458
1444
pn_data_rewind (props );
1459
1445
if (pn_data_next (props ) && pn_data_type (props ) == PN_MAP ) {
1460
- const size_t num_items = pn_data_get_map (props );
1461
- int props_found = 0 ; // once all props found exit loop
1446
+
1447
+ const size_t num_items = pn_data_get_map (props );
1448
+ const int max_props = 8 ; // total possible props
1449
+ int props_found = 0 ; // once all props found exit loop
1450
+
1462
1451
pn_data_enter (props );
1463
- for (int i = 0 ; i < num_items / 2 && props_found < 7 ; ++ i ) {
1452
+ for (int i = 0 ; i < num_items / 2 && props_found < max_props ; ++ i ) {
1464
1453
if (!pn_data_next (props )) break ;
1465
1454
if (pn_data_type (props ) != PN_SYMBOL ) break ; // invalid properties map
1466
1455
pn_bytes_t key = pn_data_get_symbol (props );
@@ -1493,11 +1482,26 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1493
1482
} else if (key .size == strlen (QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY ) &&
1494
1483
strncmp (key .start , QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY , key .size ) == 0 ) {
1495
1484
props_found += 1 ;
1485
+ assert (!connector ); // expect: connector sets correlator, listener consumes it
1496
1486
if (!pn_data_next (props )) break ;
1497
1487
if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA ) {
1498
1488
if (pn_data_type (props ) == PN_STRING ) {
1489
+ // pn_bytes is not null terminated
1499
1490
pn_bytes_t gc = pn_data_get_string (props );
1500
- strncpy (conn -> group_correlator , gc .start , MIN (gc .size , QD_DISCRIMINATOR_SIZE ));
1491
+ size_t len = MIN (gc .size , QD_DISCRIMINATOR_BYTES );
1492
+ memcpy (group_correlator , gc .start , len );
1493
+ group_correlator [len ] = '\0' ;
1494
+ }
1495
+ }
1496
+
1497
+ } else if (key .size == strlen (QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY ) &&
1498
+ strncmp (key .start , QD_CONNECTION_PROPERTY_GROUP_ORDINAL_KEY , key .size ) == 0 ) {
1499
+ props_found += 1 ;
1500
+ assert (!connector ); // expect: connector sets ordinal, listener consumes it
1501
+ if (!pn_data_next (props )) break ;
1502
+ if (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_INTER_ROUTER_DATA ) {
1503
+ if (pn_data_type (props ) == PN_ULONG ) {
1504
+ group_ordinal = pn_data_get_ulong (props );
1501
1505
}
1502
1506
}
1503
1507
@@ -1530,6 +1534,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1530
1534
1531
1535
} else if ((key .size == strlen (QD_CONNECTION_PROPERTY_ACCESS_ID )
1532
1536
&& strncmp (key .start , QD_CONNECTION_PROPERTY_ACCESS_ID , key .size ) == 0 )) {
1537
+ props_found += 1 ;
1533
1538
if (!pn_data_next (props )) break ;
1534
1539
if (!!connector && !!connector -> vflow_record && pn_data_type (props ) == PN_STRING ) {
1535
1540
vflow_set_ref_from_pn (connector -> vflow_record , VFLOW_ATTRIBUTE_PEER , props );
@@ -1539,13 +1544,35 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1539
1544
// skip this key
1540
1545
if (!pn_data_next (props )) break ;
1541
1546
}
1547
+
1548
+ // NOTE: if adding more keys update max_props value above!
1542
1549
}
1543
1550
}
1544
1551
}
1545
1552
1546
- char * proto = 0 ;
1547
- char * cipher = 0 ;
1548
- int ssl_ssf = 0 ;
1553
+ // Gather transport-level information
1554
+
1555
+ pn_transport_t * tport = 0 ;
1556
+ pn_sasl_t * sasl = 0 ;
1557
+ const char * mech = 0 ;
1558
+ const char * user = 0 ;
1559
+ char * proto = 0 ;
1560
+ char * cipher = 0 ;
1561
+ int ssl_ssf = 0 ;
1562
+
1563
+ if (conn -> pn_conn ) {
1564
+ tport = pn_connection_transport (conn -> pn_conn );
1565
+ }
1566
+ if (tport ) {
1567
+ sasl = pn_sasl (tport );
1568
+ if (conn -> user_id )
1569
+ user = conn -> user_id ;
1570
+ else
1571
+ user = pn_transport_get_user (tport );
1572
+ }
1573
+
1574
+ if (sasl )
1575
+ mech = pn_sasl_get_mech (sasl );
1549
1576
1550
1577
if (conn -> ssl ) {
1551
1578
proto = qd_tls_session_get_protocol_version (conn -> ssl );
@@ -1567,13 +1594,14 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
1567
1594
(char * ) user ,
1568
1595
container ,
1569
1596
props ,
1597
+ qd_tls_session_get_ssl_profile_ordinal (conn -> ssl ),
1570
1598
ssl_ssf ,
1571
1599
!!conn -> ssl ,
1572
1600
rversion ,
1573
1601
streaming_links ,
1574
1602
connection_trunking );
1575
1603
1576
- qdr_connection_info_set_group_correlator (connection_info , conn -> group_correlator );
1604
+ qdr_connection_info_set_group (connection_info , group_correlator , group_ordinal );
1577
1605
1578
1606
qdr_connection_opened (router -> router_core ,
1579
1607
amqp_adaptor .adaptor ,
0 commit comments