Skip to content

Commit 9c01acd

Browse files
authored
Fixes #1793: Move the LINK vflow record from the connector object to connector_config object. This ensures (#1794)
that only one LINK record exists per logical link Signed-off-by: Ganesh Murthy <[email protected]>
1 parent e00826a commit 9c01acd

File tree

5 files changed

+131
-49
lines changed

5 files changed

+131
-49
lines changed

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
569569
//
570570
// Bump LINK metrics if appropriate
571571
//
572-
if (!!conn->connector && !!conn->connector->vflow_record) {
573-
vflow_inc_counter(conn->connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, (uint64_t) octets_received);
572+
if (!!conn->connector && !!conn->connector->ctor_config && !!conn->connector->ctor_config->vflow_record) {
573+
vflow_inc_counter(conn->connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, (uint64_t) octets_received);
574574
}
575575

576576
// check if cut-through can be enabled or disabled
@@ -1535,8 +1535,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15351535
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
15361536
props_found += 1;
15371537
if (!pn_data_next(props)) break;
1538-
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
1539-
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
1538+
if (!!connector && !!connector->ctor_config && !!connector->ctor_config->vflow_record && pn_data_type(props) == PN_STRING) {
1539+
vflow_set_ref_from_pn(connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
15401540
}
15411541

15421542
} else {
@@ -2143,8 +2143,8 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
21432143
//
21442144
// Bump LINK metrics if appropriate
21452145
//
2146-
if (!!qconn->connector && !!qconn->connector->vflow_record) {
2147-
vflow_inc_counter(qconn->connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, (uint64_t) octets_sent);
2146+
if (!!qconn->connector && !!qconn->connector->ctor_config && !!qconn->connector->ctor_config->vflow_record) {
2147+
vflow_inc_counter(qconn->connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS, (uint64_t) octets_sent);
21482148
}
21492149

21502150
//

src/adaptors/amqp/qd_connector.c

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ static void qd_connector_config_cleanup_conns(void *context)
108108
static qd_error_t qd_connector_config_create_connectors(qd_connector_config_t *ctor_config)
109109
{
110110
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list!
111-
112111
qd_error_clear();
113112

114113
// The connector configuration for inter-router connections may define a data connection count. This is the number
@@ -282,25 +281,6 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
282281
snprintf(item->host_port, hplen, "%s:%s", item->host , item->port);
283282
DEQ_INSERT_TAIL(connector->conn_info_list, item);
284283

285-
//
286-
// Set up the vanflow record for this connector (LINK)
287-
// Do this only for router-to-router connectors since the record represents an inter-router link
288-
//
289-
if ((strcmp(ctor_config->config.role, "inter-router") == 0 && !is_data_connector) ||
290-
strcmp(ctor_config->config.role, "edge") == 0 ||
291-
strcmp(ctor_config->config.role, "inter-edge") == 0) {
292-
connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0);
293-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name);
294-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role);
295-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost);
296-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
297-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0);
298-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme);
299-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host);
300-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port);
301-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0);
302-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0);
303-
}
304284
return connector;
305285
}
306286

@@ -360,8 +340,6 @@ void qd_connector_decref(qd_connector_t* connector)
360340
assert(connector->qd_conn == 0);
361341

362342
qd_connector_config_decref(connector->ctor_config);
363-
vflow_end_record(connector->vflow_record);
364-
connector->vflow_record = 0;
365343
qd_timer_free(connector->reconnect_timer);
366344
sys_mutex_free(&connector->lock);
367345
sys_atomic_destroy(&connector->ref_count);
@@ -473,25 +451,34 @@ void qd_connector_remote_opened(qd_connector_t *connector)
473451
/**
474452
* Set the child connection of the connector
475453
*/
476-
void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx)
454+
void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *qd_conn)
477455
{
478-
assert(ctx->connector == 0);
479-
456+
assert(qd_conn->connector == 0);
480457
sys_atomic_inc(&connector->ref_count);
481-
ctx->connector = connector;
482-
connector->qd_conn = ctx;
458+
qd_conn->connector = connector;
459+
connector->qd_conn = qd_conn;
460+
if (!connector->is_data_connector) {
461+
qd_connector_config_t *ctor_config = connector->ctor_config;
462+
sys_atomic_inc(&ctor_config->active_control_conn_count);
463+
}
483464
}
484465

