© Marten Deinum, Daniel Rubio, and Josh Long 2017

Marten Deinum, Daniel Rubio and Josh Long, Spring 5 Recipes, https://doi.org/10.1007/978-1-4842-2790-9_14

14. Spring Messaging

Marten Deinum, Daniel Rubio2 and Josh Long3

(1)Meppel, Drenthe, The Netherlands

(2)F. Bahia, Ensenada, Baja California, Mexico

(3)Apartment 205, Canyon Country, California, USA

In this chapter, you will learn about Spring’s support for messaging. Messaging is a very powerful technique for scaling applications. It allows work that would otherwise overwhelm a service to be queued up. It also encourages a decoupled architecture. A component , for example, might only consume messages with a single java.util.Map-based key-value pair. This loose contract makes it a viable hub of communication for multiple, disparate systems.

In this chapter, we’ll refer quite a bit to topics and queues. Messaging solutions are designed to solve two types of architecture requirements: messaging from one point in an application to another known point, and messaging from one point in an application to many other unknown points. These patterns are the middleware equivalents of telling somebody something face to face and saying something over a loud speaker to a room of people, respectively.

If you want messages sent on a message queue to be broadcast to an unknown set of clients who are “listening” for the message (as in the loud speaker analogy), you send the message on a topic. If you want the message sent to a single, known client, then you send it over a queue.

By the end of this chapter, you will be able to create and access message-based middleware using Spring. This chapter will also provide you with a working knowledge of messaging in general, which will help you when we discuss Spring Integration in the next chapter.

We will take a look at the messaging abstraction and how to use it to work with JMS, AMQP, and Apache Kafka. For each of the technologies, Spring simplifies the usage with a template-based approach for easy message sending and receiving. Moreover, Spring enables beans declared in its IoC container to listen for messages and react to them. It takes the same approach for each of these technologies.

Note

In the ch14in directory there are several scripts for starting a Dockerized version of the different messaging providers: ActiveMQ for JMS, RabbitMQ for AMQP, and finally Apache Kafka.

14-1. Send and Receive JMS Messages with Spring

Problem

To send or receive a JMS message , you have to perform the following tasks:

  1. Create a JMS connection factory on a message broker.

  2. Create a JMS destination, which can be either a queue or a topic.

  3. Open a JMS connection from the connection factory.

  4. Obtain a JMS session from the connection.

  5. Send or receive the JMS message with a message producer or consumer.

  6. Handle JMSException, which is a checked exception that must be handled.

  7. Close the JMS session and connection.

As you can see, a lot of coding is required to send or receive a simple JMS message. In fact, most of these tasks are boilerplate and require you to repeat them each time when dealing with JMS.

Solution

Spring offers a template-based solution to simplify JMS code . With a JMS template (the Spring Framework class JmsTemplate), you can send and receive JMS messages with much less code. The template handles the boilerplate tasks and also converts the JMS API’s JMSException hierarchy into Spring’s runtime exception org.springframework.jms.JmsException hierarchy.

How It Works

Suppose you are developing a post-office system that includes two subsystems: the front-desk subsystem and the back-office subsystem. When the front desk receives mail, it passes the mail to the back office for categorizing and delivering. At the same time, the front-desk subsystem sends a JMS message to the back-office subsystem, notifying it of new mail. The mail information is represented by the following class:

package com.apress.springrecipes.post;
public class Mail {


private String mailId;
private String country;
private double weight;


// Constructors, Getters and Setters
 ...
}

The methods for sending and receiving mail information are defined in the FrontDesk and BackOffice interfaces as follows:

package com.apress.springrecipes.post;                                                                                                    

public interface FrontDesk {

public void sendMail(Mail mail);
}
package com.apress.springrecipes.post;


public interface BackOffice {

public Mail receiveMail();
}

Send and Receive Messages Without Spring’s JMS Template Support

Let’s look at how to send and receive JMS messages without Spring’s JMS template support. The following FrontDeskImpl class sends JMS messages with the JMS API directly.

package com.apress.springrecipes.post;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;


public class FrontDeskImpl implements FrontDesk {

public void sendMail(Mail mail) {
 ConnectionFactory cf =
new ActiveMQConnectionFactory("tcp://localhost:61616");
 Destination destination = new ActiveMQQueue("mail.queue");


 Connection conn = null;
try {
 conn = cf.createConnection();
 Session session =
 conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 MessageProducer producer = session.createProducer(destination);


 MapMessage message = session.createMapMessage();
 message.setString("mailId", mail.getMailId());
 message.setString("country", mail.getCountry());
 message.setDouble("weight", mail.getWeight());
 producer.send(message);


 session.close();
 } catch (JMSException e) {
throw new RuntimeException(e);
 } finally {
if (conn != null) {
try {
 conn.close();
 } catch (JMSException e) {
 }
 }
 }
 }
}

In the preceding sendMail() method, you first create JMS-specific ConnectionFactory and Destination objects with the classes provided by ActiveMQ. The message broker URL is the default for ActiveMQ if you run it on localhost. In JMS, there are two types of destinations: queue and topic.

As explained at the start of the chapter, a queue is for the point-to-point communication model, while a topic is for the publish-subscribe communication model. Because you are sending JMS messages point to point from front desk to back office, you should use a message queue . You can easily create a topic as a destination using the ActiveMQTopic class.

Next, you have to create a connection, session, and message producer before you can send your message. There are several types of messages defined in the JMS API, including TextMessage, MapMessage, BytesMessage, ObjectMessage, and StreamMessage. MapMessage contains message content in key-value pairs like a map. All of them are interfaces, whose superclass is simply Message. In the meantime, you have to handle JMSException, which may be thrown by the JMS API. Finally, you must remember to close the session and connection to release system resources. Every time a JMS connection is closed, all its opened sessions will be closed automatically. So, you only have to ensure that the JMS connection is closed properly in the finally block.

On the other hand, the following BackOfficeImpl class receives JMS messages with the JMS API directly:

package com.apress.springrecipes.post;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;


public class BackOfficeImpl implements BackOffice {

public Mail receiveMail() {
 ConnectionFactory cf =
new ActiveMQConnectionFactory("tcp://localhost:61616");
 Destination destination = new ActiveMQQueue("mail.queue");


 Connection conn = null;
try {
 conn = cf.createConnection();
 Session session =
 conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 MessageConsumer consumer = session.createConsumer(destination);


 conn.start();
 MapMessage message = (MapMessage) consumer.receive();
 Mail mail = new Mail();
 mail.setMailId(message.getString("mailId"));
 mail.setCountry(message.getString("country"));
 mail.setWeight(message.getDouble("weight"));
 session.close();
return mail;
 } catch (JMSException e) {
throw new RuntimeException(e);
 } finally {
if (conn != null) {
try {
 conn.close();
 } catch (JMSException e) {
 }
 }
 }
 }
}

Most of the code in this method is similar to that for sending JMS messages, except that you create a message consumer and receive a JMS message from it. Note that you used the connection’s start() method here, although you didn’t in the FrontDeskImpl example before.

When using a Connection to receive messages, you can add listeners to the connection that are invoked on receipt of a message, or you can block synchronously, waiting for a message to arrive. The container has no way of knowing which approach you will take and so it doesn’t start polling for messages until you’ve explicitly called start(). If you add listeners or do any kind of configuration, you do so before you invoke start().

Finally, let’s create two configuration class for the front-desk subsystem (e.g., FrontOfficeConfiguration) and one for the back-office subsystem (e.g., BackOfficeConfiguration).

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.FrontDeskImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FrontOfficeConfiguration {


 @Bean
public FrontDeskImpl frontDesk() {
return new FrontDeskImpl();
 }
}
package com.apress.springrecipes.post.config;


import com.apress.springrecipes.post.BackOfficeImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class BackOfficeConfiguration {


 @Bean
public BackOfficeImpl backOffice() {
return new BackOfficeImpl();
 }
}

Now, the front-desk and back-office subsystems are almost ready to send and receive JMS messages. But before moving on to the final step, start up the ActiveMQ message broker (if not done already).

You can easily monitor the ActiveMQ messaging broker’s activity. In a default installation, you can open http://localhost:8161/admin/queueGraph.jsp to see what’s happening with mail.queue, the queue used in these examples. Alternatively, ActiveMQ exposes very useful beans and statistics from JMX. Simply run jconsole and drill down to org.apache.activemq in the MBeans tab.

Next, let’s create a couple of main classes to run the message system: one for the front-desk subsystem (FrontDeskMain class) and another for the back-office subsystem (BackOfficeMain) class.

package com.apress.springrecipes.post;

import com.apress.springrecipes.post.config.FrontOfficeConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class FrontDeskMain {

public static void main(String[] args) {

 ApplicationContext context = new AnnotationConfigApplicationContext(FrontOfficeConfiguration.class);

 FrontDesk frontDesk = context.getBean(FrontDesk.class);
 frontDesk.sendMail(new Mail("1234", "US", 1.5));
 }
}
package com.apress.springrecipes.post;


import com.apress.springrecipes.post.config.BackOfficeConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class BackOfficeMain {

public static void main(String[] args) {

 ApplicationContext context = new AnnotationConfigApplicationContext(BackOfficeConfiguration.class);

 BackOffice backOffice = context.getBean(BackOffice.class);
 Mail mail = backOffice.receiveMail();
 System.out.println("Mail #" + mail.getMailId() + " received");
 }
}

Every time you run the front-desk application with the previous FrontDeskMain class, a message is sent to the broker, and every time you run the back-office application with the previous BackOfficeMain class, an attempt is made pick a message from the broker.

Send and Receive Messages with Spring’s JMS Template

Spring offers a JMS template that can significantly simplify your JMS code. To send a JMS message with this template, you simply call the send() method and provide a message destination, as well as a MessageCreator object, which creates the JMS message you are going to send. The MessageCreator object is usually implemented as an anonymous inner class.

package com.apress.springrecipes.post;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;


import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;


public class FrontDeskImpl implements FrontDesk {

private JmsTemplate jmsTemplate;
private Destination destination;


public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
 }


public void setDestination(Destination destination) {
this.destination = destination;
 }


public void sendMail(final Mail mail) {
 jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
 MapMessage message = session.createMapMessage();
 message.setString("mailId", mail.getMailId());
 message.setString("country", mail.getCountry());
 message.setDouble("weight", mail.getWeight());
return message;
 }
 });
 }
}

Note that an inner class can only access arguments or variables of the enclosing method that are declared as final. The MessageCreator interface declares only a createMessage() method for you to implement. In this method, you create and return your JMS message with the provided JMS session.

A JMS template helps you to obtain and release the JMS connection and session, and it sends the JMS message created by your MessageCreator object. Moreover, it converts the JMS API’s JMSException hierarchy into Spring’s JMS runtime exception hierarchy , whose base exception class is org.springframework.jms.JmsException. You can catch the JmsException thrown from send and the other send variants and then take action in the catch block if you want.

In the front-desk subsystem’s bean configuration file, you declare a JMS template that refers to the JMS connection factory for opening connections. Then, you inject this template as well as the message destination into your front-desk bean.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.FrontDeskImpl;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;


import javax.jms.ConnectionFactory;
import javax.jms.Queue;


@Configuration
public class FrontOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
 }


 @Bean
public Queue destination() {
return new ActiveMQQueue("mail.queue");
 }


 @Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
 }


 @Bean
public FrontDeskImpl frontDesk() {
 FrontDeskImpl frontDesk = new FrontDeskImpl();
 frontDesk.setJmsTemplate(jmsTemplate());
 frontDesk.setDestination(destination());
return frontDesk;
 }
}

To receive a JMS message with a JMS template, you call the receive() method by providing a message destination. This method returns a JMS message, javax.jms.Message, whose type is the base JMS message type (that is, an interface), so you have to cast it into proper type before further processing.

package com.apress.springrecipes.post;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;


import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;


public class BackOfficeImpl implements BackOffice {

private JmsTemplate jmsTemplate;
private Destination destination;


public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
 }


public void setDestination(Destination destination) {
this.destination = destination;
 }


public Mail receiveMail() {
 MapMessage message = (MapMessage) jmsTemplate.receive(destination);
try {
if (message == null) {
return null;
 }
 Mail mail = new Mail();
 mail.setMailId(message.getString("mailId"));
 mail.setCountry(message.getString("country"));
 mail.setWeight(message.getDouble("weight"));
return mail;
 } catch (JMSException e) {
throw JmsUtils.convertJmsAccessException(e);
 }
 }
}

However, when extracting information from the received MapMessage object, you still have to handle the JMS API’s JMSException. This is in stark contrast to the default behavior of the framework, where it automatically maps exceptions for you when invoking methods on JmsTemplate. To make the type of the exception thrown by this method consistent, you have to make a call to JmsUtils.convertJmsAccessException() to convert the JMS API’s JMSException into Spring’s JmsException.

In the back-office subsystem’s bean configuration file, you declare a JMS template and inject it together with the message destination into your back-office bean.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.BackOfficeImpl;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;


import javax.jms.ConnectionFactory;
import javax.jms.Queue;


@Configuration
public class BackOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
 }


 @Bean
public Queue destination() {
return new ActiveMQQueue("mail.queue");
 }


 @Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(connectionFactory());
 jmsTemplate.setReceiveTimeout(10000);
return jmsTemplate;
 }


 @Bean
public BackOfficeImpl backOffice() {
 BackOfficeImpl backOffice = new BackOfficeImpl();
 backOffice.setDestination(destination());
 backOffice.setJmsTemplate(jmsTemplate());
return backOffice;
 }
}

Pay special attention to the JMS template’s receiveTimeout property, which specifies how long to wait in milliseconds. By default, this template waits for a JMS message at the destination forever, and the calling thread is blocked in the meantime. To avoid waiting for a message so long, you should specify a receive timeout for this template. If there’s no message available at the destination in the duration, the JMS template’s receive() method will return a null message.

In your applications, the main use of receiving a message might be because you’re expecting a response to something or want to check for messages at an interval, handling the messages and then spinning down until the next interval. If you intend to receive messages and respond to them as a service, you’re likely going to want to use the message-driven POJO functionality described later in this chapter . There, we discuss a mechanism that will constantly sit and wait for messages, handling them by calling back into your application as the messages arrive.

Send and Receive Messages to and from a Default Destination

Instead of specifying a message destination for each JMS template’s send() and receive() method call, you can specify a default destination for a JMS template. Then, you no longer need to inject it into your message sender and receiver beans again.

@Configuration
public class FrontOfficeConfiguration {
...
 @Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(connectionFactory());
 jmsTemplate.setDefaultDestination(mailDestination());
return jmsTemplate;
 }


 @Bean
public FrontDeskImpl frontDesk() {
 FrontDeskImpl frontDesk = new FrontDeskImpl();
 frontDesk.setJmsTemplate(jmsTemplate());
return frontDesk;
 }
}

For the back office, the configuration would look like this:

@Configuration
public class BackOfficeConfiguration {
...
 @Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(connectionFactory());
 jmsTemplate.setDefaultDestination(mailDestination());
 jmsTemplate.setReceiveTimeout(10000);
return jmsTemplate;
 }


 @Bean
public BackOfficeImpl backOffice() {
 BackOfficeImpl backOffice = new BackOfficeImpl();
 backOffice.setJmsTemplate(jmsTemplate());
return backOffice;
 }
}

With the default destination specified for a JMS template, you can delete the setter method for a message destination from your message sender and receiver classes. Now, when you call the send() and receive() methods, you no longer need to specify a message destination.

package com.apress.springrecipes.post;
...
import org.springframework.jms.core.MessageCreator;


public class FrontDeskImpl implements FrontDesk {
 ...
public void sendMail(final Mail mail) {
 jmsTemplate.send(new MessageCreator() {
 ...
 });
 }
}
package com.apress.springrecipes.post;
...
import javax.jms.MapMessage;
...


public class BackOfficeImpl implements BackOffice {
 ...
public Mail receiveMail() {
 MapMessage message = (MapMessage) jmsTemplate.receive();
 ...
 }
}

In addition, instead of specifying an instance of the Destination interface for a JMS template, you can specify the destination name to let the JMS template resolve it for you, so you can delete the destination property declaration from both bean configuration classes. This is done by adding the defaultDestinationName property.

@Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 ...
 jmsTemplate.setDefaultDestinationName("mail.queue");
return jmsTemplate;
 }

Extend the JmsGatewaySupport Class

JMS sender and receiver classes can also extend JmsGatewaySupport to retrieve a JMS template. You have the following two options for classes that extend JmsGatewaySupport to create their JMS template:

  • Inject a JMS connection factory for JmsGatewaySupport to create a JMS template on it automatically. However, if you do it this way, you won’t be able to configure the details of the JMS template.

  • Inject a JMS template for JmsGatewaySupport that is created and configured by you.

Of them, the second approach is more suitable if you have to configure the JMS template yourself. You can delete the private field jmsTemplate and its setter method from both your sender and receiver classes. When you need access to the JMS template, you just make a call to getJmsTemplate().

package com.apress.springrecipes.post;

import org.springframework.jms.core.support.JmsGatewaySupport;
...


public class FrontDeskImpl extends JmsGatewaySupport implements FrontDesk {
 ...
public void sendMail(final Mail mail) {
 getJmsTemplate().send(new MessageCreator() {
 ...
 });
 }
}
package com.apress.springrecipes.post;
...


import org.springframework.jms.core.support.JmsGatewaySupport;

public class BackOfficeImpl extends JmsGatewaySupport implements BackOffice {
public Mail receiveMail() {
 MapMessage message = (MapMessage) getJmsTemplate().receive();
 ...
 }
}

14-2. Convert JMS Messages

Problem

Your application receives messages from your message queue but needs to transform those messages from the JMS-specific type to a business-specific class.

Solution

Spring provides an implementation of SimpleMessageConverter to handle the translation of a JMS message received to a business object and the translation of a business object to a JMS message. You can leverage the default or provide your own.

How It Works

The previous recipes handled the raw JMS messages . Spring’s JMS template can help you convert JMS messages to and from Java objects using a message converter. By default, the JMS template uses SimpleMessageConverter for converting TextMessage to or from a string, BytesMessage to or from a byte array, MapMessage to or from a map, and ObjectMessage to or from a serializable object.

For the front-desk and back-office classes of the previous recipe, you can send and receive a map using the convertAndSend() and receiveAndConvert() methods, where the map is converted to/from MapMessage.

package com.apress.springrecipes.post;
...
public class FrontDeskImpl extends JmsGatewaySupport implements FrontDesk {
public void sendMail(Mail mail) {
 Map<String, Object> map = new HashMap<String, Object>();
 map.put("mailId", mail.getMailId());
 map.put("country", mail.getCountry());
 map.put("weight", mail.getWeight());
 getJmsTemplate().convertAndSend(map);
 }
}
package com.apress.springrecipes.post;
...
public class BackOfficeImpl extends JmsGatewaySupport implements BackOffice {
public Mail receiveMail() {
 Map map = (Map) getJmsTemplate().receiveAndConvert();
 Mail mail = new Mail();
 mail.setMailId((String) map.get("mailId"));
 mail.setCountry((String) map.get("country"));
 mail.setWeight((Double) map.get("weight"));
return mail;
 }
}

