Skip to content

Commit bef72e2

Browse files
committed
Issue #1748: Implement inter-router connection group upgrade
1 parent f026e00 commit bef72e2

File tree

9 files changed

+719
-191
lines changed

9 files changed

+719
-191
lines changed

python/skupper_router/management/skrouter.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,10 @@
14371437
"graph": true,
14381438
"description": "The total number of deliveries that have traversed this link."
14391439
},
1440+
"connectionId": {
1441+
"type": "integer",
1442+
"description": "The identity of the parent connection."
1443+
},
14401444
"presettledCount": {
14411445
"type": "integer",
14421446
"graph": true,

src/router_core/agent_connection.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const char *qdr_connection_columns[] =
101101
"meshId",
102102
"tlsOrdinal",
103103
"groupCorrelationId",
104+
"groupOrdinal",
104105
0};
105106

106107
const char *CONNECTION_TYPE = "io.skupper.router.connection";

src/router_core/connections.c

Lines changed: 366 additions & 143 deletions
Large diffs are not rendered by default.

src/router_core/forwarder.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
3939
ALLOC_DEFINE(qdr_forward_deliver_info_t);
4040

4141

42-
// get the control link for a given inter-router connection
42+
// get the outgoing control link for a given inter-router connection
4343
static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask)
4444
{
45-
return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0;
45+
qdr_connection_t *conn = (conn_mask >= 0) ? core->rnode_conns_by_mask_bit[conn_mask] : 0;
46+
return (!!conn) ? conn->control_links[QD_OUTGOING] : 0;
4647
}
4748

4849

@@ -54,11 +55,15 @@ static inline qdr_link_t *peer_router_data_link(qdr_core_t *core,
5455
if (conn_mask < 0 || priority < 0)
5556
return 0;
5657

58+
qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_mask];
59+
if (!conn)
60+
return 0;
61+
5762
// Try to return the requested priority link, but if it does
5863
// not exist, return the closest one that is lower.
5964
qdr_link_t * link = 0;
6065
while (1) {
61-
if ((link = core->data_links_by_mask_bit[conn_mask].links[priority]))
66+
if ((link = conn->data_links.link[priority]))
6267
return link;
6368
if (-- priority < 0)
6469
return 0;

src/router_core/route_tables.c

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,23 +240,18 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
240240

241241
core->neighbor_free_mask = qd_bitmask(1);
242242

243-
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
244-
core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
245-
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
246-
core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
243+
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
244+
core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
245+
core->pending_rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
246+
core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width());
247247
DEQ_INIT(core->unallocated_group_members);
248-
core->group_correlator_by_maskbit = NEW_PTR_ARRAY(char, qd_bitmask_width());
249248

250249
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
251-
core->routers_by_mask_bit[idx] = 0;
252-
core->control_links_by_mask_bit[idx] = 0;
253-
core->data_links_by_mask_bit[idx].count = 0;
254-
core->rnode_conns_by_mask_bit[idx] = 0;
255-
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
256-
core->data_links_by_mask_bit[idx].links[priority] = 0;
257-
}
258-
core->group_correlator_by_maskbit[idx] = (char*) malloc(QD_DISCRIMINATOR_SIZE);
259-
core->group_correlator_by_maskbit[idx][0] = '\0';
250+
core->routers_by_mask_bit[idx] = 0;
251+
core->rnode_conns_by_mask_bit[idx] = 0;
252+
core->pending_rnode_conns_by_mask_bit[idx] = 0;
253+
core->group_correlator_by_maskbit[idx] = (char*) qd_malloc(QD_DISCRIMINATOR_SIZE);
254+
core->group_correlator_by_maskbit[idx][0] = '\0';
260255
}
261256
}
262257
}
@@ -441,7 +436,13 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard
441436
return;
442437
}
443438

444-
if (core->control_links_by_mask_bit[conn_maskbit] == 0) {
439+
qdr_connection_t *conn = core->rnode_conns_by_mask_bit[conn_maskbit];
440+
if (conn == 0) {
441+
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid conn reference: %d", conn_maskbit);
442+
return;
443+
}
444+
445+
if (conn->control_links[QD_OUTGOING] == 0) {
445446
qd_log(LOG_ROUTER_CORE, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit);
446447
return;
447448
}

src/router_core/router_core.c

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,10 @@ void qdr_core_free(qdr_core_t *core)
351351
assert(DEQ_IS_EMPTY(core->action_list_background));
352352
assert(DEQ_IS_EMPTY(core->streaming_connections));
353353

354-
if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
355-
if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
356-
if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
357-
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
358-
if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
354+
if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
355+
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
356+
if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
357+
if (core->pending_rnode_conns_by_mask_bit) free(core->pending_rnode_conns_by_mask_bit);
359358
if (core->group_correlator_by_maskbit) {
360359
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
361360
free(core->group_correlator_by_maskbit[idx]);
@@ -1063,11 +1062,10 @@ uint64_t qdr_identifier(qdr_core_t* core)
10631062
return id;
10641063
}
10651064

1066-
void qdr_reset_sheaf(qdr_core_t *core, uint8_t n)
1065+
1066+
void qdr_reset_sheaf(qdr_connection_t *conn)
10671067
{
1068-
qdr_priority_sheaf_t *sheaf = core->data_links_by_mask_bit + n;
1069-
sheaf->count = 0;
1070-
memset(sheaf->links, 0, QDR_N_PRIORITIES * sizeof(void *));
1068+
conn->data_links = (qdr_priority_sheaf_t) {0};
10711069
}
10721070

10731071

src/router_core/router_core_private.h

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,14 @@ void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr
635635
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
636636
const char *get_address_treatment_string(qd_address_treatment_t treatment);
637637

638+
// non-streaming inter-router links sorted by priority
639+
//
640+
typedef struct qdr_priority_sheaf_t {
641+
qdr_link_t *link[QDR_N_PRIORITIES];
642+
int count;
643+
} qdr_priority_sheaf_t;
644+
645+
638646
//
639647
// Connection Information
640648
//
@@ -682,26 +690,29 @@ struct qdr_connection_t {
682690
bool closed; // This bit is used in the case where a client is trying to force close this connection.
683691
uint8_t next_pri; // for incoming inter-router data links
684692
qdr_connection_role_t role;
685-
int inter_router_cost;
686693
qdr_conn_identifier_t *conn_id;
687694
qdr_conn_identifier_t *alt_conn_id;
688695
bool strip_annotations_in;
689696
bool strip_annotations_out;
697+
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
698+
bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
699+
int inter_router_cost;
690700
int link_capacity;
691-
int mask_bit; ///< set only if inter-router connection
701+
int mask_bit; ///< set only if inter-router control connection
702+
int group_parent_mask_bit; ///< if inter-router data connection maskbit of group parent inter-router control conn
692703
qdr_connection_work_list_t work_list;
693704
sys_mutex_t work_lock;
694705
qdr_link_ref_list_t links;
695706
qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
696707
qdr_connection_info_t *connection_info;
697708
void *user_context; /* Updated from IO thread, use work_lock */
709+
qdr_link_t *control_links[2]; // QD_LINK_CONTROL links [QD_INCOMING/QD_OUTGOING] (inter-router conn only)
710+
qdr_priority_sheaf_t data_links; // links for non-streaming messages (by priority) (inter-router conn only)
698711
qd_conn_oper_status_t oper_status;
699712
qd_conn_admin_status_t admin_status;
700713
qdr_error_t *error;
701714
uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
702715
uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
703-
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
704-
bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
705716
qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages
706717
const qd_policy_spec_t *policy_spec;
707718
qdr_connection_list_t connection_group; ///< List of associated connection group members
@@ -767,11 +778,6 @@ struct qdr_conn_identifier_t {
767778
qdr_auto_link_list_t auto_link_refs;
768779
};
769780

770-
typedef struct qdr_priority_sheaf_t {
771-
qdr_link_t *links[QDR_N_PRIORITIES];
772-
int count;
773-
} qdr_priority_sheaf_t;
774-
775781

776782
struct qdr_protocol_adaptor_t {
777783
DEQ_LINKS(qdr_protocol_adaptor_t);
@@ -882,14 +888,13 @@ struct qdr_core_t {
882888
qdr_address_t *router_addr_T;
883889
qdr_address_t *routerma_addr_T;
884890

885-
qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
886-
qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
887-
qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
888-
qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
889-
qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
890-
qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
891-
qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit)
892-
char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit
891+
qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
892+
qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
893+
qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
894+
qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
895+
qdr_connection_t **pending_rnode_conns_by_mask_bit; ///< higher precedence inter-router conns pending upgrade [conn->mask_bit]
896+
qdr_connection_list_t unallocated_group_members; ///< List of unallocated group members (i.e. before the group is given a maskbit)
897+
char **group_correlator_by_maskbit; ///< Group correlator number indexed by conn->maskbit
893898
uint64_t cost_epoch;
894899

895900
uint64_t next_tag;
@@ -1061,10 +1066,9 @@ void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer);
10611066
* Clears the sheaf of priority links in a connection.
10621067
* Call this when a connection is being closed, when the mask-bit
10631068
* for that sheaf is being returned to the core for re-use.
1064-
* @param core Pointer to the core object returned by qd_core()
1065-
* @param n uint8_t index for the sheaf to be reset prior to re-use.
1069+
* @param conn Pointer to the connection owning the sheaf
10661070
*/
1067-
void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
1071+
void qdr_reset_sheaf(qdr_connection_t *conn);
10681072

10691073
/**
10701074
* Run in an IO thread.

tests/system_tests_cert_rotation.py

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323

2424
import time
2525
from system_test import TestCase, main_module, Qdrouterd, unittest, retry
26-
from system_test import CA_CERT, SSL_PROFILE_TYPE, CONNECTION_TYPE
26+
from system_test import CA_CERT, SSL_PROFILE_TYPE, CONNECTION_TYPE, ROUTER_LINK_TYPE
2727
from system_test import CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD
2828
from system_test import SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD
29+
from tcp_streamer import TcpStreamerThread
2930

3031

3132
class InterRouterCertRotationTest(TestCase):
@@ -56,6 +57,23 @@ def wait_inter_router_conns(self, router, count):
5657
len(self.get_inter_router_conns(rtr)) == ct)
5758
self.assertTrue(ok, f"Failed to get {count} i.r. conns: {self.get_inter_router_conns(router)}")
5859

60+
def get_inter_router_data_conns(self, router):
61+
dconns = self.get_inter_router_conns(router)
62+
return [c for c in dconns if c['role'] == 'inter-router-data']
63+
64+
def get_links_by_conn_id(self, router, conn_id):
65+
mgmt = router.management
66+
links = mgmt.query(type=ROUTER_LINK_TYPE).get_dicts()
67+
return [link for link in links if link['connectionId'] == conn_id]
68+
69+
def get_streaming_data_links(self, router):
70+
ir_conns = self.get_inter_router_data_conns(router)
71+
links = []
72+
for conn in ir_conns:
73+
links.extend([link for link in self.get_links_by_conn_id(router, conn['identity'])
74+
if link['linkType'] == 'endpoint'])
75+
return links
76+
5977
def test_01_ordinal_updates(self):
6078
"""
6179
Verify that ordinal updates create new inter-router connections. Verify
@@ -194,6 +212,95 @@ def test_02_drop_old(self):
194212
router_L.teardown()
195213
router_C.teardown()
196214

215+
def test_03_tcp_streams(self):
216+
"""
217+
Verify that existing TCP streams are not interrupted when new
218+
inter-router connections are established.
219+
"""
220+
data_conn_count = 4
221+
inter_router_port = self.tester.get_port()
222+
tcp_listener_port = self.tester.get_port()
223+
tcp_connector_port = self.tester.get_port()
224+
225+
router_L = self.router("RouterL",
226+
[('sslProfile', {'name': 'ListenerSslProfile',
227+
'caCertFile': CA_CERT,
228+
'certFile': SERVER_CERTIFICATE,
229+
'privateKeyFile': SERVER_PRIVATE_KEY,
230+
'password': SERVER_PRIVATE_KEY_PASSWORD}),
231+
('listener', {'name': 'Listener01',
232+
'role': 'inter-router',
233+
'host': '0.0.0.0',
234+
'port': inter_router_port,
235+
'requireSsl': 'yes',
236+
'sslProfile': 'ListenerSslProfile'}),
237+
('tcpListener', {'name': 'tcpListener01',
238+
'address': 'tcp/streaming',
239+
'port': tcp_listener_port})],
240+
data_conn_count, wait=False)
241+
router_C = self.router("RouterC",
242+
[('sslProfile', {'name': "ConnectorSslProfile",
243+
'ordinal': 0,
244+
'oldestValidOrdinal': 0,
245+
'caCertFile': CA_CERT,
246+
'certFile': CLIENT_CERTIFICATE,
247+
'privateKeyFile': CLIENT_PRIVATE_KEY,
248+
'password': CLIENT_PRIVATE_KEY_PASSWORD}),
249+
('connector', {'role': 'inter-router',
250+
'host': 'localhost',
251+
'port': inter_router_port,
252+
'verifyHostname': 'yes',
253+
'sslProfile': 'ConnectorSslProfile'}),
254+
('tcpConnector', {'name': 'tcpConnector01',
255+
'address': 'tcp/streaming',
256+
'host': 'localhost',
257+
'port': tcp_connector_port})],
258+
data_conn_count, wait=True)
259+
router_C.wait_router_connected("RouterL")
260+
261+
# wait for the inter-router connections to come up
262+
self.wait_inter_router_conns(router_C, data_conn_count + 1)
263+
264+
# start TCP streaming connections across the routers
265+
tcp_streamer = TcpStreamerThread(client_addr=('localhost', tcp_listener_port),
266+
server_addr=('0.0.0.0', tcp_connector_port),
267+
client_count=10, poll_timeout=0.2)
268+
269+
# Now wait until the streams have established (2 links per client) and
270+
# traffic is passing
271+
ok = retry(lambda rtr=router_C:
272+
len(self.get_streaming_data_links(rtr)) == 20)
273+
self.assertTrue(ok, f"Failed to get 20 links: {self.get_streaming_data_links(router_C)}")
274+
begin_recv = tcp_streamer.bytes_received
275+
ok = retry(lambda: tcp_streamer.bytes_received > begin_recv)
276+
self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}")
277+
278+
# Now rotate the certs
279+
# update tlsOrdinal to 3 and wait for new conns to appear
280+
router_C.management.update(type=SSL_PROFILE_TYPE,
281+
attributes={'ordinal': 3},
282+
name='ConnectorSslProfile')
283+
self.wait_inter_router_conns(router_C, 2 * (data_conn_count + 1))
284+
285+
# verify that the streamer is still running and the streams are still passing traffic
286+
begin_recv = tcp_streamer.bytes_received
287+
ok = retry(lambda: tcp_streamer.bytes_received > begin_recv)
288+
self.assertTrue(ok, f"Failed to stream data {tcp_streamer.bytes_received}")
289+
self.assertTrue(tcp_streamer.is_alive, "Streamer has failed!")
290+
291+
# Update oldestValidOrdinal to 3. Expect the connections that carry the
292+
# streaming data to close
293+
router_C.management.update(type=SSL_PROFILE_TYPE,
294+
attributes={'oldestValidOrdinal': 3},
295+
name='ConnectorSslProfile')
296+
self.wait_inter_router_conns(router_C, data_conn_count + 1)
297+
ok = retry(lambda: tcp_streamer.is_alive is False)
298+
self.assertTrue(ok, "Failed to terminate the streamer")
299+
300+
router_L.teardown()
301+
router_C.teardown()
302+
tcp_streamer.join()
303+
197304

198305
if __name__ == '__main__':
199306
unittest.main(main_module())

0 commit comments

Comments
 (0)