485466

486467
void qd_connector_add_link(qd_connector_t *connector)
487468
{
488469
if (!connector->is_data_connector) {
489-
if (connector->vflow_record && connector->ctor_config->tls_config) {
490-
// connector->ctor_config->tls_ordinal is set in the handle_connector_ssl_profile_mgmt_update() callback
491-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_ACTIVE_TLS_ORDINAL, connector->ctor_config->tls_ordinal);
470+
qd_connector_config_t *ctor_config = connector->ctor_config;
471+
if (ctor_config && ctor_config->vflow_record) {
472+
if (ctor_config->tls_config) {
473+
// connector->ctor_config->tls_ordinal is set in the handle_connector_ssl_profile_mgmt_update() callback
474+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_ACTIVE_TLS_ORDINAL, connector->ctor_config->tls_ordinal);
475+
}
476+
if (sys_atomic_get(&ctor_config->active_control_conn_count) == 1) {
477+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up");
478+
vflow_set_timestamp_now(ctor_config->vflow_record, VFLOW_ATTRIBUTE_UP_TIMESTAMP);
479+
}
480+
492481
}
493-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up");
494-
vflow_set_timestamp_now(connector->vflow_record, VFLOW_ATTRIBUTE_UP_TIMESTAMP);
495482
connector->oper_status_down = false;
496483
}
497484
}
@@ -505,14 +492,25 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const
505492
{
506493
sys_mutex_lock(&connector->lock);
507494

495+
if (!connector->is_data_connector) {
496+
qd_connector_config_t *ctor_config = connector->ctor_config;
497+
sys_atomic_dec(&ctor_config->active_control_conn_count);
498+
}
499+
500+
508501
qd_connection_t *ctx = connector->qd_conn;
509502
if (!connector->is_data_connector && !connector->oper_status_down && !final) {
510503
connector->oper_status_down = true;
511-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
512-
vflow_inc_counter(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 1);
513-
vflow_set_timestamp_now(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_TIMESTAMP);
514-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_RESULT, condition_name ? condition_name : "unknown");
515-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_REASON, condition_description ? condition_description : "");
504+
qd_connector_config_t *ctor_config = connector->ctor_config;
505+
// If there are no active control connections, we can safely assume that
506+
// the operation status of the LINK record is "down"
507+
if (ctor_config && ctor_config->vflow_record && sys_atomic_get(&ctor_config->active_control_conn_count) == 0) {
508+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
509+
vflow_inc_counter(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 1);
510+
vflow_set_timestamp_now(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_TIMESTAMP);
511+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_RESULT, condition_name ? condition_name : "unknown");
512+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_REASON, condition_description ? condition_description : "");
513+
}
516514
}
517515
connector->qd_conn = 0;
518516
ctx->connector = 0;
@@ -644,6 +642,7 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
644642
ZERO(ctor_config);
645643
DEQ_ITEM_INIT(ctor_config);
646644
sys_atomic_init(&ctor_config->ref_count, 1); // for caller
645+
sys_atomic_init(&ctor_config->active_control_conn_count, 0);
647646
ctor_config->server = qd_dispatch_get_server(qd);
648647
DEQ_INIT(ctor_config->connectors);
649648
ctor_config->cleanup_timer = qd_timer(amqp_adaptor.dispatch, qd_connector_config_cleanup_conns, ctor_config);
@@ -663,6 +662,7 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
663662

664663
const bool is_inter_router = strcmp(ctor_config->config.role, "inter-router") == 0;
665664
const bool is_edge = strcmp(ctor_config->config.role, "edge") == 0;
665+
const bool is_inter_edge = strcmp(ctor_config->config.role, "inter-edge") == 0;
666666

