Using the AQ adapter to consume messages from the database

In this recipe, we will configure an inbound AQ adapter, so that it can be used to consume messages from a queue implemented in the database by Oracle AQ. The adapter will be used from a proxy service as shown in the following screenshot:

Using the AQ adapter to consume messages from the database

The queue EVENT_QUEUE that we will use has been set up with the OSB Cookbook standard environment using the following SQL and PL/SQL code:

--creating the object type, which defines payload 
CREATE OR REPLACE TYPE event_typ as OBJECT
(
   id NUMBER(19)
   , event_type VARCHAR2(30)
   , event_time TIMESTAMP
)

-- creating queue table
BEGIN
  sys.dbms_aqadm.create_queue_table(
           queue_table => 'EVENT_QUEUE_T',
           queue_payload_type => 'OSB_COOKBOOK.EVENT_TYP',
           sort_list => 'ENQ_TIME',
           compatible => '10.0.0',
           primary_instance => 0,
           secondary_instance => 0,
           storage_clause => 'tablespace USERS pctfree 10 initrans 1 maxtrans 255 storage ( initial 64K  minextents 1 maxextents unlimited )'),
END;
/

-- creating queue
BEGIN
  sys.dbms_aqadm.create_queue(
                queue_name => 'EVENT_QUEUE',
                queue_table => 'EVENT_QUEUE_T',
                queue_type => sys.dbms_aqadm.normal_queue,
                max_retries => 5,
                retry_delay => 0,
                retention_time => 0);
                
  sys.dbms_aqadm.start_queue( queue_name => 'EVENT_QUEUE', enqueue => TRUE, dequeue => TRUE);                
END;

Getting ready

For this recipe, we will use the database queue EVENT_QUEUE available with the OSB Cookbook standard environment. Make sure that a connection factory is set up for the AQ adapter configuration, as shown in the Introduction of this chapter.

You can import the OSB project containing the base setup for this recipe into Eclipse from chapter-7getting-readyusing-aq-adapter-to-dequeue-from-db.

How to do it...

First we create the AQ adapter, which will implement the dequeue functionality from the database. In JDeveloper perform, the following steps:

  1. Open the file composite.xml of the project DequeueFromDB.
  2. From the Component Palette, select the SOA components if not already done.
  3. Drag the AQ Adapter to the Exposed Services lane and click on Next.
  4. Enter DequeueEvent in the Service Name field and click on Next.
  5. Select the OsbCookbookConnection from the Connection listbox.
  6. Check that eis/aq/OsbCookbookConnection is the value of the JNDI Name field.
  7. Click on Next.
  8. On the Adapter Interface screen, select the Define from operation and schema (specified later) option and click on Next.
  9. On the Operation screen, select the Dequeue option and leave the value Dequeue for the Operation Name field.
  10. Click on Next.
  11. On the Queue Name screen, click on Browse.
  12. On the Select Queue screen, select EVENT_QUEUE and click on OK.
  13. Click on Next.
    How to do it...
  14. Leave the Queue Parameters screen as is and click on Next.
  15. On the Object Payload screen, select the Whole Object EVENT_TYP option and click on Next.
    How to do it...
  16. Click on Finish.
  17. Click the Save All button to save the SOA project.

    The definition of the AQ adapter for consuming messages from the EVENT_QUEUE is now ready. The work in JDeveloper is done by that and we can switch to Eclipse OEPE and perform the following steps to create a proxy service using the AQ adapter:

  18. On the OSB project hit Refresh (F5). The artifacts generated by JDeveloper in the previous steps will show up.
  19. Right-click on the DequeueEvent_aq.jca file and select Oracle Service Bus | Generate Service from the context menu.
  20. Select the proxy folder for the location of the proxy service.
  21. Leave the Service Name and WSDL name as suggested by the wizard.
  22. Click on OK.

    We now have a proxy service in place that uses the AQ adapter through the JCA transport. The interface of the proxy is defined by the DequeueEvent WSDL, which has been generated by the AQ adapter. Perform the following steps in Eclipse OEPE for implementing the message flow of the proxy:

  23. Navigate to the Message Flow tab of the DequeueEvent_aq proxy service.
  24. Insert a Pipeline Pair node and name it HandleEventPipelinePair.
  25. In the Request Pipeline insert a Stage node and name it HandleEventStage.
  26. Insert a Log action into the stage just created.
  27. On the Properties tab of the Log action, click on Expression and enter $body, $inbound into the Expression field, to log both the content of the body as well as the content of the inbound metadata.
  28. Click on OK.
  29. Select Warning for the Severity drop-down listbox.
  30. Deploy the project to the OSB server.
  31. The service can now be tested. In a SQL window (that is, within SQL Plus), execute the following PL/SQL code block:
    DECLARE
       enqueue_options dbms_aq.enqueue_options_t;
       message_properties dbms_aq.message_properties_t;
       msgid RAW(100);
       event event_typ;
    BEGIN
       event := event_typ(1, 'NEW_CUSTOMER', CURRENT_TIMESTAMP);
         
       dbms_aq.enqueue(queue_name => 'EVENT_QUEUE'
                    , enqueue_options => enqueue_options 
                    , message_properties => message_properties
                    , payload => event
                    , msgid => msgid);
    END;   
    /
    COMMIT;   

    The log statement showing the payload of the received message should be shown in the Service Bus Console window.

    How to do it...

How it works...

Using the AQ adapter allows us to implement a proxy service, which is automatically invoked when a message arrives in the queue on the database, implemented by using Oracle AQ.

Based on the object type payload defined on the database when creating the AQ queue, the AQ adapter is generating the corresponding XML schema. This schema is then used to define the WSDL for the DequeueEvent proxy service. When using the AQ adapter configuration to generate a proxy service, the proxy service will automatically get a WSDL-based interface, based on the generated WSDL of the JCA adapter. Therefore, the body variable will contain a message according to the XML schema generated by the AQ adapter.

Using the AQ adapter , we can offer an alternative way of processing events from the database. In the recipe, Using, DB, adapter, to, poll, for, changes, on, a, database, table, we have seen that the DB adapter can be used to poll for changes in a table.

Using this recipe and the AQ adapter, the same can be achieved in a more event-driven fashion, where a database trigger would be used on a table to enqueue a message into the EVENT_QUEUE whenever a change is applied to a database table.

Dequeuing of the message works in a transaction and can be combined with another modification on another transactional resource (such as a database table or JMS queue) by using an XA transaction.

There's more...

If we only want to consume messages which meet a specific condition, then the Dequeue Condition on the Queue Adapter screen of the AQ adapter wizard can be used.

The Dequeue Condition is a Boolean expression using a syntax similar to the WHERE clause of a SQL query, but without using the WHERE keyword. The expression can include conditions on message properties, user object payload data, and PL/SQL or SQL functions. Message properties include the columns of the queue table, such as priority and corrid.

If the Dequeue Condition is specified and no messages meet the condition, then no dequeue will happen.

To specify a Dequeue Condition on a message payload (object payload), use attributes of the object type in the clause. Each attribute must be prefixed with tab.user_data as a qualifier to indicate the specific column of the queue table that stores the payload.

The following screenshot shows a possible restriction on the recipe, so that only messages with event_type equals to UPD_CUSTOMER are consumed:

There's more...

See also

See the Chapter, Messaging with JMS transport, for how to use the WebLogic JMS server for implementing JMS messaging in the Oracle Service Bus.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset