© Marten Deinum, Daniel Rubio, and Josh Long 2017

Marten Deinum, Daniel Rubio and Josh Long, Spring 5 Recipes, https://doi.org/10.1007/978-1-4842-2790-9_15

15. Spring Integration

Marten Deinum, Daniel Rubio2 and Josh Long3

(1)Meppel, Drenthe, The Netherlands

(2)F. Bahia, Ensenada, Baja California, Mexico

(3)Apartment 205, Canyon Country, California, USA

In this chapter, you will learn the principles behind enterprise application integration (EAI), used by many modern applications to decouple dependencies between components. The Spring Framework provides a powerful and extensible framework called Spring Integration. Spring Integration provides the same level of decoupling for disparate systems and data that the core Spring Framework provides for components within an application. This chapter aims to give you all the required knowledge to understand the patterns involved in EAI to understand what an enterprise service bus (ESB) is and—ultimately—how to build solutions using Spring Integration. If you’ve used an EAI server or an ESB, you’ll find that Spring Integration is markedly simpler than anything you’re likely to have used before.

After finishing this chapter, you will be able to write fairly sophisticated Spring Integration solutions to integrate applications so that they can share services and data. You will learn Spring Integration’s many options for configuration, too. Spring Integration can be configured entirely in a standard XML namespace, if you like, but you’ll probably find that a hybrid approach, using annotations and XML, is more natural. You will also learn why Spring Integration is a very attractive alternative for people coming from a classic enterprise application integration background. If you’ve used an ESB before, such as Mule or ServiceMix, or a classical EAI server, such as Axway’s Integrator or TIBCO’s ActiveMatrix, the idioms explained here should be familiar and the configuration refreshingly straightforward.

15-1. Integrate One System with Another Using EAI

Problem

You have two applications that need to talk to each other through external interfaces. You need to establish a connection between the applications’ services and/or their data.

Solution

You need to employ EAI , which is the discipline of integrating applications and data using a set of well-known patterns. These patterns are usefully summarized and embodied in a landmark book called Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf. Today the patterns are canonical and are the lingua franca of the modern-day ESB.

How It Works

There are several different integration styles which you can use, you could use the File system, the database, messaging or even do remote procedure calls. Next you will explore how you could implement or realise the different integration styles and what choices there are next to Spring Integration.

Pick an Integration Style

There are multiple integration styles, each best suited for certain types of applications and requirements. The basic premise is simple: your application can’t speak directly to the other system using the native mechanism in one system. So, you can devise a bridging connection, something to build on top of, abstract, or work around some characteristic about the other system in a way that’s advantageous to the invoking system. What you abstract is different for each application. Sometimes it’s the location, sometimes it’s the synchronous or asynchronous nature of the call, and sometimes it’s the messaging protocol. There are many criteria for choosing an integration style, related to how tightly coupled you want your application to be, to server affinity, to the demands of the messaging formats, and so on. In a way, TCP/IP is the most famous of all integration techniques because it decouples one application from another’s server.

You have probably built applications that use some or all of the following integration styles (using Spring, no less!). A shared database, for example, is easily achieved using Spring’s JDBC support; Remote Procedure Invocation is easily achieved using Spring’s exporter functionality.

The four integration styles are as follows:

  • File transfer: Have each application produce files of shared data for others to consume and to consume files that others have produced.

  • Shared database: Have the applications store the data they want to share in a common database. This usually takes the form of a database to which different applications have access. This is not usually a favored approach because it means exposing your data to different clients that might not respect the constraints you have in place (but not codified). Using views and stored procedures can often make this option possible, but it’s not ideal. There’s no particular support for talking to a database, per se, but you can build an endpoint that deals with new results in a SQL database as message payloads. Integration with databases doesn’t tend to be granular or message-oriented but batch-oriented instead. After all, a million new rows in a database isn’t an event so much as a batch! It’s no surprise then that Spring Batch (discussed in Chapter 11) included terrific support for JDBC-oriented input and output.

  • Remote Procedure Invocation: Have each application expose some of its procedures so that they can be invoked remotely and have applications invoke them to initiate behavior and exchange data. There is specific support for optimizing RPC exchanges (which includes remote procedure calls such as SOAP, RMI, and HTTP Invoker) using Spring Integration.

  • Messaging: Have each application connect to a common messaging system and exchange data and invoke behavior using messages. This style, mostly enabled by JMS in the JEE world, also describes other asynchronous or multicast publish-subscribe architectures. In a way, an ESB or an EAI container such as Spring Integration lets you handle most of the other styles as though you were dealing with a messaging queue: a request comes in on a queue and is managed, responded to, or forwarded to another queue.

Build on an ESB Solution

Now that you know how you want to approach the integration , it’s all about implementing it. You have many choices in today’s world. If the requirement is common enough, most middleware or frameworks will accommodate it in some way. JEE, .NET, and others handle common cases very well via SOAP, XML-RPC, a binary layer such as EJB or binary remoting, JMS, or an MQ abstraction. If, however, the requirement is somewhat exotic or you have a lot of configuration to do, then perhaps an ESB is required. An ESB is middleware that provides a high-level approach to modeling integrations, in the spirit of the patterns described by EAI. The ESB provides a manageable configuration format for orchestrating the different pieces of an integration in a simple high-level format.

Spring Integration, an API in the SpringSource portfolio, provides a robust mechanism for modeling a lot of these integration scenarios that work well with Spring. Spring Integration has many advantages over a lot of other ESBs, especially the lightweight nature of the framework. The nascent ESB market is filled with choices. Some are former EAI servers, reworked to address the ESB-centric architectures. Some are genuine ESBs, built with that in mind. Some are little more than message queues with adapters.

Indeed, if you’re looking for an extraordinarily powerful EAI server (with almost integration with the JEE platform and a very hefty price tag), you might consider Axway Integrator. There’s very little it can’t do. Vendors such as TIBCO and WebMethods made their marks (and were subsequently acquired) because they provided excellent tools for dealing with integration in the enterprise. These options, although powerful, are usually very expensive and middleware-centric; your integrations are deployed to the middleware.

Standardization attempts, such as Java Business Integration (JBI), have proven successful to an extent, and there are good compliant ESBs based on these standards (OpenESB and ServiceMix, for example). One of the thought leaders in the ESB market is the Mule ESB, which has a good reputation; it is free/open source friendly, community friendly, and lightweight. These characteristics also make Spring Integration attractive. Often, you simply need to talk to another open system, and you don’t want to requisition a purchase approval for middleware that’s more expensive than some houses!

Each Spring Integration application is completely embedded and needs no server infrastructure. In fact, you could deploy an integration inside another application, perhaps in your web application endpoint. Spring Integration flips the deployment paradigms of most ESBs on their head. You deploy Spring Integration into your application; you don’t deploy your application into Spring Integration. There are no start and stop scripts and no ports to guard. The simplest possible working Spring Integration application is a simple Java public static void main() method to bootstrap a Spring context.

package com.apress.springrecipes.springintegration;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Main {
    public static void main(String [] args){
        ApplicationContext applicationContext = new AnnotationConfigApplicationContext(IntegrationConfiguration.class);
    }
}

You created a standard Spring application context and started it. The contents of the Spring application context will be discussed in subsequent recipes, but it’s helpful to see how simple it is. You might decide to hoist the context up in a web application, an EJB container, or anything else you want. Indeed, you can use Spring Integration to power the e-mail polling functionality in a Swing/JavaFX application! It’s as lightweight as you want it to be. In subsequent examples, the configuration shown should be put in an XML file and that XML file referenced as the first parameter when running this class. When the main method runs to completion, your context will start up the Spring Integration bus and start responding to requests on the components configured in the application context’s XML.

15-2. Integrate Two Systems Using JMS

Problem

You want to build an integration to connect one application to another using JMS, which provides locational and temporal decoupling on modern middleware for Java applications. You’re interested in applying more sophisticated routing and want to isolate your code from the specifics of the origin of the message (in this case, the JMS queue or topic).

Solution

While you can do this by using regular JMS code or EJB’s support for message-driven beans (MDBs) or by using core Spring’s message-driven POJO (MDP) support, all are necessarily coded for handling messages coming specifically from JMS. Your code is tied to JMS. Using an ESB lets you hide the origin of the message from the code that’s handling it. You’ll use this solution as an easy way to see how a Spring Integration solution can be built. Spring Integration provides an easy way to work with JMS, just as you might use MDPs in the core Spring container. Here, however, you could conceivably replace the JMS middleware with an e-mail, and the code that reacts to the message could stay the same.

How It Works

As you might recall from Chapter 14, Spring can replace EJB’s MDB functionality by using MDPs. This is a powerful solution for anyone wanting to build something that handles messages on a message queue. You’ll build an MDP, but you will configure it using Spring Integration’s more concise configuration and provide an example of a very rudimentary integration. All this integration will do is take an inbound JMS message (whose payload is of type Map<String,Object>) and write it to the log.

As with a standard MDP, a configuration for the ConnectionFactory class exists. Shown following is a configuration class. You can pass it in as a parameter to the Spring ApplicationContext instance on creation (as you did in the previous recipe, in the Main class).

package com.apress.springrecipes.springintegration;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;


import javax.jms.ConnectionFactory;

@Configuration
@EnableIntegration
@ComponentScan
public class IntegrationConfiguration {


    @Bean
    public CachingConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        return new CachingConnectionFactory(connectionFactory);
    }


    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }


    @Bean
    public InboundHelloWorldJMSMessageProcessor messageProcessor() {
        return new InboundHelloWorldJMSMessageProcessor();
    }


    @Bean
    public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
        return return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
            .extractPayload(true)
            .destination("recipe-15-2"))
            .handle(messageProcessor())
            .get();
    }
}

As you can see, the most intimidating part is the schema import! The rest of the code is standard boilerplate. You define a connectionFactory exactly as if you were configuring a standard MDP.

Then, you define any beans specific to this solution, in this case, a bean that responds to messages coming in to the bus from the message queue, messageProcessor . A service activator is a generic endpoint in Spring Integration that’s used to invoke functionality—whether it be an operation in a service, some routine in a regular POJO, or anything you want instead—in response to a message sent in on an input channel. Although this will be covered in some detail, it’s interesting here only because you are using it to respond to messages. These beans taken together are the collaborators in the solution, and this example is fairly representative of how most integrations look. You define your collaborating components, and then you define the flow using the Spring Integration Java DSL that configures the solution itself.

Tip

There is also a Spring Integration Groovy DSL.

The configuration starts with IntegrationFlows, which is used to define how the messages flow through the system. The flow starts with the definition of messageDrivenChannelAdapter , which basically receives messages from the recipe-15-2 destination and passes it to a Spring Integration channel. messageDrivenChannelAdapter is, as the name suggests, an adapter. An adapter is a component that knows how to speak to a specific type of subsystem and translate messages on that subsystem into something that can be used in the Spring Integration bus. Adapters also do the same in reverse, taking messages on the Spring Integration bus and translating them into something a specific subsystem will understand. This is different from a service activator (covered next) in that an adapter is meant to be a general connection between the bus and the foreign endpoint. A service activator, however, only helps you invoke your application’s business logic on receipt of a message. What you do in the business logic, connecting to another system or not, is up to you.

The next component, a service activator, listens for messages coming into that channel and invokes the bean referenced through the handle method, which in this case is the messageProcessor bean defined previously. Because of the @ServiceActivator annotation on the method of the component, Spring Integration knows which method to invoke.

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;


import java.util.Map;

public class InboundHelloWorldJMSMessageProcessor {

    private final Logger logger = LoggerFactory.getLogger(InboundHelloWorldJMSMessageProcessor.class);

    @ServiceActivator
    public void handleIncomingJmsMessage(Message<Map<String, Object>> inboundJmsMessage)
        throws Throwable {
        Map<String, Object> msg = inboundJmsMessage.getPayload();
        logger.info("firstName: {}, lastName: {}, id: {}", msg.get("firstName"),
                                                           msg.get("lastName"),
                                                           msg.get("id"));
    }
}

Notice that there is an annotation, @ServiceActivator, that tells Spring to configure this component, and this method as the recipient of the message payload from the channel, which is passed to the method as Message<Map<String, Object>> inboundJmsMessage. In the previous configuration, extract-payload="true" tells Spring Integration to take the payload of the message from the JMS queue (in this case, a Map<String,Object>) and extract it and pass that as the payload of the message that’s being moved through Spring Integration’s channels as a org.springframework.messaging.Message<T>. The Spring Message interface is not to be confused with the JMS Message interface, although they have some similarities. Had you not specified the extractPayload option, the type of payload on the Spring Message interface would have been javax.jms.Message. The onus of extracting the payload would have been on you, the developer, but sometimes getting access to that information is useful. Rewritten to handle unwrapping the javax.jms.Message interface, the example would look a little different, as shown here:

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;


import javax.jms.MapMessage;

public class InboundHelloWorldJMSMessageProcessor {

    private final Logger logger = LoggerFactory.getLogger(InboundHelloWorldJMSMessageProcessor.class);

    @ServiceActivator
    public void handleIncomingJmsMessageWithPayloadNotExtracted(
        Message<javax.jms.Message> msgWithJmsMessageAsPayload) throws Throwable {
        javax.jms.MapMessage jmsMessage = (MapMessage) msgWithJmsMessageAsPayload.getPayload();
        logger.debug("firstName: {}, lastName: {}, id: {}", jmsMessage.getString("firstName"),
                                                            jmsMessage.getString("lastName"),
                                                            jmsMessage.getLong("id"));
    }
}

You could have specified the payload type as the type of parameter passed into the method. If the payload of the message coming from JMS was of type Cat, for example, the method prototype could just as well have been public void handleIncomingJmsMessageWithPayloadNotExtracted( Cat inboundJmsMessage) throws Throwable. Spring Integration will figure out the right thing to do. In this case, we prefer access to Spring Message<T>, which has header values that can be useful to interrogate.

Also note that you don’t need to specify throws Throwable. Error handling can be as generic or as specific as you want in Spring Integration.

In the example, you use the @ServiceActivator annotation to invoke the functionality where the integration ends. However, you can forward the response from the activation on to the next channel by returning a value from the method. The type of the return value is what will be used to determine the next message sent in the system. If you return a Message<T>, that will be sent directly. If you return something other than Message<T>, that value will be wrapped as a payload in a Message<T> instance, and that will become the next message that is ultimately sent to the next component in the processing pipeline. This Message<T> interface will be sent on the output channel that’s configured on the service activator. There is no requirement to send a message on the output channel with the same type as the message that came on in the input channel; this is an effective way to transform the message type. A service activator is a very flexible component in which to put hooks to your system and to help mold the integration.

This solution is pretty straightforward, and in terms of configuration for one JMS queue, it’s not really a win over straight MDPs because there’s an extra level of indirection to overcome. The Spring Integration facilities make building complex integrations easier than Spring Core or EJB3 could because the configuration is centralized. You have a bird’s-eye view of the entire integration, with routing and processing centralized, so you can better reposition the components in your integration. However, as you’ll see, Spring Integration wasn’t meant to compete with EJB and Spring Core; it shines at solutions that couldn’t naturally be built using EJB3 or Spring Core.

15-3. Interrogate Spring Integration Messages for Context Information

Problem

You want more information about the message coming into the Spring Integration processing pipeline than the type of the message implicitly can give you.

Solution

Interrogate the Spring Integration Message<T> interface for header information specific to the message. These values are enumerated as header values in a map (of type Map<String,Object>).

How It Works

The Spring Message<T> interface is a generic wrapper that contains a pointer to the actual payload of the message as well as to headers that provide contextual message metadata. You can manipulate or augment this metadata to enable/enhance the functionality of components that are downstream, too; for example, when sending a message through e-mail, it’s useful to specify the TO/FROM headers.

Any time you expose a class to the framework to handle some requirement (such as the logic you provide for the service activator component or a transformer component), there will be some chance to interact with Message<T> and with the message headers. Remember that Spring Integration pushes a Message<T> instance through a processing pipeline. Each component that interfaces with the Message<T> instance has to act on it, do something with it, or forward it on. One way of providing information to those components, and of getting information about what’s happened in the components up until that point, is to interrogate MessageHeaders.

You should be aware of several values when working with Spring Integration (see Tables 15-1 and 15-2). These constants are exposed on the org.springframework.messaging.MessageHeaders interface and org.springframework.integration.IntegrationMessageHeaderAccessor.

Table 15-1. Common Headers Found in Core Spring Messaging

Constant

Description

ID

This is a unique value assigned to the message by the Spring Integration engine.

TIMESTAMP

This is the timestamp assigned to the message.

REPLY_CHANNEL

This is the String name of the channel to which the output of the current component should be sent. This can be overridden.

ERROR_CHANNEL

This is the String name of the channel to which the output of the current component should be sent if an exception bubbles up into the runtime. This can be overridden.

CONTENT_TYPE

This is the content type (MIME type) of the message, mainly used for Web Socket messages.

Table 15-2. Common Headers Found in Spring Integration

Constant

Description

CORRELATION_ID

This is optional and used by some components (such as aggregators) to group messages together in some sort of processing pipeline.

EXPIRATION_DATE

This is used by some components as a threshold for processing after which a component can wait no longer in processing.

PRIORITY

This is the priority of the message; higher numbers indicate a higher priority.

SEQUENCE_NUMBER

This is the order in which the message is to be sequenced; it is typically used with a sequencer.

SEQUENCE_SIZE

This is the size of the sequence so that an aggregator can know when to stop waiting for more messages and move forward. This is useful in implementing join functionality.

ROUTING_SLIP

This is the header containing the information when the Routing Slip pattern is used.

CLOSEABLE_RESOURCE

This is optional and used by some components to determine if the message payload can/should be closed (like a File or InputStream).

In addition to the headers defined by Spring Messaging, there are some commonly used headers in Spring Integration; these are defined in the org.springframework.integration.IntegrationMessageHeaderAccessor class (see Table 15-2).

Some header values are specific to the type of the source message’s payload; for example, payloads sourced from a file on the file system are different from those coming in from a JMS queue, which are different from messages coming from an e-mail system. These different components are typically packaged in their own JARs, and there’s usually some class that provides constants for accessing these headers. Component-specific headers are examples of the constants defined for files on org.springframework.integration.file.FileHeaders: FILENAME and PREFIX. Naturally, when in doubt, you can just enumerate the values manually because the headers are just a java.util.Map instance.

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;


import java.io.File;
import java.util.Map;


public class InboundFileMessageServiceActivator {
    private final Logger logger = LoggerFactory.getLogger(InboundFileMessageServiceActivator.class);


    @ServiceActivator
    public void interrogateMessage(Message<File> message) {
        MessageHeaders headers = message.getHeaders();
        for (Map.Entry<String, Object> header : headers.entrySet()) {
            logger.debug("{} : {}", header.getKey(), header.getValue() );
        }
    }
}

These headers let you interrogate the specific features of these messages without surfacing them as a concrete interface dependency if you don’t want them. They can also be used to help processing and allow you to specify custom metadata to downstream components. The act of providing extra data for the benefit of a downstream component is called message enrichment. Message enrichment is when you take the headers of a given message and add to them, usually to the benefit of components in the processing pipeline downstream. You might imagine processing a message to add a customer to a customer relationship management (CRM) system that makes a call to a third-party web site to establish credit ratings. This credit is added to the headers so the component downstream, which is tasked with adding or rejecting customers, can make its decisions on these header values.

Another way to get access to header metadata is to simply have it passed as parameters to your component’s method. You simply annotate the parameter with the @Header annotation, and Spring Integration will take care of the rest.

package com.apress.springrecipes.springintegration;

import java.io.File;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;


public class InboundFileMessageServiceActivator {
    private final Logger logger = LoggerFactory.getLogger(InboundFileMessageServiceActivator.class);


    @ServiceActivator
    public void interrogateMessage(
        @Header(MessageHeaders.ID) String uuid,
        @Header(FileHeaders.FILENAME) String fileName, File file) {
        logger.debug("the id of the message is {}, and name of the file payload is {}", uuid, fileName);
    }
}

You can also have Spring Integration simply pass Map<String,Object>.

package com.apress.springrecipes.springintegration;

import java.io.File;
import java.util.Map;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;


public class InboundFileMessageServiceActivator {
    private final Logger logger = LoggerFactory.getLogger(InboundFileMessageServiceActivator.class);


    @ServiceActivator
    public void interrogateMessage(
        @Header(MessageHeaders.ID) Map<String, Object> headers, File file) {
        logger.debug("the id of the message is {}, and name of the file payload is {}",
            headers.get(MessageHeaders.ID), headers.get(FileHeaders.FILENAME));
    }
}

15-4. Integrate Two Systems Using a File System

Problem

You want to build a solution that takes files on a well-known, shared file system and uses them as the conduit for integration with another system . An example might be that your application produces a comma-separated value (CSV) dump of all the customers added to a system every hour. The company’s third-party financial system is updated with these sales by a process that checks a shared folder, mounted over a network file system, and processes the CSV records. What’s required is a way to treat the presence of a new file as an event on the bus.

Solution

You have an idea of how this could be built by using standard techniques, but you want something more elegant. Let Spring Integration isolate you from the event-driven nature of the file system and from the file input/output requirements. Instead, let’s use it to focus on writing the code that deals with the java.io.File payload itself. With this approach, you can write unit-testable code that accepts an input and responds by adding the customers to the financial system. When the functionality is finished, you configure it in the Spring Integration pipeline and let Spring Integration invoke your functionality whenever a new file is recognized on the file system. This is an example of an event-driven architecture (EDA). EDAs let you ignore how an event was generated and focus instead on reacting to them, in much the same way that event-driven GUIs let you change the focus of your code from controlling how a user triggers an action to actually reacting to the invocation itself. Spring Integration makes it a natural approach for loosely coupled solutions. In fact, this code should look similar to the solution you built for the JMS queue because it’s just another class that takes a parameter (a Spring Integration Message<T> interface, a parameter of the same type as the payload of the message, and so on).

How It Works

Building a solution to talk to JMS is old hat. Instead, let’s consider what building a solution using a shared file system might look like. Imagine how to build it without an ESB solution. You need some mechanism by which to poll the file system periodically and detect new files. Perhaps Quartz and some sort of cache? You need something to read in these files quickly and then pass the payload to your processing logic efficiently. Finally, your system needs to work with that payload.

Spring Integration frees you from all that infrastructure code; all you need to do is configure it. There are some issues with dealing with file system–based processing, however, that are up to you to resolve. Behind the scenes, Spring Integration is still dealing with polling the file system and detecting new files. It can’t possibly have a semantically correct idea for your application of when a file is “completely” written, and thus providing a way around that is up to you.

Several approaches exist. You might write out a file and then write another zero-byte file. The presence of that file would mean it’s safe to assume that the real payload is present. Configure Spring Integration to look for that file. If it finds it, it knows that there’s another file (perhaps with the same name and a different file extension?) and that it can start reading it/working with it. Another solution along the same line is to have the client (“producer”) write the file to the directory using a name that the glob pattern that Spring Integration is using to poll the directory won’t detect. Then, when it’s finished writing, issue an mv command if you trust your file system to do the right thing there.

Let’s revisit the first solution, but this time with a file-based adapter. The configuration looks conceptually the same as before, except the configuration for the adapter has changed, and with that has gone a lot of the configuration for the JMS adapter, like the connection factory. Instead, you tell Spring Integration about a different source from whence messages will come: the file system.

package com.apress.springrecipes.springintegration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;


import java.io.File;
import java.util.concurrent.TimeUnit;


@Configuration
@EnableIntegration
@ComponentScan
public class IntegrationConfiguration {


    @Bean
    public InboundHelloWorldFileMessageProcessor messageProcessor() {
        return new InboundHelloWorldFileMessageProcessor();
    }


    @Bean
    public IntegrationFlow inboundFileFlow(@Value("${user.home}/inboundFiles/new/") File directory) {
        return IntegrationFlows
            .from(
                Files.inboundAdapter(directory).patternFilter("*.csv"),
                c -> c.poller(Pollers.fixedRate(10, TimeUnit.SECONDS)))
            .handle(messageProcessor())
            .get();
    }
}

This is nothing you haven’t already seen , really. The code for Files.inboundAdapter is the only new element. The code for the @ServiceActivator annotation has changed to reflect the fact that you’re expecting a message containing a message of type Message<java.io.File>.

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;


import java.io.File;

public class InboundHelloWorldFileMessageProcessor {
    private final Logger logger = LoggerFactory.getLogger(InboundHelloWorldFileMessageProcessor.class);


    @ServiceActivator
    public void handleIncomingFileMessage(Message<File> inboundJmsMessage)
        throws Throwable {
        File filePayload = inboundJmsMessage.getPayload();
        logger.debug("absolute path: {}, size: {}", filePayload.getAbsolutePath(), filePayload.length());
    }
}

15-5. Transform a Message from One Type to Another

Problem

You want to send a message into the bus and transform it before working with it further. Usually, this is done to adapt the message to the requirements of a component downstream. You might also want to transform a message by enriching it—adding extra headers or augmenting the payload so that components downstream in the processing pipeline can benefit from it.

Solution

Use a transformer component to take a Message<T> interface of a payload and send Message<T> out with a payload of a different type. You can also use the transformer to add extra headers or update the values of headers for the benefit of components downstream in the processing pipeline.

How It Works

Spring Integration provides a transformer message endpoint to permit the augmentation of the message headers or the transformation of the message itself. In Spring Integration, components are chained together, and output from one component is returned by way of the method invoked for that component. The return value of the method is passed out on the “reply channel” for the component to the next component, which receives it as an input parameter. A transformer component lets you change the type of the object being returned or add extra headers, and that updated object is what is passed to the next component in the chain.

Modify a Message’s Payload

The configuration of a transformer component is very much in keeping with everything you’ve seen so far.

package com.apress.springrecipes.springintegration;

import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;


import java.util.Map;

public class InboundJMSMessageToCustomerTransformer {

    @Transformer
    public Customer transformJMSMapToCustomer(
        Message<Map<String, Object>> inboundSprignIntegrationMessage) {
        Map<String, Object> jmsMessagePayload = inboundSprignIntegrationMessage.getPayload();
        Customer customer = new Customer();
        customer.setFirstName((String) jmsMessagePayload.get("firstName"));
        customer.setLastName((String) jmsMessagePayload.get("lastName"));
        customer.setId((Long) jmsMessagePayload.get("id"));
        return customer;
    }
}

Nothing terribly complex is happening here: a Message<T> interface of type Map<String,Object> is passed in. The values are manually extracted and used to build an object of type Customer. The Customer object is returned, which has the effect of passing it out on the reply channel for this component. The next component in the configuration will receive this object as its input Message<T>.

The solution is mostly the same as you’ve seen, but there is a new transformer element.

package com.apress.springrecipes.springintegration;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;


@Configuration
@EnableIntegration
@ComponentScan
public class IntegrationConfiguration {


    @Bean
    public CachingConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        return new CachingConnectionFactory(connectionFactory);
    }


    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }


    @Bean
    public InboundJMSMessageToCustomerTransformer customerTransformer() {
        return new InboundJMSMessageToCustomerTransformer();
    }


    @Bean
    public InboundCustomerServiceActivator customerServiceActivator() {
        return new InboundCustomerServiceActivator();
    }


    @Bean
    public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).extractPayload(true).destination("recipe-15-5"))
            .transform(customerTransformer())
            .handle(customerServiceActivator())
            .get();
    }
}

Here, you’re specifying a messageDrivenChannelAdapter component that moves the incoming content to an InboundJMSMessageToCustomerTransformer, which transforms it into a Customer, and that Customer is sent to the InboundCustomerServiceActivator.

The code in the next component can now declare a dependency on the Customer interface with impunity. You can, with transformers, receive messages from any number of sources and transform them into a Customer so that you can reuse the InboundCustomerServiceActivator instance.

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;


public class InboundCustomerServiceActivator {
    private static final Logger logger = LoggerFactory.getLogger(InboundCustomerServiceActivator.class);


    @ServiceActivator
    public void doSomethingWithCustomer(Message<Customer> customerMessage) {
        Customer customer = customerMessage.getPayload();
        logger.debug("id={}, firstName: {}, lastName: {}",
            customer.getId(), customer.getFirstName(), customer.getLastName());
    }
}

Modify a Message’s Headers

Sometimes changing a message’s payload isn’t enough. Sometimes you want to update the payload as well as the headers. Doing this is slightly more interesting because it involves using the MessageBuilder<T> class, which allows you to create new Message<T> objects with any specified payload and any specified header data. The XML configuration is identical in this case.

package com.apress.springrecipes.springintegration;

import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageBuilder;


import java.util.Map;

public class InboundJMSMessageToCustomerTransformer {
    @Transformer
    public Message<Customer> transformJMSMapToCustomer(
        Message<Map<String, Object>> inboundSpringIntegrationMessage) {
        Map<String, Object> jmsMessagePayload =
           inboundSpringIntegrationMessage.getPayload();
        Customer customer = new Customer();
        customer.setFirstName((String) jmsMessagePayload.get("firstName"));
        customer.setLastName((String) jmsMessagePayload.get("lastName"));
        customer.setId((Long) jmsMessagePayload.get("id"));


        return MessageBuilder.withPayload(customer)
            .copyHeadersIfAbsent( inboundSpringIntegrationMessage.getHeaders())
            .setHeaderIfAbsent("randomlySelectedForSurvey", Math.random() > .5)
            .build();
    }
}

As before, this code is simply a method with an input and an output. The output is constructed dynamically using MessageBuilder<T> to create a message that has the same payload as the input message as well as copy the existing headers and adds an extra header: randomlySelectedForSurvey.

15-6. Handle Errors Using Spring Integration

Problem

Spring Integration brings together systems distributed across different nodes; computers; and services, protocol, and language stacks. Indeed, a Spring Integration solution might not even finish in remotely the same time period as when it started. Exception handling, then, can never be as simple as a language-level try/catch block in a single thread for any component with asynchronous behavior. This implies that many of the kinds of solutions you’re likely to build, with channels and queues of any kind, need a way of signaling an error that is distributed and natural to the component that created the error. Thus, an error might get sent over a JMS queue on a different continent, or in process, on a queue in a different thread.

Solution

Use Spring Integration’s support for an error channel , both implicitly and explicitly via code.

How It Works

Spring Integration provides the ability to catch exceptions and send them to an error channel of your choosing. By default, it’s a global channel called errorChannel. By default Spring Integration registers a handler called LoggingHandler to this channel, which does nothing more than log the exception and stacktrace. To make this work, you have to tell the message-driven channel adapter that you want the error to be sent to errorChannel; you can do this by configuring the error channel attribute.

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows
        .from(Jms.messageDrivenChannelAdapter(connectionFactory).extractPayload(true).destination("recipe-15-6").errorChannel("errorChannel"))
        .transform(customerTransformer())
        .handle(customerServiceActivator())
        .get();
}

Use a Custom Handler to Handle Exceptions

Of course, you can also have components subscribe to messages from this channel to override the exception-handling behavior. You can create a class that will be invoked whenever a message comes in on the errorChannel channel.

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows
        .from("errorChannel")
        .handle(errorHandlingServiceActivator())
        .get();
}

The Java code is exactly as you’d expect it to be. Of course, the component that receives the error message from the errorChannel channel doesn’t need to be a service activator. You are just using it for convenience here. The code for the following service activator depicts some of the machinations you might go through to build a handler for errorChannel:

package com.apress.springrecipes.springintegration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;


public class DefaultErrorHandlingServiceActivator {
    private static final Logger logger = LoggerFactory.getLogger(DefaultErrorHandlingServiceActivator.class);


    @ServiceActivator
    public void handleThrowable(Message<Throwable> errorMessage)
        throws Throwable {
        Throwable throwable = errorMessage.getPayload();
        logger.debug("Message: {}", throwable.getMessage(), throwable);


        if (throwable instanceof MessagingException) {
            Message<?> failedMessage = ((MessagingException) throwable).getFailedMessage();


            if (failedMessage != null) {
                // do something with the original message
            }
        } else {
            // it's something that was thrown in the execution of code in some component you created
        }
    }
}

All errors thrown from Spring Integration components will be a subclass of MessagingException. MessagingException carries a pointer to the original Message that caused an error, which you can dissect for more context information. In the example, you’re doing a nasty instanceof. Clearly, being able to delegate to custom exception handlers based on the type of exception would be useful.

Route to Custom Handlers Based on the Type of Exception

Sometimes more specific error handling is required. In the following code, this router is configured as an exception-type router, which in turn listens to errorChannel. It then splinters off, using the type of the exception as the predicate in determining which channel should get the results.

@Bean
public ErrorMessageExceptionTypeRouter exceptionTypeRouter() {
    ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
    router.setChannelMapping(MyCustomException.class.getName(), "customExceptionChannel");
    router.setChannelMapping(RuntimeException.class.getName(), "runtimeExceptionChannel");
    router.setChannelMapping(MessageHandlingException.class.getName(), "messageHandlingExceptionChannel");
    return router;
}


@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows
        .from("errorChannel")
        .route(exceptionTypeRouter())
        .get();
}

Build a Solution with Multiple Error Channels

The preceding example might work fine for simple cases, but often different integrations require different error-handling approaches, which implies that sending all the errors to the same channel can eventually lead to a large switch-laden class that’s too complex to maintain. Instead, it’s better to selectively route error messages to the error channel most appropriate to each integration. This avoids centralizing all error handling. One way to do that is to explicitly specify on what channel errors for a given integration should go. The following example shows a component (service activator) that, upon receiving a message, adds a header indicating the name of the error channel. Spring Integration will use that header and forward errors encountered in the processing of this message to that channel.

package com.apress.springrecipes.springintegration;

import org.apache.log4j.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.MessageBuilder;


public class ServiceActivatorThatSpecifiesErrorChannel {
    private static final Logger logger = Logger.getLogger(
        ServiceActivatorThatSpecifiesErrorChannel.class);


    @ServiceActivator
    public Message<?> startIntegrationFlow(Message<?> firstMessage)
        throws Throwable {
        return MessageBuilder.fromMessage(firstMessage).
            setHeaderIfAbsent( MessageHeaders.ERROR_CHANNEL,
                "errorChannelForMySolution").build();
    }
}

Thus, all errors that come from the integration in which this component is used will be directed to customErrorChannel, to which you can subscribe any component you like.

15-7. Fork Integration Control: Splitters and Aggregators

Problem

You want to fork the process flow from one component to many, either all at once or to a single one based on a predicate condition.

Solution

You can use a splitter component (and maybe its cohort, the aggregator component) to fork and join (respectively) control of processing.

How It Works

One of the fundamental cornerstones of an ESB is routing. You’ve seen how components can be chained together to create sequences in which progression is mostly linear. Some solutions require the capability to split a message into many constituent parts. One reason this might be is that some problems are parallel in nature and don’t depend on each other in order to complete. You should strive to achieve the efficiencies of parallelism wherever possible.

Use a Splitter

It’s often useful to divide large payloads into separate messages with separate processing flows. In Spring Integration, this is accomplished by using a splitter component. A splitter takes an input message and asks you, the user of the component, on what basis it should split Message<T>: you’re responsible for providing the split functionality. Once you’ve told Spring Integration how to split Message<T>, it forwards each result out on the output channel of the splitter component. In a few cases, Spring Integration ships with useful splitters that require no customization. One example is the splitter provided to partition an XML payload along an XPath query, XPathMessageSplitter.

One example of a useful application of a splitter might be a text file with rows of data, each of which must be processed. Your goal is to be able to submit each row to a service that will handle the processing. What’s required is a way to extract each row and forward each row as a new Message<T>. The configuration for such a solution looks like this:

@Configuration
@EnableIntegration
public class IntegrationConfiguration {


    @Bean
    public CustomerBatchFileSplitter splitter() {
        return new CustomerBatchFileSplitter();
    }


    @Bean
    public CustomerDeletionServiceActivator customerDeletionServiceActivator() {
        return new CustomerDeletionServiceActivator();
    }


    @Bean
    public IntegrationFlow fileSplitAndDelete(@Value("file:${user.home}/customerstoremove/new/") File inputDirectory) throws Exception {


        return IntegrationFlows.from(
            Files.inboundAdapter(inputDirectory).patternFilter("customerstoremove-*.txt"), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
            .split(splitter())
            .handle(customerDeletionServiceActivator())
            .get();
    }
}

The configuration for this is not terribly different from the previous solutions. The Java code is just about the same as well, except that the return type of the method annotated by the @Splitter annotation is of type java.util.Collection.

package com.apress.springrecipes.springintegration;

import org.springframework.integration.annotation.Splitter;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;


public class CustomerBatchFileSplitter {

    @Splitter
    public Collection<String> splitAFile(File file) throws IOException {
        System.out.printf("Reading %s....%n", file.getAbsolutePath());
        return Files.readAllLines(file.toPath());
    }
}

A message payload is passed in as a java.io.File component, and the contents are read. The result (a collection or array value; in this case, a Collection<String> collection) is returned. Spring Integration executes a kind of foreach on the results, sending each value in the collection out on the output channel configured for the splitter. Often, you split messages so that the individual pieces can be forwarded to processing that’s more focused. Because the message is more manageable, the processing requirements are dampened. This is true in many different architectures. In map/reduce solutions, tasks are split and then processed in parallel, and the fork/join constructs in a BPM system let control flow proceed in parallel so that the total work product can be achieved quicker.

Use Aggregators

At some point you’ll need to do the reverse: combine many messages into one and create a single result that can be returned on the output channel. An @Aggregator collects a series of messages (based on some correlation that you help Spring Integration make between the messages) and publishes a single message to the components downstream. Suppose that you know you’re expecting 22 different messages from 22 actors in the system, but you don’t know when. This is similar to a company that auctions off a contract and collects all the bids from different vendors before choosing the ultimate vendor. The company can’t accept a bid until all bids have been received from all companies. Otherwise, there’s the risk of prematurely signing a contract that would not be in the best interest of the company. An aggregator is perfect for building this type of logic.

There are many ways for Spring Integration to correlate incoming messages. To determine how many messages to read until it can stop, it uses the class SequenceSizeCompletionStrategy, which reads a well-known header value. (Aggregators are often used after a splitter. Thus, the default header value is provided by the splitter, though there’s nothing stopping you from creating the header parameters yourself.) The class SequenceSizeCompletionStrategy calculates how many it should look for and notes the index of the message relative to the expected total count (e.g., 3/22).

For correlation when you might not have a size but know that you’re expecting messages that share a common header value within a known time, Spring Integration provides HeaderAttributeCorrelationStrategy. In this way, it knows that all messages with that value are from the same group, in the same way that your last name identifies you as being part of a larger group.

Let’s revisit the previous example. Suppose that the file was split (by lines, each belonging to a new customer) and subsequently processed. You now want to reunite the customers and do some cleanup with everyone at the same time. In this example, you use the default completion strategy and correlation strategy, and as such you can use the default aggregate() in the integration flow configuration. The result is passed to another service activator, which will print a small summary.

package com.apress.springrecipes.springintegration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;


import java.io.File;
import java.util.concurrent.TimeUnit;


@Configuration
@EnableIntegration
public class IntegrationConfiguration {


    @Bean
    public CustomerBatchFileSplitter splitter() {
        return new CustomerBatchFileSplitter();
    }


    @Bean
    public CustomerDeletionServiceActivator customerDeletionServiceActivator() {
        return new CustomerDeletionServiceActivator();
    }


    @Bean
    public SummaryServiceActivator summaryServiceActivator() {
        return new SummaryServiceActivator();
    }


    @Bean
    public IntegrationFlow fileSplitAndDelete(@Value("file:${user.home}/customerstoremove/new/") File inputDirectory) throws Exception {


        return IntegrationFlows.from(
            Files.inboundAdapter(inputDirectory).patternFilter("customerstoremove-*.txt"), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
            .split(splitter())
            .handle(customerDeletionServiceActivator())
            .aggregate()
            .handle(summaryServiceActivator())
            .get();
    }
}

The Java code for SummaryServiceActivator is quite simple.

package com.apress.springrecipes.springintegration;

import org.springframework.integration.annotation.ServiceActivator;

import java.util.Collection;

public class SummaryServiceActivator {

    @ServiceActivator
    public void summary(Collection<Customer> customers) {
        System.out.printf("Removed %s customers.%n", customers.size());
    }
}

15-8. Implement Conditional Routing with Routers

Problem

You want to conditionally move a message through different processes based on some criteria. This is the EAI equivalent to an if/else branch.

Solution

You can use a router component to alter the processing flow based on some predicate. You can also use a router to multicast a message to many subscribers (as you did with the splitter).

How It Works

With a router you can specify a known list of channels on which the incoming Message object should be passed. This has some powerful implications. It means you can change the flow of a process conditionally, and it also means you can forward a Message object to as many (or as few) channels as you want. There are some convenient default routers available to fill common needs, such as payload-type–based routing (PayloadTypeRouter) and routing to a group or list of channels (RecipientListRouter).

Imagine, for example, a processing pipeline that routes customers with high credit scores to one service and customers with lower credit scores to another process in which the information is queued up for a human audit and verification cycle. The configuration is, as usual, very straightforward. The following example shows the configuration. One router element, which in turn delegates the routing logic to a class, is CustomerCreditScoreRouter.

@Bean
public IntegrationFlow fileSplitAndDelete(@Value("file:${user.home}/customerstoimport/new/") File inputDirectory) throws Exception {


    return IntegrationFlows.from(
        Files.inboundAdapter(inputDirectory).patternFilter("customers-*.txt"), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS)))
        .split(splitter())
        .transform(transformer())
        .<Customer, Boolean>route(c -> c.getCreditScore() > 770,
            m -> m
                .channelMapping(Boolean.TRUE, "safeCustomerChannel")
                .channelMapping(Boolean.FALSE, "riskyCustomerChannel").applySequence(false)
        ).get();
}

You could use a class with a method annotated with @Router instead. It feels a lot like a workflow engine’s conditional element, or even a JSF backing-bean method, in that it extricates the routing logic into the XML configuration, away from code, delaying the decision until runtime. In the example, the Strings returned are the names of the channels on which the Message component should pass.

package com.apress.springrecipes.springintegration;

import org.springframework.integration.annotation.Router;

public class CustomerCreditScoreRouter {

    @Router
    public String routeByCustomerCreditScore(Customer customer) {
        if (customer.getCreditScore() > 770) {
            return "safeCustomerChannel";
        } else {
            return "riskyCustomerChannel";
        }
    }
}

If you decide that you’d rather not let Message<T> pass and want to stop processing, you can return null instead of a String.

15-9. Stage Events Using Spring Batch

Problem

You have a file with a million records in it. This file is too big to handle as one event; it’s far more natural to react to each row as an event.

Solution

Spring Batch works very well with these types of solutions. It allows you to take an input file or a payload and reliably and systematically decompose it into events that an ESB can work with.

How It Works

Spring Integration does support reading files into the bus, and Spring Batch does support providing custom, unique endpoints for data. However, just like Mom always says, “just because you can doesn’t mean you should.” Although it seems as if there’s a lot of overlap here, it turns out that there is a distinction (albeit a fine one). While both systems will work with files and message queues, or anything else you could conceivably write code to talk to, Spring Integration doesn’t do well with large payloads because it’s hard to deal with something as large as a file with a million rows that might require hours of work as an event. That’s simply too big a burden for an ESB. At that point, the term event has no meaning. A million records in a CSV file isn’t an event on a bus; it’s a file with a million records, each of which might in turn be events. It’s a subtle distinction.

A file with a million rows needs to be decomposed into smaller events. Spring Batch can help here: it allows you to systematically read through, apply validations, and optionally skip and retry invalid records. The processing can begin on an ESB such as Spring Integration. Spring Batch and Spring Integration can be used together to build truly scalable decoupled systems.

Staged event-driven architecture (SEDA) is an architecture style that deals with this sort of processing situation. In SEDA, you dampen the load on components of the architecture by staging it in queues and advance only those the components downstream can handle. Put another way, imagine video processing. If you ran a site with a million users uploading video that in turn needed to be transcoded and you had only ten servers, your system would fail if your system attempted to process each video as soon as it received the uploaded video. Transcoding can take hours and pegs a CPU (or multiple CPUs!) while the system works. The most sensible thing to do would be to store the file and then, as capacity permits, process each one. In this way, the load on the nodes that handle transcoding is managed. There’s always only enough work to keep the machine humming, but not overrun.

Similarly, no processing system (such as an ESB) can deal with a million records at once efficiently. Strive to decompose bigger events and messages into smaller ones. Let’s imagine a hypothetical solution designed to accommodate a drop of batch files representing hourly sales destined for fulfillment. The batch files are dropped onto a mount that Spring Integration is monitoring. Spring Integration kicks off processing as soon as it sees a new file. Spring Integration tells Spring Batch about the file and launches a Spring Batch job asynchronously.

Spring Batch reads the file, transforms the records into objects, and writes the output to a JMS topic with a key correlating the original batch to the JMS message. Naturally, this takes half a day to get done, but it does get done. Spring Integration, completely unaware that the job it started half a day ago is now finished, begins popping messages off the topic, one by one. Processing to fulfill the records would begin. Simple processing involving multiple components might begin on the ESB.

If fulfillment is a long-lived process with a long-lived, conversational state involving many actors, perhaps the fulfillment for each record could be farmed to a BPM engine. The BPM engine would thread together the different actors and work lists and allow work to continue over the course of days instead of the small millisecond time frames Spring Integration is more geared to. In this example, we talked about using Spring Batch as a springboard to dampen the load for components downstream. In this case, the component downstream was again a Spring Integration process that took the work and set it up to be funneled into a BPM engine where final processing could begin. Spring Integration could use directory polling as a trigger to start a batch job and supply the name of the file to process. To launch a job from Spring Integration, Spring Batch provides the JobLaunchingMessageHandler class. This class takes a JobLaunchRequest instance to determine which job with which parameters to start. You have to create a transformer to change the incoming Message<File> to a JobLaunchRequest instance.

The transformer could look like the following:

package com.apress.springrecipes.springintegration;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;


import java.io.File;

public class FileToJobLaunchRequestTransformer {

    private final Job job;
    private final String fileParameterName;


    public FileToJobLaunchRequestTransformer(Job job, String fileParameterName) {
        this.job=job;
        this.fileParameterName=fileParameterName;
    }


    @Transformer
    public JobLaunchRequest transform(File file) throws Exception {


        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addString(fileParameterName, file.getAbsolutePath());
        return new JobLaunchRequest(job, builder.toJobParameters());
    }
}

The transformer needs a Job object and a filename parameter to be constructed; this parameter is used in the Spring Batch job to determine which file needs to be loaded. The incoming message is transformed in a JobLaunchRequest using the full name of the file as a parameter value. This request can be used to launch a batch job.

To wire everything together, you can use the following configuration (note the Spring Batch setup is missing here; see Chapter 11 for information on setting up Spring Batch):

package com.apress.springrecipes.springintegration;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.launch.JobLaunchingMessageHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;


import java.io.File;
import java.util.concurrent.TimeUnit;


public class IntegrationConfiguration {

    @Bean
    public FileToJobLaunchRequestTransformer transformer(Job job) {
        return new FileToJobLaunchRequestTransformer(job, "filename");
    }


    @Bean
    public JobLaunchingMessageHandler jobLaunchingMessageHandler(JobLauncher jobLauncher) {
        return new JobLaunchingMessageHandler(jobLauncher);
    }


    @Bean
    public IntegrationFlow fileToBatchFlow(@Value("file:${user.home}/customerstoimport/new/") File directory, FileToJobLaunchRequestTransformer transformer, JobLaunchingMessageHandler handler) {
        return IntegrationFlows
            .from(Files.inboundAdapter(directory).patternFilter("customers-*.txt"), c -> c.poller(Pollers.fixedRate(10, TimeUnit.SECONDS)))
            .transform(transformer)
                .handle(handler)
            .get();


    }

}

FileToJobLaunchRequestTransformer is configured as well as JobLaunchingMessageHandler. A file-inbound channel adapter is used to poll for files. When a file is detected, a message is placed on a channel. A chain is configured to listen to that channel. When a message is received, it is first transformed and next passed on to JobLaunchingMessageHandler.

Now a batch job will be launched to process the file. A typical job would probably use a FlatFileItemReader to actually read the file passed using the filename parameter. A JmsItemWriter could be used to write messages per read row on a topic. In Spring Integration, a JMS-inbound channel adapter could be used to receive messages and process them.

15-10. Use Gateways

Problem

You want to expose an interface to clients of your service, without betraying the fact that your service is implemented in terms of messaging middleware.

Solution

Use a gateway —a pattern from the classic book Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf—that enjoys rich support in Spring Integration.

How It Works

A gateway is a distinct animal, similar to a lot of other patterns but ultimately different enough to warrant its own consideration. You used adapters in previous examples to enable two systems to speak in terms of foreign, loosely coupled, middleware components. This foreign component can be anything: the file system, JMS queues/topics, Twitter, and so on.

You also know what a façade is, serving to abstract away the functionality of other components in an abbreviated interface to provide courser-grained functionality. You might use a façade to build an interface oriented around vacation planning that in turn abstracts away the minutiae of using a car rental, hotel reservation, and airline reservation system.

You build a gateway, on the other hand, to provide an interface for your system that insulates clients from the middleware or messaging in your system so that they’re not dependent on JMS or Spring Integration APIs, for example. A gateway allows you to express compile-time constraints on the inputs and outputs of your system.

You might want to do this for several reasons. First, it’s cleaner. Secondly if you have the latitude to insist that clients comply with an interface, this is a good way to provide that interface. Your use of middleware can be an implementation detail. Perhaps your architecture’s messaging middleware can be to exploit the performance increases had by leveraging asynchronous messaging, but you didn’t intend for those performance gains to come at the cost of a precise, explicit, external-facing interface.

This feature—the capability to hide messaging behind a POJO interface—is interesting and has been the focus of several other projects. Lingo, a project from Codehaus.org that is no longer under active development, had such a feature that was specific to JMS and the Java EE Connector Architecture (JCA)—it was originally used to talk about the Java Cryptography Architecture but is more commonly used for the Java EE Connector Architecture now). Since then, the developers have moved on to work on Apache Camel.

In this recipe, you’ll explore Spring Integration’s core support for messaging gateways and explore its support for message exchange patterns. Then, you’ll see how to completely remove implementation details from the client-facing interface.

SimpleMessagingGateway

The most fundamental support for gateways comes from the Spring Integration class SimpleMessagingGateway. The class provides the ability to specify a channel on which requests should be sent and a channel on which responses are expected. Finally, the channel on which replies are sent can be specified. This gives you the ability to express in-out and in-only patterns on top of your existing messaging systems. This class supports working in terms of payloads, isolating you from the gory details of the messages being sent and received. This is already one level of abstraction. You could, conceivably, use SimpleMessagingGateway and Spring Integration’s concept of channels to interface with file systems, JMS, e-mail, or any other system and deal simply with payloads and channels. There are implementations already provided for you to support some of these common endpoints such as web services and JMS.

Let’s look at using a generic messaging gateway. In this example, you’ll send messages to a service activator and then receive the response. You manually interface with SimpleMessageGateway so that you can see how convenient it is.

package com.apress.springrecipes.springintegration;

import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.MessageChannel;


public class Main {
    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(AdditionConfiguration.class);
        MessageChannel request = ctx.getBean("request", MessageChannel.class);
        MessageChannel response = ctx.getBean("response", MessageChannel.class);


        SimpleMessagingGateway msgGateway = new SimpleMessagingGateway();
        msgGateway.setRequestChannel(request);
        msgGateway.setReplyChannel(response);
        msgGateway.setBeanFactory(ctx);
        msgGateway.afterPropertiesSet();
        msgGateway.start();


        Number result = msgGateway.convertSendAndReceive(new Operands(22, 4));

        System.out.printf("Result: %f%n", result.floatValue());

        ctx.close();

    }
}

The interface is straightforward. SimpleMessagingGateway needs a request and a response channel, and it coordinates the rest. In this case, you’re doing nothing but forwarding the request to a service activator, which in turn adds the operands and sends them out on the reply channel . The configuration is sparse because most of the work is done in those five lines of Java code.

package com.apress.springrecipes.springintegration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;


@Configuration
@EnableIntegration
public class AdditionConfiguration {


    @Bean
    public AdditionService additionService() {
        return new AdditionService();
    }


    @Bean
    public IntegrationFlow additionFlow() {


        return IntegrationFlows
            .from("request")
            .handle(additionService(), "add")
            .channel("response")
            .get();
    }
}

Break the Interface Dependency

The previous example demonstrates what’s happening behind the scenes . You’re dealing only with Spring Integration interfaces and are isolated from the nuances of the endpoints. However, there are still plenty of inferred constraints that a client might easily fail to comply with. The simplest solution is to hide the messaging behind an interface. Let’s look at building a fictional hotel reservation search engine. Searching for a hotel might take a long time, and ideally processing should be offloaded to a separate server. An ideal solution is JMS because you could implement the aggressive consumer pattern and scale simply by adding more consumers. The client would still block waiting for the result, in this example, but the server (or servers) would not be overloaded or in a blocking state.

You’ll build two Spring Integration solutions: one for the client (which will in turn contain the gateway) and one for the service itself, which, presumably, is on a separate host connected to the client only by way of well-known message queues.

Let’s look at the client configuration first. The first thing that the client configuration does is declare a ConnectionFactory. Then you declare the flow that starts with the gateway for the VacationService interface. The gateway element simply exists to identify the component and the interface, to which the proxy is cast and made available to clients. jms-outbound-gateway is the component that does most of the work. It takes the message you created and sends it to the request JMS destination, setting up the reply headers, and so on. Finally, you declare a generic gateway element, which does most of the magic.

package com.apress.springrecipes.springintegration;

import com.apress.springrecipes.springintegration.myholiday.VacationService;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.jms.connection.CachingConnectionFactory;


import java.util.Arrays;

@Configuration
@EnableIntegration
public class ClientIntegrationContext {


    @Bean
    public CachingConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        connectionFactory.setTrustAllPackages(true);
        return new CachingConnectionFactory(connectionFactory);
    }


    @Bean
    public IntegrationFlow vacationGatewayFlow() {
        return IntegrationFlows
            .from(VacationService.class)
            .handle(
                Jms.outboundGateway(connectionFactory())
                    .requestDestination("inboundHotelReservationSearchDestination")
                    .replyDestination("outboundHotelReservationSearchResultsDestination"))
            .get();
    }


}

To be able to use VacationService as a gateway, it needs to be annotated with the @MessagingGateway annotation, and the method that serves as the entry point needs to be annotated with @Gateway.

package com.apress.springrecipes.springintegration.myholiday;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;


import java.util.List;

@MessagingGateway
public interface VacationService {


    @Gateway
    List<HotelReservation> findHotels(HotelReservationSearch hotelReservationSearch);
}

This is the client-facing interface. There is no coupling between the client-facing interface exposed via the gateway component and the interface of the service that ultimately handles the messages. You use the interface for the service and the client to simplify the names needed to understand everything that’s going on. This is not like traditional, synchronous remoting in which the service interface and the client interface match.

In this example, you’re using two very simple objects for demonstration: HotelReservationSearch and HotelReservation. There is nothing interesting about these objects in the slightest; they are simple POJOs that implement Serializable and contain a few accessor/mutators to flesh out the example domain.

The following client Java code demonstrates how all of this comes together:

package com.apress.springrecipes.springintegration;

import com.apress.springrecipes.springintegration.myholiday.HotelReservation;
import com.apress.springrecipes.springintegration.myholiday.HotelReservationSearch;
import com.apress.springrecipes.springintegration.myholiday.VacationService;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;


public class Main {
    public static void main(String[] args) throws Throwable {
        // Start server
        ConfigurableApplicationContext serverCtx = new AnnotationConfigApplicationContext(ServerIntegrationContext.class);


        // Start client and issue search
        ConfigurableApplicationContext clientCtx = new AnnotationConfigApplicationContext(ClientIntegrationContext.class);


        VacationService vacationService = clientCtx.getBean(VacationService.class);

        LocalDate now = LocalDate.now();
        Date start = Date.from(now.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant());
        Date stop = Date.from(now.plusDays(8).atStartOfDay(ZoneId.systemDefault()).toInstant());
        HotelReservationSearch hotelReservationSearch = new HotelReservationSearch(200f, 2, start, stop);
        List<HotelReservation> results = vacationService.findHotels(hotelReservationSearch);


        System.out.printf("Found %s results.%n", results.size());
        results.forEach(r -> System.out.printf(" %s%n", r));


        serverCtx.close();
        clientCtx.close();
    }
}

It just doesn’t get any cleaner than that! No Spring Integration interfaces whatsoever. You make a request, searching is done, and you get the result back when the processing is done. The service implementation for this setup is interesting, not because of what you’ve added but because of what’s not there.

package com.apress.springrecipes.springintegration;

import com.apress.springrecipes.springintegration.myholiday.VacationServiceImpl;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.jms.connection.CachingConnectionFactory;


import java.util.Arrays;

@Configuration
@EnableIntegration
public class ServerIntegrationContext {


    @Bean
    public CachingConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        connectionFactory.setTrustAllPackages(true);
        return new CachingConnectionFactory(connectionFactory);
    }


    @Bean
    public VacationServiceImpl vacationService() {
        return new VacationServiceImpl();
    }


    @Bean
    public IntegrationFlow serverIntegrationFlow() {
        return IntegrationFlows.from(
            Jms.inboundGateway(connectionFactory()).destination("inboundHotelReservationSearchDestination"))
            .handle(vacationService())
            .get();
    }
}

Here, you’ve defined an inbound JMS gateway. The messages from the inbound JMS gateway are put on a channel, whose messages are forwarded to a service activator, as you would expect. The service activator is what handles actual processing. What’s interesting here is that there’s no mention of a response channel, either for the service activator or for the inbound JMS gateway. The service activator looks, and fails to find, a reply channel and so uses the reply channel created by the inbound JMS gateway component, which in turn has created the reply channel based on the header metadata in the inbound JMS message. Thus, everything just works without specification.

The implementation is a simple useless implementation of the interface.

package com.apress.springrecipes.springintegration.myholiday;

import org.springframework.integration.annotation.ServiceActivator;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;


public class VacationServiceImpl implements VacationService {
    private List<HotelReservation> hotelReservations;


    @PostConstruct
    public void afterPropertiesSet() throws Exception {
        hotelReservations = Arrays.asList(
                new HotelReservation("Bilton", 243.200F),
                new HotelReservation("East Western", 75.0F),
                new HotelReservation("Thairfield Inn", 70F),
                new HotelReservation("Park In The Inn", 200.00F));
    }


    @ServiceActivator
    public List<HotelReservation> findHotels(HotelReservationSearch searchMsg) {
        try {
            Thread.sleep(1000);
        } catch (Throwable th) {
        }


        return this.hotelReservations;
    }
}

Summary

This chapter discussed building an integration solution using Spring Integration, an ESB-like framework built on top of the Spring Framework. You were introduced to the core concepts of enterprise application integration. You learned how to handle a few integration scenarios, including JMS and file polling.

In the next chapter, you will explore the capabilities of Spring in the field of testing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset