DROP TYPE message_array_t ; DROP TYPE message_t ; -- demo message CREATE OR REPLACE TYPE message_t AS OBJECT ( nid NUMBER, vmsg VARCHAR2(1000), dtar DATE); / CREATE OR REPLACE PUBLIC SYNONYM message_t FOR hr.message_t; GRANT execute ON message_t TO public; CREATE OR REPLACE TYPE message_array_t AS TABLE OF message_t / CREATE OR REPLACE PUBLIC SYNONYM message_array_t FOR hr.message_array_t; GRANT execute ON message_array_t TO public; -- to clear queue definitions if needed begin dbms_aqadm.stop_queue(queue_name => 'RX_QUEUE'); dbms_aqadm.drop_queue(queue_name => 'RX_QUEUE'); dbms_aqadm.drop_queue_table(queue_table => 'RX_QUEUE_TABLE') ; end; / commit ; -- if you are not going to use an option do not enable it during queue definitions, every feature will have an additional cost begin dbms_aqadm.create_queue_table( queue_table => 'rx_queue_table', queue_payload_type => 'message_t', storage_clause => 'INITRANS 111 PCTFREE 10 PCTUSED 89 STORAGE(MINEXTENTS 5 FREELISTS 11 PCTINCREASE 0)', comment => 'Load queue table', compatible => '10.0', auto_commit => FALSE); end; / begin dbms_aqadm.create_queue( queue_name => 'rx_queue', queue_table => 'rx_queue_table', comment => 'Load Queue', auto_commit => FALSE); end; / begin dbms_aqadm.start_queue(queue_name => 'RX_QUEUE'); end; / commit ; CREATE OR REPLACE PACKAGE pkg_queue IS PROCEDURE prc_enqueue(usermsg message_t); PROCEDURE prc_dequeue(appname VARCHAR2 DEFAULT NULL); PROCEDURE prc_enqueue_array(usermsg message_array_t, varray_size NUMBER, msg_prop_array sys.dbms_aq.message_properties_array_t); PROCEDURE prc_dequeue_array(varray_size NUMBER, appname VARCHAR2 DEFAULT NULL); END pkg_queue; / CREATE OR REPLACE PACKAGE BODY pkg_queue IS PROCEDURE prc_do_commit AS BEGIN COMMIT; END prc_do_commit; PROCEDURE prc_enqueue(usermsg message_t) AS enq_msgid RAW(16); eopt dbms_aq.enqueue_options_t; mprop dbms_aq.message_properties_t; -- aprop sys.aq$_agent; vv_queue_name VARCHAR2(32) DEFAULT 'hr.rx_queue'; PRAGMA AUTONOMOUS_TRANSACTION; BEGIN dbms_aq.enqueue(vv_queue_name, eopt, mprop, usermsg, enq_msgid); prc_do_commit; -- dbms_output.put_line('Message was enqued to ' || vv_queue_name || ' : ' || -- rawtohex(enq_msgid)); END prc_enqueue; PROCEDURE prc_enqueue_array(usermsg message_array_t, varray_size NUMBER, msg_prop_array sys.dbms_aq.message_properties_array_t) AS eopt dbms_aq.enqueue_options_t; enq_msgid RAW(16); msgid_array dbms_aq.msgid_array_t; -- aprop sys.aq$_agent; vv_queue_name VARCHAR2(32) DEFAULT 'hr.rx_queue'; retval PLS_INTEGER; PRAGMA AUTONOMOUS_TRANSACTION; BEGIN retval := dbms_aq.enqueue_array(queue_name => vv_queue_name, enqueue_options => eopt, array_size => varray_size, message_properties_array => msg_prop_array, payload_array => usermsg, msgid_array => msgid_array); prc_do_commit; -- dbms_output.put_line('Message was enqued to ' || vv_queue_name || ' : ' || -- rawtohex(enq_msgid)); END prc_enqueue_array; PROCEDURE prc_dequeue(appname VARCHAR2 DEFAULT NULL) AS deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; payload_t message_t; -- q_on_hand PLS_INTEGER; no_messages EXCEPTION; PRAGMA EXCEPTION_INIT(no_messages, -25228); vv_queue_name VARCHAR2(16) DEFAULT 'hr.rx_queue'; piv_queue_wait NUMBER(1) DEFAULT 0; PRAGMA AUTONOMOUS_TRANSACTION; BEGIN dopt.consumer_name := appname; dopt.wait := piv_queue_wait; dbms_aq.dequeue(vv_queue_name, dopt, mprop, payload_t, deq_msgid); prc_do_commit; -- dbms_output.put_line('Message was dequed from ' || vv_queue_name || ' : ' || -- rawtohex(deq_msgid)); -- dbms_output.put_line('payload_t.nid : ' || payload_t.nid || ' - ' || -- 'payload_t.dtar : ' || payload_t.dtar || ' - ' || 'payload_t.vmsg : ' || payload_t.vmsg); EXCEPTION WHEN no_messages THEN NULL; -- dbms_output.put_line('No more messages in queue ' || vv_queue_name); END prc_dequeue; PROCEDURE prc_dequeue_array(varray_size NUMBER, appname VARCHAR2 DEFAULT NULL) AS deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; -- q_on_hand PLS_INTEGER; no_messages EXCEPTION; PRAGMA EXCEPTION_INIT(no_messages, -25228); vv_queue_name VARCHAR2(16) DEFAULT 'hr.rx_queue'; piv_queue_wait NUMBER(1) DEFAULT 0; msg_prop_array dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t(); payload_array message_array_t; msgid_array dbms_aq.msgid_array_t; retval PLS_INTEGER; PRAGMA AUTONOMOUS_TRANSACTION; BEGIN dopt.consumer_name := appname; dopt.wait := piv_queue_wait; retval := dbms_aq.dequeue_array(queue_name => vv_queue_name, dequeue_options => dopt, array_size => varray_size, message_properties_array => msg_prop_array, payload_array => payload_array, msgid_array => msgid_array); prc_do_commit; -- dbms_output.put_line('Message was dequed from ' || vv_queue_name || ' : ' || -- rawtohex(deq_msgid)); EXCEPTION WHEN no_messages THEN NULL; -- dbms_output.put_line('No more messages in queue ' || vv_queue_name); END prc_dequeue_array; END pkg_queue; / CREATE OR REPLACE PUBLIC SYNONYM pkg_queue FOR hr.pkg_queue; GRANT execute ON pkg_queue TO public; -- functional test single, if needed open dbms_output lines in the package declare usermsg message_t := message_t(123456790, 'TONGUC DEMO', SYSDATE); begin pkg_queue.prc_enqueue(usermsg => usermsg); end; / begin pkg_queue.prc_dequeue; end; / -- no message in queue test begin pkg_queue.prc_dequeue; end; / -- functional test array declare narray_size pls_integer := 10 ; usermsg message_array_t := message_array_t( message_t(123456790, 'TONGUC DEMO1', SYSDATE), message_t(123456790, 'TONGUC DEMO2', SYSDATE), message_t(123456790, 'TONGUC DEMO3', SYSDATE), message_t(123456790, 'TONGUC DEMO4', SYSDATE), message_t(123456790, 'TONGUC DEMO5', SYSDATE), message_t(123456790, 'TONGUC DEMO6', SYSDATE), message_t(123456790, 'TONGUC DEMO7', SYSDATE), message_t(123456790, 'TONGUC DEMO8', SYSDATE), message_t(123456790, 'TONGUC DEMO9', SYSDATE), message_t(123456790, 'TONGUC DEMO0', SYSDATE) ); msg_prop DBMS_AQ.message_properties_t; msg_prop_array DBMS_AQ.message_properties_array_t; begin msg_prop_array := DBMS_AQ.message_properties_array_t(msg_prop, msg_prop, msg_prop, msg_prop, msg_prop, msg_prop, msg_prop, msg_prop, msg_prop, msg_prop ); pkg_queue.prc_enqueue_array(usermsg, narray_size, msg_prop_array); end; / declare narray_size pls_integer := 10 ; begin pkg_queue.prc_dequeue_array(narray_size); end; /