From df78e96c303a3f44d0a24e6fb94dbd80c6d03969 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 5 Mar 2025 09:23:14 -0500 Subject: [PATCH 1/3] Issue #1748: Implement inter-router connection group upgrade This introduces the concept of a connection group ordinal. It annotates inter-router connections. Its purpose is to assign a precedence to connections within and inter-router connection group. Those inter router connections with the highest ordinal become the active routing path to a peer router. Connections within the group that have a lower ordinal remain active but no new flows are sent over them. This is used to implement connection upgrade for certificate rotation. --- .../skupper_router/management/skrouter.json | 4 + src/router_core/agent_connection.c | 1 + src/router_core/connections.c | 555 +++++++++++++----- src/router_core/forwarder.c | 11 +- src/router_core/route_tables.c | 31 +- src/router_core/router_core.c | 16 +- src/router_core/router_core_private.h | 44 +- tests/system_test.py | 60 ++ tests/system_tests_cert_rotation.py | 352 ++++++++++- tests/tcp_streamer.py | 188 ++++++ 10 files changed, 1057 insertions(+), 205 deletions(-) create mode 100755 tests/tcp_streamer.py diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index 947ee3aa5..92d0e22be 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -1437,6 +1437,10 @@ "graph": true, "description": "The total number of deliveries that have traversed this link." }, + "connectionId": { + "type": "integer", + "description": "The identity of the parent connection." + }, "presettledCount": { "type": "integer", "graph": true, diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index 6b0a57927..8b4c6b074 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -101,6 +101,7 @@ const char *qdr_connection_columns[] = "meshId", "tlsOrdinal", "groupCorrelationId", + "groupOrdinal", 0}; const char *CONNECTION_TYPE = "io.skupper.router.connection"; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index f890c3db2..58eae7125 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -41,6 +41,8 @@ static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *acti static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link); static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn); static void qdr_connection_set_tracing_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void setup_inter_router_control_conn_CT(qdr_core_t *core, qdr_connection_t *conn); + ALLOC_DEFINE_SAFE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); @@ -110,6 +112,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, conn->policy_spec = policy_spec; conn->link_capacity = link_capacity; conn->mask_bit = -1; + conn->group_parent_mask_bit = -1; conn->admin_status = QD_CONN_ADMIN_ENABLED; conn->oper_status = QD_CONN_OPER_UP; DEQ_INIT(conn->links); @@ -334,7 +337,10 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn) conn_already_closed = true; } else { conn->closed = true; - conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request"); + if (!conn->error) { + // Allow caller to override the default error condition + conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request"); + } } conn->admin_status = QD_CONN_ADMIN_DELETED; @@ -1082,10 +1088,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li // if (qd_bitmask_valid_bit_value(conn->mask_bit)) { if (link->link_type == QD_LINK_CONTROL) - core->control_links_by_mask_bit[conn->mask_bit] = 0; + conn->control_links[link->link_direction] = 0; if (link->link_type == QD_LINK_ROUTER) { - if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority]) - core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0; + if (link == conn->data_links.link[link->priority]) + conn->data_links.link[link->priority] = 0; } } @@ -1453,41 +1459,67 @@ void qdr_process_addr_attributes_CT(qdr_core_t *core, qdr_address_t *addr) } +/** Setup a group of inter-router connections + * + * An inter-router connection group consists of one inter-router control connection and zero or more inter-router data + * connections. All these connections are generated by the same connector instance and therefore terminate at the same + * peer router. + * + * Connection groups are designed to scale the amount of traffic that can be sent between peer interior routers by + * allowing multiple connections to the peer. A message forwarded to the peer can use any connection in the group, + * which means that N messages can be forwarded concurrently (where N == the number of connections in the group). + * + * A group is identified by a 2-tuple consisting of the group correlator string and an integer ordinal. All connections + * in a group share the same (correlator, ordinal) value. The correlator is unique to the parent connector. The ordinal + * tracks updates to the connector's configuration (e.g. certificate refresh) with the connections with the highest + * ordinal value taking precedence over connections with lower ordinal values. The ordinal gives the router the ability + * to "rollover" a connection group to a new set of connections. + * + * This function is called when the inter-router control connection opens. It is considered the group "parent" + * connection. All inter-router data connections belonging to the group will be associated with the parent connection, + * and are referred to as "member" connections. The routing agent is only aware of the inter-router control connection + * therefore the group cannot exist without it. + */ static void qdr_connection_group_setup_CT(qdr_core_t *core, qdr_connection_t *conn) { const char *correlator = conn->connection_info->group_correlator; uint64_t ordinal = conn->connection_info->group_ordinal; assert(conn->role == QDR_ROLE_INTER_ROUTER); - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, - "Connection group '%s' setup parent [C%"PRIu64"] with ordinal=%"PRIu64" (host=%s)", - correlator, conn->identity, ordinal, conn->connection_info->host); - if (strnlen(correlator, QD_DISCRIMINATOR_SIZE) > 0) { -#if 0 // KAG TODO FIX: allow dupes for now: + if (!!correlator[0] && qd_bitmask_valid_bit_value(conn->mask_bit)) { + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "Connection group '%s' setup parent [C%"PRIu64"] with ordinal=%"PRIu64" (host=%s)", + correlator, conn->identity, ordinal, conn->connection_info->host); + + assert(core->group_correlator_by_maskbit[conn->mask_bit][0] == '\0'); + assert(core->rnode_conns_by_mask_bit[conn->mask_bit] == conn); + // - // Check the existing set of correlators (by mask-bit) to determine if there exists another - // connection with the same correlator. If found, clear the old correlator and move the old - // connection's group members to the unallocated list. These will be picked up by the new connection. + // Sanity check: Expect the same group correlator is not in use by another connection. If so there's either a + // bug in how we manage the groups OR the globally unique discriminator is actually not unique which breaks an + // invariant. This should not happen. // for (int mask_bit = 0; mask_bit < qd_bitmask_width(); mask_bit++) { - qdr_connection_t *old_conn = core->rnode_conns_by_mask_bit[mask_bit]; - if (old_conn) { - if(strncmp(old_conn->connection_info->group_correlator, correlator, QD_DISCRIMINATOR_SIZE) == 0) { + if (strcmp(core->group_correlator_by_maskbit[mask_bit], correlator) == 0) { + qdr_connection_t *old_conn = core->rnode_conns_by_mask_bit[mask_bit]; + qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, + "CGROUP duplicate group control connection detected: correlator=%s conns=[C%"PRIu64"],[C%"PRIu64"]", + correlator, conn->identity, old_conn ? old_conn->identity : 0); + if (old_conn) { + // This is what is called a "hail mary pass" - google it - in an attempt to recover from this mess qdr_connection_group_cleanup_CT(core, old_conn); } + assert(false); // Fail if this is a debug build in order to catch this in CI } } -#else - assert(qd_bitmask_valid_bit_value(conn->mask_bit)); -#endif // // Record the group's correlator in the core record. // Check the unallocated member list for matching correlators. Import the // matches into this connection's group. // - assert(core->group_correlator_by_maskbit[conn->mask_bit][0] == '\0'); qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' recording correlator[mask-bit=%d]", correlator, conn->mask_bit); strncpy(core->group_correlator_by_maskbit[conn->mask_bit], correlator, QD_DISCRIMINATOR_SIZE); @@ -1500,6 +1532,7 @@ static void qdr_connection_group_setup_CT(qdr_core_t *core, qdr_connection_t *co correlator, member->identity, conn->identity); DEQ_REMOVE_N(GROUP, core->unallocated_group_members, member); DEQ_INSERT_HEAD_N(GROUP, conn->connection_group, member); + member->group_parent_mask_bit = conn->mask_bit; } member = next; } @@ -1509,46 +1542,64 @@ static void qdr_connection_group_setup_CT(qdr_core_t *core, qdr_connection_t *co } +/** Add an inter-router data connection to the connection group + * + * See the comment for qdr_connection_group_setup_CT() + * + * This is called when a new inter-router data connection comes up. + */ static void qdr_connection_group_member_setup_CT(qdr_core_t *core, qdr_connection_t *conn) { assert(conn->role == QDR_ROLE_INTER_ROUTER_DATA); - // - // Scan the correlators-by-maskbit to see if this member's correlator is active. - // If so, import this into the active group and reset the cursor. - // If not, put this member into the unallocated list. - // + const char *correlator = conn->connection_info->group_correlator; uint64_t ordinal = conn->connection_info->group_ordinal; - qdr_connection_t *parent = 0; + qdr_connection_t *parent = 0; // the inter-router control connection if present - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' adding member [C%"PRIu64"] with ordinal=%"PRIu64, - correlator, conn->identity, ordinal); + if (!!correlator[0]) { - for (int maskbit = 0; maskbit < qd_bitmask_width(); maskbit++) { - if (strncmp(core->group_correlator_by_maskbit[maskbit], correlator, QD_DISCRIMINATOR_SIZE) == 0) { - assert(!!core->rnode_conns_by_mask_bit[maskbit]); - if (core->rnode_conns_by_mask_bit[maskbit]->connection_info->group_ordinal == ordinal) { - parent = core->rnode_conns_by_mask_bit[maskbit]; - break; + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' adding member [C%"PRIu64"] with ordinal=%"PRIu64, + correlator, conn->identity, ordinal); + + // + // Scan the correlators-by-maskbit table to see if this member's correlator is active (i.e. control connection is + // up). If so, import this into the parent's active group and reset the cursor. If not, put this member into the + // unallocated list and wait for the control connection to come up. + // + for (int maskbit = 0; maskbit < qd_bitmask_width(); maskbit++) { + if (strncmp(core->group_correlator_by_maskbit[maskbit], correlator, QD_DISCRIMINATOR_SIZE) == 0) { + assert(!!core->rnode_conns_by_mask_bit[maskbit]); + if (core->rnode_conns_by_mask_bit[maskbit]->connection_info->group_ordinal == ordinal) { + parent = core->rnode_conns_by_mask_bit[maskbit]; + break; + } } } - } - if (!!parent) { - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' member [C%"PRIu64"] added to parent [C%"PRIu64"]", - correlator, conn->identity, parent->identity); - assert(strncmp(parent->connection_info->group_correlator, correlator, QD_DISCRIMINATOR_SIZE) == 0); - assert(parent->connection_info->group_ordinal == ordinal); - DEQ_INSERT_TAIL_N(GROUP, parent->connection_group, conn); - parent->group_cursor = DEQ_HEAD(parent->connection_group); - } else { - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' parent not present moving [C%"PRIu64"] to unallocated", - correlator, conn->identity); - DEQ_INSERT_TAIL_N(GROUP, core->unallocated_group_members, conn); + if (!!parent) { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' member [C%"PRIu64"] added to parent [C%"PRIu64"]", + correlator, conn->identity, parent->identity); + + assert(strncmp(parent->connection_info->group_correlator, correlator, QD_DISCRIMINATOR_SIZE) == 0); + assert(parent->connection_info->group_ordinal == ordinal); + DEQ_INSERT_TAIL_N(GROUP, parent->connection_group, conn); + conn->group_parent_mask_bit = parent->mask_bit; + parent->group_cursor = DEQ_HEAD(parent->connection_group); + } else { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' parent not present moving [C%"PRIu64"] to unallocated", + correlator, conn->identity); + DEQ_INSERT_TAIL_N(GROUP, core->unallocated_group_members, conn); + } } } +/** Teardown a group of inter-router connections + * + * See the comment for qdr_connection_group_setup_CT() + * + * This is called when the inter-router control connection is closed. + */ static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn) { assert(conn->role == QDR_ROLE_INTER_ROUTER); @@ -1559,12 +1610,16 @@ static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t * const char *correlator = conn->connection_info->group_correlator; uint64_t ordinal = conn->connection_info->group_ordinal; - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' clean up parent [C%"PRIu64"] with ordinal=%"PRIu64, - correlator, conn->identity, ordinal); - if (strnlen(correlator, QD_DISCRIMINATOR_SIZE) > 0) { - assert(strncmp(core->group_correlator_by_maskbit[conn->mask_bit], correlator, QD_DISCRIMINATOR_SIZE) == 0); - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' clearing correlator[mask-bit=%d]", correlator, conn->mask_bit); - core->group_correlator_by_maskbit[conn->mask_bit][0] = '\0'; + if (!!correlator[0]) { + + // It is possible that this connection is not the active group parent (like on connection upgrade). Only the + // active parent can unregister the correlator mask bit map in the core: + if (qd_bitmask_valid_bit_value(conn->mask_bit) && core->rnode_conns_by_mask_bit[conn->mask_bit] == conn) { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' clean up parent [C%"PRIu64"] with ordinal=%"PRIu64, + correlator, conn->identity, ordinal); + assert(strncmp(core->group_correlator_by_maskbit[conn->mask_bit], correlator, QD_DISCRIMINATOR_SIZE) == 0); + core->group_correlator_by_maskbit[conn->mask_bit][0] = '\0'; + } while (!!DEQ_HEAD(conn->connection_group)) { qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, @@ -1573,55 +1628,56 @@ static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t * qdr_connection_t *member = DEQ_HEAD(conn->connection_group); DEQ_REMOVE_HEAD_N(GROUP, conn->connection_group); DEQ_INSERT_TAIL_N(GROUP, core->unallocated_group_members, member); + member->group_parent_mask_bit = -1; } conn->group_cursor = 0; } } +/** Remove an inter-router data connection from the connection group + * + * See the comment for qdr_connection_group_setup_CT() + * + * This is called when an existing inter-router data connection goes down. It must be removed from the group parent or + * the unallocated group list depending on where it resides. + */ static void qdr_connection_group_member_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn) { assert(conn->role == QDR_ROLE_INTER_ROUTER_DATA); - // - // Search for the correlator in the maskbit index. - // If found, get the parent connection and remove this connection from the group. Reset the cursor. - // If not found, remove this connection from the unallocated group - // + const char *correlator = conn->connection_info->group_correlator; uint64_t ordinal = conn->connection_info->group_ordinal; - qdr_connection_t *parent = 0; - - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' cleanup member [C%"PRIu64"] with ordinal=%"PRIu64, - correlator, conn->identity, ordinal); - assert(strnlen(correlator, QD_DISCRIMINATOR_SIZE) > 0); - for (int maskbit = 0; maskbit < qd_bitmask_width(); maskbit++) { - if (strncmp(core->group_correlator_by_maskbit[maskbit], correlator, QD_DISCRIMINATOR_SIZE) == 0) { - assert(core->rnode_conns_by_mask_bit[maskbit]); - if (core->rnode_conns_by_mask_bit[maskbit]->connection_info->group_ordinal == ordinal) { - parent = core->rnode_conns_by_mask_bit[maskbit]; - break; - } - } - } - if (!!parent) { - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' removing member [C%"PRIu64"] from parent [C%"PRIu64"]", - correlator, conn->identity, parent->identity); - assert(strncmp(parent->connection_info->group_correlator, correlator, QD_DISCRIMINATOR_SIZE) == 0); - assert(parent->connection_info->group_ordinal == ordinal); - DEQ_REMOVE_N(GROUP, parent->connection_group, conn); - parent->group_cursor = DEQ_HEAD(parent->connection_group); - } else { - qdr_connection_t *ptr = DEQ_HEAD(core->unallocated_group_members); - while (!!ptr) { - qdr_connection_t *next = DEQ_NEXT_N(GROUP, ptr); - if (ptr == conn) { - qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' removing member [C%"PRIu64"] from unallocated", - correlator, conn->identity); - DEQ_REMOVE_N(GROUP, core->unallocated_group_members, ptr); - break; + if (!!correlator[0]) { + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' cleanup member [C%"PRIu64"] with ordinal=%"PRIu64, + correlator, conn->identity, ordinal); + + if (qd_bitmask_valid_bit_value(conn->group_parent_mask_bit)) { + qdr_connection_t *parent = core->rnode_conns_by_mask_bit[conn->group_parent_mask_bit]; + assert(!!parent); + assert(strcmp(parent->connection_info->group_correlator, correlator) == 0); + assert(parent->connection_info->group_ordinal == ordinal); + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' removing member [C%"PRIu64"] from parent [C%"PRIu64"]", + correlator, conn->identity, parent->identity); + DEQ_REMOVE_N(GROUP, parent->connection_group, conn); + parent->group_cursor = DEQ_HEAD(parent->connection_group); + conn->group_parent_mask_bit = -1; + + } else { + qdr_connection_t *ptr = DEQ_HEAD(core->unallocated_group_members); + while (!!ptr) { + qdr_connection_t *next = DEQ_NEXT_N(GROUP, ptr); + if (ptr == conn) { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "Connection group '%s' removing member [C%"PRIu64"] from unallocated", + correlator, conn->identity); + DEQ_REMOVE_N(GROUP, core->unallocated_group_members, ptr); + break; + } + ptr = next; } - ptr = next; } } } @@ -1653,50 +1709,7 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo } if (conn->role == QDR_ROLE_INTER_ROUTER) { - // - // Assign a unique mask-bit to this connection as a reference to be used by - // the router module - // - if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit)) { - qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit); - assert(core->rnode_conns_by_mask_bit[conn->mask_bit] == 0); - core->rnode_conns_by_mask_bit[conn->mask_bit] = conn; - } else { - qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count"); - conn->role = QDR_ROLE_NORMAL; - break; - } - - if (!conn->incoming) { - // - // The connector-side of inter-router connections is responsible for setting up the - // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for - // routed-message transfer. - // - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, - qdr_terminus_router_control(), qdr_terminus_router_control(), - QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, - qdr_terminus_router_control(), qdr_terminus_router_control(), - QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); - STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME); - - for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { - // a session is reserved for each priority link - qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, - qdr_terminus_router_data(), qdr_terminus_router_data(), - sc, priority); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, - qdr_terminus_router_data(), qdr_terminus_router_data(), - sc, priority); - } - } - - // - // Set up any connection group associated with this inter-router connection. - // - qdr_connection_group_setup_CT(core, conn); + setup_inter_router_control_conn_CT(core, conn); } if (conn->role == QDR_ROLE_INTER_ROUTER_DATA) { @@ -1815,13 +1828,19 @@ static void qdr_connection_notify_closed_CT(qdr_core_t *core, qdr_action_t *acti } // - // Give back the router mask-bit. + // See if the router mask-bit can be freed. // if (conn->role == QDR_ROLE_INTER_ROUTER) { - assert(qd_bitmask_valid_bit_value(conn->mask_bit)); - qdr_reset_sheaf(core, conn->mask_bit); - qd_bitmask_set_bit(core->neighbor_free_mask, conn->mask_bit); - core->rnode_conns_by_mask_bit[conn->mask_bit] = 0; + qdr_reset_sheaf(conn); + if (qd_bitmask_valid_bit_value(conn->mask_bit)) { + if (conn == core->rnode_conns_by_mask_bit[conn->mask_bit]) + core->rnode_conns_by_mask_bit[conn->mask_bit] = 0; + if (conn == core->pending_rnode_conns_by_mask_bit[conn->mask_bit]) + core->pending_rnode_conns_by_mask_bit[conn->mask_bit] = 0; + if (!core->rnode_conns_by_mask_bit[conn->mask_bit] && !core->pending_rnode_conns_by_mask_bit[conn->mask_bit]) + qd_bitmask_set_bit(core->neighbor_free_mask, conn->mask_bit); + conn->mask_bit = -1; + } } // @@ -1890,25 +1909,116 @@ static void qdr_connection_notify_closed_CT(qdr_core_t *core, qdr_action_t *acti // -// Handle the attachment and detachment of an inter-router control link +// Handle the attachment of an inter-router control link. // static void qdr_attach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { - if (conn->role == QDR_ROLE_INTER_ROUTER) { + assert(link->link_type == QD_LINK_CONTROL); + assert(conn->role == QDR_ROLE_INTER_ROUTER); + assert(conn->control_links[link->link_direction] == 0); + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Attaching %s router control link [L%"PRIu64"]", + conn->identity, link->link_direction == QD_OUTGOING ? "outgoing" : "incoming", + link->identity); + + conn->control_links[link->link_direction] = link; + if (link->link_direction == QD_OUTGOING) { link->owning_addr = core->hello_addr; qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); - core->control_links_by_mask_bit[conn->mask_bit] = link; + } + + // + // Once both control links are active the control connection is ready for forwarding. + // + if (!!conn->control_links[QD_INCOMING] && !!conn->control_links[QD_OUTGOING]) { + // + // If this connection is pending upgrade we can now to the switchover. + // + if (qd_bitmask_valid_bit_value(conn->mask_bit) && core->pending_rnode_conns_by_mask_bit[conn->mask_bit] == conn) { + qdr_connection_t *old_conn = core->rnode_conns_by_mask_bit[conn->mask_bit]; + + if (old_conn) { + // + // To disable routing over the old connection we invalidate the mask_bit and teardown the + // connection group + // + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Upgrading to primary router control connection, downgrading [C%"PRIu64"]", + conn->identity, old_conn->identity); + assert(old_conn != conn); + + qdr_connection_group_cleanup_CT(core, old_conn); + old_conn->mask_bit = -1; + + // + // Shut down the control link on the old connection in order to force forwarding to ignore this + // connection. The connector-side router is responsible for creating and destroying the control links. + // + if (!old_conn->incoming) { + if (old_conn->control_links[QD_INCOMING]) { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] detaching incoming control link [L%"PRIu64"]", old_conn->identity, + old_conn->control_links[QD_INCOMING]->identity); + qdr_link_outbound_detach_CT(core, old_conn->control_links[QD_INCOMING], 0, QDR_CONDITION_NONE); + } + if (old_conn->control_links[QD_OUTGOING]) { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] detaching outgoing control link [L%"PRIu64"]", old_conn->identity, + old_conn->control_links[QD_OUTGOING]->identity); + qdr_link_outbound_detach_CT(core, old_conn->control_links[QD_OUTGOING], 0, QDR_CONDITION_NONE); + } + } + } else { + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Upgrading to primary router control connection", conn->identity); + } + + core->pending_rnode_conns_by_mask_bit[conn->mask_bit] = 0; + core->rnode_conns_by_mask_bit[conn->mask_bit] = conn; + } + + // + // Set up any connection group associated with this inter-router connection. + // + qdr_connection_group_setup_CT(core, conn); } } +// +// Handle the detachment of an inter-router control link. +// static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { - if (conn->role == QDR_ROLE_INTER_ROUTER) { + assert(link->link_type == QD_LINK_CONTROL); + assert(conn->role == QDR_ROLE_INTER_ROUTER); + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Detaching %s router control link [L%"PRIu64"]", + conn->identity, link->link_direction == QD_OUTGOING ? "outgoing" : "incoming", + link->identity); + + conn->control_links[link->link_direction] = 0; + if (link->link_direction == QD_OUTGOING && !!link->owning_addr) { + assert(link->owning_addr == core->hello_addr); qdr_del_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); link->owning_addr = 0; - core->control_links_by_mask_bit[conn->mask_bit] = 0; - qdr_post_link_lost_CT(core, conn->mask_bit); + + // + // Optimization: immediately notify the routing agent that this path is no longer present (rather than wait for + // heartbeat timeout). This should only be done if the connection is the active control connection and there's + // no pending connection to take over: + // + if (qd_bitmask_valid_bit_value(conn->mask_bit) && + core->rnode_conns_by_mask_bit[conn->mask_bit] == conn && + core->pending_rnode_conns_by_mask_bit[conn->mask_bit] == 0) { + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"] Notify routing agent of loss of control link [L%"PRIu64"]", + conn->identity, link->identity); + qdr_post_link_lost_CT(core, conn->mask_bit); + } } } @@ -1918,16 +2028,18 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, // static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { + assert(conn->role == QDR_ROLE_INTER_ROUTER); assert(link->link_type == QD_LINK_ROUTER); + // The first 2 x QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over // the inter-router connection are the shared priority links. These links // are attached in priority order starting at zero. if (link->link_direction == QD_OUTGOING) { - int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count; + int next_pri = conn->data_links.count; if (next_pri < QDR_N_PRIORITIES) { link->priority = next_pri; - core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link; - core->data_links_by_mask_bit[conn->mask_bit].count += 1; + conn->data_links.link[next_pri] = link; + conn->data_links.count += 1; } } else { if (conn->next_pri < QDR_N_PRIORITIES) { @@ -1939,10 +2051,14 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { + assert(conn->role == QDR_ROLE_INTER_ROUTER); + assert(link->link_type == QD_LINK_ROUTER); + // if this link is in the priority sheaf it needs to be removed - if (conn->role == QDR_ROLE_INTER_ROUTER) - if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority]) - core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0; + if (link == conn->data_links.link[link->priority]) { + assert(link->link_direction == QD_OUTGOING); + conn->data_links.link[link->priority] = 0; + } } @@ -2096,7 +2212,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // listeners for normal traffic but will not prevent routed-links from being established. // if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT && - core->control_links_by_mask_bit[conn->mask_bit] == 0) { + conn->control_links[QD_OUTGOING] == 0) { qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE); qdr_terminus_free(source); qdr_terminus_free(target); @@ -2154,8 +2270,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act case QD_LINK_ROUTER: qdr_attach_link_data_CT(core, conn, link); - // fallthrough + qdr_link_outbound_second_attach_CT(core, link, source, target); + qdr_link_issue_credit_CT(core, link, link->capacity, false); + break; + case QD_LINK_CONTROL: + qdr_attach_link_control_CT(core, conn, link); qdr_link_outbound_second_attach_CT(core, link, source, target); qdr_link_issue_credit_CT(core, link, link->capacity, false); break; @@ -2273,8 +2393,11 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac case QD_LINK_ROUTER: qdr_attach_link_data_CT(core, conn, link); - // fallthrough + qdr_link_issue_credit_CT(core, link, link->capacity, false); + break; + case QD_LINK_CONTROL: + qdr_attach_link_control_CT(core, conn, link); qdr_link_issue_credit_CT(core, link, link->capacity, false); break; @@ -2375,6 +2498,7 @@ static void qdr_link_process_detach(qdr_core_t *core, qdr_link_t *link, qdr_erro break; case QD_LINK_CONTROL: + qdr_detach_link_control_CT(core, conn, link); break; case QD_LINK_ROUTER: @@ -2513,3 +2637,132 @@ static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *acti qdr_link_cleanup_CT(core, link->conn, link, "Link cleanup deferred after IO processing"); } + +/** Set up the inter-router trunk message links. + * + * The connector-side of inter-router connections (i.e. outgoing connection) is responsible for setting up the + * inter-router trunk links: Two (in and out) for control messages, 2 * QDR_N_PRIORITIES for non-streaming message + * transfer. + */ +static void create_inter_router_message_links(qdr_core_t *core, qdr_connection_t *conn) +{ + assert(conn->role == QDR_ROLE_INTER_ROUTER); + + if (!conn->incoming) { + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, + qdr_terminus_router_control(), qdr_terminus_router_control(), + QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, + qdr_terminus_router_control(), qdr_terminus_router_control(), + QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); + STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME); + + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + // a session is reserved for each priority link + qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, + qdr_terminus_router_data(), qdr_terminus_router_data(), + sc, priority); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, + qdr_terminus_router_data(), qdr_terminus_router_data(), + sc, priority); + } + } +} + + +static void setup_inter_router_control_conn_CT(qdr_core_t *core, qdr_connection_t *conn) +{ + assert(conn->role == QDR_ROLE_INTER_ROUTER); + + const char *correlator = conn->connection_info->group_correlator; + qdr_connection_t *old_conn = 0; + qdr_connection_t *old_pending = 0; + + // + // Connection group upgrade: see if there are other connections present that use the same group correlator. + // + if (!!correlator[0]) { + for (int idx = 0; idx < qd_bitmask_width(); ++idx) { + qdr_connection_t *tmp; + tmp = core->rnode_conns_by_mask_bit[idx]; + if (!!tmp && strcmp(correlator, tmp->connection_info->group_correlator) == 0) { + old_conn = tmp; + } + tmp = core->pending_rnode_conns_by_mask_bit[idx]; + if (!!tmp && strcmp(correlator, tmp->connection_info->group_correlator) == 0) { + old_pending = tmp; + } + } + } + + if (old_pending) { + // A pending connection will have higher precedence than the old_conn (because its overriding old_conn). Check + // if the new conn has higher precedence than pending and either replace pending or ignore the new conn + + if (conn->connection_info->group_ordinal > old_pending->connection_info->group_ordinal) { + // Replace old_pending + // + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "Overriding existing pending connection [C%"PRIu64"] with [C%"PRIu64"]", + old_pending->identity, conn->identity); + + // Re-use the mask bit. This is necessary to prevent the router from recalculating routes. + conn->mask_bit = old_pending->mask_bit; + old_pending->mask_bit = -1; + core->pending_rnode_conns_by_mask_bit[conn->mask_bit] = conn; + create_inter_router_message_links(core, conn); + + old_pending->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "New inter-router connection takes precedence"); + qdr_close_connection_CT(core, old_pending); + + } else { + // The existing pending connection has higher precedence, keep it. + // + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "Existing pending connection [C%"PRIu64"] takes precedence, closing [C%"PRIu64"]", + old_pending->identity, conn->identity); + conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Existing pending connection takes precedence"); + qdr_close_connection_CT(core, conn); + } + + } else if (old_conn) { + // Does the new conn take precedence over the existing conn? If so make it pending. + // + if (conn->connection_info->group_ordinal > old_conn->connection_info->group_ordinal) { + // Bring up new conn to succeed old_conn + // + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "Upgrading connection [C%"PRIu64"] to [C%"PRIu64"]", + old_conn->identity, conn->identity); + + // Re-use the mask bit. This is necessary to prevent the router from recalculating routes. + conn->mask_bit = old_conn->mask_bit; + core->pending_rnode_conns_by_mask_bit[conn->mask_bit] = conn; + create_inter_router_message_links(core, conn); + + // Once the new inter-router control links come up we replace the old_conn + + } else { + qd_log(LOG_ROUTER_CORE, QD_LOG_INFO, "Existing connection [C%"PRIu64"] takes precedence, closing [C%"PRIu64"]", + old_conn->identity, conn->identity); + conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Existing connection takes precedence"); + qdr_close_connection_CT(core, conn); + } + } else { + // + // Not upgrading so simply setup the new connection. Assign a unique mask-bit to this connection as a reference + // to be used by the router module. + // + if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit)) { + qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit); + assert(core->rnode_conns_by_mask_bit[conn->mask_bit] == 0); + core->rnode_conns_by_mask_bit[conn->mask_bit] = conn; + create_inter_router_message_links(core, conn); + + } else { + // Too many inter-router connections provisioned! + // + qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "Exceeded maximum supported number of inter-router connections"); + conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Too many inter-router connections"); + qdr_close_connection_CT(core, conn); + } + } +} diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 4019f29a9..67d6803a4 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -39,10 +39,11 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t); ALLOC_DEFINE(qdr_forward_deliver_info_t); -// get the control link for a given inter-router connection +// get the outgoing control link for a given inter-router connection static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask) { - return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0; + qdr_connection_t *conn = (conn_mask >= 0) ? core->rnode_conns_by_mask_bit[conn_mask] : 0; + return (!!conn) ? conn->control_links[QD_OUTGOING] : 0; } @@ -54,11 +55,15 @@ static inline qdr_link_t *peer_router_data_link(qdr_core_t *core, if (conn_mask < 0 || priority < 0) return 0; + qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_mask]; + if (!conn) + return 0; + // Try to return the requested priority link, but if it does // not exist, return the closest one that is lower. qdr_link_t * link = 0; while (1) { - if ((link = core->data_links_by_mask_bit[conn_mask].links[priority])) + if ((link = conn->data_links.link[priority])) return link; if (-- priority < 0) return 0; diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index a622b97e2..6632e3ed1 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -240,23 +240,18 @@ void qdr_route_table_setup_CT(qdr_core_t *core) core->neighbor_free_mask = qd_bitmask(1); - core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width()); - core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width()); - core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width()); - core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width()); + core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width()); + core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width()); + core->pending_rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width()); + core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width()); DEQ_INIT(core->unallocated_group_members); - core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width()); for (int idx = 0; idx < qd_bitmask_width(); idx++) { - core->routers_by_mask_bit[idx] = 0; - core->control_links_by_mask_bit[idx] = 0; - core->data_links_by_mask_bit[idx].count = 0; - core->rnode_conns_by_mask_bit[idx] = 0; - for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { - core->data_links_by_mask_bit[idx].links[priority] = 0; - } - core->group_correlator_by_maskbit[idx] = (char*) malloc(QD_DISCRIMINATOR_SIZE); - core->group_correlator_by_maskbit[idx][0] = '\0'; + core->routers_by_mask_bit[idx] = 0; + core->rnode_conns_by_mask_bit[idx] = 0; + core->pending_rnode_conns_by_mask_bit[idx] = 0; + core->group_correlator_by_maskbit[idx] = (char*) qd_malloc(QD_DISCRIMINATOR_SIZE); + core->group_correlator_by_maskbit[idx][0] = '\0'; } } } @@ -441,7 +436,13 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard return; } - if (core->control_links_by_mask_bit[conn_maskbit] == 0) { + qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_maskbit]; + if (conn == 0) { + qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid conn reference: %d", conn_maskbit); + return; + } + + if (conn->control_links[QD_OUTGOING] == 0) { qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit); return; } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index e372631e9..987045bc8 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -351,11 +351,10 @@ void qdr_core_free(qdr_core_t *core) assert(DEQ_IS_EMPTY(core->action_list_background)); assert(DEQ_IS_EMPTY(core->streaming_connections)); - if (core->routers_by_mask_bit) free(core->routers_by_mask_bit); - if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit); - if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit); - if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask); - if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit); + if (core->routers_by_mask_bit) free(core->routers_by_mask_bit); + if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask); + if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit); + if (core->pending_rnode_conns_by_mask_bit) free(core->pending_rnode_conns_by_mask_bit); if (core->group_correlator_by_maskbit) { for (int idx = 0; idx < qd_bitmask_width(); idx++) { free(core->group_correlator_by_maskbit[idx]); @@ -1063,11 +1062,10 @@ uint64_t qdr_identifier(qdr_core_t* core) return id; } -void qdr_reset_sheaf(qdr_core_t *core, uint8_t n) + +void qdr_reset_sheaf(qdr_connection_t *conn) { - qdr_priority_sheaf_t *sheaf = core->data_links_by_mask_bit + n; - sheaf->count = 0; - memset(sheaf->links, 0, QDR_N_PRIORITIES * sizeof(void *)); + conn->data_links = (qdr_priority_sheaf_t) {0}; } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index c9cf1fe9d..30fba1afe 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -635,6 +635,14 @@ void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); const char *get_address_treatment_string(qd_address_treatment_t treatment); +// non-streaming inter-router links sorted by priority +// +typedef struct qdr_priority_sheaf_t { + qdr_link_t *link[QDR_N_PRIORITIES]; + int count; +} qdr_priority_sheaf_t; + + // // Connection Information // @@ -682,26 +690,29 @@ struct qdr_connection_t { bool closed; // This bit is used in the case where a client is trying to force close this connection. uint8_t next_pri; // for incoming inter-router data links qdr_connection_role_t role; - int inter_router_cost; qdr_conn_identifier_t *conn_id; qdr_conn_identifier_t *alt_conn_id; bool strip_annotations_in; bool strip_annotations_out; + bool enable_protocol_trace; // Has trace level logging been turned on for this connection. + bool has_streaming_links; ///< one or more of this connection's links are for streaming messages + int inter_router_cost; int link_capacity; - int mask_bit; ///< set only if inter-router connection + int mask_bit; ///< set only if inter-router control connection + int group_parent_mask_bit; ///< if inter-router data connection maskbit of group parent inter-router control conn qdr_connection_work_list_t work_list; sys_mutex_t work_lock; qdr_link_ref_list_t links; qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES]; qdr_connection_info_t *connection_info; void *user_context; /* Updated from IO thread, use work_lock */ + qdr_link_t *control_links[2]; // QD_LINK_CONTROL links [QD_INCOMING/QD_OUTGOING] (inter-router conn only) + qdr_priority_sheaf_t data_links; // links for non-streaming messages (by priority) (inter-router conn only) qd_conn_oper_status_t oper_status; qd_conn_admin_status_t admin_status; qdr_error_t *error; uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running. uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection. - bool enable_protocol_trace; // Has trace level logging been turned on for this connection. - bool has_streaming_links; ///< one or more of this connection's links are for streaming messages qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages const qd_policy_spec_t *policy_spec; qdr_connection_list_t connection_group; ///< List of associated connection group members @@ -767,11 +778,6 @@ struct qdr_conn_identifier_t { qdr_auto_link_list_t auto_link_refs; }; -typedef struct qdr_priority_sheaf_t { - qdr_link_t *links[QDR_N_PRIORITIES]; - int count; -} qdr_priority_sheaf_t; - struct qdr_protocol_adaptor_t { DEQ_LINKS(qdr_protocol_adaptor_t); @@ -882,14 +888,13 @@ struct qdr_core_t { qdr_address_t *router_addr_T; qdr_address_t *routerma_addr_T; - qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest - qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values) - qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit - qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit - qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit - qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit - qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit) - char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit + qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest + qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values) + qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit + qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit + qdr_connection_t **pending_rnode_conns_by_mask_bit; ///< higher precedence inter-router conns pending upgrade [conn->mask_bit] + qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit) + char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit uint64_t cost_epoch; uint64_t next_tag; @@ -1061,10 +1066,9 @@ void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer); * Clears the sheaf of priority links in a connection. * Call this when a connection is being closed, when the mask-bit * for that sheaf is being returned to the core for re-use. - * @param core Pointer to the core object returned by qd_core() - * @param n uint8_t index for the sheaf to be reset prior to re-use. + * @param conn Pointer to the connection owning the sheaf */ -void qdr_reset_sheaf(qdr_core_t *core, uint8_t n); +void qdr_reset_sheaf(qdr_connection_t *conn); /** * Run in an IO thread. diff --git a/tests/system_test.py b/tests/system_test.py index 63ef4eddb..f38d34ef8 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -1204,6 +1204,66 @@ def get_inter_router_conns(self): conns = self.management.query(type=CONNECTION_TYPE).get_dicts() return [c for c in conns if 'inter-router' in c['role']] + def get_inter_router_data_conns(self): + """ + Return a list of all inter-router-data connections present + """ + dconns = self.get_inter_router_conns() + return [c for c in dconns if c['role'] == 'inter-router-data'] + + def get_inter_router_control_conns(self): + """ + Return a list of all inter-router control connections present + """ + dconns = self.get_inter_router_conns() + return [c for c in dconns if c['role'] == 'inter-router'] + + def get_links_by_conn_id(self, connection_id): + """ + Return a list of all active AMQP links for the given connection + """ + links = self.management.query(type=ROUTER_LINK_TYPE).get_dicts() + return [link for link in links if link['connectionId'] == connection_id] + + def get_active_inter_router_data_links(self): + # Get the list of active inter-router-data links for the router. These + # are the dynamically provisioned links for passing streaming data + # between routers. For example there will be at least one streaming + # data link for every TCP session passing through the router. + ir_conns = self.get_inter_router_data_conns() + links = [] + for conn in ir_conns: + links.extend([link for link in self.get_links_by_conn_id(conn['identity']) + if link['linkType'] == 'endpoint' and + link['operStatus'] == 'up']) + return links + + def get_active_inter_router_control_links(self): + # Get the list of active inter-router control links for the + # router. These are the links that carry the inter-router Hello message + # traffic as well as priority-based non-streaming messages. + # There will be two for each inter-router path (1 incoming link, 1 + # outgoing link) + ir_conns = self.get_inter_router_control_conns() + links = [] + for conn in ir_conns: + links.extend([link for link in self.get_links_by_conn_id(conn['identity']) + if link['linkType'] == 'router-control' and + link['operStatus'] == 'up']) + return links + + def get_last_topology_change(self): + """ + Get the timestamp when this router last re-computed topology. Returns + None if the router is not interior. + """ + try: + node_state = self.management.read(type=ROUTER_NODE_TYPE, + identity=f"router.node/{self.config.router_id}") + return node_state["lastTopoChange"] + except: + return None + class NcatException(Exception): def __init__(self, error=None): diff --git a/tests/system_tests_cert_rotation.py b/tests/system_tests_cert_rotation.py index ffe1c701a..ac2aecec1 100644 --- a/tests/system_tests_cert_rotation.py +++ b/tests/system_tests_cert_rotation.py @@ -22,10 +22,12 @@ """ import time +from http1_tests import wait_tcp_listeners_up from system_test import TestCase, main_module, Qdrouterd, unittest, retry from system_test import CA_CERT, SSL_PROFILE_TYPE from system_test import CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD +from tcp_streamer import TcpStreamerThread class InterRouterCertRotationTest(TestCase): @@ -47,10 +49,33 @@ def router(self, name, test_config, data_connection_count, **kwargs): return self.tester.qdrouterd(name, Qdrouterd.Config(config), **kwargs) def wait_inter_router_conns(self, router, count): + # Wait until the number of inter-router connections equals count ok = retry(lambda rtr=router, ct=count: len(rtr.get_inter_router_conns()) == ct) self.assertTrue(ok, f"Failed to get {count} i.r. conns: {router.get_inter_router_conns()}") + def wait_control_links(self, router, group_ordinal): + # This function is used after the oldestValidOrdinal is advanced in + # order to block until the proper inter-router control links are + # active. + # NOTE restriction: only works for a router with a single inter-router + # connector or listener. In other words it blocks until there are only + # 2 inter-router control links present with the given group_ordinal. + def _control_link_test(router, ordinal): + clinks = router.get_active_inter_router_control_links() + if len(clinks) != 2: + return False + cid = clinks[0]['connectionId'] + if cid != clinks[1]['connectionId']: + return False # not the same parent conn + cconns = router.get_inter_router_control_conns() + for conn in cconns: + if conn['identity'] == cid and conn['groupOrdinal'] == ordinal: + return True + return False + return retry(lambda rtr=router, ordinal=group_ordinal: + _control_link_test(rtr, ordinal)) + def test_01_ordinal_updates(self): """ Verify that ordinal updates create new inter-router connections. Verify @@ -86,6 +111,7 @@ def test_01_ordinal_updates(self): 'sslProfile': 'ConnectorSslProfile'})], data_conn_count, wait=True) router_C.wait_router_connected("RouterL") + router_L.wait_router_connected("RouterC") # get the number of active inter-router conns, verify count and tlsOrdinal are 0 self.wait_inter_router_conns(router_C, data_conn_count + 1) @@ -98,18 +124,21 @@ def test_01_ordinal_updates(self): attributes={'ordinal': 3}, name='ConnectorSslProfile') self.wait_inter_router_conns(router_C, 2 * (data_conn_count + 1)) + self.wait_inter_router_conns(router_L, 2 * (data_conn_count + 1)) # Update oldestValidOrdinal to 3. Expect the older connections with an # ordinal value of 0 to be deleted router_C.management.update(type=SSL_PROFILE_TYPE, attributes={'oldestValidOrdinal': 3}, name='ConnectorSslProfile') + self.wait_inter_router_conns(router_L, data_conn_count + 1) self.wait_inter_router_conns(router_C, data_conn_count + 1) - # Verify all tlsOrdinals are 3 + # Verify all group Ordinals are 3 (same as connector tlsOrdinal) irc = router_C.get_inter_router_conns() - self.assertEqual(data_conn_count + 1, - len([c for c in irc if c['tlsOrdinal'] == 3]), + irc.extend(router_L.get_inter_router_conns()) + self.assertEqual(2 * (data_conn_count + 1), + len([c for c in irc if c['groupOrdinal'] == 3]), f"Unexpected conns: {irc}") router_L.teardown() router_C.teardown() @@ -149,8 +178,9 @@ def test_02_drop_old(self): 'sslProfile': 'ConnectorSslProfile'})], data_conn_count, wait=True) router_C.wait_router_connected("RouterL") + router_L.wait_router_connected("RouterC") - # wait for the inter-router connections to come up + # wait for all the inter-router connections to come up self.wait_inter_router_conns(router_C, data_conn_count + 1) # update tlsOrdinal to 3 and wait for new conns to appear @@ -158,6 +188,7 @@ def test_02_drop_old(self): attributes={'ordinal': 3}, name='ConnectorSslProfile') self.wait_inter_router_conns(router_C, 2 * (data_conn_count + 1)) + self.wait_inter_router_conns(router_L, 2 * (data_conn_count + 1)) # Destroy router_L - this will cause all connections to drop router_L.teardown() @@ -178,14 +209,321 @@ def test_02_drop_old(self): 'sslProfile': 'ListenerSslProfile'})], data_conn_count, wait=True) router_C.wait_router_connected("RouterL2") + router_L.wait_router_connected("RouterC") # expect only those connectors with ordinal == 3 are restored self.wait_inter_router_conns(router_C, data_conn_count + 1) + self.wait_inter_router_conns(router_L, data_conn_count + 1) time.sleep(1.0) # ensure no extra conns come up irc = router_C.get_inter_router_conns() - self.assertEqual(data_conn_count + 1, len(irc), f"Wrong conns: {irc}") - self.assertEqual(0, len([c for c in irc if c['tlsOrdinal'] != 3]), - f"tlsOrdinals !=3: {irc}") + irc.extend(router_L.get_inter_router_conns()) + self.assertEqual(2 * (data_conn_count + 1), + len([c for c in irc if c['groupOrdinal'] == 3]), + f"Unexpected conns: {irc}") + + router_L.teardown() + router_C.teardown() + + def test_03_tcp_streams(self): + """ + Verify that existing TCP streams are not interrupted when new + inter-router connections are established. + + This test sets up several TCP streaming connections through two + routers. It then does a certificate rotation and verifies that the + streams have not failed. + + It then creates another set of TCP streaming connections. It verifies + that these streams are sent over the upgraded connections. + + Lastly it expires the original certificates and verifies that the first + set of streaming TCP sessions have been dropped. It also verifies that + the second set of streaming TCP sessions are still active. + """ + data_conn_count = 4 + inter_router_port = self.tester.get_port() + tcp_listener_port_1 = self.tester.get_port() + tcp_listener_port_2 = self.tester.get_port() + tcp_connector_port_1 = self.tester.get_port() + tcp_connector_port_2 = self.tester.get_port() + + router_L = self.router("RouterL", + [('sslProfile', {'name': 'ListenerSslProfile', + 'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('listener', {'name': 'Listener01', + 'role': 'inter-router', + 'host': '0.0.0.0', + 'port': inter_router_port, + 'requireSsl': 'yes', + 'sslProfile': 'ListenerSslProfile'}), + ('tcpListener', {'name': 'tcpListener01', + 'address': 'tcp/streaming/1', + 'port': tcp_listener_port_1}), + ('tcpListener', {'name': 'tcpListener02', + 'address': 'tcp/streaming/2', + 'port': tcp_listener_port_2})], + data_conn_count, wait=False) + router_C = self.router("RouterC", + [('sslProfile', {'name': "ConnectorSslProfile", + 'ordinal': 0, + 'oldestValidOrdinal': 0, + 'caCertFile': CA_CERT, + 'certFile': CLIENT_CERTIFICATE, + 'privateKeyFile': CLIENT_PRIVATE_KEY, + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), + ('connector', {'role': 'inter-router', + 'host': 'localhost', + 'port': inter_router_port, + 'verifyHostname': 'yes', + 'sslProfile': 'ConnectorSslProfile'}), + ('tcpConnector', {'name': 'tcpConnector01', + 'address': 'tcp/streaming/1', + 'host': 'localhost', + 'port': tcp_connector_port_1}), + ('tcpConnector', {'name': 'tcpConnector02', + 'address': 'tcp/streaming/2', + 'host': 'localhost', + 'port': tcp_connector_port_2})], + data_conn_count, wait=True) + router_C.wait_router_connected("RouterL") + router_L.wait_router_connected("RouterC") + + # wait for all the inter-router data connections and the TCP listener + # ports to come up + self.wait_inter_router_conns(router_L, data_conn_count + 1) + wait_tcp_listeners_up(router_L.addresses[0]) + + # Verify all inter-router conns on Router_C are based on the same + # tlsOrdinal, which is zero. + ir_conns = router_C.get_inter_router_conns() + for ir_conn in ir_conns: + self.assertEqual(0, ir_conn['tlsOrdinal']) + + # This test allows the certificate rotation to complete before expiring + # the old inter-router connections. Therefore we expect that the + # router's topology does not change during this test. Let the topology + # settle before continuting the test. Using the default flux_interval + # which should be "long enough" (fingers crossed) + flux_interval = 4.1 # wait a bit longer than the interval to prevent races + last_topo_C = router_C.get_last_topology_change() + last_topo_L = router_L.get_last_topology_change() + deadline = time.time() + flux_interval + while deadline > time.time(): # test will timeout on failure + time.sleep(0.1) + topo_C = router_C.get_last_topology_change() + topo_L = router_L.get_last_topology_change() + if topo_C != last_topo_C or topo_L != last_topo_L: + last_topo_C = topo_C + last_topo_L = topo_L + deadline = time.time() + flux_interval + + # start TCP streaming connections across the routers + tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port_1), + server_addr=('0.0.0.0', tcp_connector_port_1), + client_count=10, poll_timeout=0.2) + + # Now wait until the streaming client have connected and traffic is + # being sent + ok = retry(lambda: tcp_streamer.active_clients == 10) + self.assertTrue(ok, f"Streaming clients failed {tcp_streamer.active_clients}") + begin_recv = tcp_streamer.bytes_received + ok = retry(lambda: tcp_streamer.bytes_received > begin_recv) + self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}") + + # Expect 2 streaming links per TCP flow (links are uni-directional) + self.assertEqual(20, len(router_L.get_active_inter_router_data_links()), + f"Failed to get 20 links: {router_L.get_active_inter_router_data_links()}") + + # Now rotate the certs: update tlsOrdinal to 3 + router_C.management.update(type=SSL_PROFILE_TYPE, + attributes={'ordinal': 3}, + name='ConnectorSslProfile') + + # wait until the new control links are active and all the data + # connections have established + ok = self.wait_control_links(router_C, 3) + self.assertTrue(ok, f"Bad control links: {router_C.get_active_inter_router_control_links()}") + ok = self.wait_control_links(router_L, 3) + self.assertTrue(ok, f"Bad control links: {router_L.get_active_inter_router_control_links()}") + self.wait_inter_router_conns(router_L, 2 * (data_conn_count + 1)) + + # verify that the streamer is still running and the streams are still passing traffic + begin_recv = tcp_streamer.bytes_received + ok = retry(lambda: tcp_streamer.bytes_received > begin_recv) + self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}") + self.assertTrue(tcp_streamer.is_alive, "Streamer has failed!") + + # Now create a new streamer. Its TCP flows should use the new + # inter-router-data links + new_tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port_2), + server_addr=('0.0.0.0', tcp_connector_port_2), + client_count=4, poll_timeout=0.2) + ok = retry(lambda: new_tcp_streamer.active_clients == 4) + self.assertTrue(ok, f"Streaming clients failed {new_tcp_streamer.active_clients}") + begin_recv = new_tcp_streamer.bytes_received + ok = retry(lambda: new_tcp_streamer.bytes_received > begin_recv) + self.assertTrue(ok, f"Failed to stream data {new_tcp_streamer.bytes_received}") + + # Expect an additional 2 streaming links per TCP flow (links are uni-directional) + self.assertEqual(28, len(router_L.get_active_inter_router_data_links()), + f"Failed to get 28 links: {router_L.get_active_inter_router_data_links()}") + + # Now expire the old inter-router connections by setting the + # oldestValidOrdinal to 3. Expect the connections that carry the + # old streaming data to close. + router_C.management.update(type=SSL_PROFILE_TYPE, + attributes={'oldestValidOrdinal': 3}, + name='ConnectorSslProfile') + self.wait_inter_router_conns(router_C, data_conn_count + 1) + ok = retry(lambda: tcp_streamer.is_alive is False) + self.assertTrue(ok, "Failed to terminate the streamer") + tcp_streamer.join() + + # Verify that the new TCP flows are still actively passing data + self.assertEqual(4, new_tcp_streamer.active_clients, + f"New flows failed: {new_tcp_streamer.active_clients}") + begin_recv = new_tcp_streamer.bytes_received + ok = retry(lambda: new_tcp_streamer.bytes_received > begin_recv) + self.assertTrue(ok, f"Streaming data failed {new_tcp_streamer.bytes_received}") + new_tcp_streamer.join() + + # Verify that the remaining inter-router conns (both data and control) + # share the group ordinal value (currently the same as the + # connector-side tlsOrdinal - may change in the future) + ir_conns = router_C.get_inter_router_conns() + ir_conns.extend(router_L.get_inter_router_conns()) + for ir_conn in ir_conns: + self.assertEqual(3, ir_conn['groupOrdinal'], f"Wrong ordinal {ir_conn}") + + # Lastly check that neither router has seen a topology change: + self.assertEqual(last_topo_C, router_C.get_last_topology_change(), + "Unexpected topology change for RouterC") + self.assertEqual(last_topo_L, router_L.get_last_topology_change(), + "Unexpected topology change for RouterL") + + router_L.teardown() + router_C.teardown() + + def test_04_rotate_storm(self): + """ + Similar to test_03_tcp_streams but stresses the router by requesting + back to back rotations while creating new TCP streams. This test does + not wait for inter-router connections to settle before expiring them. + """ + data_conn_count = 4 + inter_router_port = self.tester.get_port() + tcp_listener_port_1 = self.tester.get_port() + tcp_listener_port_2 = self.tester.get_port() + tcp_connector_port_1 = self.tester.get_port() + tcp_connector_port_2 = self.tester.get_port() + + router_L = self.router("RouterL", + [('sslProfile', {'name': 'ListenerSslProfile', + 'caCertFile': CA_CERT, + 'certFile': SERVER_CERTIFICATE, + 'privateKeyFile': SERVER_PRIVATE_KEY, + 'password': SERVER_PRIVATE_KEY_PASSWORD}), + ('listener', {'name': 'Listener01', + 'role': 'inter-router', + 'host': '0.0.0.0', + 'port': inter_router_port, + 'requireSsl': 'yes', + 'sslProfile': 'ListenerSslProfile'}), + ('tcpListener', {'name': 'tcpListener01', + 'address': 'tcp/streaming/1', + 'port': tcp_listener_port_1}), + ('tcpListener', {'name': 'tcpListener02', + 'address': 'tcp/streaming/2', + 'port': tcp_listener_port_2})], + data_conn_count, wait=False) + router_C = self.router("RouterC", + [('sslProfile', {'name': "ConnectorSslProfile", + 'ordinal': 0, + 'oldestValidOrdinal': 0, + 'caCertFile': CA_CERT, + 'certFile': CLIENT_CERTIFICATE, + 'privateKeyFile': CLIENT_PRIVATE_KEY, + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), + ('connector', {'role': 'inter-router', + 'host': 'localhost', + 'port': inter_router_port, + 'verifyHostname': 'yes', + 'sslProfile': 'ConnectorSslProfile'}), + ('tcpConnector', {'name': 'tcpConnector01', + 'address': 'tcp/streaming/1', + 'host': 'localhost', + 'port': tcp_connector_port_1}), + ('tcpConnector', {'name': 'tcpConnector02', + 'address': 'tcp/streaming/2', + 'host': 'localhost', + 'port': tcp_connector_port_2})], + data_conn_count, wait=True) + router_C.wait_router_connected("RouterL") + router_L.wait_router_connected("RouterC") + + # wait for all the inter-router connections and the TCP listener ports + # to come up + self.wait_inter_router_conns(router_L, data_conn_count + 1) + wait_tcp_listeners_up(router_L.addresses[0]) + + # start TCP streaming connections across the routers + tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port_1), + server_addr=('0.0.0.0', tcp_connector_port_1), + client_count=20, poll_timeout=0.2) + + # do several back to back rotations while the connections are coming + # up. Do not wait for anything to stabilize between updates + + for tls_ordinal in range(1, 11): # max ordinal == 10 + router_C.management.update(type=SSL_PROFILE_TYPE, + attributes={'ordinal': tls_ordinal}, + name='ConnectorSslProfile') + + # Immediately teardown all new connections but the last one (ordinal == + # 10) + router_C.management.update(type=SSL_PROFILE_TYPE, + attributes={'oldestValidOrdinal': 10}, + name='ConnectorSslProfile') + + # Wait for the carnage to subside by waiting until the control links + # have all closed with the exception of two links for ordinal 10. + ok = self.wait_control_links(router_C, 10) + self.assertTrue(ok, f"Bad control links: {router_C.get_active_inter_router_control_links()}") + ok = self.wait_control_links(router_L, 10) + self.assertTrue(ok, f"Bad control links: {router_L.get_active_inter_router_control_links()}") + + # wait for all data conns to come up and verify all inter-router conns + # have the same group ordinal + self.wait_inter_router_conns(router_L, data_conn_count + 1) + self.wait_inter_router_conns(router_C, data_conn_count + 1) + ir_conns = router_C.get_inter_router_conns() + ir_conns.extend(router_L.get_inter_router_conns()) + for ir_conn in ir_conns: + self.assertEqual(10, ir_conn['groupOrdinal'], f"Wrong ordinal {ir_conn}") + + # This test aggressively tears down inter-router connections without + # waiting for them to complete connection to the peer. Therefore it is + # likely the routing path was momentarily lost. Ensure the routers are + # visible to each other before starting new flows: + router_C.wait_router_connected("RouterL") + router_L.wait_router_connected("RouterC") + + # Test the inter-router path by firing up more TCP client flows + new_tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port_2), + server_addr=('0.0.0.0', tcp_connector_port_2), + client_count=4, poll_timeout=0.2) + ok = retry(lambda: new_tcp_streamer.active_clients == 4) + self.assertTrue(ok, f"Streaming clients failed {new_tcp_streamer.active_clients}") + begin_recv = new_tcp_streamer.bytes_received + ok = retry(lambda: new_tcp_streamer.bytes_received > begin_recv) + self.assertTrue(ok, f"Failed to stream data {new_tcp_streamer.bytes_received}") + + tcp_streamer.join() + new_tcp_streamer.join() + router_L.teardown() router_C.teardown() diff --git a/tests/tcp_streamer.py b/tests/tcp_streamer.py new file mode 100755 index 000000000..a6d48bbe5 --- /dev/null +++ b/tests/tcp_streamer.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import argparse +import logging +import selectors +import socket +import sys +from threading import Thread +from system_test import TIMEOUT + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.WARNING) + + +class TcpStreamer: + """ + A test tool that establishes N TCP connections across the router. Data is + continually passed across these links at a very slow rate. The intent of + this tool is to verify that TCP streams are preserved while router + operations are performed. + """ + def __init__(self, client_addr, server_addr, client_count, poll_timeout=1.0): + self.client_addr = client_addr + self.server_addr = server_addr + self.client_count = client_count # two sockets will be created per client + self.selector = selectors.DefaultSelector() + self.poll_timeout = poll_timeout + self.clients = [] + self._bytes_received = 0 + self._shutdown = False + logger.debug("TcpStreamer created host=%s listen=%s", + f"{self.client_addr[0]}:{self.client_addr[1]}", + f"{self.server_addr[0]}:{self.server_addr[1]}") + + def run(self): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.setblocking(True) + server.bind(self.server_addr) + server.listen() + self.selector.register(server, selectors.EVENT_READ, self.on_accept) + + for index in range(self.client_count): + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + client.setblocking(True) + while True: + try: + client.connect(self.client_addr) + break + except (ConnectionRefusedError, ConnectionAbortedError): + pass + self.selector.register(client, selectors.EVENT_WRITE, self.on_event) + self.clients.append(client) + + logger.debug("TcpStreamer begin sending traffic") + while self.clients and not self._shutdown: + events = self.selector.select(timeout=self.poll_timeout) + if events: + for key, mask in events: + cb = key.data + cb(key.fileobj, mask) + else: + for client in self.clients: + client.send(b'!') + logger.debug("TcpStreamer end sending traffic") + + for client in self.clients: + self.selector.unregister(client) + client.shutdown(socket.SHUT_RDWR) + client.close() + self.selector.unregister(server) + self.selector.close() + + def on_accept(self, server, mask): + conn, addr = server.accept() + logger.debug("TcpStreamer accepting %s", str(addr)) + self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.on_event) + self.clients.append(conn) + + def on_event(self, conn, mask): + if mask & selectors.EVENT_WRITE == selectors.EVENT_WRITE: + conn.send(b'?') + self.selector.modify(conn, selectors.EVENT_READ, self.on_event) + + if mask & selectors.EVENT_READ == selectors.EVENT_READ: + data = conn.recv(10) + if data: + self._bytes_received += len(data) + else: + logger.debug("TcpStreamer closing %s", str(conn.getsockname())) + self.selector.unregister(conn) + self.clients.remove(conn) + conn.close() + + @property + def bytes_received(self): + return self._bytes_received + + @property + def active_clients(self): + return len(self.clients) / 2 # two sockets per client + + def shutdown(self): + self._shutdown = True + + +class TcpStreamerThread: + """ + A wrapper for running the TCP streamer in a thread. + """ + def __init__(self, **kwargs): + self._streamer = TcpStreamer(**kwargs) + self._thread = Thread(target=self._run) + self._thread.daemon = True + self._thread.start() + + def _run(self): + self._streamer.run() + + def join(self, timeout=TIMEOUT): + self._streamer.shutdown() + self._thread.join(timeout) + if self._thread.is_alive(): + raise Exception("TcpStreamer failed to join!") + + @property + def bytes_received(self): + return self._streamer.bytes_received + + @property + def active_clients(self): + if self.is_alive is False: + return 0 + return self._streamer.active_clients + + @property + def is_alive(self): + return self._thread.is_alive() + + +def main(argv): + p = argparse.ArgumentParser() + p.add_argument('--host', help="Host to connect to [127.0.0.1:10000]", default='127.0.0.1:10000') + p.add_argument('--listen', help="Address to listen on [0.0.0.0:20000]", default='0.0.0.0:20000') + p.add_argument('--count', help="Total clients to create", default=1, type=int) + + del argv[0] + args = p.parse_args(argv) + + if args.host is None: + raise Exception("NO HOST") + if args.listen is None: + raise Exception("NO LISTEN ADDRESS") + + host_address = args.host.split(':') + listen_address = args.listen.split(':') + streamer = TcpStreamer((host_address[0], int(host_address[1])), + (listen_address[0], int(listen_address[1])), + args.count) + try: + streamer.run() + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) From bf711c9116f552829d82bcbe6084f5f0644198c9 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 21 Mar 2025 09:25:13 -0400 Subject: [PATCH 2/3] fixup: review feedback --- src/router_core/connections.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 58eae7125..f743f6556 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -1505,7 +1505,7 @@ static void qdr_connection_group_setup_CT(qdr_core_t *core, qdr_connection_t *co if (strcmp(core->group_correlator_by_maskbit[mask_bit], correlator) == 0) { qdr_connection_t *old_conn = core->rnode_conns_by_mask_bit[mask_bit]; qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, - "CGROUP duplicate group control connection detected: correlator=%s conns=[C%"PRIu64"],[C%"PRIu64"]", + "Connection group duplicate correlator detected: correlator=%s conns=[C%"PRIu64"],[C%"PRIu64"]", correlator, conn->identity, old_conn ? old_conn->identity : 0); if (old_conn) { // This is what is called a "hail mary pass" - google it - in an attempt to recover from this mess @@ -1933,7 +1933,7 @@ static void qdr_attach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, // if (!!conn->control_links[QD_INCOMING] && !!conn->control_links[QD_OUTGOING]) { // - // If this connection is pending upgrade we can now to the switchover. + // If this connection is pending upgrade we can now do the switchover. // if (qd_bitmask_valid_bit_value(conn->mask_bit) && core->pending_rnode_conns_by_mask_bit[conn->mask_bit] == conn) { qdr_connection_t *old_conn = core->rnode_conns_by_mask_bit[conn->mask_bit]; From 45fbb42af7f6587f83d456bf83628efe3005d36f Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 28 Mar 2025 11:52:51 -0400 Subject: [PATCH 3/3] fixup: document the connection upgrade implementation --- docs/notes/connection-upgrade.md | 177 +++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 docs/notes/connection-upgrade.md diff --git a/docs/notes/connection-upgrade.md b/docs/notes/connection-upgrade.md new file mode 100644 index 000000000..9e297312d --- /dev/null +++ b/docs/notes/connection-upgrade.md @@ -0,0 +1,177 @@ + + + + + + + + + + + + + + + + + +# Inter-Router Connection Upgrade + +## Introduction + +The router supports a mechanism that allows new inter-router +connections to replace existing connections without disturbing any +message flows in-flight. Once the new connections are established new +message flows will be routed over those new connections only. New +flows will never be forwarded over the older pre-existing connections. +Older connections are not torn down. They are preserved to allow older +message flows to continue without interruption. + +This capability is useful for those situations where the properties of +the inter-router connections need to be modified on the fly. For +example this allows an operator to update TLS credentials used for +inter-router connections without disturbing existing traffic. + +## Implementation + +It is a goal to preserve existing message flows during connection +upgrade. In order to do this the connection upgrade process should +avoid causing topology recomputations. Connection upgrade does not +require a topology change because there is no effective changes to the +routing path: the new connections are connecting to the same peer +router as the connections to be upgraded. + +This implementation avoids changing any routing state in the routing +engine. The connection upgrade process is done entirely within the +router core module. The router core basically hot swaps the +connections being upgraded in the forwarding data structures without +notifying the routing engine that the connections have changed. + +### Inter-Router Connector + +The inter-router connector is responsible for establishing +inter-router connections. A connector will open one connection of role +"inter-router" that serves as the control connection. The control +connection establishes the bi-directional control links that carry +HELLO routing protocol messages. In addition to the inter-router +control connection the connector will establish zero or more +connections of role "inter-router-data". The inter-router-data +connections are dedicated to forwarding streaming message flows. + +The connector labels all of its connections with a +"correlation-id". The correlation-id is used by the router core (on +both routers) to group all connections originating from the connector. +Since all of these connections are to the same peer router the core +considers the connection group a single "path" for routing purposes. + +The correlation-id is globally unique and does not change for the life +of the connector. + +### The Group Ordinal + +The connector also labels each connection with an integer value called +the "group ordinal". Initially, all connections in the group have the +same group ordinal value. + +However the group ordinal can be advanced by a management operation +(like certificate rotation). When the connector's group ordinal is +changed it triggers the connector to establish a new set of +connections. This new set of connections are labelled with the new +group ordinal value. + +Note that the group ordinal value can only be numerically +increased. In other words a group ordinal value will never be set to a +value less than its current value. + +The group ordinal is used by the connection upgrade process. During +connection upgrade the router core needs some way of determining that +a new connection replaces another existing connection. To do this the +router core compairs the group ordinal values associated with the +connections. + +The group ordinal effectively becomes the connections precedence. New +message flows are always routed over those connections with the +numerically highest group ordinal values within the connection +group. Those connections with lower group ordinal values remain up, +but no longer receive new message flows. + +### The Upgrade Process + +When an event occurs that requires inter-router connection upgrade - +such as certificate rotation - the connector's group ordinal value is +increased and the connector opens a new set of inter-router control +and data connections. These new connections are labelled with the +latest (and numerically greatest) group ordinal value. + +The router core is notified when these connections activate. The core +uses the correlation-id to determine which group the connection +belongs to. The core then checks the new connection's group +ordinal. If the new connection's ordinal is greater than the ordinals +of other connections in the group the core begins transitioning the +forwarder to the new higher valued connections. + +The core cannot perform the transition until: + +* a new higher-ordinal connection with the role of inter-router control has opened +* the control links on the new inter-router control connection have attached + +Until those two events happen the core will store all new higher +ordinal inter-router connections in a pending state. + +On the connector-side router when a new higher-ordinal control +connection has opened the core will initiate all AMQP links carried by +a control connection. This includes the control links and all +priority-based non-streaming message links. + +Once the core is notified that both control links are attached the +core performs the upgrade: + +* The new control connection uses the same "mask-bit" link identifer as the old control connection. +* The new control connection becomes the group "parent", replacing the old control connection. +* The new control connection replaces the old control connection in the core's next-hop forwarding table. +* The control links on the old control connection are detached. + +All new inter-router data connections with the same group ordinal as +the parent control connection are added to the parent's data +connection list. Since these connections are created asynchronously +their addition to the parent may happen after the upgrade process +completes. + +At this point the upgrade process is complete. New forwarding requests +will find the highest ordinal connection in the next hop forwarding +table and be forwarded across these connection. + +### Extra Credit: The Inter-router Link Identifier + +The link identifer - often referred in the code as the "mask bit" - is +used to identify a logical path between the current router and a +next-hop peer. In this case the term "link" has no relationship to an +AMQP link. For routing purposes the term "link" means the data path +between two entities. In the case of the router the link identifier +"identifies" the concrete connection group that comprises the message +flow between two routers. + +The term "mask bit" relates to an implementation detail: the router +code manages the pool of free link identifiers using a bitmask +utility. + +The link identifier serves as an abstraction for the routing +engine. Rather than having the routing engine deal with the complexity +of managing multiple connections to the same next hop (the connection +group), the router simply uses the link identifier to represent the +group as a whole. This is valid since all connections in the group +terminate at the same peer router. + +Therefore all path-related state shared between the routing engine and +any other part of the core (like the forwarder) use only the link +identifier. + +This is how the connection upgrade process can adopt a new set of +connections between routers without triggering routing topology +re-computation. The new control connection does not get a new link +identifer. Instead it takes the link identifer from the old control +connection that it replaces. This means that the routing engine sees +no change of state for that link and does not trigger a routing +topology update. + +