Chapter 21. Spring Batch

Previous chapters discussed JMS and Spring Integration, which provide essential framework infrastructure for very common types of problems in an event-driven architecture (EDA). Another common kind of processing requirement is batch processing, which is both a complement and sometimes necessary extension to event-driven processing.

Batch processing has been around for decades. The earliest widespread applications of technology for managing information (information technology) were applications of batch processing. These environments didn't have interactive sessions and usually didn't have the capability to load multiple applications in memory. Computers were expensive and bore no resemblance to today's servers. Typically, machines were multiuser and in use during the day (time-shared). During the evening, however, the machines would sit idle, which was a tremendous waste. Businesses invested in ways to utilize the offline time to do work aggregated through the course of the day. Out of this practice emerged batch processing.

Batch processing solutions typically run offline, indifferent to events in the system. In the past, batch processes ran offline out of necessity. Today, however, most batch processes are run offline because having work done at a predictable time and having chunks of work done is a requirement for a lot of architectures. A batch processing solution doesn't usually respond to requests, although there's no reason it couldn't be started as a consequence of a message or request. Batch processing solutions tend to be used on large datasets where the duration of the processing is a critical factor in its architecture and implementation. A process might run for minutes, hours, or days! Jobs may have unbounded durations (i.e., run until all work is finished, even if this means running for a few days), or they may be strictly bounded (jobs must proceed in constant time, with each row taking the same amount of time regardless of bound, which lets you say predict that a given job will finish in a certain time window.)

Batch processing has had a long history that informs even modern batch processing solutions.

Mainframe applications used batch processing, and one of the largest modern day environments for batch processing, CICS on z/OS, is still fundamentally a mainframe operating system. Customer Information Control System (CICS) is very well suited to a particular type of task: take input, process it, and write it to output. CICS is a transaction server used most in financial institutions and government that runs programs in a number of languages (COBOL, C, PLI, and so on). It can easily support thousands of transactions per second. CICS was one of the first containers, a concept familiar to Spring and Java EE users, even though CICS itself debuted in 1969! A CICS installation is very expensive, and although IBM still sells and installs CICS, many other solutions have come along since then. These solutions are usually specific to a particular environment: COBOL/CICS on mainframes, C on Unix, and, today, Java on any number of environments. The problem is that there's very little standardized infrastructure for dealing with these types of batch processing solutions. Very few people are even aware of what they're missing because there's very little native support on the Java platform for batch processing. Businesses that need a solution typically end up writing it in-house, resulting in fragile, domain-specific code.

The pieces are there, however: transaction support, fast I/O, schedulers such as Quartz, and solid threading support, as well as a very powerful concept of an application container in Java EE and Spring. It was only natural that Dave Syer and his team would come along and build Spring Batch, a batch processing solution for the Spring platform.

It's important to think about the kinds of problems this framework solves before diving into the details. A technology is defined by its solution space. A typical Spring Batch application typically reads in a lot of data and then writes it back out in a modified form. Decisions about transactional barriers, input size, concurrency, and order of steps in processing are all dimensions of a typical integration.

A common requirement is loading data from a comma-separated value (CSV) file, perhaps as a business-to-business (B2B) transaction, perhaps as an integration technique with an older legacy application. Another common application is nontrivial processing on records in a database. Perhaps the output is an update of the database record itself. An example might be resizing of images on the file system whose metadata is stored in a database or needing to trigger another process based on some condition.

Note

Fixed-width data is a format of rows and cells, quite like a CSV file. CSV file cells are separated by commas or tabs however, and fixed-width data works by presuming certain lengths for each value. The first value might be the first nine characters, the second value the next four characters after that, and so on.

Fixed-width data, which is often used with legacy or embedded systems, is a fine candidate for batch processing. Processing that deals with a resource that's fundamentally nontransactional (e.g., a web service or a file) begs for batch processing, because batch processing provides retry/skip/fail functionality that most web services will not.

It's also important to understand what Spring Batch doesn't do. Spring Batch is a flexible but not all-encompassing solution. Just as Spring doesn't reinvent the wheel when it can be avoided, Spring Batch leaves a few important pieces to the discretion of the implementor. Case in point: Spring Batch provides a generic mechanism by which to launch a job, be it by the command line, a Unix cron, an operating system service, Quartz (discussed in Chapter 18), or in response to an event on an enterprise service bus (for example, the Mule ESB or Spring's own ESB-like solution, Spring Integration, which is discussed in Chapter 20). Another example is the way Spring Batch manages the state of batch processes. Spring Batch requires a durable store. The only useful implementation of a JobRepository (an interface provided by Spring Batch for storing runtime data) requires a database, because a database is transactional and there's no need to reinvent it. To which database you should deploy, however, is largely unspecified, although there are useful defaults provided for you, of course.

Runtime Metadata Model

Spring Batch works with a JobRepository, which is the keeper of all the knowledge and metadata for each job (including component parts such as JobInstances, JobExecution, and StepExecution). Each job is composed of one or more steps, one after another. With Spring Batch 2.0, a step can conditionally follow another step, allowing for primitive workflows. These steps can also be concurrent: two steps can run at the same time.

When a job is run, it's often coupled with JobParameters to parameterize the behavior of the job itself. For example, a job might take a date parameter to determine which records to process. This coupling is called a JobInstance. A JobInstance is unique because of the JobParameters associated with it. Each time the same JobInstance (i.e., the same job and JobParameters) is run, it's called a JobExecution. This is a runtime context for a version of the job. Ideally, for every JobInstance there'd be only one JobExecution: the JobExecution that was created the first time the JobInstance ran. However, if there were any errors, the JobInstance should be restarted; the subsequent run would create another JobExecution. For every step in the original job, there is a StepExecution in the JobExecution.

Thus, you can see that Spring Batch has a mirrored object graph, one reflecting the design/build time view of a job, and another reflecting the runtime view of a job. This split between the prototype and the instance is very similar to the way many workflow engines—including jBPM—work.