667667
//
668668
// If an sslProfile is configured allocate a TLS config to be used by all child connector's connections
@@ -684,9 +684,25 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
684684
handle_connector_ssl_profile_mgmt_update);
685685
}
686686
}
687-
687+
if (is_inter_router || is_edge || is_inter_edge) {
688+
ctor_config->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0);
689+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name);
690+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role);
691+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost);
692+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
693+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0);
694+
if (ctor_config->config.ssl_required) {
695+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, "amqps");
696+
} else {
697+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, "amqp");
698+
}
699+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, ctor_config->config.port);
700+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0);
701+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0);
702+
}
703+
//
688704
// For inter-router connectors generate a group correlator and configure the data connection count
689-
705+
//
690706
if (is_inter_router) {
691707
qd_generate_discriminator(ctor_config->group_correlator);
692708
ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd);
@@ -742,13 +758,19 @@ void qd_connector_config_decref(qd_connector_config_t *ctor_config)
742758
assert(rc > 0); // else underflow
743759

744760
if (rc == 1) {
761+
if (ctor_config->vflow_record) {
762+
vflow_end_record(ctor_config->vflow_record);
763+
ctor_config->vflow_record = 0;
764+
}
765+
745766
// Expect: all connectors hold the ref_count so this must be empty
746767
assert(DEQ_IS_EMPTY(ctor_config->connectors));
747768

748769
// free the timer first otherwise the callback can run and attempt to access ctor_config while it is being torn
749770
// down:
750771
qd_timer_free(ctor_config->cleanup_timer);
751772
sys_atomic_destroy(&ctor_config->ref_count);
773+
sys_atomic_destroy(&ctor_config->active_control_conn_count);
752774
free(ctor_config->policy_vhost);
753775
qd_tls_config_decref(ctor_config->tls_config);
754776
qd_server_config_free(&ctor_config->config);

src/adaptors/amqp/qd_connector.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ typedef struct qd_connector_t {
6464
sys_mutex_t lock;
6565
connector_state_t state;
6666
qd_connection_t *qd_conn;
67-
vflow_record_t *vflow_record;
6867
bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications.
6968
bool is_data_connector; // inter-router conn for streaming messages
7069

@@ -96,13 +95,13 @@ struct qd_connector_config_t {
9695
qd_server_t *server;
9796
char *policy_vhost; /* Optional policy vhost name */
9897
qd_timer_t *cleanup_timer; /* remove quiesced connectors */
99-
98+
vflow_record_t *vflow_record; /* vflow record for VFLOW_RECORD_LINK */
10099
// TLS Configuration. Keep a local copy of the TLS ordinals to monitor changes by management
101100
qd_tls_config_t *tls_config;
102101
uint64_t tls_ordinal;
103102
uint64_t tls_oldest_valid_ordinal;
104103
uint32_t data_connection_count; // # of child inter-router data connections
105-
104+
sys_atomic_t active_control_conn_count;
106105
// The group correlation id for all child connectors/connections
107106
char group_correlator[QD_DISCRIMINATOR_SIZE];
108107

tests/system_tests_cert_rotation.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from system_test import CLIENT2_CERTIFICATE, CLIENT2_PRIVATE_KEY, CLIENT2_PRIVATE_KEY_PASSWORD
3232
from system_test import SERVER2_CERTIFICATE, SERVER2_PRIVATE_KEY, SERVER2_PRIVATE_KEY_PASSWORD
3333
from tcp_streamer import TcpStreamerThread
34+
from vanflow_snooper import VFlowSnooperThread, ANY_VALUE
3435

3536

3637
class InterRouterCertRotationTest(TestCase):
@@ -122,13 +123,55 @@ def test_01_ordinal_updates(self):
122123
zero_ordinals = [c for c in irc if c['tlsOrdinal'] == 0]
123124
self.assertEqual(data_conn_count + 1, len(zero_ordinals), f"Missing conns: {zero_ordinals}")
124125

126+
snooper_thread = VFlowSnooperThread(router_C.addresses[0])
127+
128+
expected = {
129+
'RouterL': [('ROUTER_ACCESS', {'LINK_COUNT': 1,
130+
'ROLE': 'inter-router',
131+
'IDENTITY': ANY_VALUE})],
132+
'RouterC': [('LINK', {'PEER': ANY_VALUE,
133+
'OPER_STATUS': 'up',
134+
'ACTIVE_TLS_ORDINAL': 0,
135+
'ROLE': 'inter-router'})]
136+
}
137+
success = retry(lambda: snooper_thread.match_records(expected), delay=2)
138+
self.assertTrue(success, f"Failed to match records {snooper_thread.get_results()}")
139+
140+
# Before incrementing the ordinal on the sslProfile, get the vflow LINK record
141+
# and capture its identity and start time. After we update the ordinal, we will
142+
# make sure that the LINK record we obtain then has the same ordinal and start time
143+
# as the current LINK record. This will prove that the LINK records were not recreated
144+
# across ordinal updates.
145+
link_vflow_recs = snooper_thread.get_router_records("RouterC", record_type='LINK')
146+
link_vflow_dict = link_vflow_recs[0]
147+
link_identity = link_vflow_dict['IDENTITY']
148+
link_start_time = link_vflow_dict['START_TIME']
149+
125150
# update tlsOrdinal to 3 and wait for new conns to appear
126151
router_C.management.update(type=SSL_PROFILE_TYPE,
127152
attributes={'ordinal': 3},
128153
name='ConnectorSslProfile')
129154
self.wait_inter_router_conns(router_C, 2 * (data_conn_count + 1))
130155
self.wait_inter_router_conns(router_L, 2 * (data_conn_count + 1))
131156

157+
# The ordinal has been updated to 3, check to see if the LINK has the correct
158+
# ordinal value of 3
159+
expected = {
160+
'RouterC': [('LINK', {'PEER': ANY_VALUE,
161+
'OPER_STATUS': 'up',
162+
'ACTIVE_TLS_ORDINAL': 3,
163+
'ROLE': 'inter-router'})]
164+
}
165+
success = retry(lambda: snooper_thread.match_records(expected), delay=2)
166+
self.assertTrue(success, f"Failed to match records {snooper_thread.get_results()}")
167+
168+
# Check to see if there is still the same link
169+
# record as from before the ordinal was updated.
170+
link_vflow_recs = snooper_thread.get_router_records("RouterC", record_type='LINK')
171+
link_vflow_dict = link_vflow_recs[0]
172+
self.assertEqual(link_identity, link_vflow_dict['IDENTITY'])
173+
self.assertEqual(link_start_time, link_vflow_dict['START_TIME'])
174+
132175
# Update oldestValidOrdinal to 3. Expect the older connections with an
133176
# ordinal value of 0 to be deleted
134177
router_C.management.update(type=SSL_PROFILE_TYPE,
@@ -137,13 +180,30 @@ def test_01_ordinal_updates(self):
137180
self.wait_inter_router_conns(router_L, data_conn_count + 1)
138181
self.wait_inter_router_conns(router_C, data_conn_count + 1)
139182

183+
# The oldestValidOrdinal has been updated to 3, we will check to see if there is
184+
# still the same link record as from before the ordinal was updated.
185+
link_vflow_recs = snooper_thread.get_router_records("RouterC", record_type='LINK')
186+
link_vflow_dict = link_vflow_recs[0]
187+
self.assertEqual(link_identity, link_vflow_dict['IDENTITY'])
188+
self.assertEqual(link_start_time, link_vflow_dict['START_TIME'])
189+
140190
# Verify all group Ordinals are 3 (same as connector tlsOrdinal)
141191
irc = router_C.get_inter_router_conns()
142192
irc.extend(router_L.get_inter_router_conns())
143193
self.assertEqual(2 * (data_conn_count + 1),
144194
len([c for c in irc if c['groupOrdinal'] == 3]),
145195
f"Unexpected conns: {irc}")
146196
router_L.teardown()
197+
198+
# Router L has now been torn down, check to see if the RouterC's OPER_STATUS on the LINK record is "down"
199+
expected = {
200+
'RouterC': [('LINK', {'PEER': ANY_VALUE,
201+
'OPER_STATUS': 'down',
202+
"PROTOCOL": "amqp",
203+
'ROLE': 'inter-router'})]
204+
}
205+
206+
success = retry(lambda: snooper_thread.match_records(expected), delay=2)
147207
router_C.teardown()
148208

149209
def test_02_drop_old(self):

tests/vanflow_snooper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@
127127
62: "PROXY_HOST",
128128
63: "PROXY_PORT",
129129
64: "ERROR_LISTENER_SIDE",
130-
65: "ERROR_CONNECTOR_SIDE"
130+
65: "ERROR_CONNECTOR_SIDE",
131+
66: "ACTIVE_TLS_ORDINAL"
131132
}
132133

133134

0 commit comments

Comments
 (0)