Skip to content

Commit 63b5f20

Browse files
committed
Fixes #1793: Move the LINK vflow record from the connector object to connector_config object. This ensures
that only one LINK record exists per logical link
1 parent f92ae18 commit 63b5f20

File tree

3 files changed

+44
-39
lines changed

3 files changed

+44
-39
lines changed

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
569569
//
570570
// Bump LINK metrics if appropriate
571571
//
572-
if (!!conn->connector && !!conn->connector->vflow_record) {
573-
vflow_inc_counter(conn->connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, (uint64_t) octets_received);
572+
if (!!conn->connector && !!conn->connector->ctor_config && !!conn->connector->ctor_config->vflow_record) {
573+
vflow_inc_counter(conn->connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, (uint64_t) octets_received);
574574
}
575575

576576
// check if cut-through can be enabled or disabled
@@ -1535,8 +1535,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
15351535
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
15361536
props_found += 1;
15371537
if (!pn_data_next(props)) break;
1538-
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
1539-
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
1538+
if (!!connector && !!connector->ctor_config && !!connector->ctor_config->vflow_record && pn_data_type(props) == PN_STRING) {
1539+
vflow_set_ref_from_pn(connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
15401540
}
15411541

15421542
} else {
@@ -2143,8 +2143,8 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
21432143
//
21442144
// Bump LINK metrics if appropriate
21452145
//
2146-
if (!!qconn->connector && !!qconn->connector->vflow_record) {
2147-
vflow_inc_counter(qconn->connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, (uint64_t) octets_sent);
2146+
if (!!qconn->connector && !!qconn->connector->ctor_config && !!qconn->connector->ctor_config->vflow_record) {
2147+
vflow_inc_counter(qconn->connector->ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS, (uint64_t) octets_sent);
21482148
}
21492149

21502150
//

src/adaptors/amqp/qd_connector.c

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ static void qd_connector_config_cleanup_conns(void *context)
108108
static qd_error_t qd_connector_config_create_connectors(qd_connector_config_t *ctor_config)
109109
{
110110
ASSERT_MGMT_THREAD; // only the mgmt thread can modify the connector list!
111-
112111
qd_error_clear();
113112

114113
// The connector configuration for inter-router connections may define a data connection count. This is the number
@@ -282,25 +281,6 @@ qd_connector_t *qd_connector_create(qd_connector_config_t *ctor_config, bool is_
282281
snprintf(item->host_port, hplen, "%s:%s", item->host , item->port);
283282
DEQ_INSERT_TAIL(connector->conn_info_list, item);
284283

285-
//
286-
// Set up the vanflow record for this connector (LINK)
287-
// Do this only for router-to-router connectors since the record represents an inter-router link
288-
//
289-
if ((strcmp(ctor_config->config.role, "inter-router") == 0 && !is_data_connector) ||
290-
strcmp(ctor_config->config.role, "edge") == 0 ||
291-
strcmp(ctor_config->config.role, "inter-edge") == 0) {
292-
connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0);
293-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name);
294-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role);
295-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost);
296-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
297-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0);
298-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme);
299-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host);
300-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port);
301-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0);
302-
vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0);
303-
}
304284
return connector;
305285
}
306286

@@ -360,8 +340,6 @@ void qd_connector_decref(qd_connector_t* connector)
360340
assert(connector->qd_conn == 0);
361341

362342
qd_connector_config_decref(connector->ctor_config);
363-
vflow_end_record(connector->vflow_record);
364-
connector->vflow_record = 0;
365343
qd_timer_free(connector->reconnect_timer);
366344
sys_mutex_free(&connector->lock);
367345
sys_atomic_destroy(&connector->ref_count);
@@ -486,8 +464,11 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx
486464
void qd_connector_add_link(qd_connector_t *connector)
487465
{
488466
if (!connector->is_data_connector) {
489-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up");
490-
vflow_set_timestamp_now(connector->vflow_record, VFLOW_ATTRIBUTE_UP_TIMESTAMP);
467+
qd_connector_config_t *ctor_config = connector->ctor_config;
468+
if (ctor_config && ctor_config->vflow_record) {
469+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "up");
470+
vflow_set_timestamp_now(ctor_config->vflow_record, VFLOW_ATTRIBUTE_UP_TIMESTAMP);
471+
}
491472
connector->oper_status_down = false;
492473
}
493474
}
@@ -504,11 +485,14 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const
504485
qd_connection_t *ctx = connector->qd_conn;
505486
if (!connector->is_data_connector && !connector->oper_status_down && !final) {
506487
connector->oper_status_down = true;
507-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
508-
vflow_inc_counter(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 1);
509-
vflow_set_timestamp_now(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_TIMESTAMP);
510-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_RESULT, condition_name ? condition_name : "unknown");
511-
vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_REASON, condition_description ? condition_description : "");
488+
qd_connector_config_t *ctor_config = connector->ctor_config;
489+
if (ctor_config && ctor_config->vflow_record) {
490+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
491+
vflow_inc_counter(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 1);
492+
vflow_set_timestamp_now(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_TIMESTAMP);
493+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_RESULT, condition_name ? condition_name : "unknown");
494+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_REASON, condition_description ? condition_description : "");
495+
}
512496
}
513497
connector->qd_conn = 0;
514498
ctx->connector = 0;
@@ -660,6 +644,7 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
660644

661645
const bool is_inter_router = strcmp(ctor_config->config.role, "inter-router") == 0;
662646
const bool is_edge = strcmp(ctor_config->config.role, "edge") == 0;
647+
const bool is_inter_edge = strcmp(ctor_config->config.role, "inter-edge") == 0;
663648

664649
//
665650
// If an sslProfile is configured allocate a TLS config to be used by all child connector's connections
@@ -681,9 +666,25 @@ qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t
681666
handle_connector_ssl_profile_mgmt_update);
682667
}
683668
}
684-
669+
if (is_inter_router || is_edge || is_inter_edge) {
670+
ctor_config->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0);
671+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name);
672+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role);
673+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost);
674+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down");
675+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0);
676+
if (ctor_config->config.ssl_required) {
677+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, "amqps");
678+
} else {
679+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, "amqp");
680+
}
681+
vflow_set_string(ctor_config->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, ctor_config->config.port);
682+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0);
683+
vflow_set_uint64(ctor_config->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0);
684+
}
685+
//
685686
// For inter-router connectors generate a group correlator and configure the data connection count
686-
687+
//
687688
if (is_inter_router) {
688689
qd_generate_discriminator(ctor_config->group_correlator);
689690
ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd);
@@ -739,6 +740,11 @@ void qd_connector_config_decref(qd_connector_config_t *ctor_config)
739740
assert(rc > 0); // else underflow
740741

741742
if (rc == 1) {
743+
if (ctor_config->vflow_record) {
744+
vflow_end_record(ctor_config->vflow_record);
745+
ctor_config->vflow_record = 0;
746+
}
747+
742748
// Expect: all connectors hold the ref_count so this must be empty
743749
assert(DEQ_IS_EMPTY(ctor_config->connectors));
744750

src/adaptors/amqp/qd_connector.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ typedef struct qd_connector_t {
6464
sys_mutex_t lock;
6565
connector_state_t state;
6666
qd_connection_t *qd_conn;
67-
vflow_record_t *vflow_record;
6867
bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications.
6968
bool is_data_connector; // inter-router conn for streaming messages
7069

@@ -96,7 +95,7 @@ struct qd_connector_config_t {
9695
qd_server_t *server;
9796
char *policy_vhost; /* Optional policy vhost name */
9897
qd_timer_t *cleanup_timer; /* remove quiesced connectors */
99-
98+
vflow_record_t *vflow_record; /* vflow record for VFLOW_RECORD_LINK */
10099
// TLS Configuration. Keep a local copy of the TLS ordinals to monitor changes by management
101100
qd_tls_config_t *tls_config;
102101
uint64_t tls_ordinal;

0 commit comments

Comments
 (0)