-- Создаем тип данных
create or replace type XX_QUEUE_TYPE is object
(
text varchar2(5)
)
-- Таблица для очереди
begin
dbms_aqadm.create_queue_table(
queue_table => 'XX_QUEUE_TBL',
queue_payload_type => 'XX_QUEUE_TYPE'
);
end;
-- выборка записей из таблицы
select * from XX_QUEUE_TBL;
-- автоматически созданной представление вида aq$*
select * from AQ$XX_QUEUE_TBL;
-- Собственно очередь
begin
dbms_aqadm.create_queue(
queue_name => 'XX_QUEUE_Q',
queue_table => 'XX_QUEUE_TBL'
);
end;
-- Запуск очереди
begin
dbms_aqadm.start_queue(
queue_name => 'XX_QUEUE_Q'
);
end;
-- Вставка 9 сообщений
declare
l_msg_id raw(32767);
l_enq_opt dbms_aq.enqueue_options_t;
l_msg_prop dbms_aq.message_properties_t;
begin
for i in 1..9 loop
dbms_aq.enqueue(
queue_name => 'XX_QUEUE_Q',
enqueue_options => l_enq_opt,
message_properties => l_msg_prop,
payload => xx_queue_type('TEST'||to_char(i)),
msgid => l_msg_id
);
end loop;
commit;
end;
-- проверяем, что внесли 9 сообщений
select * from AQ$XX_QUEUE_TBL;
-- Выборка 8 сообщений
declare
l_msg_id raw(16);
l_deq_opt dbms_aq.dequeue_options_t;
l_msg_prop dbms_aq.message_properties_t;
l_payload xx_queue_type;
begin
for i in 1..8 loop
dbms_aq.dequeue(
queue_name => 'XX_QUEUE_Q',
dequeue_options => l_deq_opt,
message_properties => l_msg_prop,
payload => l_payload,
msgid => l_msg_id
);
dbms_output.put_line('Got a message: '||l_payload.text);
end loop;
commit;
end;
-- проверяем, что осталось 1 сообщение
select * from AQ$XX_QUEUE_TBL;
-- очистка таблицы
declare
po dbms_aqadm.aq$_purge_options_t;
begin
po.block := FALSE;
DBMS_AQADM.PURGE_QUEUE_TABLE(
queue_table => 'XX_QUEUE_TBL',
purge_condition => NULL,
purge_options => po);
end;
-- проверяем, что осталось 0 сообщений
select * from AQ$XX_QUEUE_TBL;
/*остановка очереди,
удаление очереди,
удаление таблицы,
удаление типа */
begin
dbms_aqadm.stop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue_table(
queue_table => 'XX_QUEUE_TBL'
);
execute immediate 'drop type XX_QUEUE_TYPE';
end;