For example, suppose that a daily report is generated at 2 AM. The parameter to the job would be the date (most likely the previous day's date). The job, in this case, would model a loading step, a summary step, and an output step. Each day the job is run, a new JobInstance and JobExecution would be created. If there are any retries of the same JobInstance, conceivably many JobExecutions would be created.

Note

if you want to use Spring Batch, you need to add the appropriate libraries to the classpath. If you are using Maven, add the following dependency to your project.

<dependency>
  <groupId>org.springframework.batch</groupId>
  <version>2.1.1.RELEASE</version>
  <artifactId>spring-batch-core</artifactId>
 </dependency>

Setting Up Spring Batch's Infrastructure

Problem

Spring Batch provides a lot of flexibility and guarantees to your application, but it cannot work in a vacuum. To do its work, the JobRepository requires a database. Additionally, there are several collaborators required for Spring Batch to do its work. This configuration is mostly boilerplate.

Solution

In this recipe, you'll set up the Spring Batch database and also create a Spring XML application context that can be imported by subsequent solutions. This configuration is repetitive and largely uninteresting. It will also tell Spring Batch what database to use for the metadata it stores.

How It Works

The JobRepository interface is the first thing that you'll have to deal with when setting up a Spring Batch process. You usually don't deal with it in code, but in Spring configuration it is key to getting everything else working. There's only one really useful implementation of the JobRepository interface called SimpleJobRepository, which stores information about the state of the batch processes in a database. Creation is done through a JobRepositoryFactoryBean. Another standard factory, MapJobRepositoryFactoryBean is useful mainly for testing because its state is not durable – it's an in-memory implementation. Both factories create an instance of SimpleJobRepository.

Because this JobRepository instance works on your database, you need to set up the schema for Spring Batch to work with. The simplest way to get that schema was to simply download the spring-batch-2.1.1.RELEASE-no-dependencies.zip and look in the sources folder at spring-batch-2.1.1.RELEASE/dist/org/springframework/batch/core. There you'll find a slew of .sql files, each containing the data definition language (DDL, the subset of SQL used for defining and examining the structure of a database) for the required schema for the database of your choice. In these examples, we will use PostgreSQL, so we will use the DDL for PostgreSQL: schema-postgresql.sql. Make sure you configure it and tell Spring Batch about it as in the following configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans
    xmlns="http://www.springframework.org/schema/batch"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans 
How It Works
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch
How It Works
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/aop
How It Works
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx
How It Works
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util
How It Works
http://www.springframework.org/schema/util/spring-util-3.0.xsd"> <context:component-scan base-package="com.apress.springrecipes.springbatch"/> <beans:bean class="org.springframework.beans.factory.config.
How It Works
PropertyPlaceholderConfigurer" p:location="batch.properties" p:ignoreUnresolvablePlaceholders="true"/> <beans:bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" p:driverClassName="${dataSource.driverClassName}"
p:username="${dataSource.username}"
        p:password="${dataSource.password}"
        p:url="${dataSource.url}"/>

<beans:bean
        id="transactionManager"           class="org.springframework.jdbc.
How It Works
datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource"/> <beans:bean id="jobRegistry" class="org.springframework.batch.core.configuration.
How It Works
support.MapJobRegistry"/> <beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.
How It Works
support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/> <beans:bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.
How It Works
configuration.support.JobRegistryBeanPostProcessor" p:jobRegistry-ref="jobRegistry"/> <beans:bean id="jobRepository" class="org.springframework.batch.core.
How It Works
repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager"/> </beans:beans>

Because the implementation uses a database to persist the metadata, take care to configure a DataSource as well as a TransactionManager. In this example, you're using a PropertyPlaceholderConfigurer to load the contents of a properties file (batch.properties) whose values you use to configure the data source. You need to place values for your particular database in this file. This example uses Spring's property schema ("p") to abbreviate the tedious configuration. In subsequent examples, this file will be referenced as batch.xml. The properties file looks like this for us:

hibernate.configFile=hibernate.cfg.xml
dataSource.password=sep
dataSource.username=sep
dataSource.databaseName=sep
dataSource.driverClassName=org.postgresql.Driver
dataSource.dialect=org.hibernate.dialect.PostgreSQLDialect
dataSource.serverName=studio:5432
dataSource.url=jdbc:postgresql://${dataSource.serverName}/${dataSource.databaseName}
dataSource.properties=user=${dataSource.username};databaseName=${dataSource.databaseName};serverName=${dataSource.serverName};password=${dataSource.password}

The first few beans are related strictly to configuration – nothing particularly novel or peculiar to Spring Batch: a data source, a transaction manager and a properties resolver.

Eventually, we get to the declaration of a MapJobRegistry instance. This is critical—it is the central store for information regarding a given Job, and it controls the "big picture" about all Jobs in the system. Everything else works with this instance.

Next, we have a SimpleJobLauncher, whose sole purpose is to give you a mechanism to launch batch jobs, where a "job" in this case is our batch solution. The jobLauncher is used to specify the name of the batch solution to run as well as any parameters required. We'll follow up more on that in the next recipe.

Next, you define a JobRegistryBeanPostProcessor. This bean scans your Spring context file and associates any configured Jobs with the MapJobRegistry.

Finally, we get to the SimpleJobRepository (that is, in turn, factoried by the JobRepositoryFactoryBean). The JobRepository is an implementation of "repository (in the Patterns of Enterprise Application Architecture sense of the word): it handles persistence and retrieval for the domain models surrounding Steps, Jobs, etc.

Reading and Writing (but No Arithmetic)

Problem

You want to insert data from a file into a database. This solution will be one of the simplest solutions and will give you a chance to explore the moving pieces of a typical solution.

Solution

You'll build a solution that does a minimal amount of work, while being a viable application of the technology. The solution will read in a file of arbitrary length and write out the data into a database. The end result will be almost 100 percent code free. You will rely on an existing model class and write one class (a class containing the public static void main(String [] args() method) to round out the example. There's no reason why the model class couldn't be a Hibernate class or something from your DAO layer, though in this case it's a brainless POJO. This solution will use the components we configured in batch.xml.

How It Works

This example demonstrates the simplest possible use of a Spring Batch: to provide scalability. This program will do nothing but read data from a CSV file, with fields delimited by commas and rows delimited by new lines. It then inserts the records into a table. You are exploiting the intelligent infrastructure that Spring Batch provides to avoid worrying about scaling. This application could easily be done manually. You will not exploit any of the smart transactional functionality made available to you, nor will you worry about retries for the time being.

This solution is as simple as Spring Batch solutions get. Spring Batch models solutions using XML schema. This schema is new to Spring Batch 2.1. The abstractions and terms are in the spirit of classical batch processing solutions so will be portable from previous technologies and perhaps to subsequent technologies. Spring Batch provides useful default classes that you can override or selectively adjust. In the following example, you'll use a lot of the utility implementations provided by Spring Batch. Fundamentally, most solutions look about the same and feature a combination of the same set of interfaces. It's usually just a matter of picking and choosing the right ones.

When I ran this program, it worked on files with 20,000 rows, and it worked on files with 1 million rows. I experienced no increase in memory, which indicates there were no memory leaks. Naturally, it took a lot longer! (The application ran for several hours with the 1-million-row insert.)

Tip

Of course, it would be catastrophic if you worked with a million rows and it failed on the penultimate record, because you'd lose all your work when the transaction rolled back! Read on for examples on chunking. Additionally, you might want to read through Chapter 16 to brush up on transactions.

The following example inserts records into a table. I'm using PostgreSQL, which is a good mature open source database; you can use any database you want. (More information and downloads are available at www.postgresql.org.) The schema for the table is simple:

create table USER_REGISTRATION
(
  ID bigserial    not null  ,
  FIRST_NAME character varying(255) not null,
  LAST_NAME character varying(255) not null,
  COMPANY character varying(255) not null,
  ADDRESS character varying(255) not null,
  CITY character varying(255) not null,
  STATE character varying(255) not null,
  ZIP character varying(255) not null,
  COUNTY character varying(255) not null,
  URL character varying(255) not null,
  PHONE_NUMBER character varying(255) not null,
  FAX character varying(255) not null,
  constraint USER_REGISTRATION_PKEY primary key (id)
) ;

The Job Configuration

The configuration for the job is as follows:

<job
        job-repository="jobRepository"
        id="insertIntoDbFromCsvJob">
        <step id="step1">
             <tasklet transaction-manager="transactionManager">
             <chunk
                     reader="csvFileReader"
                     writer="jdbcItemWriter"
                     commit-interval="5"
                />
              </tasklet>
          </step>
      </job>

As described earlier, a job consists of steps, which are the real workhorse of a given job. The steps can be as complex or as simple as you like. Indeed, a step could be considered the smallest unit of work for a job. Input (what's read) is passed to the Step and potentially processed; then output (what's written) is created from the step. This processing is spelled out using a Tasklet. You can provide your own Tasklet implementation or simply use some of the preconfigured configurations for different processing scenarios. These implementations are made available in terms of subelements of the Tasklet element. One of the most important aspects of batch processing is chunk-oriented processing, which is employed here using the chunk element.

In chunk-oriented processing, input is read from a reader, optionally processed, and then aggregated. Finally, at a configurable interval as specified by the commit-interval attribute to configure how many items will be processed before the transaction is committed all the input is sent to the writer. If there is a transaction manager in play, the transaction is also committed. Right before a commit, the metadata in the database is updated to mark the progress of the job.

There are some nuances surrounding the aggregation of the input (read) values when a transaction-aware writer (or processor) rolls back. Spring Batch caches the values it reads and writes them to the writer. If the writer component is transactional, like a database, and the reader is not, there's nothing inherently wrong with caching the read values and perhaps retrying or taking some alternative approach. If the reader itself is also transactional, then the values read from the resource will be rolled back and could conceivably change, rendering the in-memory cached values stale. If this happens, you can configure the chunk to not cache the values using reader-transactional-queue="true" on the chunk element.

Input

The first responsibility is reading a file from the file system. You use a provided implementation for the example. Reading CSV files is a very common scenario, and Spring Batch's support does not disappoint. The org.springframework.batch.item.file.FlatFileItemReader<T> class delegates the task of delimiting fields and records within a file to a LineMapper<T>, which in turn delegates the task of identifying the fields within that record, to LineTokenizer. You use a org.springframework.batch.item.file.transform.DelimitedLineTokenizer, which is configured to delineate fields separated by a "," character.

The FlatFileItemReader also declares a fieldSetMapper attribute that requires an implementation of FieldSetMapper. This bean is responsible for taking the input name/value pairs and producing a type that will be given to the writer component.

In this case, you use an BeanWrapperFieldSetMapper that will create a JavaBean POJO of type UserRegistration. You name the fields so that you can reference them later in the configuration. These names don't have to be the values of some header row in the input file; they just have to correspond to the order in which the fields are found in the input file. These names are also used by the FieldSetMapper to match properties on a POJO. As each record is read, the values are applied to an instance of a POJO, and that POJO is returned.

<beans:bean
  id="csvFileReader"
  class="org.springframework.batch.item.file.FlatFileItemReader"
  p:resource="file:${user.home}/batches/registrations.csv">
  <beans:property
   name="lineMapper">
   <beans:bean
    class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    <beans:property
     name="lineTokenizer">
     <beans:bean         class="org.springframework.batch.item.file.
Input
transform.DelimitedLineTokenizer" p:delimiter="," p:names="firstName,lastName,company,address,
Input
city,state,zip,county,url,phoneNumber,fax" /> </beans:property> <beans:property name="fieldSetMapper"> <beans:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
Input
p:targetType="com.apress.springrecipes.springbatch.UserRegistration" /> </beans:property> </beans:bean> </beans:property> </beans:bean>

The class returned from the reader, UserRegistration, is a rather plain JavaBean.

package com.apress.springrecipes.springbatch.solution1;

public class UserRegistration implements Serializable {

    private String firstName;
    private String lastName;
    private String company;
    private String address;
    private String city;
    private String state;
    private String zip;
    private String county;
    private String url;
    private String phoneNumber;
    private String fax;

   //... accessor / mutators omitted for brevity ...

}

Output

The next component to do work is the writer, which is responsible for taking the aggregated collection of items read from the reader. In this case, you might imagine that a new collection (java.util.List<UserRegistration>) is created, then written, and then reset each time the collection exceeds the commit-interval attribute on the chunk element. Because you're trying to write to a database, you use Spring Batch's org.springframework.batch.item.database.JdbcBatchItemWriter. This class contains support for taking input and writing it to a database. It is up to the developer to provide the input and to specify what SQL should be run for the input. It will run the SQL specified by the sql property, in essence reading from the database, as many times as specified by the chunk element's commit-interval, and then commit the whole transaction. Here, you're doing a simple insert. The names and values for the named parameters are being created by the bean configured for the itemSqlParameterSourceProvider property, an instance of the interface BeanPropertyItemSqlParameterSourceProvider, whose sole job it is to take JavaBean properties and make them available as named parameters corresponding to the property name on the JavaBean.

<beans:bean id="jdbcItemWriter"class="org.springframework.batch.item.
Output
database.JdbcBatchItemWriter" p:assertUpdates="true" p:dataSource-ref="dataSource"> <beans:property name="sql"> <beans:value> <![CDATA[ insert into USER_REGISTRATION( FIRST_NAME, LAST_NAME, COMPANY, ADDRESS, CITY, STATE, ZIP, COUNTY, URL, PHONE_NUMBER, FAX ) values ( :firstName, :lastName, :company, :address, :city , :state, :
Output
zip, :county, :url, :phoneNumber, :fax ) ]]> </beans:value> </beans:property>
<beans:property name="itemSqlParameterSourceProvider">
<beans:bean
    class="org.springframework.batch.item.database.BeanPropertyItemSql
ParameterSourceProvider" />
</beans:property>
</beans:bean>

And that's it! A working solution. With little configuration and no custom code, you've built a solution for taking large CSV files and reading them into a database. This solution is bare bones and leaves a lot of edge cases uncared for. You might want to do processing on the item as it's read (before it's inserted), for example.

This exemplifies a simple job. It's important to remember that there are similar classes for doing the exact opposite transformation: reading from a database and writing to a CSV file.

Writing a Custom ItemWriter and ItemReader

Problem

You want to talk to a resource (you might imagine an RSS feed, or any other custom data format) that Spring Batch doesn't know how to connect to.

Solution

You can easily write your own ItemWriter or ItemReader. The interfaces are drop dead simple, and there's not a lot of responsibility placed on the implementations.

How It Works

As easy and trivial as this process is to do, it's still not better than just reusing any of the numerous provided options. If you look, you'll likely find something. There's support for writing JMS (JmsItemWriter<T>), JPA (JpaItemWriter<T>), JDBC (JdbcBatchItemWriter<T>), Files (FlatFileItemWriter<T>), iBatis (IbatisBatchItemWriter<T>), Hibernate (HibernateItemWriter<T>), and more. There's even support for writing by invoking a method on a bean (PropertyExtractingDelegatingItemWriter<T>) and passing to it as arguments the properties on the Item to be written! One of the more useful writers lets you write to a set of files that are numbered. This implementation—MultiResourceItemWriter<T>—delegates to other proper ItemWriter<T> implementation for the work, but lets you write to multiple files, not just one very large one. There's a slightly smaller but impressive set of implementations for ItemReader implementations. If it doesn't exist, look again. If you still can't find one, consider writing your own. In this recipe, we will do just that.

Writing a Custom ItemReader

The ItemReader example is trivial. Here, an ItemReader is created that knows how to retrieve UserRegistration objects from a remote procedure call (RPC) endpoint:

package com.apress.springrecipes.springbatch.solution2;

import java.util.Collection;
import java.util.Date;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;

import com.apress.springrecipes.springbatch.UserRegistrationService;
import com.apress.springrecipes.springbatch.solution1.UserRegistration;

public class UserRegistrationItemReader
  implements ItemReader<UserRegistration> {

    @Autowired
    private UserRegistrationService userRegistrationService;
    public UserRegistration read() throws Exception,UnexpectedInputException,
    ParseException {
        Date today = new Date();
        Collection<UserRegistration> registrations =

         userRegistrationService.getOutstandingUserRegistrationBatchForDate(
                  1, today);
        if (registrations!=null && registrations.size() >= 1)
             return registrations.iterator().next();
        return null;
    }
}

As you can see, the interface is trivial. In this case, you defer most work to a remote service to provide you with the input. The interface requires that you return one record. The interface is parameterized to the type of object (the "item") to be returned. All the read items will be aggregated and then passed to the ItemWriter.

Writing a Custom ItemWriter

The ItemWriter example is also trivial. Imagine wanting to write by invoking a remote service using any of the numerous options for remoting that Spring provides. The ItemWriter<T> interface is parameterized by the type of item you're expecting to write. Here, you expect a UserRegistration object from the ItemReader<T>. The interface consists of one method, which expects a List of the class's parameterized type. These are the objects read from ItemReader<T> and aggregated. If your commit-interval were ten, you might expect ten or fewer items in the List.

package com.apress.springrecipes.springbatch.solution2;

import java.util.List;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.log4j.Logger;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;

import com.apress.springrecipes.springbatch.User;
import com.apress.springrecipes.springbatch.UserRegistrationService;
import com.apress.springrecipes.springbatch.solution1.UserRegistration;

public class UserRegistrationServiceItemWriter implements
      ItemWriter<UserRegistration> {

   private static final Logger logger = Logger
         .getLogger(UserRegistrationServiceItemWriter.class);

   // this is the client interface to an HTTP Invoker service.
   @Autowired
   private UserRegistrationService userRegistrationService;

   /**
    * takes aggregated input from the reader and 'writes' them using a custom
    * implementation.
    */
   public void write(List<? extends UserRegistration> items)
   throws Exception {
      for (final UserRegistration userRegistration : items) {
         UserRegistration registeredUser = userRegistrationService
               .registerUser(userRegistration);
         logger.debug("Registered:"
               + ToStringBuilder.reflectionToString(registeredUser));
      }
   }
}

Here, you've wired in the service's client interface. You simply loop through the UserRegistration objects and invoke the service, which in turn hands you back an identical instance of UserRegistration. If you remove the gratuitous spacing, curly brackets and logging output, it becomes two lines of code to satisfy the requirement.

The interface for UserRegistrationService follows:

package com.apress.springrecipes.springbatch;

import java.util.Collection;
import java.util.Date;

public interface UserRegistrationService {
Collection<UserRegistration> getOutstandingUserRegistrationBatchForDate(
        int quantity, Date date);

    UserRegistration registerUser(
        UserRegistration userRegistrationRegistration);

}

In our example, we have no particular implementation for the interface, as it is irrelevant: it could be any interface that Spring Batch doesn't know about already.

Processing Input Before Writing

Problem

While transferring data directly from a spreadsheet or CSV dump might be useful, one can imagine having to do some sort of processing on the data before it's written. Data in a CSV file, and more generally from any source, is not usually exactly the way you expect it to be or immediately suitable for writing. Just because Spring Batch can coerce it into a POJO on your behalf, that doesn't mean the state of the data is correct. There may be additional data that you need to infer or fill in from other services before the data is suitable for writing.

Solution

Spring Batch will let you do processing on reader output. This processing can do virtually anything to the output before it gets passed to the writer, including changing the type of the data.

How It Works

Spring Batch gives the implementor a chance to perform any custom logic on the data read from reader. The processor attribute on the chunk element expects a reference to a bean of the interface org.springframework.batch.item.ItemProcessor<I,O>. Thus, the revised definition for the job from the previous recipe looks like this:

<job
        job-repository="jobRepository"
        id="insertIntoDbFromCsvJob">
        <step id="step1">
            <tasklet transaction-manager="transactionManager">
                           <chunk
                    reader="csvFileReader"
processor = "userRegistrationValidationProcessor"
                    writer="jdbcItemWriter"
                    commit-interval="5"
                 />
            </tasklet>
        </step>
    </job>

The goal is to do certain validations on the data before you authorize it to be written to the database. If you determine the record is invalid, you can stop further processing by returning null from the ItemProcessor<I,O>. This is crucial and provides a necessary safeguard. One thing that you want to do is ensure that the data is the right format (for example, the schema may require a valid two-letter state name instead of the longer full state name). Telephone numbers are expected to follow a certain format, and you can use this processor to strip the telephone number of any extraneous characters, leaving only a valid (in the United States) ten-digit phone number. The same applies for U. S. zip codes, which consist of five characters and optionally a hyphen followed by a four-digit code. Finally, while a constraint guarding against duplicates is best implemented in the database, there may very well be some other eligibility criteria for a record that can be met only by querying the system before insertion.

Here's the configuration for the ItemProcessor:

<beans:bean id="userRegistrationValidationProcessor"
class="com.apress.springrecipes.springbatch.solution2.
How It Works
UserRegistrationValidationItemProcessor" />

In the interest of keeping this class short, I won't reprint it in its entirety, but the salient bits should be obvious:

package com.apress.springrecipes.springbatch.solution2;
import java.util.Arrays;
import java.util.Collection;

import org.apache.commons.lang.StringUtils;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ItemProcessor;
import com.apress.springrecipes.springbatch.solution1.UserRegistration;

public class UserRegistrationValidationItemProcessor
  implements ItemProcessor<UserRegistration, UserRegistration> {

    private String stripNonNumbers(String input) { /* ... */ }

    private boolean isTelephoneValid(String telephone) { /* ... */ }

    private boolean isZipCodeValid(String zip) { /* ... */ }

    private boolean isValidState(String state) { /* ... */ }

    public UserRegistration process(UserRegistration input) throws Exception {
      String zipCode = stripNonNumbers(input.getZip());
      String telephone = stripNonNumbers(input.getPhoneNumber());
      String state = StringUtils.defaultString(input.getState());
if (isTelephoneValid(telephone) && isZipCodeValid(zipCode) 
How It Works
&& isValidState(state)) { input.setZip(zipCode); input.setPhoneNumber(telephone ); return input; } return null; } }

The class is a parameterized type. The type information is the type of the input, as well as the type of the output. The input is what's given to the method for processing, and the output is the returned data from the method. Because you're not transforming anything in this example, the two parameterized types are the same.

Once this process has completed, there's a lot of useful information to be had in the Spring Batch metadata tables. Issue the following query on your database:

select * from BATCH_STEP_EXECUTION;

Among other things, you'll get back the exit status of the job, how many commits occurred, how many items were read, and how many items were filtered. So if the preceding job was run on a batch with a 100 rows, each item was read and passed through the processor, and it found 10 items invalid (it returned null 10 times), the value for the filter_count column would be 10. You could see that a 100 items were read from the read_count. The write_count column would reflect that 10 items didn't make it and would show 90.

Chaining Processors Together

Sometimes you might want to add extra processing that isn't congruous with the goals of the processor you've already set up. Spring Batch provides a convenience class, CompositeItemProcessor<I,O>, which forwards the output of the filter to the input of the successive filter. In this way, you can write many, singly focused ItemProcessor<I,O>s and then reuse them and chain them as necessary:

<beans:bean id="compositeBankCustomerProcessor"
    class="org.springframework.batch.
Chaining Processors Together
item.support.CompositeItemProcessor"> <beans:property name="delegates"> <beans:list> <bean ref="creditScoreValidationProcessor" /> <bean ref="salaryValidationProcessor" /> <bean ref="customerEligibilityProcessor" /> </beans:list> </beans:property> </beans:bean> <job job-repository="jobRepository" id="insertIntoDbFromCsvJob"> <step id="step1"> <tasklet transaction-manager="transactionManager">
<chunk
                    reader="csvFileReader"
                    processor="compositeBankCustomerProcessor"
                    writer="jdbcItemWriter"
                    commit-interval="5"
                 />
            </tasklet>
        </step>
    </job>

The example created a very simple workflow. The first ItemProcessor<T> will take an input of whatever's coming from the ItemReader<T> configured for this job, presumably a Customer object. It will check the credit score of the Customer and, if approved, forward the Customer to the salary and income validation processor. If everything checks out there, the Customer will be forwarded to the eligibility processor, where the system is checked for duplicates or any other invalid data. It will finally be forwarded to the writer to be added to the output. If at any point in the three processors the Customer fails a check, the executing ItemProcessor can simply return null and arrest processing.

Better Living through Transactions

Problem

You want your reads and writes to be robust. Ideally, they'll use transactions where appropriate and correctly react to exceptions.

Solution

Transaction capabilities are built on top of the first class support already provided by the core Spring framework. Where relevant, Spring Batch surfaces the configuration so that you can control it. Within the context of chunk-oriented processing, it also exposes a lot of control over the frequency of commits, rollback semantics, and so on.

How It Works

Transactions

Spring's core framework provides first-class support for transactions. You simply wire up a TransactionManager and give Spring Batch a reference, just as you would in any regular JdbcTemplate or HibernateTemplate solution. As you build your Spring Batch solutions, you'll be given opportunities to control how steps behave in a transaction. You've already seen some of the support for transactions baked right in.

The batch.xml file, used in all these examples, established a BasicDataSource and a DataSourceTransactionManager bean. The TransactionManager and BasicDataSource were then wired to the JobRepository, which was in turn wired to the JobLauncher, which you used to launch all jobs thus far. This enabled all the metadata your jobs create to be written to the database in a transactional way.

You might wonder why there is no explicit mention of the TransactionManager when you configured the JdbcItemWriter with a reference to dataSource. The transaction manager reference can be specified, but in your solutions, it wasn't required because Spring Batch will, by default, try to pluck the PlatformTransactionManager named transactionManager from the context and use it. If you want to explicitly configure this, you can specify the transactionManager property on the tasklet element. A simple TransactionManager for JDBC work might look like this:

<bean id="myCustomTransactionManager" class="org.springframework.jdbc.datasource.
Transactions
DataSourceTransactionManager" p:dataSource-ref="dataSource" /> <job job-repository="jobRepository" id="insertIntoDbFromCsvJob"> <step id="step1"> <tasklet transaction-manager="myCustomTransactionManager" > <!-- ... --> </tasklet> </step> </job> ...

Items read from an ItemReader<T> are normally aggregated. If a commit on the ItemWriter<T> fails, the aggregated items are kept and then resubmitted. This process is efficient and works most of the time. One place where it breaks semantics is when reading from a transactional message queue. Reads from a message queue can and should be rolled back if the transaction they participate in (in this case, the transaction for the writer) fails:

<tasklet transaction-manager="customTransactionManager" >
        <chunk
            reader="jmsItemReader" is-reader-transactional-queue="true"
            processor="userRegistrationValidationProcessor"
            writer="jdbcItemWriter"
            commit-interval="5" />
    </tasklet>

Rollbacks

Handling the simple case ("read X items, and every Y items, commit a database transaction every Y items") is easy. Spring Batch excels in the robustness it surfaces as simple configuration options for the edge and failure cases.

If a write fails on an ItemWriter, or some other exception occurs in processing, Spring Batch will roll back the transaction. This is valid handling for a majority of the cases. There may be some scenarios when you want to control which exceptional cases cause the transaction to roll back.

You can use the no-rollback-exception-classes element to configure this for the step. The value is a list of Exception classes that should not cause the transaction to roll back:

<step id = "step2">
        <tasklet>
                    <chunk reader="reader" writer="writer" commit-interval="10" />
                <no-rollback-exception-classes>
                    <include class="com.yourdomain.exceptions.YourBusinessException"/>
                </no-rollback-exception-classes>
        </tasklet>
</step>

Retrying

Problem

You are dealing with a requirement for functionality that may fail but is not transactional. Perhaps it is transactional but unreliable. You want to work with a resource that may fail when you try to read from or write to it. It may fail because of networking connectivity because an endpoint is down or for any other number of reasons. You know that it will likely be back up soon, though, and that it should be retried.

Solution

Use Spring Batch's retry capabilities to systematically retry the read or write.

How It Works

As you saw in the last recipe, it's easy to handle transactional resources with Spring Batch. When it comes to transient or unreliable resources, a different tack is required. Such resources tend to be distributed or manifest problems that eventually resolve themselves. Some (such as web services) cannot inherently participate in a transaction because of their distributed nature. There are products that can start a transaction on one server and propagate the transactional context to a distributed server and complete it there, although this tends to be very rare and inefficient. Alternatively, there's good support for distributed ("global" or XA) transactions if you can use it. Sometimes, however, you may be dealing with a resource that isn't either of those. A common example might be a call made to a remote service, such as an RMI service or a REST endpoint. Some invocations will fail but may be retried with some likelihood of success in a transactional scenario. For example, an update to the database resulting in org.springframework.dao.DeadlockLoserDataAccessException might be usefully retried.

Configuring a Step

The simplest example is in the configuration of a step. Here, you can specify exception classes on which to retry the operation. As with the rollback exceptions, you can delimit this list of exceptions with newlines or commas:

<step id = "step23">
<tasklet transaction-manager="transactionManager">
        <chunk reader="csvFileReader" writer="jdbcItemWriter" commit-interval="10"
Configuring a Step
retry-limit="3" cache-capacity="10"> <retryable-exception-classes> <include class="org.springframework.dao.DeadlockLoserDataAccessException"/> </retryable-exception-classes> </chunk> </tasklet> </step>

Retry Template

Alternatively, you can leverage Spring Batch's support for retries and recovery in your own code. For example, you can have a custom ItemWriter<T> in which retry functionality is desired or even an entire service interface for which retry support is desired.

Spring Batch supports these scenarios through the RetryTemplate that (much like its various other Template cousins) isolates your logic from the nuances of retries and instead enables you to write the code as though you were only going to attempt it once. Let Spring Batch handle everything else through declarative configuration.

The RetryTemplate supports many use cases, with convenient APIs to wrap otherwise tedious retry/fail/recover cycles in concise, single-method invocations.

Let's take a look at the modified version of a simple ItemWriter<T> from Recipe 9.4 on how to write a custom ItemWriter<T>. The solution was simple enough and would ideally work all the time. It fails to handle the error cases for the service, however. When dealing with RPC, always proceed as if it's almost impossible for things to go right; the service itself may surface a semantic or system violation. An example might be a duplicate database key, invalid credit card number, and so on. This is true whether the service is distributed or in-VM, of course.

Next, the RPC layer below the system may also fault. Here's the rewritten code, this time allowing for retries:

package com.apress.springrecipes.springbatch.solution2;

import java.util.List;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.log4j.Logger;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.retry.RetryCallback;
import org.springframework.batch.retry.RetryContext;
import org.springframework.batch.retry.support.RetryTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import com.apress.springrecipes.springbatch.User;
import com.apress.springrecipes.springbatch.UserRegistrationService;
import com.apress.springrecipes.springbatch.solution1.UserRegistration;
/**
 *
 *
 * This class writes the user registration by calling an RPC service (whose
 * client interface is wired in using Spring
 */
public class RetryableUserRegistrationServiceItemWriter implements
      ItemWriter<UserRegistration> {

   private static final Logger logger = Logger
         .getLogger(RetryableUserRegistrationServiceItemWriter.class);

   // this is the client interface to an HTTP Invoker service.
   @Autowired
   private UserRegistrationService userRegistrationService;

   @Autowired
   private RetryTemplate retryTemplate;

   /**
    * takes aggregated input from the reader and 'writes' them using a custom
    * implementation.
    */
   public void write(List<? extends UserRegistration> items)
    throws Exception {
      for (final UserRegistration userRegistration : items) {
         User registeredUser = retryTemplate.execute(
         new RetryCallback<User>() {
            public User doWithRetry(RetryContext context) throws Exception {
               return userRegistrationService.registerUser(userRegistration);
            }
         });
         logger.debug("Registered:"
               + ToStringBuilder.reflectionToString(registeredUser));
      }
   }
}

As you can see, the code hasn't changed much, and the result is much more robust. The RetryTemplate itself is configured in the Spring context, although it's trivial to create in code. I declare it in the Spring context only because there is some surface area for configuration when creating the object, and I try to let Spring handle the configuration.

One of the more useful settings for the RetryTemplate is the BackOffPolicy in use. The BackOffPolicy dictates how long the RetryTemplate should back off between retries. Indeed, there's even support for growing the delay between retries after each failed attempt to avoid lock stepping with other clients attempting the same invocation. This is great for situations in which there are potentially many concurrent attempts on the same resource and a race condition may ensue. There are other BackOffPolicies, including one that delays retries by a fixed amount called FixedBackOffPolicy.

<beans:bean id="retryTemplate" class="org.springframework.batch.retry.
support.RetryTemplate">
     <beans:property name="backOffPolicy" >
          <beans:bean     class="org.springframework.batch.retry.backoff.
Retry Template
ExponentialBackOffPolicy" p:initialInterval="1000" p:maxInterval="10000" p:multiplier="2" /> </beans:property> </beans:bean>

You have configured a RetryTemplate's backOffPolicy so that the backOffPolicy will wait 1 second (1,000 milliseconds) before the initial retry. Subsequent attempts will double that value (the growth is influenced by the multiplier). It'll continue until the maxInterval is met, at which point all subsequent retry intervals will level off, retrying at a consistent interval.

AOP-Based Retries

An alternative is an AOP advisor provided by Spring Batch that will wrap invocations of methods whose success is not guaranteed in retries, as you did with the RetryTemplate. In the previous example, you rewrote an ItemWriter<T> to make use of the template. Another approach might be to merely advise the entire userRegistrationService proxy with this retry logic. In this case, the code could go back to the way it was in the original example, with no RetryTemplate!

<aop:config>
    <aop:pointcut id="remote"
        expression="execution(* com..*RetryableUserRegistrationServiceItemWriter.*(..))" />
    <aop:advisor pointcut-ref="remote"  advice-ref="retryAdvice" order="-1"/>
</aop:config>

<beans:bean id="retryAdvice" class="org.springframework.batch.retry.interceptor.
AOP-Based Retries
RetryOperationsInterceptor"/>

Controlling Step Execution

Problem

You want to control how steps are executed, perhaps to eliminate a needless waste of time by introducing concurrency or by executing steps only if a condition is true.

Solution

There are different ways to change the runtime profile of your jobs, mainly by exerting control over the way steps are executed: concurrent steps, decisions, and sequential steps.

How It Works

Thus far, you have explored running one step in a job. Typical jobs of almost any complexity will have multiple steps, however. A step provides a boundary (transactional or not) to the beans and logic it encloses. A step can have its own reader, writer, and processor. Each step helps decide what the next step will be. A step is isolated and provides focused functionality that can be assembled using the updated schema and configuration options in Spring Batch 2.1 in very sophisticated workflows. In fact, some of the concepts and patterns you're about to see will be very familiar if you have an interest in business process management (BPM) systems and workflows. (To learn more about BPM, and jBPM in particular, see Chapter 23.) BPM provides many constructs for process or job control that are similar to what you're seeing here.

A step often corresponds to a bullet point when you outline the definition of a job on paper. For example, a batch job to load the daily sales and produce a report might be proposed as follows:

Daily Sales Report Job

  1. Load customers from the CSV file into the database.

  2. Calculate daily statistics, and write to a report file.

  3. Send messages to the message queue to notify an external system of the successful registration for each of the newly loaded customers.

Sequential Steps

In the previous example, there's an implied sequence between the first two steps: the audit file can't be written until all the registrations have completed. This sort of relationship is the default relationship between two steps. One occurs after the other. Each step executes with its own execution context and shares only a parent job execution context and an order.

<job       id="nightlyRegistrationsJob"
        job-repository="jobRepository">
        <step id="loadRegistrations" next="reportStatistics" >
        <tasklet ref = "tasklet1"/>
        </step>
        <step id="reportStatistics" next="..." >
        <tasklet ref     ="tasklet2"/>
        </step>
        <!-- ... other steps ...     -->
    </job>

Notice that you specify the next attribute on the step elements to tell processing which step to go to next.

Concurrency

The first version of Spring Batch was oriented toward batch processing inside the same thread and, with some alteration, perhaps inside the virtual machine. There were workarounds, of course, but the situation was less than ideal.

In the outline for this example job, the first step had to come before the second two because the second two are dependent on the first. The second two, however, do not share any such dependencies. There's no reason why the audit log couldn't be written at the same time as the JMS messages are being delivered. Spring Batch provides the capability to fork processing to enable just this sort of arrangement:

<job job-repository="jobRepository" id="insertIntoDbFromCsvJob">
        <step id="loadRegistrations" next="finalizeRegistrations">         <!--  ...  -->          </step>
        <split id="finalizeRegistrations" >
            <flow>
                <step id="reportStatistics" ><!--    ...     --></step>
            </flow>
            <flow>
                <step id="sendJmsNotifications" > <!--     ...  --></step>
            </flow>
        </split>
    </job>

In this example, there's nothing to prevent you from having many steps within the flow elements, nor was there anything preventing you from having more steps after the split element. The split element, like the step elements, takes a next attribute as well.

Spring Batch provides a mechanism to offload processing to another process. This feature, called remote chunking, is new in Spring Batch 2.x. This distribution requires some sort of durable, reliable connection. This is a perfect use of JMS because it's rock-solid and transactional, fast, and reliable. Spring Batch support is modeled at a slightly higher level, on top of the Spring Integration abstractions for Spring Integration channels. This support is not in the main Spring Batch code, though. Remote chunking lets individual steps read and aggregate items as usual in the main thread. This step is called the Master. Items read are sent to the ItemProcessor<I,O>/ItemWriter<T> running in another process (this is called the Slave). If the Slave is an aggressive consumer, you have a simple, generic mechanism to scale: work is instantly farmed out over as many JMS clients as you can throw at it. The aggressive-consumer pattern refers to the arrangement of multiple JMS clients all consuming the same queue's messages. If one client consumes a message and is busy processing, other idle queues will get the message instead. As long as there's a client that's idle, the message will be processed instantly.

Additionally, Spring Batch supports implicitly scaling out using a feature called partitioning. This feature is interesting because it's built in and generally very flexible. You replace your instance of a step with a subclass, PartitionStep, which knows how to coordinate distributed executors and maintains the metadata for the execution of the step, thus eliminating the need for a durable medium of communication as in the "remote chunking" technology.

The functionality here is also very generic. It could, conceivably, be used with any sort of grid fabric technology such as GridGain or Hadoop. (For more on GridGain, see Chapter 22.) Spring Batch ships with only a TaskExecutorPartitionHandler, which executes steps in multiple threads using a TaskExecutor strategy. This simple improvement might be enough of a justification for this feature! If you're really hurting, however, you can extend it.

Conditional Steps with Statuses

Using the ExitStatus of a given job or step to determine the next step is the simplest example of a conditional flow. Spring Batch facilitates this through the use of the stop, next, fail, and end elements. By default, assuming no intervention, a step will have an ExitStatus that matches its BatchStatus, which is a property whose values are defined in an enum and may be any of the following: COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, or UNKNOWN.

Let's look at an example that executes one of two steps based on the success of a preceding step:

<step id="step1" >
    <next on="COMPLETED" to="step2" > <!--     ...  --></step>
        <next on="FAILED" to="failureStep" > <!--    ...     --></step>
</step>

It's also possible to provide a wildcard. This is useful if you want to ensure a certain behavior for any number of BatchStatus, perhaps in tandem with a more specific next element that matches only one BatchStatus.

<step id="step1" >
    <next on="COMPLETED" to="step2" > <!--     ...  --></step>
        <next on="*" to="failureStep" > <!--  ...    --></step>
</step>

In this example, you are instructing Spring Batch to perform some step based on any unaccounted-for ExitStatus. Another option is to just stop processing altogether with a BatchStatus of FAILED. You can do this using the fail element. A less aggressive rewrite of the preceding example might be the following:

<step id="step1" >
    <next on="COMPLETED" to="step2" />
        <fail  on="FAILED"  />
        <!--     ...  -->
</step>

In all these examples, you're reacting to the standard BatchStatuses that the Spring Batch framework provides. But it's also possible to raise your own ExitStatus. If, for example, you wanted the whole job to fail with a custom ExitStatus of "MAN DOWN", you might do something like this:

<step id="step1" next="step2"><!--     ...  --></step>
  <step id="step2" ...>
    <fail on="FAILED" exit-code="MAN DOWN "/>
    <next on="*" to="step3"/>
  </step>
  <step id="step3"><!--  ...  --></step>

Finally, if all you want to do is end processing with a BatchStatus of COMPLETED, you can use the end element. This is an explicit way of ending a flow as if it had run out of steps and incurred no errors.

<next on="COMPLETED" to="step2" />
  <step id="step2" >
    <end on="COMPLETED"/>
    <next on="FAILED" to="errorStep"/>
    <!--     ...  -->
  </step>

Conditional Steps with Decisions

If you want to vary the execution flow based on some logic more complex than a job's ExitStatuses, you may give Spring Batch a helping hand by using a decision element and providing it with an implementation of a JobExecutionDecider.

package com.apress.springrecipes.springbatch.solution2;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class HoroscopeDecider implements JobExecutionDecider {

    private boolean isMercuryIsInRetrograde (){ return Math.random() > .9 ; }

    public FlowExecutionStatus decide(JobExecution jobExecution,
Conditional Steps with Decisions
StepExecution stepExecution) { if (isMercuryIsInRetrograde()) { return new FlowExecutionStatus("MERCURY_IN_RETROGRADE"); } return FlowExecutionStatus.COMPLETED; } }

All that remains is the XML configuration:

<beans:bean id="horoscopeDecider" class="com.apress.springrecipes.
springbatch.solution2.HoroscopeDecider"/>

 <job id="job">
    <step id="step1"     next="decision"  ><!--  ...    --></step>
    <decision id="decision" decider="horoscopeDecider">
      <next on="MERCURY_IN_RETROGRADE" to="step2" />
      <next on="COMPLETED" to="step3" />
    </decision>
    <step id="step2" next="step3"> <!--  ...  --> </step>
    <step id="step3" parent="s3">     <!--  ...  --> </step>
  </job>

Launching a Job

Problem

What deployment scenarios does Spring Batch support? How does Spring Batch launch? How does Spring Batch work with a system scheduler such as cron or autosys, or from a web application?

Solution

Spring Batch works well in all environments that Spring runs: your public static void main, OSGi, a web application—anywhere! Some use cases are uniquely challenging, though: it is rarely practical to run Spring Batch in the same thread as an HTTP response because it might end up stalling execution, for example. Spring Batch supports asynchronous execution for just this scenario. Spring Batch also provides a convenience class that can be readily used with cron or autosys to support launching jobs. Additionally, Spring 3.0's excellent scheduler namespace provides a great mechanism to schedule jobs.

How It Works

Before you get into creating a solution, it's important to know what options are available for deploying and running these solutions. All solutions require, at minimum, a job and a JobLauncher. You already configured these components in the previous recipe. The job is configured in your Spring XML application context, as you'll see later. The simplest example of launching a Spring Batch solution from Java code is about five lines of Java code, three if you've already got a handle to the ApplicationContext!

package com.apress.springrecipes.springbatch.solution1;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.Date;


public class Main {
    public static void main(String[] args) throws Throwable {
        ClassPathXmlApplicationContext ctx = new
How It Works
ClassPathXmlApplicationContext("solution2.xml"); ctx.start(); JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher"); Job job = (Job) ctx.getBean("myJobName"); JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); } }

As you can see, the JobLauncher reference you configured previously is obtained and used to then launch an instance of a Job. The result is a JobExecution. You can interrogate the JobExecution for information on the state of the Job, including its exit status and runtime status.

JobExecution jobExecution = jobLauncher.run(job, jobParameters);
BatchStatus batchStatus = jobExecution.getStatus();
while(batchStatus.isRunning()) {
        System.out.println( "Still running...");
        Thread.sleep( 10 * 1000 ); // 10 seconds
}

You can also get the ExitStatus:

System.out.println( "Exit code: "+ jobExecution.getExitStatus().getExitCode());

The JobExecution also provides a lot of other very useful information like the create time of the Job, the start time, the last updated date, and the end time—all as java.util.Date instances. If you want to correlate the job back to the database, you'll need the jobInstance and the ID:

JobInstance jobInstance = jobExecution.getJobInstance();
System.out.println( "job instance Id: "+ jobInstance.getId());

In our simple example, we use an empty JobParameters instance. In practice, this will only work once. Spring Batch builds a unique key based on the parameters and uses to keep uniquely identify one run of a given Job from another. You'll learn about parameterizing a Job in detail in the next recipe.

Launching From a Web Application

Launching a job from a web application requires a slightly different approach, because the client thread (presumably an HTTP request) can't usually wait for a batch job to finish. The ideal solution is to have the job execute asynchronously when launched from a controller or action in the web tier, unattended by the client thread. Spring Batch supports this scenario through the use of a Spring TaskExecutor. This requires a simple change to the configuration for the JobLauncher, although the Java code can stay the same. Here, we will use a SimpleAsyncTaskExecutor that will spawn a thread of execution and manage that thread without blocking.

<beans:bean id="jobLauncher"
class="org.springframework.batch.execution.launch.support.SimpleJobLauncher"
        p:jobRepository-ref="jobRepository" >
      <beans:property name="taskExecutor">
      <beans:bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
      </beans:property>
</beans:bean>

Running from the Command Line

Another common use case is deployment of a batch process from a system scheduler such as cron or autosys, or even Window's event scheduler. Spring Batch provides a convenience class that takes as its parameters the name of the XML application context (that contains everything required to run a job) as well as the name of the job bean itself. Additional parameters may be provided and used to parameterize the job. These parameters must be in the form name=value. An example invocation of this class on the command line (on a Linux/Unix system), assuming that you set up the classpath, might look like this:

java CommandLineJobRunner jobs.xml hourlyReport date =`date +%m/%d/%Y` time=`date +%H`

The CommandLineJobRunner will even return system error codes (0 for success, 1 for failure, and 2 for an issue with loading the batch job) so that a shell (such as used by most system schedulers) can react or do something about the failure. More complicated return codes can be returned by creating and declaring a top-level bean that implements the interface ExitCodeMapper, in which you can specify a more useful translation of exit status messages to integer-based error codes that the shell will see on process exit.

Running On A Schedule

Spring 3.0 debuts support for a scheduling framework. This framework lends itself perfectly to running Spring Batch. First, let's modify our existing application context batch.xml to use the Spring scheduling namespace. The additions consist mainly of changes to schema imports, as well as the declaration of a few beans. The application context file now starts off like so:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans
        xmlns="http://www.springframework.org/schema/batch"
        xmlns:beans="http://www.springframework.org/schema/beans"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:p="http://www.springframework.org/schema/p"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:util="http://www.springframework.org/schema/util"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
Running On A Schedule
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch
Running On A Schedule
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/aop
Running On A Schedule
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx
Running On A Schedule
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/task
Running On A Schedule
http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/util
Running On A Schedule
http://www.springframework.org/schema/util/spring-util-3.0.xsd
Running On A Schedule
http://www.springframework.org/schema/context
Running On A Schedule
http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan annotation-config="true" base-package="com.apress.springrecipes.springbatch"/> <task:scheduler id="scheduler" pool-size="10"/> <task:executor id="executor" pool-size="10"/> <task:annotation-driven scheduler="scheduler" executor="executor"/> ...

These imports enable the simplest possible support for scheduling. The preceding declaration ensures that any bean under the package com.apress.springrecipes.springbatch will be configured and scheduled as required. Our bean is as follows:

package com.apress.springrecipes.springbatch.scheduler;

import org.apache.commons.lang.StringUtils;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMain {

    @Autowired private JobLauncher jobLauncher;

    @Autowired private Job job;

    public void runRegistrationsJob(Date date) throws Throwable {
        System.out.println(StringUtils.repeat("-", 100));
        System.out.println("Starting job at " + date.toString());
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("date", date);
        jobParametersBuilder.addString("input.file", "registrations");
        JobParameters jobParameters = jobParametersBuilder.toJobParameters();
        JobExecution jobExecution = jobLauncher.run(job, jobParameters);
        System.out.println("jobExecution finished, exit code: " +
                jobExecution.getExitStatus().getExitCode());
    }

    @Scheduled(fixedDelay = 1000 * 10)
    void runRegistrationsJobOnASchedule() throws Throwable {
        runRegistrationsJob(new Date());
    }

    public static void main(String[] args) throws Throwable {
        ClassPathXmlApplicationContext classPathXmlApplicationContext =
                new ClassPathXmlApplicationContext("scheduled_batch.xml", "solution1.xml");
        classPathXmlApplicationContext.start();
    }
}

There is nothing particularly novel; it's a good study of how the different components of the Spring framework work well together. The bean is recognized and becomes part of the application context because of the @Component annotation, which we enabled with the context:component-scan element in our reworked batch.xml (which we're calling scheduled_batch.xml). There's only one Job in the solution1.xml file and only one JobLauncher, so we simply have those auto-wired into our bean. Finally, the logic for kicking off a batch run is inside the runRegistrationsJob(java.util.Date date) method. This method could be called from anywhere. Our only client for this functionality is the scheduled method runRegistrationsJobOnASchedule. The framework will invoke this method for us, according to the timeline dictated by the @Scheduled annotation.

There are other options for this sort of thing; traditionally in the Java and Spring world, this sort of problem would be a good fit for Quartz. It might still be, as the Spring scheduling support isn't designed to be as extensible as Quartz. If you are in an environment requiring more traditional, ops-friendly scheduling tools, there are of course old standbys like cron, autosys, and BMC, too.

Parameterizing a Job

Problem

The previous examples work well enough, but they leave something to be desired in terms of flexibility. To apply the batch code to some other file, you'd have to edit the configuration and hard-code the name in there. The ability to parameterize the batch solution would be very helpful.

Solution

Use JobParameters to parameterize a job, which is then available to your steps through Spring Batch's expression language or via API calls.

How It Works

Launching a Job with Parameters

A job is a prototype of a JobInstance. JobParameters are used to provide a way of identifying a unique run of a job (a JobInstance). These JobParameters allow you to give input to your batch process, just as you would with a method definition in Java. You've seen the JobParameters in previous examples but not in detail. The JobParameters object is created as you launch the job using the JobLauncher. To launch a job called dailySalesFigures, with the date for the job to work with, you would write something like this:

ClassPathXmlApplicationContext classPathXmlApplicationContext = new
ClassPathXmlApplicationContext("solution2.xml");
classPathXmlApplicationContext.start();
JobLauncher jobLauncher = (JobLauncher) classPathXmlApplicationContext.
Launching a Job with Parameters
getBean("jobLauncher"); Job job = (Job) classPathXmlApplicationContext.getBean("dailySalesFigures"); jobLauncher.run(job, new JobParametersBuilder().addDate( "date", new Date() ).toJobParameters());

Accessing JobParameters

Technically, you can get at JobParameters via any of the ExecutionContexts (step and job). Once you have it, you can access the parameters in a type-safe way by calling getLong(), getString(), and so on. A simple way to do this is to bind to the @BeforeStep event, save the StepExecution, and iterate over the parameters this way. From here, you can inspect the parameters and do anything you want with them. Let's look at that in terms of the ItemProcessor<I,O> you wrote earlier:

// ...
private StepExecution stepExecution;

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
  this.stepExecution = stepExecution;
}

public UserRegistration process(UserRegistration input) throws Exception {

    Map<String, JobParameter> params =  stepExecution.getJobParameters().
Accessing JobParameters
getParameters(); // iterate over all of the parameters for (String jobParameterKey : params.keySet()) { System.out.println(String.format("%s=%s", jobParameterKey, params.get(jobParameterKey).getValue().toString())); } // access specific parameters in a type safe way Date date = stepExecution.getJobParameters().getDate("date"); // etc ... }

This turns out to be of limited value. The 80 percent case is that you'll need to bind parameters from the job's launch to the Spring beans in the application context. These parameters are available only at runtime, whereas the steps in the XML application context are configured at design time. This happens in many places. Previous examples demonstrated ItemWriters<T> and ItemReaders <T> with hard-coded paths. That works fine unless you want to parameterize the file name. This is hardly acceptable unless you plan on using a job just once!

The core Spring Framework 3.0 features an enhanced expression language that Spring Batch 2.0 (depending on Spring Framework 3.0) uses to defer binding of the parameter until the correct time. Or, in this case, until the bean is in the correct scope. Spring Batch 2.0 introduces the "step" scope for just this purpose. Let's take a look at how you'd rework the previous example to use a parameterized file name for the ItemReader's resource:

<beans:bean
        scope="step"
        id="csvFileReader"
        class="org.springframework.batch.item.file.FlatFileItemReader"
        p:resource="file:${user.home}/batches/#{jobParameters['input.fileName']}.csv">
 <!--  ... this is the same as before...-->
</beans:bean>

All you did is scope the bean (the FlatFileItemReader<T>) to the life cycle of a step (at which point those JobParameters will resolve correctly) and then used the EL syntax to parameterize the path to work off of.

Note that the earlier versions of Spring Batch 2.0 featured an expression language in advance of the Spring Expression Language that debuted in the core framework for 3.0 (SpEL). This expression language is by and large similar to SpEL but is not exact. In the preceding example, we reference the input variable as #{jobParameters['input.fileName']}, which works in Spring Batch with SpEL. In the original Spring Batch expression language, however, you could have left off the quotes.

Summary

This chapter introduced you to the concepts of batch processing, some of its history, and why it fits in a modern day architecture. You learned about Spring Batch, the batch processing from SpringSource, and how to do reading and writing with ItemReader<T> and ItemWriter<T> implementations in your batch jobs. You wrote your own ItemReader<T>, and ItemWriter <T>implementations as needed and saw how to control the execution of steps inside a job.

The next chapter will discuss Terracotta and GridGain. You'll learn how to use Terracotta to build a distributed cache and take your Spring applications onto the grid with GridGain.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset