Chapter Eleven. Message Correlation

Introduction

This chapter focuses on Java CAPS facilities that can be used to implement solutions involving message correlation. Rather than discussing the details of correlation and implementation for each [EIP] pattern individually, this chapter presents the Java CAPS material necessary to understand how [EIP] patterns that call for correlation are implemented in Java CAPS.

Message correlation in Java CAPS, the facilities and techniques used to accomplish message correlation, correlation concepts, eInsight Business Process Execution Language (BPEL) Correlation services, and how correlation can be accomplished when eInsight is not available are discussed. Chapter 9, “Message Correlation,” in Part II (located on the accompanying CD-ROM), provides a number of detailed examples illustrating these topics.

Because configuration of BPEL Correlation is rather nonintuitive, this chapter walks you through discussion of various correlation scenarios starting with an incorrect, naïve implementation and progressing through correct, increasingly more complex implementations.

The chapter also introduces common Message Relationship patterns, implementation of which requires correlation. Specifically, the following Message Relationship patterns are discussed:

  • Header-Items-Trailer

  • Any Order Two Items

  • Any Order Two Items with Timeout

  • Items-Trailer

  • Header-Counted-Items

  • Counted and Timed Items

  • Scatter-Gather

Part II provides implementation examples of each Message Relationship pattern.

In the absence of an eInsight correlation service, correlation in Java CAPS can still be accomplished using other techniques. One such technique, using JMS with dynamic selectors, is introduced with illustration provided in Part II. The technique, which uses JMS API to manipulate JMS objects and which is discussed in detail in this chapter, is applied elsewhere to implement JMS Polling.

Along the way, discussion touches upon related [EIP] patterns like Scatter-Gather and Aggregator, which are discussed in Chapter 6, “Message Routing.”

Overview

The word correlation, used in statistics and similar sciences, denotes a degree of relationship between two or more variables. To the extent that it implies a continuous relationship between variables, it is totally inappropriate for describing that variables are, or are not, related, which is a strictly binary relationship. Be that as it may, Java CAPS has a number of features that can be used to implement correlation-based message processing. This chapter discusses both the technologies available in Java CAPS for message correlation and Java CAPS–based implementation of the common Message Relationship patterns.

Correlation is a process of collecting related messages, using specific values known as Correlation Identifiers within messages, in order to process them together.

In messaging systems, correlation is complex. It can be implemented in the messaging system itself, in the processing components, or in both. The parts of the integration solutions that perform message correlation are called correlation processors.

Correlation of multiple related messages implies that there is some “initial” message used to establish the Correlation Identifier value, and there are “subsequent” messages whose Correlation Identifier values will be the same as that of the initial message.

The relationship between the initial message and subsequent messages may be simple. The initial message happens to arrive before subsequent messages. Or it can be more complex. The initial message is somehow “special” among all messages with the same Correlation Identifier value. Processing of messages with the same Correlation Identifier value before receipt of the initial message is different from processing after it is received. These relationships could be described as Message Relationship patterns.

JMSCorrelationID

The JMS Message Service Specification [JMSSpec] provides for a number of predefined JMS Header fields, which are transmitted to JMS clients together with each message. Some of these fields are assigned by the sending client, others are assigned by the provider, and still others are assigned by the application that uses the JMS client.

JMSCorrelationID is one of the predefined JMS Message Header fields. Its value is essentially application-specific. The JMS Message Server will not assign a useful value to this field. In Java CAPS this field will carry the literal string “Sun-SeeBeyond” assigned to it by the JMS client code unless it is explicitly assigned a value by the application.

Despite the name given to this required JMS Message Header field, the JMS Specification [JMSSpec] does not describe any mechanism within a JMS provider that will implement correlation and does not require the use of the JMSCorrelationID field for anything. It is up to the application to assign a useful value to this field and implement a correlation mechanism.

The Sun SeeBeyond JMS Message Server does not implement a proprietary message correlation service.

Note

Note

For all intents and purposes, the JMS Correlation Identifier is simply another JMS Message Header field that can be used by the application. A user-defined JMS Message property can be used for message correlation just as readily as the JMSCorrelationID field.

eInsight Correlations

Business Process Execution Language for Web Services (BPEL4WS), implemented in Java CAPS eInsight, supports message correlation—the ability of a single Business Process instance to process multiple related messages based on the value of some user-defined message component(s), called correlation keys.

Message correlation is the BPEL4WS mechanism which allows processes to participate in stateful conversations. It can be used, for example, to match returning or known customers to long-running business processes. When a message arrives for a Web service which has been implemented using BPEL, that message must be delivered somewhere—either to a new or an existing instance of the process. The task of determining to which conversation a message belongs, in BPEL’s case the task of locating/instantiating the instance, is what message correlation is all about. [BPEL4WS06]

Subsequent sections discuss in more detail how eInsight Correlation facilities can be used to implement a variety of solutions that involve Message Relationship patterns.

Note

Note

As of Java CAPS 5.1, eInsight Business Processes can be deployed in multiple-engine configurations for Business Process instance failover.

This does not apply to Business Processes that use the eInsight Correlation feature. In multiple-engine deployments, correlations will not work when the instance is failed over to another engine.

eInsight Correlation Processor: First Cut

An eInsight Business Process is triggered by receipt of a message at the endpoint that, wrapped as a Receive Service, is the initial receive activity in the process. The eInsight engine takes care of creating an instance of the Business Process and delivery of the message to the initial receive activity. From that point on, the process will continue invoking Web Services defined within it until it completes or is terminated. The initial receive activity has the special property Create Instance set to yes. At least one receive activity within a process must have this property set to yes in order to deploy a solution that contains the process.

A typical eInsight Business Process will have one initial receive activity that will cause instance creation. Figure 11-1 illustrates the receive activity property that determines whether instance creation will take place.

Create Instance property

Figure 11-1. Create Instance property

Note

Note

When using the Event-Based Decision construct, illustrated in Figure 11-2, the initial receive activity may be one of many alternative receive activities, but only one of them will cause instance creation, and other receive activities will be ignored for the process instance.

Create Instance property in the Event-Based Decision

Figure 11-2. Create Instance property in the Event-Based Decision

The reason for the foregoing discussion is to introduce a “subsequent” receive activity and matters related thereto.

Picture a process where two different messages are to be received, combined, and sent out. Assume JMS Destinations are the sources and destinations of the messages involved, though it does not really matter because most of the following discussion is applicable regardless the types of sources and destinations. Both messages must be received before the process can do its work.

Note that the example being discussed will result in a solution in which correlation does not work. This example is intended to walk you through a naïve implementation to introduce concepts that will be used later to construct correlation solutions that do work as intended.

A naïve designer might design a Business Process in which there are two receive activities, one after the other, as illustrated in Figure 11-3.

__Book/MessageCorrelation/EInsightCorrelation/bpNaiveCorrelation

Figure 11-3. __Book/MessageCorrelation/EInsightCorrelation/bpNaiveCorrelation

Each receive activity would be connected in a connectivity map to a different JMS queue, as shown in Figure 11-4. At runtime, a message in the first queue would cause the Business Process instance to be created and the message delivered to it. The process would then wait at the second receive activity and, when a message was put there and delivered to the process, it would combine both messages and send the combined message on its way.

Two different JMS queues provide input to a process

Figure 11-4. Two different JMS queues provide input to a process

Let’s concatenate text messages from both JMS receive activities and send the combined message out, as shown in Figure 11-5.

Business Rule concatenating outputs of the two receive activities

Figure 11-5. Business Rule concatenating outputs of the two receive activities

The complete example is presented in Chapter 9, section 9.2, “eInsight Correlation Processor: First Cut,” in Part II.

If you were to manually submit a message to the qReceive_1 using the Enterprise Manager, the eInsight engine would create a Business Process instance and hand it the message, as shown in Figure 11-6.

Submit initial message

Figure 11-6. Submit initial message

The first receive activity would complete and the second receive activity would be invoked. It would block waiting for a message to be delivered to the qReceive_2.

Manually submitting a message to the qReceive_2 would cause the eInsight engine to unblock the process, deliver the message to the second receive activity, and allow the process to continue.

The resulting message would end up in qSendCombined, as shown in Figure 11-7.

Concatenated message

Figure 11-7. Concatenated message

Each receive activity received a message in turn. The process combined the two messages into a result message and delivered the result message to the outgoing queue.

Superficially all is well. In reality, the fact that the two messages were correctly received and combined is merely a function of the messages being queued in an appropriate order to the appropriate queues. The process did not actually use correlations to ensure correct messages are received and combined.

If you were to manually queue three messages to qReceive_1, one with the content of a, one with the content of b, and one with the content of c, then manually queue three messages to qReceive_2, one with the content of b, one with the content of c, and one with the content of a, the resulting combined messages in qSendCombined would be combinations of messages with content of a, b, and c, where no message contained both a’s, both b’s, or both c’s. Rather, the messages would have been a+b, b+c, and c+a. Messages from qReceive_2 would have been combined with messages from qReceive_1 in the order they were submitted, regardless of their content.

This is an example of combining messages from two queues by brute force, which may be valid in certain circumstances. As a general proposition, however, this approach is not likely to meet the requirements.

There are a few issues with this approach:

  1. Any two messages, one from qReceive_1 and one from qReceive_2, will be combined, whether or not they are in any way related. Neither the eInsight engine nor the Business Process care whether the two messages in fact can be meaningfully combined.

  2. If there are multiple instances of the process waiting for a message to show up in qReceive_2, there is no way to tell to which instance it will be delivered. For all intents and purposes, the eInsight engine will choose one at random.

  3. If a message for the first receives activity arrives, and the Business Process instance is created, but a message for the second receive activity never arrives, the Business Process instance will wait. The wait will end when the Integration Server is shut down, the enterprise application of which it is a part is undeployed, or the instance is explicitly deleted, whichever comes first.

  4. If a message for the second activity arrives but there is no Business Process instance waiting for it, attempts to deliver it will continue according to the JMS Message Server global redelivery policy and Message Time-to-Live value, whether explicit or JMS Message Server global. If a File eWay, which should not be used for serious work, is used instead of JMS receiver, and the Resend on Error property is set to true, the eWay will continue attempts to redeliver the file indefinitely; otherwise it will try once and give up. Behavior of other eWays may vary—see eWay documentation for redelivery behavior, if any. This behavior is different from what might be used in ICAN 5.0.5, where the message that cannot be matched to a process instance would be unceremoniously dumped, and it applies only to JMS receivers.

Issues 3 and 4 must be addressed by appropriate solution design, incorporating message expiry, process instance timeout, and endpoint redelivery behavior configuration. These issues are common to all Message Relationship patterns and are discussed later.

Issues 1 and 2 can be addressed with the aid of Message Correlation facilities that the Java CAPS eInsight engine provides. Before we address these issues, the notion of Correlation Identifier is discussed in general, and more specifically how it is used in eInsight to ensure related messages are delivered to the same eInsight Business Process instance.

Correlation Identifier

Correlation Identifier is that part of a message that allows the integration solution to associate a message with another message that has the Correlation Identifier containing the same value.

Correlation Identifier may be derived from a simple value—a value of a field or a contiguous part of a message—or it can be complex—values of multiple message fields or multiple noncontiguous message parts.

For purpose-built Correlation Processors, whether implemented as Java Collaboration–based components or as eInsight Business Processes, the Collaboration Identifier can be any value that is directly present within a message or that can be derived from data within a message. A purpose-built Correlation Processor in this case is a component that receives all messages of interest, isolates or derives the Correlation Identifier in each, and causes related messages to be processed together, however that is implemented. This is distinct from the eInsight engine built-in correlation facilities, discussed next.

eInsight Correlation Processor: Second Cut

The eInsight Business Process Management (BPM) engine offers automatic correlation facilities. These facilities allow process designers to identify the parts of initial and subsequent messages whose values are to be used for correlation. The correlation process itself is delegated to the eInsight engine. It will ensure that subsequent messages are delivered to the Business Process instance created to process the corresponding initial message.

The eInsight engine is a BPEL4WS processor. Every activity within a Business Process is a Web Service invocation, whether implemented as an explicit SOAP Request/Reply invocation or using more efficient internal mechanisms. This implies that each activity has an input message and an output message. Each of these Web Service messages has some, more or less complex, structure.

In order for the eInsight engine to correlate messages on behalf of a Business Process, it must be configured to recognize Correlation Identifier values in messages that cause new Business Process instance creation. This allows the engine to identify Correlation Identifier values in messages that are to be delivered to existing Business Process instances and therefore to identify process instances to which to deliver these messages.

A Business Process, bpSimpleCorrelation, shown in Figure 11-8, illustrates the concepts.

__Book/MessageCorrelation/EInsightCorrelation/bpSimpleCorrelation

Figure 11-8. __Book/MessageCorrelation/EInsightCorrelation/bpSimpleCorrelation

So far, this process is identical to the naïve implementation presented earlier. Now we need to enable correlations and identify the Correlation Identifiers to use. Figure 11-9 illustrates access to Business Process properties, some of which govern the use of correlations.

Accessing Business Process properties

Figure 11-9. Accessing Business Process properties

Correlations are enabled by creation of Correlation Keys and Correlation Sets through Business Process properties for the appropriate Business Process, and assignment of Correlation Sets to appropriate activities. The process is described in detail in the eInsight User Guide [eInsightUG].

In this example, we wish to use the entire message as the Correlation Identifier. To do so, it is necessary to add a Correlation Key and a Correlation Set naming the JMS message’s entire Text Message node as the Correlation Identifier, using a series of dialogue boxes, as shown in Figures 11-10 and 11-11.

Defining the Correlation Key

Figure 11-10. Defining the Correlation Key

Defining the Correlation Set

Figure 11-11. Defining the Correlation Set

First, JMS receive activity’s Use Correlations property must be configured to use the Correlation Set defined previously. The Initialize Set value must be set to Yes, as shown in Figure 11-12.

Configuring correlation for the initial receive activity

Figure 11-12. Configuring correlation for the initial receive activity

The second JMS receive activity’s Use Correlations property must be configured to use the Correlation Set defined previously. The Initialize Set property must be set to No, as illustrated in Figure 11-13.

Configuring correlation on the second receive activity

Figure 11-13. Configuring correlation on the second receive activity

The complete example is presented in Chapter 9, section 9.4, “eInsight Correlation Processor: Second Cut,” in Part II.

With correlation properly configured, manually submitting messages containing a, b, and c to qReceive_1, and manually submitting messages containing b, c, and a to qReceive_2, will result in message being correctly correlated.

Notice that with JMS receive activities there was no opportunity to specify parts of the text message as the Correlation Identifier: the entire text message was used. Notice, too, that the JMS message consists of not just the text message but also the JMS header with a variety of fields, one of which is the JMSCorrelationID field, as illustrated in Figure 11-14.

JMS message properties that can be selected for use in correlations

Figure 11-14. JMS message properties that can be selected for use in correlations

Any one or more of these JMS header fields could have been used as a Correlation Key and as part of the Correlation Set. Note, however, that a JMS user property could not be directly used. There is nowhere to specify which of the possibly many user properties will carry the value to use.

Consider a receive activity in the bpNaiveCorrelationFiles process. Expand the Business Process properties, select the Correlations tab, and click the Create button to open the New Correlation Key dialogue box. Scroll to the entry that reads FileTextMessage, as illustrated in Figure 11-15.

__Book/MessageCorrelation/EInsightCorrelation/bpNaiveCorrelationFiles

Figure 11-15. __Book/MessageCorrelation/EInsightCorrelation/bpNaiveCorrelationFiles

Notice that for the File eWay, there are only two alternatives for the Correlation identifier: the entire text node or the entire byteArray node. If we wished to specify only a fragment of the message as the Correlation Identifier, we could not do so when using the JMS message, other than JMS header fields, or the file message, or for that matter any endpoint message where discrete fields are not defined.

This issue and the issue of how to use specific parts of JMS messages or other eWay messages as parts of Correlation Sets are discussed in the next section.

Derived Correlation Identifiers

Regardless of which endpoint delivers a message to the business process, only directly accessible fields of the message can be used as part of the Correlation Identifier. The eInsight engine can only access those parts of incoming messages that are identifiable using internally defined XPath expressions—remember that all that eInsight works with are Web Services with XML input and output messages. The New Correlation Key dialogue box provides no means of directly specifying the XPath expression. It formulates XPath expressions on the basis of which fields of which messages the developer chooses in the node tree. If access to subcomponents of a message is required, then a new Web Service must be created and a new, properly structured message must be defined for it to return.

The following discussion concentrates on receiving a structured message from JMS and using message structure to derive the appropriate Correlation Identifier. The same technique applies to any receive activity where the endpoint returns a payload as an unstructured message and that message must be parsed to obtain Correlation Identifiers.

eInsight Correlations are useful but poorly documented in the eInsight User’s Guide for Release 5.1.x. Therefore, Part II of this book delivers a step-by-step walkthrough of the implementation and use of eInsight Correlations. Subsequent sections, dealing with Message Relationship patterns, assume that the mechanics of configuring eInsight Correlations are well understood.

Let’s assume that a Business Process must match a Purchase Order to a corresponding Invoice using the Purchase Order Number field as the Correlation Identifier. Data for this component is present in both messages. Purchase Orders are delivered by some part of the solution to a JMS Destination qPOQueue as JMS text messages. Invoices are delivered to a JMS Destination qInvQueue as JMS text messages.

The issue lies in supplying the content of the JMS text message, containing the Purchase Order or the Invoice, to the eInsight engine in such a way that it can address the Purchase Order Number field using an XPath expression.

The Purchase Order and the Invoice XML Schemas will be used to construct OTDs used to receive Purchase Orders from qPOQueue and Invoices from qInvQueue.

The XML Schema documents are shown in Listings 11-1 and 11-2.

SOAPRequestReply_PO.xsd is shown in Listing 11-1.

Example 11-1. Purchase Order XML Schema document

<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema
    xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:tns="http://MCZ01:14000//SOAPRequestReply"
    targetNamespace="http://MCZ01:14000//SOAPRequestReply"
    >
    <xsd:element name="PO">
        <xsd:complexType>
            <xsd:sequence>
                <xsd:element name="PONumber" type="xsd:string"/>
                <xsd:element name="PODate" type="xsd:string"/>
                <xsd:element name="Items" maxOccurs="unbounded">
                    <xsd:complexType>
                        <xsd:sequence>
                            <xsd:element name="ItemNumber"
                                         type="xsd:string"/>
                            <xsd:element name="ItemQuantity"
                                         type="xsd:integer"/>
                        </xsd:sequence>
                    </xsd:complexType>
                </xsd:element>
            </xsd:sequence>
        </xsd:complexType>
    </xsd:element>
</xsd:schema>

SOAPRequestReply_Inv.xsd is shown in Listing 11-2.

Example 11-2. Invoice XML Schema document

<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema
    xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:tns="http://MCZ01:14000//SOAPRequestReply"
    targetNamespace="http://MCZ01:14000//SOAPRequestReply"
    >
    <xsd:element name="Inv">
        <xsd:complexType>
            <xsd:sequence>
                <xsd:element name="PONumber" type="xsd:string"/>
                <xsd:element name="Items" maxOccurs="unbounded">
                    <xsd:complexType>
                        <xsd:sequence>
                            <xsd:element name="ItemNumber"
                                         type="xsd:string"/>
                            <xsd:element name="ItemQuantity"
                                         type="xsd:integer"/>
                            <xsd:element name="TotalPrice"
                                         type="xsd:double"/>
                        </xsd:sequence>
                    </xsd:complexType>
                </xsd:element>
                <xsd:element name="InvoiceTotal" type="xsd:double"/>
            </xsd:sequence>
        </xsd:complexType>
    </xsd:element>
</xsd:schema>

By itself, the JMS receive activity will return a text message or a bytes message, and a series of JMS Message Header properties. To allow the eInsight engine access to the structure of the body of the message, we must create a receive activity that returns the body as a structured message. One way to facilitate this is to wrap the JMS receive activity into a “notify”-style subprocess that returns the necessary XML structure. This subprocess will be used in place of the JMS receive activity in the main Business Process that processes correlated messages. This is illustrated in detail in Chapter 9, section 9.5, “Derived Correlation Identifiers,” in Part II.

The basic process, bpCorrelatePOandInvoice, receives a Purchase Order and an Invoice, concatenates them, and sends the result to a JMS Destination, as shown in Figure 11-16. A real process would do something more useful with the Purchase Orders and Invoices.

Constructing a result message from two input messages

Figure 11-16. Constructing a result message from two input messages

It is necessary to identify the parts of the Purchase Order and the Invoice messages that are to be used for correlation and to configure the receive activities so that the eInsight engine can actually correlate messages.

We identified the PONumber fields in both the Purchase Order and the Invoice as the field that will contain values to match on. eInsight, implementing BPEL4WS techniques, requires us to define the Correlation Key and the Correlation Set.

In Business Process Properties, right-clicking on the name of the process bpCorrelatePOandInvoice and selecting the Correlations tab gives access to correlation properties.

You would create a new Correlation Key, ckPONumber message reqPO, expand the entry until PONumber node is seen, select it, and add it to the Correlation Key, as illustrated in Figure 11-17.

Selecting Correlation Key field from the Purchase Order message

Figure 11-17. Selecting Correlation Key field from the Purchase Order message

The process is repeated for resInv, as illustrated in Figure 11-18.

Selecting Correlation Key field from the Invoice message

Figure 11-18. Selecting Correlation Key field from the Invoice message

The reqPO is the structure returned by the sbpReceivePO. The resInv is the structure returned by the sbpReceiveInv. The Correlation Key defines which field values, in different messages, will be matched to determine if the messages are related. The names of the fields do not have to be the same. The datatypes and sizes must be the same because it is expected that values in these fields in different messages will be identical.

Having defined the Correlation Key, you would define the Correlation Set that references the Correlation Key, then configure Use Correlation properties of the appropriate receive activities.

The entire process is presented in a step-by-step fashion in Chapter 9, section 9.5, “Derived Correlation Identifiers,” in Part II.

When a message that is to be delivered to a Business Process arrives, the eInsight engine will extract data contained in the part of the message designated as the Correlation ID. It will then create a Business Process instance and hand it the message. The Correlation ID data and the Business Process ID of the new Business Process instance will be used to create an entry in a list of executing Business Processes that are candidates for correlation. When a message for the second receive activity arrives, the eInsight engine will extract the Correlation ID value and attempt to find a matching process instance in the list. If it finds one, it will deliver the message to the receive activity of that Business Process instance, thus completing the correlation process. If it does not find a match, it will throw an exception. What ultimately happens to the message will be determined by the endpoint from which it came.

Although we used JMS Destinations as sources of messages for the subprocesses, the method discussed above is a generic method; it can be used to wrap arbitrary endpoints. The point in all this is to allow the eInsight engine to locate the parts of messages it needs for correlation even if the endpoint does not deliver messages in a form that lends itself to XPath manipulation.

What we achieved, using subprocesses masquerading as receive activities, could have been achieved using independent processes and JMS Destinations with JMSCorrelationID properties, discussed in the next section.

Derived Correlation Identifiers: Alternative

Correlation using JMS receive activities and JMS text messages as Correlation Identifiers was discussed in section 11.7. It was pointed out that only the XPath-accessible fields, within messages returned by the receive activity, could be used for correlation. It was also observed that the JMSCorrelationID JMS header property by default will carry a literal string “Sun-SeeBeyond.” The JMS header properties are accessible using XPath expressions; therefore, we can use the JMSCorrelationID JMS header field to carry a Correlation Identifier value of our choosing. The trick to using JMS receive activities directly, without having to wrap them up in subprocesses, is to make sure to set correct values in JMSCorrelationID fields, of messages to be correlated, and mark these fields for correlation.

In the previous section, we were receiving messages from endpoints in a subprocess, unmarshaling them, and returning the structured payload as the output message. Rather than doing that, we could preprocess messages in separate components to obtain and set Correlation Identifiers. We would receive messages from endpoints, unmarshal them in specialized components, extract the values to be used for correlation, set the JMSCorrelationID values in JMS messages, and finally send the JMS messages to the appropriate JMS Destinations. The main process would then deal with JMS receive activities, as described earlier, and the eInsight engine would use JMSCorrelationID values in JMS messages to determine which messages belong to which Business Process instances. There would be no need to create subprocesses as wrappers, thus making the solution simpler and more efficient.

To illustrate this notion, Figure 11-19 shows a connectivity map containing all relevant components.

__Book/MessageCorrelation/EInsightDerivedCorrelationIds/cm_EInsightDerivedCorrelationIdsAlt

Figure 11-19. __Book/MessageCorrelation/EInsightDerivedCorrelationIds/cm_EInsightDerivedCorrelationIdsAlt

Either an eInsight Business Process or a Java Collaboration can receive the message, unmarshal it, configure the JMSCorrelationID and JMS Payload, and send the message on its way. For clarity and brevity, Java Collaboration sources are shown in Listings 11-3 and 11-4.

Example 11-3. Book/MessageCorrelation/EInsightDerivedCorrelationIds/jcdSetCorrelationID_PO

public void receive
    (com.stc.connectors.jms.Message input
    ,com.stc.connectors.jms.JMS W_toJMS
    ,stcgen.fcxotd.http___MCZ01_14000__SOAPRequestReply.PODocument vPO )
        throws Throwable
{
    vPO.unmarshalFromString( input.getTextMessage() );
    com.stc.connectors.jms.Message msgOut = W_toJMS.createTextMessage();
    msgOut.getMessageProperties().setCorrelationID
                                          ( vPO.getPO().getPONumber() );
    msgOut.setTextMessage( input.getTextMessage() );
    W_toJMS.send(msgOut);
}

Example 11-4. Book/MessageCorrelation/EInsightDerivedCorrelationIds/jcdSetCorrelationID_Inv

public void receive
    (com.stc.connectors.jms.Message input
    ,com.stc.connectors.jms.JMS W_toJMS
    ,stcgen.fcxotd.http___MCZ01_14000__SOAPRequestReply.InvDocument vInv )
        throws Throwable
{
    vInv.unmarshalFromString( input.getTextMessage() );
    com.stc.connectors.jms.Message msgOut = W_toJMS.createTextMessage();
    msgOut.getMessageProperties().setCorrelationID
                                        ( vInv.getInv().getPONumber() );
    msgOut.setTextMessage( input.getTextMessage() );
    W_toJMS.send(msgOut);
}

Copy eInsight Business Process bpSimpleCorrelation from the eInsightCorrelation project and modify its Correlation properties or create an equivalent new Business Process. Define the Correlation Key and the Correlation Set to use the JMSCorrelationID property of the JMS message, as shown in Figure 11-20.

__Book/MessageCorrelation/EInsightDerivedCorrelationIds/bpSimpleCorrelation

Figure 11-20. __Book/MessageCorrelation/EInsightDerivedCorrelationIds/bpSimpleCorrelation

Make sure the Use Correlations properties on both JMS receive activities are set as appropriate—refer to section 11.7 for details.

The advantage of this approach is that Web Services Description Language (WSDL) interface definitions are not required, since no subprocesses are required. The disadvantage is that two additional JMS queues are required and that messages must be unmarshaled again in the main process, even though they were already unmarshaled in the Java Collaborations. This was done in order to obtain values for the JMSCorrelationID properties. To minimize the number of queues, you could venture to configure static JMS selectors, discussed in Chapter 5, so that only one additional JMS queue would be required. Ultimately, it is up to the implementer to determine the best way to address this issue.

Message Relationship Patterns

Correlation can be used to process messages that follow several distinct Message Relationship patterns. In the earlier versions of SeeBeyond products, this feature was called Event Linking and Sequencing.

Header-Items-Trailer Correlation

The Header-Items-Trailer Message Relationship pattern is one where a particular kind of a message, called the Header Message, causes the correlation process for a series of messages to start. Subsequent messages with the same Correlation Identifier, called Items, are collected until another special message, with the same Correlation Identifier, called the Trailer, is received. This causes collection of related messages to stop. Collected messages are then processed together.

In previous sections dealing with correlations, the relationship between correlated messages was very simple: the Purchase Order was expected to arrive first and the Invoice was expected to follow. The relationship between the Purchase Order Message and the Invoice Message could be considered to be a special case of a Header-Items-Trailer Message Relationship pattern. The Purchase Order is the Header, a Message that initiates the correlation process, and the Invoice is the Trailer, a Message that causes the correlation process to finish. Here the Items messages were not present. While this may be a valid relationship, the simplistic processes that were implemented did not consider some runtime issues that might arise. The first and foremost was that if the Invoice arrived before the corresponding Purchase Order, the Invoice would be lost. There would be no existing eInsight Business Process instance to which to deliver it. Subsequent arrival of the Purchase Order would result in creation of a new eInsight Business Process instance for which the Invoice would never arrive. What would happen to that instance would depend on whether persistence was turned on for the process or not. In the former case, the instance would exist until explicitly deleted. In the latter case, it would be eliminated upon Integration Server shutdown.

What is required is that the two documents are correlated. In this case, it does not matter which arrives first. A more reasonable solution that would not lose an Invoice document if it arrived before the Purchase Order document would be to allow the documents to arrive in any order. To do so, rather than place the receive activities one after the other, you would use an Event-Based Decision.

This discussion is illustrated with a detailed example in Chapter 9, section 9.7.1, “Header-Items-Trailer Correlation,” in Part II.

Whichever the order of submission, the messages will be correlated correctly.

Rather then being restricted to correlating two messages, the process can be extended to any number of messages coming in from distinct endpoints.

In the initial implementation, messages were required to arrive in a predefined order, and issues would arise if they did not. In this implementation, messages can arrive in any order, so this issue was avoided.

Another issue arises if an initial message arrives but a subsequent message does not. This will result in a process instance hanging around indefinitely, if persistence was enabled for it, or until the Integration Server is shut down if not. This is wasteful of resources. Furthermore, if persistence is not turned on, there is no way to see or operate on such in-flight instances. Without persistence, the Enterprise Manager will not display eInsight process instances and will not offer the “terminate” option to terminate them.

To work around this issue, a process instance must be able to time out receive activities on which it is blocked for an excessive amount of time.

The third variant of the process adds a Timer Event as an additional receive activity. When the timer expires, the Business Process will terminate regardless of whether all expected messages were received. Any messages that were received up to that point will be sent to an exception queue.

Any Order Two Items Correlation

We are required to match Purchase Orders and Invoices, one of each, in any order. It matters not whether the Invoice arrives first or the Purchase Order arrives first.

To implement this pattern, we need two sets of receive activities, one set that causes the Correlation Set to be initialized and one that does not. The process is shown in Figure 11-21. Since either the Purchase Order or the Invoice can arrive first, the initial set of receive activities will use the Event-Based Decision. Once we have one of the required messages, we will branch to the appropriate receive activity to receive the other.

Any Order Two Items correlation process

Figure 11-21. Any Order Two Items correlation process

The connectivity map uses two inbound and one outbound JMS queues, as shown in Figure 11-22.

Connectivity map for the Any Order Two Items correlation

Figure 11-22. Connectivity map for the Any Order Two Items correlation

Project __Book/MessageCorrelation/EInsightCorrelations/AnyOrder implements this example. The complete example is reproduced in Chapter 9, section 9.7.2, “Any Order Two Items Correlation,” in Part II.

This implementation will accept the two messages in any order: it will not lose messages and wait if messages arrive out of order, as the simplistic solution shown previously would.

The issue from which this solution suffers, as the simple solution shown before also did, is that if the subsequent message never arrives, the process instance will wait indefinitely or until terminated by an external event such as an Integration Server shutdown or failure.

Any Order Two Items Correlation with Timeout

We are required to match Purchase Orders and Invoices, one of each, in any order. It matters not whether the Invoice arrives first or the Purchase Order arrives first, but if no match can be made within a set amount of time, the unmatched document must be processed by an exception process.

The implementation of this pattern is very similar to the Any Order Two Items Correlation pattern discussed previously. It is illustrated in Figure 11-23. The difference is the addition of two Event-Based Decisions, with a receive activity and a timer event activity each, to replace the two plain receive activities.

Any Order Two Items correlation with Timeout process

Figure 11-23. Any Order Two Items correlation with Timeout process

Timer Event timeout value can be static or dynamic, set to a Duration Literal or a Business Process Attribute value, which also must be a Duration Literal.

Some might ask why a timeout is associated with the second receive activity and not with the first. This is because until a message destined for the first receive activity arrives, the Business Process instance does not exist, so there is no process to time out.

Items-Trailer Correlation

We are required to collect related messages until a “special” trailer message, indicating that a batch of messages is finished, arrives. When it does, we are to process all messages collected so far and release them for further processing by the next component.

Let as assume that each ordinary message carries a Purchase Order Number, an Item Number, and a Quantity to be delivered. The trailer message carries the same Purchase Order Number, a dummy Item Number, and a count of items for cross-check. The messages are to be assembled into a Purchase Order with the Purchase Order Number and a repeating group of Item Number and Quantity. Ordinary messages will come from a JMS Destination qItemsIn. The trailer message will come from another JMS Destination, qTrailerIn. This is a contrived example. With the items-trailer, you would perhaps be better off using a single queue and a trailer message with a sentinel value in one of the fields, perhaps an ItemNumber value of TRAILER or similar.

Since the item messages are structured and only one field of each, the Purchase Order Number, will be used for correlation, it is necessary to preprocess messages as they arrive. We use JMS to receive messages, so we unmarshal them in a Java Collaboration and set the JMSCorrelationID JMS header property to the value of the Purchase Order Number so that the correlation logic can associate related messages. Depending on preferences, wrapper subprocesses could have been used instead.

The Business Process starts with a single receive activity with Use Correlations configured, and the Correlation Set Initialized property set to Yes, to receive the initial item. This is followed by a loop containing an Event-Based Decision with two receive activities, one for the remaining items and one for the trailer message. As items are processed, they will be counted and their item numbers and quantities will be set in the Purchase Order structure.

If a trailer arrives that cannot be matched to an existing set of items, it will not be delivered and will be processed according to the global JMS Server retry policy.

Since this solution does not use timers to restrict the amount of time given for the correlation to complete, it suffers from the same issue as other solutions that do not use timers. If the trailer message never arrives, the correlation will never complete. If no persistence is used and the Integration Server goes down, messages accumulated by the process instance will be lost. A simple modification that adds a timed branch would address this issue.

While this modified solution does address the issue of the trailer not arriving within a reasonable amount of time, it does so in a fairly simplistic manner. A more reasonable solution would be to have the partially completed Purchase Order sent to an alternate destination, perhaps an exception destination tied into a human workflow.

The Items-Trailer Message Relationship pattern described in this section could be looked at as a Gather component implementation in a Scatter-Gather pattern.

This discussion is illustrated with a complete example in Chapter 9, section 9.7.4, “Items-Trailer Correlation,” in Part II.

Header-Counted-Items Correlation

We are processing Purchase Orders to generate Invoices. Each Item in the Purchase Order is sent as a separate message to the ProvideItemPrice service, which has the smarts to work out volume discounts and, given the Item Number and Quantity, will return the Price for that Quantity of that Item. We need to then collect all the responses from the service, using the Purchase Order Number as a Correlation Key, assemble the Invoice message, and send it for further processing.

This narrative describes an implementation of a final part of a Scatter-Gather or a Composed Message Processor pattern. The Header-Counted-Items correlation implements the Gather part of the Scatter-Gather pattern or the Aggregator part of the Composed Message Processor pattern.

Assume that the Scatter part of the Scatter-Gather implementation breaks up the Purchase Order message into as many Item messages as there are items. Since the Scatter implementation knows how many items there are in each order, it will also assemble a header message, PricedItemsHeader, containing the PONumber and the number of items in the ItemCount.

A Business Process will do the Gather work. It needs a receive activity to receive the CostedItemsHeader message and a receive activity in a while loop to receive the CostedItems messages. Figure 11-24 illustrates this process. The loop is controlled by the loop counter, which is incremented for every item received until it reaches the value of the count of items to receive, which is set from the ItemCount field in the header message. Item cost and invoice total are calculated as the loop executes.

Header-Counted-Items correlation process

Figure 11-24. Header-Counted-Items correlation process

This solution could potentially benefit from a timeout on the second receive activity to abort processing if not all messages arrived within a reasonable time. The complete example is presented in Chapter 9, section 9.7.5, “Header-Counted-Items Correlation,” in Part II.

Counted and Timed Items Correlation

We are running a Shoe Warehouse serving all states in the country. Shoes ship in boxes of the same size. For cost-efficiency, we only ship to a particular state if there are enough orders to fill a carton of 125 boxes, all going to the one state. As orders come, in we identify the State, the Order Number, and the Item Number and send a message to the warehousing system. When enough orders are available, the warehousing system prints the list of items to be assembled for shipment to a particular state so the carton can be filled and dispatched. We have a service level agreement with the states requiring us to deliver the goods within, at most, 3 days. If there are not enough boxes to fill a carton within 3 days, we make and ship a partial shipment.

The narrative above describes a Counted-Items Message Relationship pattern. All messages are the same and consist of the State, OrderNumber, and ItemNumber.

The State field will be used for correlation. The first message for which there is no Business Process instance will start the correlation process. Orders from the particular state will be collected until there are 5 or the timer of 5 minutes expires, whichever is sooner, at which point the list of items will be sent for picking. This process is illustrated in Figure 11-25.

Counted and Timed Items correlation

Figure 11-25. Counted and Timed Items correlation

The complete example is presented in Chapter 9, section 9.7.6, “Counted and Timed Items Correlation,” in Part II.

Timed Items Correlation

We batch like items for a period of time. When the time expires, we send the batch on regardless of how many items were accumulated.

Whatever the narrative excuse for a Timed Items Correlation, the Business Process that implements this pattern will be identical, with one exception, to the Counted and Timed Items pattern discussed previously. The sole difference will be the while loop conditional. Rather than counting items as they are received, the loop will accumulate items until the timer expires. At this point, the loop conditional will be reset to false so the loop can exit and the batch can be sent.

Here, too, a preprocess collaboration or a subprocess wrapper will be used to ensure the correlation key value is available to the eInsight engine in the appropriate XPath-addressable structure.

Scatter-Gather Correlation

We order goods from suppliers. A Purchase Order (PO) is submitted to a supplier. Some time later, an Advanced Shipping Notice (ASN) is received, followed later still by the Delivery Notice (DN) indicating the goods were received. The process that tracks purchases and deliveries must collect all of these documents together for processing by the finance department. If the ASN is not received within a fixed period of time, an alert must be sent to the purchasing department to initiate a followup action. If goods are not received within a fixed period of time, an alert must be sent to the receiving department to initiate a follow-up action.

The narrative describes a Scatter-Gather pattern. A component sends one or more messages, then waits for one or more asynchronous response messages that are related to the messages sent.

The eInsight Business Process needs to be triggered by some event, let’s say an arrival of a Purchase Order. The Purchase Order Number is extracted from the Purchase Order message and used to initialize the Correlation Set. The Purchase Order message is then sent to a JMS queue. The receiving Business Process parses the PO, constructs the ASN and sends it to the ASN queue, then constructs a DN and sends it to the DN queue. In a more realistic implementation, the event that results in a DN message being generated would happen some time after the event that resulted in generation of the ASN message, so in all likelihood a separate process would handle the DN and deliver it for correlation.

The process that handles sending of the PO and correlation of the ASN and the DN might look like that shown in Figure 11-26.

Scatter-Gather correlation process

Figure 11-26. Scatter-Gather correlation process

Note that we have four send activities and three receive activities, each associated with a distinct JMS queue.

The complete step-by-step implementation of this example is included in Chapter 9, section 9.7.8, “Scatter-Gather Correlation,” in Part II.

This is one of the possible implementations of the Scatter-Gather pattern. Another implementation, using asynchronous subprocess, is presented in Chapter 12, “Reusability,” section 12.4.2.

Message Relationship Patterns Summary

Message Relationship patterns discussed in the preceding sections represent basic relationships that exist between messages in messaging solutions. Other, more complex patterns can be broken down into these basic patterns. All of the Message Relationship patterns, which would be implemented using eInsight, rely on the eInsight engine’s ability to identify key data values in messages and use that key data to associate messages with Business Processes and, in particular, Business Process instances that are collecting related messages. To be effectively employed in integration solutions, all implementations must have a notion of when to stop waiting for related messages and to processes messages that are already collected. Common indicators are timeout expiration, collection of a predefined number of messages, or arrival of a sentinel message that indicates end of collection. The basic examples discussed in this section are common to all implementations of a type and can be used as the basis for extended implementations. Custom logic will need to reflect processing requirements of assembled collections of messages, such as selections of a lowest price or a highest bid, rather than how the collection is performed or when collection ends and processing begins.

eGate Correlation with Dynamic Selectors

eInsight-based correlation takes advantage of the eInsight engine’s correlation facilities. The eInsight engine ensures that processing components receive only messages they require. Without eInsight, correlation must be implemented differently.

As a general proposition, a correlating component would receive all messages, determine which are of interest, and discard or return to sender those that are not. It would then pass the messages of interest to the processing component or would process them itself. The major issue with this approach is that the correlating component receives all messages even if only a small proportion of them are of interest to it. This is an overhead in terms of resources required to receive all messages and increased latency while the component determines whether or not it is required to process each message and return it to the sender. The component could rapidly become a bottleneck. Resubmitting messages to the sender could result in the same messages being continually redelivered to the correlating component only to be continually returned to the sender. This issue could be addressed by having the correlating component subscribe to a JMS topic so that it receives copies of all messages and can safely discard those that are not of interest. That approach addresses one issue but introduces another—that of additional resources required for copies of all messages. Ideally, we would like the correlating component to receive only messages that are of interest to it.

To further refine the problem, we must mention that the correlating component is interested only in messages that are to be correlated by it. That means not just specific kinds of messages, or messages from specific sources, but messages with specific Correlation Identifier values.

If we were to develop a solution that correlates massages without the benefit of the eInsight engine’s correlation facilities, the solution itself would have to implement the necessary correlation logic.

Message correlation processing consists of collecting and storing batches of related messages until batch completion criteria are met and submission of completed batches to the processing components for processing. With eInsight-based correlation, all of the logic necessary to determine if a message is related to any other message(s) is performed by the eInsight engine. All of the logic necessary to collect, store, determine completion criteria state, and process the batch of messages is implemented by eInsight Engine and eInsight Business Processes.

The fundamental piece of functionality would be that which determines if a current message is related to any other message already known or isolating and making available the pieces of data necessary for some component to make that determination.

Another fundamental piece of functionality necessary for correlation would be the storage of batches of related messages, as they are being assembled, prior to invocation of the completed batch processing component.

Yet another fundamental piece would be the functionality that determines batch completion and delivers related messages for processing.

The final piece of functionality would be that which processes batches of related messages.

Short of developing a sophisticated and, most likely, large Java Collaboration that implements all of the required functionality, there is no real way to develop a generic Java Collaboration Definition (JCD)–based correlation processor. Not all is lost, however. We will discuss a re-implementation of one of the Message Relationship patterns using just the JMS Message Server and JCDs.

Bear in mind that it still is much easier to implement correlations using eInsight than it is to do so without it.

Items-Trailer Correlation

Let’s re-implement the Items-Trailer Message Relationship pattern without the use of eInsight and eInsight correlation functionality.

Recapping the scenario, we are required to collect related messages until a “special” trailer message, indicating that a batch of messages is finished, arrives. When it does, we are to process all messages collected so far and release them for further processing by the next component.

Let as assume that each ordinary message carries a Purchase Order Number, an Item Number, and a Quantity to be delivered. The trailer message carries the same Purchase Order Number, a dummy Item Number, and a count of items for cross-check. The messages are to be assembled into a Purchase Order with the Purchase Order Number and a repeating group of Item number and Quantity. Ordinary messages will come from a JMS Destination, qItemsIn. The trailer message will come from another JMS Destination, qTrailerIn. This is a contrived example. With the items-trailer, we would perhaps be better off using a single queue and a trailer message with a sentinel value in one of the fields, perhaps an ItemNumber value of “TRAILER” or similar.

With just the eGate and the JMS Message Server, we need to figure a way of collecting related items and triggering processing of related items once the trailer is received.

One way to implement storage of items is to use a JMS Destination with no current receiver/subscriber—all messages sent there will remain until expired or until explicitly received by some component, whichever is the sooner. One way to implement collection of related items, such that they can be retrieved as a collection of related messages, is to use JMS Correlation ID property to store the Correlation ID of related items and to use the JMS selector mechanism to retrieve items related by the common Correlation ID. One way to trigger processing of related items is to create a Java Collaboration that will be triggered by a trailer message containing the Correlation ID to use and that will retrieve and process related items from the JMS queue using a dynamically constructed selector expression that contains the Correlation ID.

Of the required infrastructure, only dynamic selectors are not available out-of-the-box. Review material presented in Chapter 5, section 5.6.7.2, before continuing with this discussion.

The implementation schematic in Figure 11-27 shows the major user-developed components involved in the solution.

eGate-based Items-Trailer correlation schematic

Figure 11-27. eGate-based Items-Trailer correlation schematic

Any items or trailer messages submitted to qItemsIn and qTrailerIn will be passed to the Add Correlation ID collaboration, where the Purchase Order value will be extracted and assigned to the JMS header property Correlation ID. The Assemble PO collaboration will be invoked by the arrival of the trailer message. Using the Correlation ID in the trailer message, it will construct a dynamic selector expression, create a selective receiver, receive all messages with the matching Correlation ID from qItemsInCorr, combine them, and finally send the combined message to qPOOut.

Here the JMS Message Server is the means of storing messages while they are being correlated and retrieving related messages.

The Assemble PO collaboration receives all messages using the selective receiver and constructs the PO message. This is the business part of the correlation infrastructure—the rules that govern what is to be done with the related messages. In this case, the PO message is assembled. In other cases, item costs could be summed up and a summary message could be sent out. In still other cases, each message would be inspected to choose the highest bid, the lowest price, or whatever business requirement is being met by message correlation implementation.

The complete example is presented in Chapter 9, section 9.8.1, “Items-Trailer Correlation,” in Part II.

This implementation of the Items-Trailer Message Relationship pattern takes advantage of the JMS Message Server to support storage and selective retrieval of related messages. If configured for discrete timeout, which could be done when messages are queued by the jcdAddCorrelationID, messages for which there is no trailer would be discarded by the JMS Message Server when expired.

Chapter Summary

This chapter discussed message correlation in Java CAPS, the facilities and techniques used to accomplish message correlation, correlation concepts, eInsight BPEL Correlation services, and how correlation can be accomplished when eInsight is not available.

The chapter also discussed common Message Relationship patterns, implementation of which requires correlation. Specifically, the following Message Relationship patterns were discussed:

  • Header-Items-Trailer

  • Any Order Two Items

  • Any Order Two Items with Timeout

  • Items-Trailer

  • Header-Counted-Items

  • Counted and Timed Items

  • Scatter-Gather

An eGate-only correlation implementation technique, using JMS with dynamic selectors, was introduced. This technique, which uses JMS API to manipulate JMS objects, is applied elsewhere to implement JMS Polling.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset