Skip to content

Commit 71e90b5

Browse files
authored
Fixes #1756: Remove qd_node_type_t struct and replaced its callbacks … (#1757)
* Fixes #1756: Remove qd_node_type_t struct and replaced its callbacks with direct calls to AMQP functions * Fixed indent of AMQP_link_closed_handler
1 parent 8d20e94 commit 71e90b5

File tree

6 files changed

+90
-167
lines changed

6 files changed

+90
-167
lines changed

docs/notes/overview.adoc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,7 @@ Interaction among:
203203

204204
* qd_server_t - proactor AMQP event batch handler
205205
* qd_container_t - dispatches events to AMQP callbacks
206-
** defines a set of callbacks to handler AMQP events
207-
*** qd_node_type_t in include/qpid/dispatch/container.h
208-
** callbacks are the interface to the dispatch library “application”
209206
* qd_router_t - router “application”
210-
** registers callbacks with the container via a “qd_node_type_t”
211207
** see the router_node structure in router_node.c
212208

213209
== Data plane: Proactor event handling

include/qpid/dispatch/amqp_adaptor.h

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ typedef struct pn_link_t pn_link_t;
3535
typedef struct qd_connection_t qd_connection_t;
3636
typedef struct qd_link_t qd_link_t;
3737
typedef struct qd_session_t qd_session_t;
38-
38+
typedef struct qd_router_t qd_router_t;
3939

4040
// For use by message.c
4141

@@ -53,4 +53,57 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn);
5353
// Used by the log module
5454
void qd_amqp_connection_set_tracing(bool enabled);
5555

56+
57+
//
58+
// AMQP handlers
59+
//
60+
/**
61+
* Invoked when an attach for a new outgoing (sending) link is received.
62+
*/
63+
int AMQP_outgoing_link_handler(qd_router_t *router, qd_link_t *link);
64+
/**
65+
* Invoked when a new or existing received delivery is available for processing.
66+
*/
67+
bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link);
68+
/**
69+
* Invoked when an existing delivery changes disposition or settlement state.
70+
*/
71+
void AMQP_disposition_handler(qd_router_t *router, qd_link_t *link, pn_delivery_t *pnd);
72+
/**
73+
* Invoked when an attach for a new incoming (receiving) link is received.
74+
*/
75+
int AMQP_incoming_link_handler(qd_router_t *router, qd_link_t *link);
76+
/**
77+
* Invoked when a link we created was opened by the peer
78+
*/
79+
int AMQP_link_attach_handler(qd_router_t *router, qd_link_t *link);
80+
/**
81+
* Invoked when link detached is received.
82+
*/
83+
int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link);
84+
/**
85+
* Invoked when an incoming connection (via listener) is opened.
86+
*/
87+
int AMQP_inbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context);
88+
/**
89+
* Invoked when an outgoing connection (via connector) is opened.
90+
*/
91+
int AMQP_outbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context);
92+
/**
93+
* Invoked when a connection is closed.
94+
*/
95+
int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void *context);
96+
/**
97+
* Invoked when an activated connection is available for writing.
98+
*/
99+
int AMQP_conn_wake_handler(qd_router_t *router, qd_connection_t *conn, void *context);
100+
/** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t
101+
* as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed
102+
* due to the parent connection/session closing or on shutdown.
103+
*/
104+
void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced);
105+
/**
106+
* Invoked when a link receives a flow event
107+
*/
108+
int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link);
56109
#endif

src/adaptors/amqp/amqp_adaptor.c

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include "qd_connection.h"
2323
#include "qd_listener.h"
2424
#include "container.h"
25-
#include "node_type.h"
2625

2726
#include "delivery.h"
2827
#include "dispatch_private.h"
@@ -331,7 +330,7 @@ static void clear_producer_activation(qdr_core_t *core, qdr_delivery_t *delivery
331330
}
332331

333332

334-
static int AMQP_conn_wake_handler(qd_router_t *router, qd_connection_t *conn, void *context)
333+
int AMQP_conn_wake_handler(qd_router_t *router, qd_connection_t *conn, void *context)
335334
{
336335
qdr_connection_t *qconn = (qdr_connection_t*) qd_connection_get_context(conn);
337336
int result = 0;
@@ -536,7 +535,7 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa
536535
* ready for rx processing. This will cause the container to immediately
537536
* re-call this function with the next delivery.
538537
*/
539-
static bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
538+
bool AMQP_rx_handler(qd_router_t *router, qd_link_t *link)
540539
{
541540
pn_link_t *pn_link = qd_link_pn(link);
542541

@@ -1003,7 +1002,7 @@ static void deferred_AMQP_rx_handler(void *context, bool discard)
10031002
/**
10041003
* Delivery Disposition Handler
10051004
*/
1006-
static void AMQP_disposition_handler(qd_router_t *router, qd_link_t *link, pn_delivery_t *pnd)
1005+
void AMQP_disposition_handler(qd_router_t *router, qd_link_t *link, pn_delivery_t *pnd)
10071006
{
10081007
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
10091008

@@ -1047,7 +1046,7 @@ static void AMQP_disposition_handler(qd_router_t *router, qd_link_t *link, pn_de
10471046
/**
10481047
* New Incoming Link Handler
10491048
*/
1050-
static int AMQP_incoming_link_handler(qd_router_t *router, qd_link_t *link)
1049+
int AMQP_incoming_link_handler(qd_router_t *router, qd_link_t *link)
10511050
{
10521051
qd_connection_t *conn = qd_link_connection(link);
10531052
uint64_t link_id;
@@ -1080,7 +1079,7 @@ static int AMQP_incoming_link_handler(qd_router_t *router, qd_link_t *link)
10801079
/**
10811080
* New Outgoing Link Handler
10821081
*/
1083-
static int AMQP_outgoing_link_handler(qd_router_t *router, qd_link_t *link)
1082+
int AMQP_outgoing_link_handler(qd_router_t *router, qd_link_t *link)
10841083
{
10851084
qd_connection_t *conn = qd_link_connection(link);
10861085
uint64_t link_id;
@@ -1111,7 +1110,7 @@ static int AMQP_outgoing_link_handler(qd_router_t *router, qd_link_t *link)
11111110
/**
11121111
* Handler for remote opening of links that we initiated.
11131112
*/
1114-
static int AMQP_link_attach_handler(qd_router_t *router, qd_link_t *link)
1113+
int AMQP_link_attach_handler(qd_router_t *router, qd_link_t *link)
11151114
{
11161115
qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
11171116
qdr_link_second_attach(qlink,
@@ -1126,7 +1125,7 @@ static int AMQP_link_attach_handler(qd_router_t *router, qd_link_t *link)
11261125
* Handler for flow events on links. Flow updates include session window
11271126
* state, which needs to be checked for unblocking Q3.
11281127
*/
1129-
static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
1128+
int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
11301129
{
11311130
pn_link_t *pnlink = qd_link_pn(link);
11321131
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
@@ -1236,7 +1235,7 @@ static void drain_link(qd_router_t *router, qd_link_t *qd_link)
12361235
/**
12371236
* Link Detached Handler
12381237
*/
1239-
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
1238+
int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
12401239
{
12411240
assert(link);
12421241

@@ -1270,7 +1269,7 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
12701269
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
12711270
* link has not properly closed (detach handshake completed).
12721271
*/
1273-
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
1272+
void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
12741273
{
12751274
assert(qd_link);
12761275

@@ -1786,21 +1785,21 @@ static bool parse_failover_property_list(qd_router_t *router, qd_connection_t *c
17861785
}
17871786

17881787

1789-
static int AMQP_inbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context)
1788+
int AMQP_inbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context)
17901789
{
17911790
AMQP_opened_handler(router, conn, true);
17921791
return 0;
17931792
}
17941793

17951794

1796-
static int AMQP_outbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context)
1795+
int AMQP_outbound_opened_handler(qd_router_t *router, qd_connection_t *conn, void *context)
17971796
{
17981797
AMQP_opened_handler(router, conn, false);
17991798
return 0;
18001799
}
18011800

18021801

1803-
static int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void *context)
1802+
int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void *context)
18041803
{
18051804
qdr_connection_t *qdrc = (qdr_connection_t*) qd_connection_get_context(conn);
18061805

@@ -1823,23 +1822,6 @@ static int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void
18231822
}
18241823

18251824

1826-
static const qd_node_type_t router_node = {"router", 0,
1827-
AMQP_rx_handler,
1828-
AMQP_disposition_handler,
1829-
AMQP_incoming_link_handler,
1830-
AMQP_outgoing_link_handler,
1831-
AMQP_conn_wake_handler,
1832-
AMQP_link_detach_handler,
1833-
AMQP_link_closed_handler,
1834-
AMQP_link_attach_handler,
1835-
AMQP_link_flow_handler,
1836-
0, // node_created_handler
1837-
0, // node_destroyed_handler
1838-
AMQP_inbound_opened_handler,
1839-
AMQP_outbound_opened_handler,
1840-
AMQP_closed_handler};
1841-
1842-
18431825
static void CORE_connection_activate(void *context, qdr_connection_t *conn)
18441826
{
18451827
qd_router_t *router = (qd_router_t*) context;
@@ -2361,7 +2343,7 @@ static void qd_amqp_adaptor_init(qdr_core_t *core, void **adaptor_context)
23612343
amqp_adaptor.dispatch = qd;
23622344
assert(qd->router); // ensure router has been initialized first
23632345
amqp_adaptor.router = qd->router;
2364-
amqp_adaptor.container = qd_container(qd->router, &router_node);
2346+
amqp_adaptor.container = qd_container(qd->router);
23652347
amqp_adaptor.adaptor = qdr_protocol_adaptor(core,
23662348
"amqp",
23672349
(void*) amqp_adaptor.router,

0 commit comments

Comments
 (0)