1+ --
2+ -- This sample demonstrates how to create a TEQ using PL/SQL
3+ --
14
2- CREATE type Message_type as object (subject VARCHAR2 (30 ), text VARCHAR2 (80 ));
3- /
4- -- Creating an Object type queue
5- BEGIN
6- DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
7- queue_name => ' objType_TEQ' ,
8- storage_clause => null ,
9- multiple_consumers => true,
10- max_retries => 10 ,
11- comment => ' ObjectType for TEQ' ,
12- queue_payload_type => ' Message_type' ,
13- queue_properties => null ,
14- replication_mode => null );
15- DBMS_AQADM .START_QUEUE (queue_name=> ' objType_TEQ' , enqueue => TRUE, dequeue=> True);
16- END;
17- /
5+ -- There are various payload types supported, including user-defined object, raw, JMS and JSON.
6+ -- This sample uses the JSON payload type.
187
19- -- Creating a RAW type queue:
20- BEGIN
21- DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
22- queue_name => ' rawType_TEQ' ,
23- storage_clause => null ,
24- multiple_consumers => true,
25- max_retries => 10 ,
26- comment => ' RAW type for TEQ' ,
27- queue_payload_type => ' RAW' ,
28- queue_properties => null ,
29- replication_mode => null );
30- DBMS_AQADM .START_QUEUE (queue_name=> ' rawType_TEQ' , enqueue => TRUE, dequeue=> True);
31- END;
32- /
8+ -- Execute permission on dbms_aqadm is required.
339
34- -- Creating JSON type queue:
35- BEGIN
36- DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
37- queue_name => ' jsonType_TEQ' ,
38- storage_clause => null ,
39- multiple_consumers => true,
40- max_retries => 10 ,
41- comment => ' jsonType for TEQ' ,
42- queue_payload_type => ' JSON' ,
43- queue_properties => null ,
44- replication_mode => null );
45- DBMS_AQADM .START_QUEUE (queue_name=> ' jsonType_TEQ' , enqueue => TRUE, dequeue=> True);
46- END;
47- /
48- BEGIN
49- DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
50- queue_name => ' JAVA_TEQ_PUBSUB_QUEUE' ,
51- storage_clause => null ,
52- multiple_consumers=> true,
53- max_retries => 10 ,
54- comment => ' JAVA_TEQ_PUBSUB_QUEUE' ,
55- queue_payload_type=> ' JMS' ,
56- queue_properties => null ,
57- replication_mode => null );
58- DBMS_AQADM .START_QUEUE (queue_name=> ' JAVA_TEQ_PUBSUB_QUEUE' , enqueue => TRUE, dequeue=> True);
59- END;
10+ begin
11+ -- create the TEQ
12+ dbms_aqadm .create_transactional_event_queue (
13+ -- note, in Oracle 19c this is called create_sharded_queue() but has the same parameters
14+ queue_name => ' my_json_teq' ,
15+ queue_payload_type => ' JSON' ,
16+ -- when mutiple_consumers is true, this will create a pub/sub "topic" - the default is false
17+ multiple_consumers => true,
18+ max_retries => 10 ,
19+ comment => ' A TEQ with JSON payload'
20+ );
21+
22+ -- start the TEQ
23+ dbms_aqadm .start_queue (
24+ queue_name => ' my_json_teq' ,
25+ -- these two parameters control whether enqueueing and dequeueing will be allowed
26+ enqueue => true,
27+ dequeue => true
28+ );
29+ end;
6030/
61- DECLARE
62- subscriber sys .aq $_agent;
63- BEGIN
64- dbms_aqadm .add_subscriber (queue_name => ' objType_TEQ' , subscriber => sys .aq $_agent(' teqBasicObjSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicObjSubscriber' ' ' );
6531
66- dbms_aqadm .add_subscriber (queue_name => ' rawType_TEQ' , subscriber => sys .aq $_agent(' teqBasicRawSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicRawSubscriber' ' ' );
32+ --
33+ -- You may also want to create a subscriber for the TEQ, pub/sub topics normally deliver
34+ -- messages only when the consumer/subscriber is present.
35+ --
6736
68- dbms_aqadm .add_subscriber (queue_name => ' jsonType_TEQ' , subscriber => sys .aq $_agent(' teqBasicJsonSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicJsonSubscriber' ' ' );
69-
70- END;
71- /
72- CREATE OR REPLACE FUNCTION enqueueDequeueTEQ (subscriber varchar2 , queueName varchar2 , message Message_Typ) RETURN Message_Typ
73- IS
74- enqueue_options DBMS_AQ .enqueue_options_t ;
75- message_properties DBMS_AQ .message_properties_t ;
76- message_handle RAW(16 );
77- dequeue_options DBMS_AQ .dequeue_options_t ;
78- messageData Message_Typ;
79-
80- BEGIN
81- messageData := message;
82- message_properties .correlation := subscriber;
83- DBMS_AQ .ENQUEUE (
84- queue_name => queueName,
85- enqueue_options => enqueue_options,
86- message_properties => message_properties,
87- payload => messageData,
88- msgid => message_handle);
89- COMMIT ;
90- DBMS_OUTPUT .PUT_LINE (' ----------ENQUEUE Message: ' || ' ORDERID: ' || messageData .ORDERID || ' , OTP: ' || messageData .OTP || ' , DELIVERY_STATUS: ' || messageData .DELIVERY_STATUS );
91-
92- dequeue_options .dequeue_mode := DBMS_AQ .REMOVE ;
93- dequeue_options .wait := DBMS_AQ .NO_WAIT ;
94- dequeue_options .navigation := DBMS_AQ .FIRST_MESSAGE ;
95- dequeue_options .consumer_name := subscriber;
96- DBMS_AQ .DEQUEUE (
97- queue_name => queueName,
98- dequeue_options => dequeue_options,
99- message_properties => message_properties,
100- payload => messageData,
101- msgid => message_handle);
102- COMMIT ;
103- DBMS_OUTPUT .PUT_LINE (' ----------DEQUEUE Message: ' || ' ORDERID: ' || messageData .ORDERID || ' , OTP: ' || messageData .OTP || ' , DELIVERY_STATUS: ' || messageData .DELIVERY_STATUS );
104- RETURN messageData;
105- END;
106- /
107- EXIT;
37+ declare
38+ subscriber sys .aq $_agent;
39+ begin
40+ dbms_aqadm .add_subscriber (
41+ queue_name => ' my_json_teq' ,
42+ subscriber => sys .aq $_agent(
43+ ' my_subscriber' , -- the subscriber name
44+ null , -- address, only used for notifications
45+ 0 -- protocol
46+ ),
47+ rule => ' correlation = ' ' my_subscriber' ' '
48+ );
49+ end;
50+ /
0 commit comments