You can also create a custom message converter by implementing the MessageConverter interface for converting mail objects.

package com.apress.springrecipes.post;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;


import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;


public class MailMessageConverter implements MessageConverter {

public Object fromMessage(Message message) throws JMSException,
 MessageConversionException {
 MapMessage mapMessage = (MapMessage) message;
 Mail mail = new Mail();
 mail.setMailId(mapMessage.getString("mailId"));
 mail.setCountry(mapMessage.getString("country"));
 mail.setWeight(mapMessage.getDouble("weight"));
return mail;
 }


public Message toMessage(Object object, Session session) throws JMSException,
 MessageConversionException {
 Mail mail = (Mail) object;
 MapMessage message = session.createMapMessage();
 message.setString("mailId", mail.getMailId());
 message.setString("country", mail.getCountry());
 message.setDouble("weight", mail.getWeight());
return message;
 }
}

To apply this message converter, you have to declare it in both bean configuration classes and inject it into the JMS template .

@Configuration
public class BackOfficeConfiguration {
 ...
 @Bean
public JmsTemplate jmsTemplate() {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setMessageConverter(mailMessageConverter());
 ...
return jmsTemplate;
 }


 @Bean
public MailMessageConverter mailMessageConverter() {
return new MailMessageConverter();
 }
}

When you set a message converter for a JMS template explicitly, it will override the default SimpleMessageConverter. Now, you can call the JMS template’s convertAndSend() and receiveAndConvert() methods to send and receive mail objects.

package com.apress.springrecipes.post;                                                                                          
...
public class FrontDeskImpl extends JmsGatewaySupport implements FrontDesk {
public void sendMail(Mail mail) {
 getJmsTemplate().convertAndSend(mail);
 }
}
package com.apress.springrecipes.post;
...
public class BackOfficeImpl extends JmsGatewaySupport implements BackOffice {
public Mail receiveMail() {
return (Mail) getJmsTemplate().receiveAndConvert();
 }
}

14-3. Manage JMS Transactions

Problem

You want to participate in transactions with JMS so that the receipt and sending of messages are transactional.

Solution

You can use the same transactions strategy as you would for any Spring component. Leverage Spring’s TransactionManager implementations as needed and wire the behavior into beans.

How It Works

When producing or consuming multiple JMS messages in a single method, if an error occurs in the middle, the JMS messages produced or consumed at the destination may be left in an inconsistent state. You have to surround the method with a transaction to avoid this problem.

In Spring, JMS transaction management is consistent with other data access strategies. For example, you can annotate the methods that require transaction management with the @Transactional annotation.

package com.apress.springrecipes.post;

import org.springframework.jms.core.support.JmsGatewaySupport;
import org.springframework.transaction.annotation.Transactional;
...
public class FrontDeskImpl extends JmsGatewaySupport implements FrontDesk {


 @Transactional
public void sendMail(Mail mail) {
 ...
 }
}
package com.apress.springrecipes.post;


import org.springframework.jms.core.support.JmsGatewaySupport;
import org.springframework.transaction.annotation.Transactional;
...
public class BackOfficeImpl extends JmsGatewaySupport implements BackOffice {


 @Transactional
public Mail receiveMail() {
 ...
 }
}

Then, in both Java configuration classes, add the @EnableTransactionManagement annotation and declare a transaction manager. The corresponding transaction manager for local JMS transactions is JmsTransactionManager, which requires a reference to the JMS connection factory .

package com.apress.springrecipes.post.config;
...
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;


import javax.jms.ConnectionFactory;

@Configuration
@EnableTransactionManagement
public class BackOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() { ... }


 @Bean
public PlatformTransactionManager transactionManager() {
return new JmsTransactionManager(connectionFactory());
 }
}

If you require transaction management across multiple resources, such as a data source and an ORM resource factory, or if you need distributed transaction management, you have to configure JTA transaction in your app server and use JtaTransactionManager. Note that for multiple resource transaction support, the JMS connection factory must be XA compliant (i.e., it must support distributed transactions ).

14-4. Create Message-Driven POJOs in Spring

Problem

When you call the receive() method on a JMS message consumer to receive a message, the calling thread is blocked until a message is available. The thread can do nothing but wait. This type of message reception is called synchronous reception because an application must wait for the message to arrive before it can finish its work. You can create a message-driven POJO (MDP) to support the asynchronous reception of JMS messages. An MDP is decorated with the @MessageDriven annotation.

Note

A message-driven POJO or MDP in the context of this recipe refers to a POJO that can listen for JMS messages without any particular runtime requirements. It does not refer to message-driven beans (MDBs) aligned to the EJB specification that require an EJB container.

Solution

Spring allows beans declared in its IoC container to listen for JMS messages in the same way as MDBs, which are based on the EJB spec. Because Spring adds message-listening capabilities to POJOs, they are called message-driven POJOs (MDPs).

How It Works

Suppose you want to add an electronic board to the post office’s back office to display mail information in real time as it arrives from the front desk. As the front desk sends a JMS message along with mail, the back-office subsystem can listen for these messages and display them on the electronic board. For better system performance, you should apply the asynchronous JMS reception approach to avoid blocking the thread that receives these JMS messages.

Listen for JMS Messages with Message Listeners

First, you create a message listener to listen for JMS messages . The message listener provides an alternative to the approach taken in BackOfficeImpl in previous recipes with JmsTemplate. A listener can also consume messages from a broker. For example, the following MailListener listens for JMS messages that contain mail information:

package com.apress.springrecipes.post;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;


import org.springframework.jms.support.JmsUtils;

public class MailListener implements MessageListener {

public void onMessage(Message message) {
 MapMessage mapMessage = (MapMessage) message;
try {
 Mail mail = new Mail();
 mail.setMailId(mapMessage.getString("mailId"));
 mail.setCountry(mapMessage.getString("country"));
 mail.setWeight(mapMessage.getDouble("weight"));
 displayMail(mail);
 } catch (JMSException e) {
throw JmsUtils.convertJmsAccessException(e);
 }
 }


private void displayMail(Mail mail) {
 System.out.println("Mail #" + mail.getMailId() + " received");
 }
}

A message listener must implement the javax.jms.MessageListener interface. When a JMS message arrives, the onMessage() method will be called with the message as the method argument. In this sample, you simply display the mail information to the console. Note that when extracting message information from a MapMessage object, you need to handle the JMS API’s JMSException. You can make a call to JmsUtils.convertJmsAccessException() to convert it into Spring’s runtime exception JmsException.

Next, you have to configure this listener in the back office’s configuration. Declaring this listener alone is not enough to listen for JMS messages. You need a message listener container to monitor JMS messages at a message destination and trigger your message listener on message arrival.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.MailListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.SimpleMessageListenerContainer;


import javax.jms.ConnectionFactory;

@Configuration
public class BackOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() { ... }


 @Bean
public MailListener mailListener() {
return new MailListener();
 }


 @Bean
public Object container() {
 SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
 smlc.setConnectionFactory(connectionFactory());
 smlc.setDestinationName("mail.queue");
 smlc.setMessageListener(mailListener());
return smlc;
 }
}

Spring provides several types of message listener containers for you to choose from in the org.springframework.jms.listener package, of which SimpleMessageListenerContainer and DefaultMessageListenerContainer are the most commonly used. SimpleMessageListenerContainer is the simplest one that doesn’t support transactions. If you have a transaction requirement in receiving messages, you have to use DefaultMessageListenerContainer.

Now, you can start the message listener. Since you won’t need to invoke a bean to trigger message consumption—the listener will do it for you—the following main class, which only starts the Spring IoC container, is enough:

package com.apress.springrecipes.post;

import org.springframework.context.support.GenericXmlApplicationContext;

public class BackOfficeMain {

public static void main(String[] args) {
new AnnotationConfigApplicationContext(BackOfficeConfiguration.class);
 }
}

When you start this back-office application, it will listen for messages on the message broker (i.e., ActiveMQ). As soon as the front-desk application sends a message to the broker, the back-office application will react and display the message to the console.

Listen for JMS Messages with POJOs

A listener that implement s the MessageListener interface can listen for messages, and so can an arbitrary bean declared in the Spring IoC container. Doing so means that beans are decoupled from the Spring Framework interfaces as well as the JMS MessageListener interface. For a method of this bean to be triggered on message arrival, it must accept one of the following types as its sole method argument:

  • Raw JMS message type: For TextMessage, MapMessage, BytesMessage, and ObjectMessage

  • String: For TextMessage only

  • Map: For MapMessage only

  • byte[]: For BytesMessage only

  • Serializable: For ObjectMessage only

For example, to listen for MapMessage, you declare a method that accepts a Map as its argument and annotate it with @JmsListener. This class no longer needs to implement the MessageListener interface.

package com.apress.springrecipes.post;

import org.springframework.jms.annotation.JmsListener;

import java.util.Map;

public class MailListener {

 @JmsListener(destination = "mail.queue")
public void displayMail(Map map) {
 Mail mail = new Mail();
 mail.setMailId((String) map.get("mailId"));
 mail.setCountry((String) map.get("country"));
 mail.setWeight((Double) map.get("weight"));
 System.out.println("Mail #" + mail.getMailId() + " received");
 }
}

To detect the @JmsListener annotations, you need to put the @EnableJms annotation on the configuration class, and you need to register a JmsListenerContainerFactory, which by default is detected with the name jmsListenerContainerFactory.

A POJO is registered to a listener container through a JmsListenerContainerFactory. This factory creates and configures a MessageListenerContainer and registers the annotated method as a message listener to it. You could implement your own version of the JmsListenerContainerFactory, but it is generally enough to use one of the provided classes. SimpleJmsListenerContainerFactory creates an instance of the SimpleMessageListenerContainer, whereas DefaultJmsListenerContainerFactory creates a DefaultMessageListenerContainer.

For now you will use a SimpleJmsListenerContainerFactory. If the need arises, you can quite easily switch to the DefaultMessageListenerContainer, for instance, when transactions or async processing with a TaskExecutor is needed.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.MailListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;


import javax.jms.ConnectionFactory;

@Configuration
@EnableJms
public class BackOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
 }


 @Bean
public MailListener mailListener() {
return new MailListener();
 }


 @Bean
public SimpleJmsListenerContainerFactory jmsListenerContainerFactory() {
 SimpleJmsListenerContainerFactory listenerContainerFactory = new SimpleJmsListenerContainerFactory();
 listenerContainerFactory.setConnectionFactory(connectionFactory());
return listenerContainerFactory;
 }
}

Convert JMS Messages

You can also create a message converter for converting mail objects from JMS messages that contain mail information. Because message listeners receive messages only, the method toMessage() will not be called, so you can simply return null for it. However, if you use this message converter for sending messages too, you have to implement this method. The following example reprints the MailMessageConvertor class written earlier:

package com.apress.springrecipes.post;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;


import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;


public class MailMessageConverter implements MessageConverter {

public Object fromMessage(Message message) throws JMSException,
 MessageConversionException {
 MapMessage mapMessage = (MapMessage) message;
 Mail mail = new Mail();
 mail.setMailId(mapMessage.getString("mailId"));
 mail.setCountry(mapMessage.getString("country"));
 mail.setWeight(mapMessage.getDouble("weight"));
return mail;
 }


public Message toMessage(Object object, Session session) throws JMSException,
 MessageConversionException {
 ...
 }
}

A message converter should be applied to the listener container factory for it to convert messages into objects before calling your POJO’s methods.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.MailListener;
import com.apress.springrecipes.post.MailMessageConverter;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;


import javax.jms.ConnectionFactory;

@Configuration
@EnableJms
public class BackOfficeConfiguration {


 @Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
 }


 @Bean
public MailListener mailListener() {
return new MailListener();
 }


 @Bean
public MailMessageConverter mailMessageConverter() {
return new MailMessageConverter();
 }


 @Bean
public SimpleJmsListenerContainerFactory jmsListenerContainerFactory() {
 SimpleJmsListenerContainerFactory listenerContainerFactory = new SimpleJmsListenerContainerFactory();
 listenerContainerFactory.setConnectionFactory(connectionFactory());
 listenerContainerFactory.setMessageConverter(mailMessageConverter());
return listenerContainerFactory;
 }
}

With this message converter, the listener method of your POJO can accept a mail object as the method argument.

package com.apress.springrecipes.post;

import org.springframework.jms.annotation.JmsListener;

public class MailListener {

 @JmsListener(destination = "mail.queue")
public void displayMail(Mail mail) {
 System.out.println("Mail #" + mail.getMailId() + " received");
 }
}

Manage JMS Transactions

As mentioned, SimpleMessageListenerContainer doesn’t support transactions. So, if you need transaction management for your message listener method, you have to use DefaultMessageListenerContainer instead. For local JMS transactions, you can simply enable its sessionTransacted property, and your listener method will run within a local JMS transaction (as opposed to XA transactions). To use a DefaultMessageListenerContainer, change the SimpleJmsListenerContainerFactory to a DefaultJmsListenerContainerFactory and configure said sessionTransacted property .

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
 DefaultJmsListenerContainerFactory listenerContainerFactory = new DefaultJmsListenerContainerFactory();
 listenerContainerFactory.setConnectionFactory(cachingConnectionFactory());
 listenerContainerFactory.setMessageConverter(mailMessageConverter());
 listenerContainerFactory.setSessionTransacted(true);
return listenerContainerFactory;
}

However, if you want your listener to participate in a JTA transaction, you need to declare a JtaTransactionManager instance and inject it into your listener container factory .

14-5. Cache and Pool JMS Connections

Problem

Throughout this chapter, for the sake of simplicity , you’ve explored Spring’s JMS support with a simple instance of org.apache.activemq.ActiveMQConnectionFactory as the connection factory. This isn’t the best choice in practice. As with all things, there are performance considerations.

The crux of the issue it is that JmsTemplate closes sessions and consumers on each invocation. This means that it tears down all those objects and frees the memory. This is “safe,” but not performant, as some of the objects created—like consumers—are meant to be long lived. This behavior stems from the use of JmsTemplate in application server environments, where typically the application server’s connection factory is used, and it, internally, provides connection pooling. In this environment, restoring all the objects simply returns it to the pool, which is the desirable behavior.

Solution

There’s no “one-size-fits-all” solution to this. You need to weigh the qualities you’re looking for and react appropriately.

How It Works

Generally, you want a connection factory that provides pooling and caching of some sort when publishing messages using JmsTemplate. The first place to look for a pooled connection factory might be your application server. It may very well provide one by default.

In the examples in this chapter, you use ActiveMQ in a stand-alone configuration. ActiveMQ, like many vendors, provides a pooled connection factory class alternative. ActiveMQ provides two, actually: one for use consuming messages with a JCA connector and another one for use outside of a JCA container. You can use these instead to handle caching producers and sessions when sending messages. The following configuration pools a connection factory in a stand-alone configuration. It’s a drop-in replacement for the previous examples when publishing messages .

@Bean(destroyMethod = "stop")
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactoryToUse =
        new ActiveMQConnectionFactory("tcp://localhost:61616");
    PooledConnectionFactory connectionFactory = new PooledConnectionFactory();
    connectionFactory.setConnectionFactory(connectionFactoryToUse);
    return connectionFactory;
}

If you are receiving messages, you could still stand some more efficiency because the JmsTemplate constructs a new MessageConsumer each time as well. In this situation, you have a few alternatives: use Spring’s various *MessageListenerContainer implementations (MDPs) because it caches consumers correctly, or use Spring’s ConnectionFactory implementations. The first implementation, org.springframework.jms.connection.SingleConnectionFactory, returns the same underlying JMS connection each time (which is thread-safe according to the JMS API) and ignores calls to the close() method .

Generally, this implementation works well with the JMS API. A newer alternative is org.springframework.jms.connection.CachingConnectionFactory. First, the obvious advantage is that it provides the ability to cache multiple instances. Second, it caches sessions, message producers, and message consumers. Finally, it works regardless of your JMS connection factory implementation .

@Bean
public ConnectionFactory cachingConnectionFactory() {
    return new CachingConnectionFactory(connectionFactory());
}

14-6. Send and Receive AMQP Messages with Spring

Problem

You want to use RabbitMQ to send and receive messages .

Solution

The Spring AMQP project provides easy access to the AMQP protocol. It has support similar to that of Spring JMS. It comes with a RabbitTemplate, which provides basic send and receive options; it also comes with a MessageListenerContainer option that mimics Spring JMS .

How It Works

Let’s look at how you can send a message using RabbitTemplate. To get access to RabbitTemplate, it is the simplest to extend RabbitGatewaySupport. You’ll use FrontDeskImpl, which uses RabbitTemplate.

Send and Receive Message Without Spring’s Template Support

Let’s look at how to send and receive messages without Spring’s template support . The following FrontDeskImpl class sends a message to RabbitMQ using the plain API:

package com.apress.springrecipes.post;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.scheduling.annotation.Scheduled;


import java.io.IOException;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.TimeoutException;


public class FrontDeskImpl implements FrontDesk {

    private static final String QUEUE_NAME = "mail.queue";

    public void sendMail(final Mail mail) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);


        Connection connection = null;
        Channel channel = null;
        try {


            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = new ObjectMapper().writeValueAsString(mail);
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));


        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                }
            }


            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                }
            }
        }
    }
  }

First you create a ConnectionFactory to obtain a connection to RabbitMQ; here we configured it for localhost and provided a username/password combination. Next you need to obtain a Channel to finally create a queue. Then the passed-in Mail message is converted to JSON using a Jackson ObjectMapper and finally sent to the queue. When creating connections and sending messages, you need to take care of the different exceptions that can occur, and after sending, you need to properly close and release Connection again, which also can throw an exception.

Before you can send and receive AMQP messages, you need to install an AMQP message broker.

Note

In the bin directory is a rabbitmq.sh file, which downloads and starts a RabbitMQ broker in a Docker container.

The following BackOfficeImpl class receives messages using the plain RabbitMQ API:

package com.apress.springrecipes.post;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import org.springframework.stereotype.Service;


import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.TimeoutException;


@Service
public class BackOfficeImpl implements BackOffice {


    private static final String QUEUE_NAME = "mail.queue";

    private MailListener mailListener = new MailListener();
    private Connection connection;


    @Override
    public Mail receiveMail() {


        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);


        Channel channel = null;
        try {


            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);


            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                    Mail mail = new ObjectMapper().readValue(body, Mail.class);
                    mailListener.displayMail(mail);
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);


        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }


        return null;
    }


    @PreDestroy
    public void destroy() {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e) {
            }
        }
    }
}

This code is largely the same as FrontDeskImpl except that you now register a Consumer object to retrieve the messages. In this consumer, you use Jackson to map the message to the Mail object again and pass it to MailListener, which in turn prints the converted message to the console. When using a channel, you can add a consumer that will be invoked when a message is received. The consumer will be ready as soon as it is registered with the channel using the basicConsume method.

If you already have the FrontDeskImpl running , you will see the messages coming in quite quickly.

Send Messages with Spring’s Template Support

The FrontDeskImpl class extends RabbitGatewaySupport, which configures a RabbitTemplate based on the configuration you pass in. To send a message , you use the getRabbitOperations method to get the template and next to convert and send the message. For this you use the convertAndSend method. This method will first use a MessageConverter to convert the message into JSON and then send it to the queue you have configured.

package com.apress.springrecipes.post;

import org.springframework.amqp.rabbit.core.RabbitGatewaySupport;

public class FrontDeskImpl extends RabbitGatewaySupport implements FrontDesk {

    public void sendMail(final Mail mail) {
        getRabbitOperations().convertAndSend(mail);
    }
}

Let’s take a look at the configuration:

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.FrontDeskImpl;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FrontOfficeConfiguration {


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        return connectionFactory;
    }


    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory());
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setRoutingKey("mail.queue");
        return rabbitTemplate;
    }


    @Bean
    public FrontDeskImpl frontDesk() {
        FrontDeskImpl frontDesk = new FrontDeskImpl();
        frontDesk.setRabbitOperations(rabbitTemplate());
        return frontDesk;
    }
}

The configuration is quite similar to the JMS configuration. You need a ConnectionFactory to connect to your RabbitMQ broker. You use a CachingConnectionFactory so that you can reuse your connections. Next there is the RabbitTemplate that uses the connection and has a MessageConverter to convert the message. The message is being converted into JSON using the Jackson2 library, which is the reason for the configuration of the Jackson2JsonMessageConverter. Finally, RabbitTemplate is passed into the FrontDeskImpl class so that it is available for usage.

package com.apress.springrecipes.post;

import com.apress.springrecipes.post.config.FrontOfficeConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class FrontDeskMain {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context =
            new AnnotationConfigApplicationContext(FrontOfficeConfiguration.class);


        FrontDesk frontDesk = context.getBean(FrontDesk.class);
        frontDesk.sendMail(new Mail("1234", "US", 1.5));


        System.in.read();

        context.close();
    }
}

Listen for AMQP Messages with Message Listeners

Spring AMQP supports MessageListenerContainers for retrieving messages in the same way as it Spring JMS does for JMS. Spring AMQP has the @RabbitListener annotation to indicate an AMQP-based message listener. Let’s take a look at the MessageListener that is used.

package com.apress.springrecipes.post;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

public class MailListener {

    @RabbitListener(queues = "mail.queue")
    public void displayMail(Mail mail) {
        System.out.println("Received: " + mail);
    }
}

MailListener is exactly the same as the one created in recipe 14-4 for receiving JMS messages. The difference is in the configuration.

package com.apress.springrecipes.post.config;                                                                                                              

import com.apress.springrecipes.post.MailListener;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableRabbit
public class BackOfficeConfiguration {


    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory());
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        return containerFactory;
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        return connectionFactory;
    }


    @Bean
    public MailListener mailListener() {
        return new MailListener();
    }


}

To enable AMQP annotation-based listeners, you add the @EnableRabbit annotation to the configuration class. As each listener requires a MessageListenerContainer, you need to configure a RabbitListenerContainerFactory, which takes care of creating those containers. The @EnableRabbit logic will, by default, will look for a bean named rabbitListenerContainerFactory.

RabbitListenerContainerFactory needs a ConnectionFactory. For this you are using CachingConnectionFactory. Before the MailListener.displayMail method is invoked by MessageListenerContainer, it needs to convert the message payload, in JSON, into a Mail object using Jackon2JsonMessageConverter.

To listen to messages, create a class with a main method that only needs to construct the application context.

package com.apress.springrecipes.post;

import com.apress.springrecipes.post.config.BackOfficeConfiguration;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class BackOfficeMain {

    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(BackOfficeConfiguration.class);
    }
}

14-7. Send and Receive Messages with Spring Kafka

Problem

You want to use Apache Kafka to send and receive messages.

Solution

The Spring Kafka project provides easy access to Apache Kafka . It has support similar to that of Spring JMS using the Spring Messaging abstraction. It comes with KafkaTemplate, which provides basic send options; it also comes with a MessageListenerContainer option that mimics Spring JMS and can be enabled by @EnableKafka.

How It Works

First you will see how to setup the KafkaTemplate to send messages and how to listen to messages using a KafkaListener. Finally you will look at how to convert objects into message payloads using MessageConverters.

Send Messages with Spring’s Template Support

Let’s start by rewriting the FrontOfficeImpl class to use KafkaTemplate to send a message. To do so, you actually want an object that implements KafkaOperations, which is the interface implemented by KafkaTemplate.

package com.apress.springrecipes.post;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


public class FrontDeskImpl implements FrontDesk {

    private final KafkaOperations<Integer, String> kafkaOperations;

    public FrontDeskImpl(KafkaOperations<Integer, String> kafkaOperations) {
        this.kafkaOperations = kafkaOperations;
    }


    public void sendMail(final Mail mail) {

        ListenableFuture<SendResult<Integer, String>> future =     kafkaOperations.send("mails", convertToJson(mail));
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {


            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }


            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                System.out.println("Result (success): " + result.getRecordMetadata());
            }
        });
    }


    private String convertToJson(Mail mail) {
        try {
            return new ObjectMapper().writeValueAsString(mail);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }
}

Notice the kafkaOperations field, which takes KafkaOperations<Integer, String>. This means you are sending a message with an Integer type as the key (generated when sending a message), and you will send a message of type String. This means you need to convert the incoming Mail instance to a String. This is taken care of by the convertToJson method using a Jackson2 ObjectMapper. The message will be sent to the mails topic, which is the first argument in the send method; the second one is the payload to send (the converted Mail message ).

Sending a message using Kafka is generally an async operation, and the KafkaOperations.send methods reflect this in returning a ListenableFuture. It is a normal Future, so you could use the call to get() to make it a blocking operation or register ListenableFutureCallback to get notified of the success or failure of the operation.

Next you need to create a configuration class to configure KafkaTemplate to use in FrontDeskImpl.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.FrontDeskImpl;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;


import java.util.HashMap;
import java.util.Map;


@Configuration
public class FrontOfficeConfiguration {


    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerFactoryProperties());
        return producerFactory;
    }


    @Bean
    public Map<String, Object> producerFactoryProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return properties;
    }


    @Bean
    public FrontDeskImpl frontDesk() {
        return new FrontDeskImpl(kafkaTemplate());
    }


}

The aforementioned configuration creates a minimally configured KafkaTemplate. You need to configure ProducerFactory used by KafkaTemplate; it requires at least the URL to connect to and needs to know which key and value types you want to serialize the messages to. The URL is specified by using ProducerConfig.BOOTSTRAP_SERVERS_CONFIG. This can take one or more servers to connect to. ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG respectively configure the key and value serializers used. As you want to use an Integer for the key and String for the value, those are configured with IntegerSerializer and StringSerializer.

Finally, the constructed KafkaTemplate is passed to FrontDeskImpl. To run the front-desk application, the following Main class is all that is needed:

package com.apress.springrecipes.post;

import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


import com.apress.springrecipes.post.config.FrontOfficeConfiguration;

public class FrontDeskMain {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context =
            new AnnotationConfigApplicationContext(FrontOfficeConfiguration.class);
        context.registerShutdownHook();


        FrontDesk frontDesk = context.getBean(FrontDesk.class);
        frontDesk.sendMail(new Mail("1234", "US", 1.5));


        System.in.read();

    }
}

This will launch the front-desk application and send a message through Kafka.

Listen to Messages Using Spring Kafka

Spring Kafka also has message listener containers for listening to messages on topics just like Spring JMS and Spring AMQP. To enable the use of these containers, you need to put @EnableKafka on your configuration class and create and configure your Kafka consumer using @KafkaListener.

First let’s create the listener, which is as easy as annotating a method with a single argument with @KafkaListener.

package com.apress.springrecipes.post;

import org.springframework.kafka.annotation.KafkaListener;

public class MailListener {

    @KafkaListener(topics = "mails")
    public void displayMail(String mail) {
        System.out.println(" Received: " + mail);
    }
}

For now you are interested in the raw String-based payload as that is what is being sent.

Next you need to configure the listener container .

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.MailListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;


import java.util.HashMap;
import java.util.Map;


@Configuration
@EnableKafka
public class BackOfficeConfiguration {


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =     new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }


    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfiguration());
    }


    @Bean
    public Map<String, Object> consumerConfiguration() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        return properties;
    }


    @Bean
    public MailListener mailListener() {
        return new MailListener();
    }


}

The configuration is similar to the client; you need to pass the URL (or URLs) to connect to Apache Kafka , and as you want to deserialize messages, you need to specify a key and value deserializer. Finally, you need to add a group ID or you won’t be able to connect to Kafka. The URL is passed by using ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; the key and value deserializers used are the IntegerDeserializer for the key (as that was an integer); and as the payload is a String, you need to use the StringDeserializer. Finally, the group property is set.

With these properties, you can configure KafkaListenerContainerFactory, which is a factory used to create a Kafka-based MessageListenerContainer. The container is internally used by the functionality enabled by adding the @EnableKafka annotation. For each method annotated with @KafkaListener, a MessageListenerContainer is created.

To run the back-office application, you would need to load this configuration and let it listen:

package com.apress.springrecipes.post;

import com.apress.springrecipes.post.config.BackOfficeConfiguration;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class BackOfficeMain {

    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(BackOfficeConfiguration.class);
    }
}

Now when the front-office application is started, the Mail message will be converted to a String and sent through Kafka to the back office, resulting in the following output:

Received: {"mailId":"1234","country":"US","weight":1.5}

Use a MessageConverter to Convert Payloads into Objects

The listener now receives a String, but it would be nicer if you could automatically convert this into a Mail object again. This is quite easily done with some tweaks in the configuration. The KafkaListenerContainerFactory used here accepts a MessageConverter, and to automatically turn a String into the desired object, you can pass it a StringJsonMessageConverter. This will take the String and convert it into the object as specified as argument in the @KafkaListener annotated method.

First update the configuration.

package com.apress.springrecipes.post.config;

import org.springframework.kafka.support.converter.StringJsonMessageConverter;

@Configuration
@EnableKafka
public class BackOfficeConfiguration {


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setMessageConverter(new StringJsonMessageConverter());
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
    ...
}

Next you need to modify the MailListener to use a Mail object instead of the plain String.

package com.apress.springrecipes.post;

import org.springframework.kafka.annotation.KafkaListener;

public class MailListener {

    @KafkaListener(topics = "mails")
    public void displayMail(Mail mail) {
        System.out.println("Mail #" + mail.getMailId() + " received");
    }
}

When running the back office and front office, the message will still be sent and received.

Convert Objects to Payloads

In the front office, the Mail instance is manually being converted to a JSON string. Although it’s not hard, it would be nice if the framework could do this transparently. This is possible by configuring JsonSerializer instead of StringSerializer.

package com.apress.springrecipes.post.config;

import com.apress.springrecipes.post.FrontDeskImpl;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;


import java.util.HashMap;
import java.util.Map;
@Configuration
public class FrontOfficeConfiguration {


    @Bean
    public KafkaTemplate<Integer, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    @Bean
    public ProducerFactory<Integer, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerFactoryProperties());
    }


    @Bean
    public Map<String, Object> producerFactoryProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return properties;
    }


    @Bean
    public FrontDeskImpl frontDesk() {
        return new FrontDeskImpl(kafkaTemplate());
    }
}

Instead of KafkaTemplate<Integer, String>, you now use KafkaTemplate<Integer, Object> because you will now be able to send objects serialized to a String to Kafka.

The FrontOfficeImpl class can also be cleaned up now because conversion to JSON is now handled by KafkaTemplate.

package com.apress.springrecipes.post;

import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


public class FrontDeskImpl implements FrontDesk {

    private final KafkaOperations<Integer, Object> kafkaOperations;

    public FrontDeskImpl(KafkaOperations<Integer, Object> kafkaOperations) {
        this.kafkaOperations = kafkaOperations;
    }


    public void sendMail(final Mail mail) {

        ListenableFuture<SendResult<Integer, Object>> future = kafkaOperations.send("mails", mail);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, Object>>() {


            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }


            @Override
            public void onSuccess(SendResult<Integer, Object> result) {
                System.out.println("Result (success): " + result.getRecordMetadata());
            }
        });
    }
}

Summary

This chapter explored Spring’s messaging support and how to use this to build message-oriented architectures. You learned how to produce and consume messages using different messaging solutions. For different messaging solutions, you looked at how to build message-driven POJOs using MessageListenerContainer.

You looked at JMS and AMQP with ActiveMQ, a reliable open source message queue, and you briefly looked at Apache Kafka.

The next chapter will explore Spring Integration, which is an ESB-like framework for building application integration solutions, similar to Mule ESB and ServiceMix. You will be able to leverage the knowledge gained in this chapter to take your message-oriented applications to new heights with Spring Integration .

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset