egorius: (Default)
egorius ([personal profile] egorius) wrote2014-03-13 07:27 pm

Navigational Queueing

Развлекался тут с Advanced Queueing и нашел чудесную окаменелость.

Dequeue Options

navigation
The navigation attribute specifies the position of the dequeued message. If FIRST_MESSAGE is specified, then the first available message matching the search criteria is dequeued. If NEXT_MESSAGE is specified, then the next available message matching the search criteria is dequeued (the default).

Проявляется это, например, если пытаться выбирать сообщения из очереди, прикидываясь разными подписчиками:

ORA-25242: cannot change subscriber name from SUBSCRIBER1 to SUBSCRIBER2 without FIRST_MESSAGE option
*Cause:    An attempt was made to change the subscriber name while using the
           NEXT_MESSAGE or NEXT_TRANSACTION option for dequeuing.
*Action:   To use a subscriber name that is different from the previous
           dequeue call, reset the dequeuing position by using the
           FIRST_MESSAGE navigation option.

Ведь что получается? В недрах реляционки сидит, особо не скрываясь, пережиток навигационных баз данных и заставляет нас думать в терминах FIND FIRST, FIND NEXT!

Будь я постарше, мог бы испытать примерно то же, что испытал однажды, сохраняя файл на КПК. А так просто в тихом шоке.

Если вдруг кому интересно пощупать руками, то вот пример. Тип данных:

create or replace type Q_TYPE is object(
  text varchar2(1000)
);

Создание и запуск очереди:

begin
  dbms_aqadm.create_queue_table(
    queue_table        => 'QUEUE',
    queue_payload_type => 'Q_TYPE',
    multiple_consumers => true
  );
  dbms_aqadm.create_queue(
    queue_name         => 'QUEUE',
    queue_table        => 'QUEUE'
  );
  dbms_aqadm.start_queue(
    queue_name         => 'QUEUE'
  );
  dbms_aqadm.add_subscriber(
    queue_name         => 'QUEUE',
    subscriber         => sys.aq$_agent('subscriber1',null,null)
  );
  dbms_aqadm.add_subscriber(
    queue_name         => 'QUEUE',
    subscriber         => sys.aq$_agent('subscriber2',null,null)
  );
end;
/

Положим в очередь пару сообщений для двух подписчиков:

declare
  msg_id raw(16);
  enq_opt dbms_aq.enqueue_options_t;
  msg_prop dbms_aq.message_properties_t;
begin
  msg_prop.recipient_list(1) := sys.aq$_agent('subscriber1',null,null);
  dbms_aq.enqueue(
    queue_name         => 'QUEUE',
    enqueue_options    => enq_opt,
    message_properties => msg_prop,
    payload            => Q_TYPE('Hi first!'),
    msgid              => msg_id
  );
  msg_prop.recipient_list(1) := sys.aq$_agent('subscriber2',null,null);
  dbms_aq.enqueue(
    queue_name         => 'QUEUE',
    enqueue_options    => enq_opt,
    message_properties => msg_prop,
    payload            => Q_TYPE('Hi second!'),
    msgid              => msg_id
  );
  commit;
end;
/

Получение сообщений по очереди первым и вторым подписчиком. Чтобы заработало, надо убрать комментарии:

set serveroutput on
declare
  msg_id raw(16);
  deq_opt dbms_aq.dequeue_options_t;
  msg_prop dbms_aq.message_properties_t;
  msg Q_TYPE;
  
  aq_end_of_fetch exception;
  pragma exception_init(aq_end_of_fetch,-25228);
begin
  deq_opt.wait := 0;
  for i in 1 .. 2 loop
    deq_opt.consumer_name := 'subscriber'||to_char(i);
--    deq_opt.navigation := dbms_aq.first_message;
    loop
      begin
        dbms_aq.dequeue(
          queue_name         => 'QUEUE',
          dequeue_options    => deq_opt,
          message_properties => msg_prop,
          payload            => msg,
          msgid              => msg_id
        );
        dbms_output.put_line(deq_opt.consumer_name||' got '||msg.text);
--        deq_opt.navigation := dbms_aq.next_message;
      exception
        when aq_end_of_fetch then exit;
      end;
    end loop;
  end loop;
  commit;
end;
/