© Marten Deinum, Daniel Rubio, and Josh Long 2017

Marten Deinum, Daniel Rubio and Josh Long, Spring 5 Recipes, https://doi.org/10.1007/978-1-4842-2790-9_11

11. Spring Batch

Marten Deinum, Daniel Rubio2 and Josh Long3

(1)Meppel, Drenthe, The Netherlands

(2)F. Bahia, Ensenada, Baja California, Mexico

(3)Apartment 205, Canyon Country, California, USA

Batch processing has been around for decades. The earliest widespread applications of technology for managing information (information technology) were applications of batch processing. These environments didn’t have interactive sessions and usually didn’t have the capability to load multiple applications in memory. Computers were expensive and bore no resemblance to today’s servers. Typically, machines were multiuser and in use during the day (time-shared). During the evening, however, the machines would sit idle, which was a tremendous waste. Businesses invested in ways to utilize the offline time to do work aggregated through the course of the day. Out of this practice emerged batch processing.

Batch processing solutions typically run offline, unaware of events in the system. In the past, batch processes ran offline out of necessity. Today, however, most batch processes are run offline because having work done at a predictable time and having chunks of work done are requirements for lots of architectures. A batch processing solution doesn’t usually respond to requests, although there’s no reason it couldn’t be started as a consequence of a message or request. Batch processing solutions tend to be used on large data sets where the duration of the processing is a critical factor in its architecture and implementation. A process might run for minutes, hours, or days! Jobs may have unbounded durations (i.e., run until all work is finished, even if this means running for a few days), or they may be strictly bounded (jobs must proceed in constant time, with each row taking the same amount of time regardless of bound, which lets you, say, predict that a given job will finish in a certain time window).

Batch processing has had a long history that informs even modern batch processing solutions.

Mainframe applications used batch processing , and one of the largest modern-day environments for batch processing, CICS on z/OS, is still fundamentally a mainframe operating system. Customer Information Control System (CICS) is very well suited to a particular type of task: take input, process it, and write it to output. CICS is a transaction server used most in financial institutions and government that runs programs in a number of languages (COBOL, C, PLI, and so on). It can easily support thousands of transactions per second. CICS was one of the first containers, a concept familiar to Spring and Java EE users, even though CICS itself debuted in 1969! A CICS installation is very expensive, and although IBM still sells and installs CICS, many other solutions have come along since then. These solutions are usually specific to a particular environment: COBOL/CICS on mainframes, C on Unix, and, today, Java on any number of environments. The problem is that there’s very little standardized infrastructure for dealing with these types of batch processing solutions. Few people are even aware of what they’re missing because there’s very little native support on the Java platform for batch processing. Businesses that need a solution typically end up writing it in-house, resulting in fragile, domain-specific code.

The pieces are there, however: transaction support, fast I/O, schedulers such as Quartz , and solid threading support, as well as a very powerful concept of an application container in Java EE and Spring. It was only natural that Dave Syer and his team would come along and build Spring Batch, a batch processing solution for the Spring platform.

It’s important to think about the kinds of problems this framework solves before diving into the details. A technology is defined by its solution space. A typical Spring Batch application typically reads in a lot of data and then writes it back out in a modified form. Decisions about transactional barriers, input size, concurrency, and order of steps in processing are all dimensions of a typical integration.

A common requirement is loading data from a comma-separated value (CSV) file , perhaps as a business-to-business (B2B) transaction or perhaps as an integration technique with an older legacy application. Another common application is nontrivial processing on records in a database. Perhaps the output is an update of the database record itself. An example might be resizing images on the file system whose metadata is stored in a database or needing to trigger another process based on some condition.

Note

Fixed-width data is a format of rows and cells, quite like a CSV file. CSV file cells are separated by commas or tabs, however, and fixed-width data works by presuming certain lengths for each value. The first value might be the first nine characters, the second value the next four characters after that, and so on.

Fixed-width data that is often used with legacy or embedded systems is a fine candidate for batch processing. Processing that deals with a resource that’s fundamentally nontransactional (e.g., a web service or a file) begs for batch processing because batch processing provides retry/skip/fail functionality that most web services will not.

It’s also important to understand what Spring Batch doesn’t do. Spring Batch is a flexible but not all-encompassing solution. Just as Spring doesn’t reinvent the wheel when it can be avoided, Spring Batch leaves a few important pieces to the discretion of the implementer. Case in point: Spring Batch provides a generic mechanism by which to launch a job, be it by the command line, a Unix cron, an operating system service, Quartz (discussed in Chapter 13), or in response to an event on an enterprise service bus (for example, the Mule ESB or Spring’s own ESB-like solution, Spring Integration , which is discussed in Chapter 15). Another example is the way Spring Batch manages the state of batch processes. Spring Batch requires a durable store. The only useful implementation of a JobRepository (an interface provided by Spring Batch for storing batch metadata entities) requires a database because a database is transactional and there’s no need to reinvent it. To which database you should deploy, however, is largely unspecified, although there are useful defaults provided for you, of course.

Note

The JEE7 specification includes JSR-352 (Batch Applications for the Java Platform). Spring Batch is the reference implementation of this specification.

Runtime Metadata Model

Spring Batch works with a JobRepository, which is the keeper of all the knowledge and metadata for each job (including component parts such as JobInstances, JobExecution, and StepExecution). Each job is composed of one or more steps, one after another. With Spring Batch, a step can conditionally follow another step, allowing for primitive workflows.

These steps can also be concurrent; two steps can run at the same time.

When a job is run, it’s often coupled with JobParameter to parameterize the runtime behavior of the Job itself. For example, a job might take a date parameter to determine which records to process. To identify a job run, a JobInstance is created. A JobInstance is unique because of the JobParameters associated with it. Each time the same JobInstance (i.e., the same Job and JobParameters) is run, it’s called a JobExecution. This is a runtime context for a version of the Job. Ideally, for every JobInstance there’d be only one JobExecution: the JobExecution that was created the first time the JobInstance ran. However, if there were any errors, the JobInstance should be restarted; the subsequent run would create another JobExecution. For every step in the original job, there is a StepExecution in the JobExecution.

Thus, you can see that Spring Batch has a mirrored object graph—one reflecting the design/build time view of a job and another reflecting the runtime view of a job . This split between the prototype and the instance is similar to the way many workflow engines, including jBPM, work.

For example, suppose that a daily report is generated at 2 a.m. The parameter to the job would be the date (most likely the previous day’s date). The job, in this case, would model a loading step, a summary step, and an output step. Each day the job is run, a new JobInstance and JobExecution would be created. If there are any retries of the same JobInstance, conceivably many JobExecutions would be created.

11-1. Set Up Spring Batch’s Infrastructure

Problem

Spring Batch provides a lot of flexibility and guarantees to your application, but it cannot work in a vacuum. To do its work, the JobRepository requires data storage (could be a SQL database or other means of storing data). Additionally, there are several collaborators required for Spring Batch to do its work. This configuration is mostly boilerplate.

Solution

In this recipe, you’ll set up the Spring Batch database and also create a Spring application configuration that can be imported by subsequent solutions. This configuration is repetitive and largely uninteresting. It will also tell Spring Batch what database to use for the metadata it stores.

How It Works

The JobRepository interface is the first thing that you’ll have to deal with when setting up a Spring Batch process. You usually don’t deal with it in code, but in a Spring configuration, it is key to getting everything else working. There’s only one really useful implementation of the JobRepository interface called SimpleJobRepository, which stores information about the state of the batch processes in a data store. Creation is done through a JobRepositoryFactoryBean. Another standard factory, MapJobRepositoryFactoryBean, is useful mainly for testing because its state is not durable—it’s an in-memory implementation. Both factories create an instance of SimpleJobRepository.

Because this JobRepository instance works on your database, you need to set up the schema for Spring Batch to work with. The schemas for different databases are in the Spring Batch distribution. The simplest way to initialize your database is to use a DataSourceInitializer in a Java config. The files can be found in the org/springframework/batch/core directory; there are several .sql files, each containing the data definition language (DDL, the subset of SQL used for defining and examining the structure of a database) for the required schema for the database of your choice. These examples will use H2 (an in-memory database), so you will use the DDL for H2: schema-h2.sql. Make sure you configure it and tell Spring Batch about it, as in the following configurations :

@Configuration
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:batch.properties")
public class BatchConfiguration {


    @Autowired
    private Environment env;


    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setUrl(env.getRequiredProperty("dataSource.url"));
        dataSource.setUsername(env.getRequiredProperty("dataSource.username"));
        dataSource.setPassword(env.getRequiredProperty("dataSource.password"));
        return dataSource;
    }


    @Bean
    public DataSourceInitializer dataSourceInitializer() {
        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource());
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }


    private DatabasePopulator databasePopulator() {
        ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
        databasePopulator.setContinueOnError(true);
        databasePopulator.addScript(
            new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
        databasePopulator.addScript(
            new ClassPathResource("sql/reset_user_registration.sql"));
        return databasePopulator;
    }


    @Bean
    public DataSourceTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }


    @Bean
    public JobRepositoryFactoryBean jobRepository() {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource());
        jobRepositoryFactoryBean.setTransactionManager(transactionManager());
        return jobRepositoryFactoryBean;
    }


    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository().getObject());
        return jobLauncher;
    }


    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
        jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry());
        return jobRegistryBeanPostProcessor;
    }


    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }
}

The first few beans are related strictly to configuration. There’s nothing particularly novel or peculiar to Spring Batch: a data source, a transaction manager, and a data source initializer .

Eventually, you get to the declaration of a MapJobRegistry instance. This is critical—it is the central store for information regarding a given job, and it controls the “big picture” about all jobs in the system. Everything else works with this instance.

Next, you have a SimpleJobLauncher, whose sole purpose is to give you a mechanism to launch batch jobs, where a “job” in this case is your batch solution. The jobLauncher is used to specify the name of the batch solution to run as well as any parameters required. I’ll follow up more on that in the next recipe.

Next, you define a JobRegistryBeanPostProcessor. This bean scans your Spring context file and associates any configured jobs with the MapJobRegistry.

Finally, you get to the SimpleJobRepository (that is, in turn, factoried by the JobRepositoryFactoryBean). The JobRepository is an implementation of a “repository” (in the Patterns of Enterprise Application Architecture sense of the word): it handles persistence and retrieval for the domain models surrounding steps and jobs.

The @PropertySource annotation will instruct Spring to load your batch.properties file (located in src/main/resource). You are going to retrieve the properties you need using the Environment class.

Tip

You could have also used an @Value annotation to inject all individual properties, but when needing multiple properties in a configuration class, it is easier to use the Environment object.

The batch.properties file contains the following:

dataSource.password=sa
dataSource.username=
dataSource.url= jdbc:h2:∼/batch

Although this works, Spring Batch also has support to configure these defaults out of the box using the @EnableBatchProcessing annotation. This makes things a little easier.

package com.apress.springrecipes.springbatch.config;

import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;


import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
public class BatchConfiguration {


    @Autowired
    private Environment env;


    @Bean
    public DataSource dataSource() {
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setUrl(env.getRequiredProperty("dataSource.url"));
        dataSource.setDriverClassName(
            env.getRequiredProperty("dataSource.driverClassName"));
        dataSource.setUsername(env.getProperty("dataSource.username"));
        dataSource.setPassword(env.getProperty("dataSource.password"));
        return dataSource;
    }


    @Bean
    public DataSourceInitializer databasePopulator() {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-derby.sql"));
        populator.addScript(new ClassPathResource("sql/reset_user_registration.sql"));
        populator.setContinueOnError(true);
        populator.setIgnoreFailedDrops(true);


        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDatabasePopulator(populator);
        initializer.setDataSource(dataSource());
        return initializer;
    }
}

This class contains only two bean definitions: one for the data source and one for initializing the database; everything else has been taken care of because of the @EnableBatchProcessing annotation. The previous configuration class will bootstrap Spring Batch with some sensible defaults.

The default configuration will configure a JobRepository, JobRegistry, and JobLauncher.

If there are multiple data sources in your application, you need to add an explicit BatchConfigurer to select the data source to use for the batch part of your application.

The following Main class will use the Java-based configuration for running the batch application:

package com.apress.springrecipes.springbatch;

import com.apress.springrecipes.springbatch.config.BatchConfiguration;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class Main {
    public static void main(String[] args) throws Throwable {
        ApplicationContext context = new AnnotationConfigApplicationContext(BatchConfiguration.class);


        JobRegistry jobRegistry = context.getBean("jobRegistry", JobRegistry.class);
        JobLauncher jobLauncher = context.getBean("jobLauncher", JobLauncher.class);
        JobRepository jobRepository = context.getBean("jobRepository", JobRepository.class);


        System.out.println("JobRegistry: " + jobRegistry);
        System.out.println("JobLauncher: " + jobLauncher);
        System.out.println("JobRepository: " + jobRepository);


    }
}

11-2. Read and Write Data

Problem

You want to insert data from a CSV file into a database. This solution will be one of the simplest solutions and will give you a chance to explore the moving pieces of a typical solution.

Solution

You’ll build a solution that does a minimal amount of work , while being a viable application of the technology. The solution will read in a file of arbitrary length and write out the data into a database. The end result will be almost 100 percent code free. You will rely on an existing model class and write one class (a class containing the public static void main(String [] args) method) to round out the example. There’s no reason why the model class couldn’t be a Hibernate class or something from your DAO layer; however, in this case it’s a brainless POJO. This solution will use the components you configured in recipe 11-1.

How It Works

This example demonstrates the simplest possible use of Spring Batch : to provide scalability. This program will do nothing but read data from a CSV file, with fields delimited by commas and rows delimited by newlines. It then inserts the records into a table. You are exploiting the intelligent infrastructure that Spring Batch provides to avoid worrying about scaling. This application could easily be done manually. You will not exploit any of the smart transactional functionality made available to you, nor will you worry about retries for the time being.

This solution is as simple as Spring Batch solutions get. Spring Batch models solutions using XML schema. The abstractions and terms are in the spirit of classical batch processing solutions so will be portable from previous technologies and perhaps to subsequent technologies. Spring Batch provides useful default classes that you can override or selectively adjust. In the following example, you’ll use a lot of the utility implementations provided by Spring Batch. Fundamentally, most solutions look about the same and feature a combination of the same set of interfaces. It’s usually just a matter of picking and choosing the right ones.

When I ran this program , it worked on files with 20,000 rows, and it worked on files with 1 million rows. I experienced no increase in memory, which indicates there were no memory leaks. Naturally, it took a lot longer! (The application ran for several hours with the 1-million-row insert.)

Tip

Of course, it would be catastrophic if you worked with a million rows and it failed on the penultimate record. You’d lose all your work when the transaction rolled back! Read on for examples on chunking. Additionally, you might want to read Chapter 10 to brush up on transactions.

create table USER_REGISTRATION (
    ID BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
    FIRST_NAME VARCHAR(255) not null,
    LAST_NAME VARCHAR(255) not null,
    COMPANY VARCHAR(255) not null,
    ADDRESS VARCHAR(255) not null,
    CITY VARCHAR(255) not null,
    STATE VARCHAR(255) not null,
    ZIP VARCHAR(255) not null,
    COUNTY VARCHAR(255) not null,
    URL VARCHAR(255) not null,
    PHONE_NUMBER VARCHAR(255) not null,
    FAX VARCHAR(255) not null
) ;

I didn’t tune the table at all. For example, there are no indexes on any of the columns besides the primary key. This is to avoid complicating the example. Great care should be taken with a table like this one in a nontrivial, production-bound application.

Spring Batch applications are workhorse applications and have the potential to reveal bottlenecks in your application you didn’t know you had. Imagine suddenly being able to achieve 1 million new database insertions every 10 minutes. Would your database grind to a halt? Insert speed can be a critical factor in the speed of your application. Software developers will (ideally) think about their database schema in terms of how well it enforces the constraints of the business logic and how well it serves the overall business model. However, it’s important to wear another hat, that of a DBA, when writing applications such as this one. A common solution is to create a denormalized table whose contents can be coerced into valid data once inside the database, perhaps by a trigger on inserts. This is typical in data warehousing. Later, you’ll explore using Spring Batch to do processing on a record before insertion. This lets the developer verify or override the input into the database. This processing, in tandem with a conservative application of constraints that are best expressed in the database, can make for applications that are very robust and quick.

The Job Configuration

The configuration for the job is as follows:

package com.apress.springrecipes.springbatch.config;

import com.apress.springrecipes.springbatch.UserRegistration;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;


import javax.sql.DataSource;

@Configuration
public class UserJob {


    private static final String INSERT_REGISTRATION_QUERY =
        "insert into USER_REGISTRATION (FIRST_NAME, LAST_NAME, COMPANY, ADDRESS,CITY,STATE,ZIP,COUNTY,URL,PHONE_NUMBER,FAX)" +
        " values " +
        "(:firstName,:lastName,:company,:address,:city,:state,:zip,:county,:url,:phoneNumber,:fax)";


    @Autowired
    private JobBuilderFactory jobs;


    @Autowired
    private StepBuilderFactory steps;


    @Autowired
    private DataSource dataSource;


    @Value("file:${user.home}/batches/registrations.csv")
    private Resource input;


    @Bean
    public Job insertIntoDbFromCsvJob() {
        return jobs.get("User Registration Import Job")
            .start(step1())
            .build();
    }


    @Bean
    public Step step1() {
        return steps.get("User Registration CSV To DB Step")
            .<UserRegistration,UserRegistration>chunk(5)
            .reader(csvFileReader())
            .writer(jdbcItemWriter())
            .build();
    }


    @Bean
    public FlatFileItemReader<UserRegistration> csvFileReader() {
        FlatFileItemReader<UserRegistration> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(lineMapper());
        itemReader.setResource(input);
        return itemReader;
    }


    @Bean
    public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
        JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
        itemWriter.setDataSource(dataSource);
        itemWriter.setSql(INSERT_REGISTRATION_QUERY);
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return itemWriter;
    }


    @Bean
    public DefaultLineMapper<UserRegistration> lineMapper() {
        DefaultLineMapper<UserRegistration> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer());
        lineMapper.setFieldSetMapper(fieldSetMapper());
        return lineMapper;
    }


    @Bean
    public BeanWrapperFieldSetMapper<UserRegistration> fieldSetMapper() {
        BeanWrapperFieldSetMapper<UserRegistration> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(UserRegistration.class);
        return fieldSetMapper;
    }


    @Bean
    public DelimitedLineTokenizer tokenizer() {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setDelimiter(",");
        tokenizer.setNames(new String[]{"firstName","lastName","company","address","city","state","zip","county","url","phoneNumber","fax"});
        return tokenizer;
    }
}

As described earlier, a job consists of steps, which are the real workhorse of a given job. The steps can be as complex or as simple as you like. Indeed, a step could be considered the smallest unit of work for a job. Input (what’s read) is passed to the step and potentially processed; then output (what’s written) is created from the step. This processing is spelled out using a Tasklet (which is another interface provided by Spring Batch). You can provide your own Tasklet implementation or simply use some of the preconfigured configurations for different processing scenarios. These implementations are made available in terms of configuration methods. One of the most important aspects of batch processing is chunk-oriented processing, which is employed here using the chunk() configuration method.

In chunk-oriented processing, input is read from a reader, optionally processed, and then aggregated. Finally, at a configurable interval—as specified by the commit-interval attribute to configure how many items will be processed before the transaction is committed—all the input is sent to the writer. If there is a transaction manager in play, the transaction is also committed. Right before a commit, the metadata in the database is updated to mark the progress of the job.

There are some nuances surrounding the aggregation of the input (read) values when a transaction-aware writer (or processor) rolls back. Spring Batch caches the values it reads and writes them to the writer. If the writer component is transactional, like a database, and the reader is not, there’s nothing inherently wrong with caching the read values and perhaps retrying or taking some alternative approach. If the reader itself is also transactional, then the values read from the resource will be rolled back and could conceivably change, rendering the in-memory cached values stale. If this happens, you can configure the chunk to not cache the values using reader-transactional-queue="true" on the chunk element .

Input

The first responsibility is reading a file from the file system. You use a provided implementation for the example. Reading CSV files is a common scenario, and Spring Batch’s support does not disappoint. The org.springframework.batch.item.file.FlatFileItemReader<T> class delegates the task of delimiting fields and records within a file to a LineMapper<T>, which in turn delegates the task of identifying the fields within that record to LineTokenizer. You use an org.springframework.batch.item.file.transform.DelimitedLineTokenizer, which is configured to delineate fields separated by a comma (,) character.

The DefaultLineMapper also declares a fieldSetMapper attribute that requires an implementation of FieldSetMapper. This bean is responsible for taking the input name-value pairs and producing a type that will be given to the writer component.

In this case, you use a BeanWrapperFieldSetMapper that will create a JavaBean POJO of type UserRegistration. You name the fields so that you can reference them later in the configuration. These names don’t have to be the values of some header row in the input file; they just have to correspond to the order in which the fields are found in the input file. These names are also used by the FieldSetMapper to match properties on a POJO. As each record is read, the values are applied to an instance of a POJO, and that POJO is returned.

@Bean                                                                          
public FlatFileItemReader<UserRegistration> csvFileReader() {
    FlatFileItemReader<UserRegistration> itemReader = new FlatFileItemReader<>();
    itemReader.setLineMapper(lineMapper());
    itemReader.setResource(input);
    return itemReader;
}


@Bean
public DefaultLineMapper<UserRegistration> lineMapper() {
    DefaultLineMapper<UserRegistration> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer());
    lineMapper.setFieldSetMapper(fieldSetMapper());
    return lineMapper;
}


@Bean
public BeanWrapperFieldSetMapper<UserRegistration> fieldSetMapper() {
    BeanWrapperFieldSetMapper<UserRegistration> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(UserRegistration.class);
    return fieldSetMapper;
}


@Bean
public DelimitedLineTokenizer tokenizer() {
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setDelimiter(",");
    tokenizer.setNames(new String[]{"firstName","lastName","company","address","city","state","zip","county","url","phoneNumber","fax"});
    return tokenizer;
}

The class returned from the reader, UserRegistration, is a rather plain JavaBean .

package com.apress.springrecipes.springbatch;

public class UserRegistration implements Serializable {

    private String firstName;
    private String lastName;
    private String company;
    private String address;
    private String city;
    private String state;
    private String zip;
    private String county;
    private String url;
    private String phoneNumber;
    private String fax;


   //... accessor / mutators omitted for brevity ...

}

Output

The next component to do work is the writer, which is responsible for taking the aggregated collection of items read from the reader. In this case, you might imagine that a new collection (java.util.List<UserRegistration>) is created, then written, and then reset each time the collection exceeds the commit-interval attribute on the chunk element. Because you’re trying to write to a database, you use Spring Batch’s org.springframework.batch.item.database.JdbcBatchItemWriter. This class contains support for taking input and writing it to a database. It is up to the developer to provide the input and to specify what SQL should be run for the input. It will run the SQL specified by the sql property, in essence reading from the database, as many times as specified by the chunk element’s commit-interval and then commit the whole transaction. Here, you’re doing a simple insert. The names and values for the named parameters are being created by the bean configured for the itemSqlParameterSourceProvider property, which is an instance of BeanPropertyItemSqlParameterSourceProvider, whose sole job it is to take JavaBean properties and make them available as named parameters corresponding to the property name on the JavaBean.

@Bean
public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
    JdbcBatchItemWriter<UserRegistration> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setDataSource(dataSource);
    itemWriter.setSql(INSERT_REGISTRATION_QUERY);
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    return itemWriter;
}

That’s it! A working solution. With little configuration and no custom code, you’ve built a solution for taking large CSV files and reading them into a database. This solution is bare-bones and leaves a lot of edge cases uncared for. You might want to do processing on the item as it’s read (before it’s inserted), for example.

This exemplifies a simple job. It’s important to remember that there are similar classes for doing the exact opposite transformation: reading from a database and writing to a CSV file.

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
        .build();
}


@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        .<UserRegistration,UserRegistration>chunk(5)
        .reader(csvFileReader())
        .writer(jdbcItemWriter())
        .build();
}

To configure the step, you give it the name User Registration CSV To DB Step. You are using chunk-based processing, and you need to tell it that you want a chunk size of 5. Next you supply it with a reader and writer, and finally you tell the factory to build to the step. The configured step is finally wired as a starting point to your job, named User Registration Import Job, which consists only of this step.

Simplify the ItemReader and ItemWriter Configuration

Configuring the ItemReader and ItemWriter can be a daunting task. You need to know a quite a lot of the internals of Spring Batch (which classes to use, etc.). As of Spring Batch 4, configuring the readers and writers has become easier as there are now specific builders for the different readers and writers.

To configure the FlatFileItemReader, you could use the FlatFileItemReaderBuilder and instead of configuring four individual beans, it is now six lines of code (mainly because the formatting in the sample).

@Bean
public FlatFileItemReader<UserRegistration> csvFileReader() throws Exception {


    return new FlatFileItemReaderBuilder<UserRegistration>()
        .name(ClassUtils.getShortName(FlatFileItemReader.class))
        .resource(input)
        .targetType(UserRegistration.class)
        .delimited()
        .names(new String[]{"firstName","lastName","company","address","city","state","zip","county","url","phoneNumber","fax"})
        .build();
}

This builder will automatically create the DefaultLineMapper, BeanWrapperFieldSetMapper, and DelimitedLineTokenizer, and you don’t have to know that they are used internally. You can now basically describe your configuration rather than explicitly configuring all the different items.

The same can be applied to the JdbcBatchItemWriter using the JdbcBatchItemWriterBuilder.

@Bean
public JdbcBatchItemWriter<UserRegistration> jdbcItemWriter() {
    return new JdbcBatchItemWriterBuilder<UserRegistration>()
        .dataSource(dataSource)
        .sql(INSERT_REGISTRATION_QUERY)
        .beanMapped()
        .build();
}

11-3. Write a Custom ItemWriter and ItemReader

Problem

You want to talk to a resource (you might imagine an RSS feed or any other custom data format) that Spring Batch doesn’t know how to connect to.

Solution

You can easily write your own ItemWriter or ItemReader. The interfaces are drop-dead simple, and there’s not a lot of responsibility placed on the implementations .

How It Works

As easy and trivial as this process is to do, it’s still not better than just reusing any of the numerous provided options. If you look, you’ll likely find something. There’s support for writing JMS (JmsItemWriter<T>), JPA (JpaItemWriter<T>), JDBC (JdbcBatchItemWriter<T>), files (FlatFileItemWriter<T>), Hibernate (HibernateItemWriter<T>), and more. There’s even support for writing by invoking a method on a bean (PropertyExtractingDelegatingItemWriter<T>) and passing to it as arguments the properties on the Item to be written! One of the more useful writers lets you write to a set of files that are numbered. This implementation—MultiResourceItemWriter<T>—delegates to the other proper ItemWriter<T> implementation for the work but lets you write to multiple files, not just one very large one. There’s a slightly smaller but impressive set of implementations for ItemReader implementations. If it doesn’t exist, look again. If you still can’t find one, consider writing your own. In this recipe, you will do just that.

Write a Custom ItemReader

The ItemReader example is trivial. Here, an ItemReader is created that knows how to retrieve UserRegistration objects from a remote procedure call (RPC) endpoint:

package com.apress.springrecipes.springbatch;

import org.springframework.batch.item.ItemReader;

import java.util.Collection;
import java.util.Date;


public class UserRegistrationItemReader implements ItemReader<UserRegistration> {

    private final UserRegistrationService userRegistrationService;

    public UserRegistrationItemReader(UserRegistrationService userRegistrationService) {
        this.userRegistrationService = userRegistrationService;
    }


    public UserRegistration read() throws Exception {
        final Date today = new Date();
        Collection<UserRegistration> registrations = userRegistrationService.getOutstandingUserRegistrationBatchForDate(1, today);
        return registrations.stream().findFirst().orElse(null);
    }
}

As you can see, the interface is trivial. In this case, you defer most work to a remote service to provide you with the input. The interface requires that you return one record. The interface is parameterized to the type of object (the “item”) to be returned. All the read items will be aggregated and then passed to the ItemWriter.

Write a Custom ItemWriter

The ItemWriter example is also trivial. Imagine wanting to write by invoking a remote service using any of the numerous options for remoting that Spring provides. The ItemWriter<T> interface is parameterized by the type of item you’re expecting to write. Here, you expect a UserRegistration object from the ItemReader<T>. The interface consists of one method, which expects a List of the class’s parameterized type. These are the objects read from ItemReader<T> and aggregated. If your commit-interval were ten, you might expect ten or fewer items in the List.

package com.apress.springrecipes.springbatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;


import java.util.List;

public class UserRegistrationServiceItemWriter implements ItemWriter<UserRegistration> {

    private static final Logger logger = LoggerFactory.getLogger(UserRegistrationServiceItemWriter.class);

    private final UserRegistrationService userRegistrationService;

    public UserRegistrationServiceItemWriter(UserRegistrationService userRegistrationService) {
        this.userRegistrationService = userRegistrationService;
    }


    public void write(List<?extends UserRegistration> items) throws Exception {
        items.forEach(this::write);
    }


    private void write(UserRegistration userRegistration) {
        UserRegistration registeredUserRegistration = userRegistrationService.registerUser(userRegistration);
        logger.debug("Registered: {}", registeredUserRegistration);


    }
}

Here, you’ve wired in the service’s client interface. You simply loop through the UserRegistration objects and invoke the service, which in turn hands you back an identical instance of UserRegistration. If you remove the gratuitous spacing, curly brackets , and logging output, it becomes two lines of code to satisfy the requirement.

The interface for UserRegistrationService follows:

package com.apress.springrecipes.springbatch;

import java.util.Collection;
import java.util.Date;


public interface UserRegistrationService {

    Collection<UserRegistration> getOutstandingUserRegistrationBatchForDate(
        int quantity, Date date);


    UserRegistration registerUser(UserRegistration userRegistrationRegistration);

}

In this example, you have no particular implementation for the interface, as it is irrelevant: it could be any interface that Spring Batch doesn’t know about already.

11-4. Process Input Before Writing

Problem

While transferring data directly from a spreadsheet or CSV dump might be useful, you can imagine having to do some sort of processing on the data before it’s written. Data in a CSV file, and more generally from any source, is not usually exactly the way you expect it to be or immediately suitable for writing. Just because Spring Batch can coerce it into a POJO on your behalf, that doesn’t mean the state of the data is correct. There may be additional data that you need to infer or fill in from other services before the data is suitable for writing.

Solution

Spring Batch will let you do processing on reader output. This processing can do virtually anything to the output before it gets passed to the writer, including changing the type of the data.

How It Works

Spring Batch gives the implementer a chance to perform any custom logic on the data read from the reader. The processor attribute on the chunk configuration expects a reference to a bean of the interface org.springframework.batch.item.ItemProcessor<I,O>. Thus, the revised definition for the job from the previous recipe looks like this:

@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        .<UserRegistration,UserRegistration>chunk(5)
        .reader(csvFileReader())
        .processor(userRegistrationValidationItemProcessor())
        .writer(jdbcItemWriter())
        .build();
}

The goal is to do certain validations on the data before you authorize it to be written to the database. If you determine the record is invalid, you can stop further processing by returning null from the ItemProcessor<I,O>. This is crucial and provides a necessary safeguard. One thing that you want to do is ensure that the data is the right format (for example, the schema may require a valid two-letter state name instead of the longer full state name). Telephone numbers are expected to follow a certain format, and you can use this processor to strip the telephone number of any extraneous characters, leaving only a valid (in the United States) ten-digit phone number. The same applies for U.S. zip codes, which consist of five characters and optionally a hyphen followed by a four-digit code. Finally, while a constraint guarding against duplicates is best implemented in the database, there may very well be some other eligibility criteria for a record that can be met only by querying the system before insertion.

Here’s the configuration for the ItemProcessor:

@Bean                                                          
public ItemProcessor<UserRegistration, UserRegistration> userRegistrationValidationItemProcessor() {
    return new UserRegistrationValidationItemProcessor();
}

In the interest of keeping this class short, I won’t reprint it in its entirety, but the salient bits should be obvious.

package com.apress.springrecipes.springbatch;                                                                                                    
import java.util.Arrays;
import java.util.Collection;


import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ItemProcessor;
import com.apress.springrecipes.springbatch.UserRegistration;


public class UserRegistrationValidationItemProcessor
    implements ItemProcessor<UserRegistration, UserRegistration> {


        private String stripNonNumbers(String input) { /* ... */ }

        private boolean isTelephoneValid(String telephone) { /* ... */ }

        private boolean isZipCodeValid(String zip) { /* ... */ }

        private boolean isValidState(String state) { /* ... */ }

        public UserRegistration process(UserRegistration input) throws Exception {
            String zipCode = stripNonNumbers(input.getZip());
            String telephone = stripNonNumbers(input.getPhoneNumber());
            String state = StringUtils.defaultString(input.getState());
            if (isTelephoneValid(telephone) && isZipCodeValid(zipCode) && isValidState(state)) {
                input.setZip(zipCode);
                input.setPhoneNumber(telephone );
                return input;
            }
            return null;
        }
}

The class is a parameterized type. The type information is the type of the input, as well as the type of the output. The input is what’s given to the method for processing, and the output is the returned data from the method. Because you’re not transforming anything in this example, the two parameterized types are the same. Once this process has completed, there’s a lot of useful information to be had in the Spring Batch metadata tables. Issue the following query on your database:

select * from BATCH_STEP_EXECUTION;

Among other things, you’ll get back the exit status of the job, how many commits occurred, how many items were read, and how many items were filtered. So if the preceding job was run on a batch with 100 rows, each item was read and passed through the processor, and it found 10 items invalid (it returned null 10 times), the value for the filter_count column would be 10. You could see that a 100 items were read from the read_count. The write_count column would reflect that 10 items didn’t make it and would show 90.

Chain Processors Together

Sometimes you might want to add extra processing that isn’t congruous with the goals of the processor you’ve already set up. Spring Batch provides a convenience class, CompositeItemProcessor<I,O>, that forwards the output of the filter to the input of the successive filter. In this way, you can write many, singly focused ItemProcessor<I,O> s and then reuse them and chain them as necessary.

@Bean
public CompositeItemProcessor<Customer, Customer> compositeBankCustomerProcessor() {
    List<ItemProcessor<Customer, Customer>> delegates = Arrays.asList(creditScoreValidationProcessor(), salaryValidationProcessor(), customerEligibilityProcessor());
    CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>();
    processor.setDelegates(delegates);
    return processor;
}

The example created a simple workflow. The first ItemProcessor<T> will take an input of whatever’s coming from the ItemReader<T> configured for this job, presumably a Customer object. It will check the credit score of the Customer and, if approved, forward the Customer to the salary and income validation processor. If everything checks out there, the Customer will be forwarded to the eligibility processor, where the system is checked for duplicates or any other invalid data. It will finally be forwarded to the writer to be added to the output. If at any point in the three processors the Customer fails a check, the executing ItemProcessor can simply return null and arrest processing .

11-5. Achieve Better Living Through Transactions

Problem

You want your reads and writes to be robust. Ideally, they’ll use transactions where appropriate and correctly react to exceptions.

Solution

Transaction capabilities are built on top of the first-class support already provided by the core Spring framework. Where relevant, Spring Batch surfaces the configuration so that you can control it. Within the context of chunk-oriented processing, it also exposes a lot of control over the frequency of commits, rollback semantics, and so on.

How It Works

First you explore how to make a step (or chunk) transactional followed by the configuration of retry logic on a step.

Transactions

Spring’s core framework provides first-class support for transactions. You simply wire up a PlatformTransactionManager and give Spring Batch a reference, just as you would in any regular JdbcTemplate or HibernateTemplate solution. As you build your Spring Batch solutions, you’ll be given opportunities to control how steps behave in a transaction. You’ve already seen some of the support for transactions baked right in.

The configuration used in all these examples established a DriverManagerDataSource and a DataSourceTransactionManager bean. The PlatformTransactionManager and DataSource were then wired to the JobRepository, which was in turn wired to the JobLauncher, which you used to launch all jobs thus far. This enabled all the metadata your jobs created to be written to the database in a transactional way.

You might wonder why there is no explicit mention of the transaction manager when you configured the JdbcItemWriter with a reference to dataSource. The transaction manager reference can be specified, but in your solutions, it wasn’t required because Spring Batch will, by default, try to pluck the PlatformTransactionManager named transactionManager from the context and use it. If you want to explicitly configure this, you can specify the transactionManager property on the tasklet configuration method. A simple transaction manager for JDBC work might look like this:

@Bean
protected Step step1() {
    return steps.get("step1")
        .<UserRegistration,UserRegistration>chunk(5)
        .reader(csvFileReader())
        .processor(userRegistrationValidationItemProcessor())
        .writer(jdbcItemWriter())
        .transactionManager(new DataSourceTransactionManager(dataSource))
        .build();
}

Items read from an ItemReader<T> are normally aggregated. If a commit on the ItemWriter<T> fails, the aggregated items are kept and then resubmitted. This process is efficient and works most of the time. One place where it breaks semantics is when reading from a transactional resource (like a JMS queue or database). Reads from a message queue can and should be rolled back if the transaction they participate in (in this case, the transaction for the writer) fails .

@Bean
protected Step step1() {
    return steps.get("step1")
        .<UserRegistration,UserRegistration>chunk(5)
        .reader(csvFileReader()).readerIsTransactionalQueue()
        .processor(userRegistrationValidationItemProcessor())
        .writer(jdbcItemWriter())
        .transactionManager(new DataSourceTransactionManager(dataSource))
        .build();
}

Rollbacks

Handling the simple case (“read X items, and every Y items, commit a database transaction every Y items”) is easy. Spring Batch excels in the robustness it surfaces as simple configuration options for the edge and failure cases.

If a write fails on an ItemWriter, or some other exception occurs in processing, Spring Batch will roll back the transaction. This is valid handling for a majority of the cases. There may be some scenarios when you want to control which exceptional cases cause the transaction to roll back.

When using Java-based configuration to enable rollbacks, the first step needs to be a fault-tolerant step, which in turn can be used to specify the no-rollback exceptions. First use faultTolerant() to obtain a fault-tolerant step, next the skipLimit() method can be used to specify the number of ignored rollbacks before actually stopping the job execution, and finally the noRollback() method can be used to specify the exceptions that don’t trigger a rollback. To specify multiple exceptions, you can simply chain calls to the noRollback() method.

@Bean
protected Step step1() {
    return steps.get("step1")
        .<UserRegistration,UserRegistration>chunk(10)
            .faultTolerant()
                .noRollback(com.yourdomain.exceptions.YourBusinessException.class)
        .reader(csvFileReader())
        .processor(userRegistrationValidationItemProcessor())
        .writer(jdbcItemWriter())
        .build();
}

11-6. Retry

Problem

You are dealing with a requirement for functionality that may fail but is not transactional. Perhaps it is transactional but unreliable. You want to work with a resource that may fail when you try to read from or write to it. It may fail because of networking connectivity because an endpoint is down or for any other number of reasons. You know that it will likely be back up soon, though, and that it should be retried.

Solution

Use Spring Batch’s retry capabilities to systematically retry the read or write.

How It Works

As you saw in the previous recipe , it’s easy to handle transactional resources with Spring Batch. When it comes to transient or unreliable resources, a different tack is required. Such resources tend to be distributed or manifest problems that eventually resolve themselves. Some (such as web services) cannot inherently participate in a transaction because of their distributed nature. There are products that can start a transaction on one server and propagate the transactional context to a distributed server and complete it there, although this tends to be very rare and inefficient. Alternatively, there’s good support for distributed (“global” or XA) transactions if you can use it. Sometimes, however, you may be dealing with a resource that isn’t either of those. A common example might be a call made to a remote service, such as an RMI service or a REST endpoint. Some invocations will fail but may be retried with some likelihood of success in a transactional scenario. For example, an update to the database resulting in org.springframework.dao.DeadlockLoserDataAccessException might be usefully retried.

Configure a Step

When using Java-based configuration to enable retrying, the first step needs to be a fault-tolerant step, which in turn can be used to specify the retry limit and retryable exceptions. First use faultTolerant() to obtain a fault-tolerant step, next the retryLimit() method can be used to specify the number of retry attempts, and finally the retry() method can be used to specify the exceptions that trigger a retry. To specify multiple exceptions, you can simply chain calls to the retry() method.

@Bean
public Step step1() {
    return steps.get("User Registration CSV To DB Step")
        .<UserRegistration,UserRegistration>chunk(10)
            .faultTolerant()
                .retryLimit(3).retry(DeadlockLoserDataAccessException.class)
        .reader(csvFileReader())
        .writer(jdbcItemWriter())
        .transactionManager(transactionManager)
        .build();
}

Retry Template

Alternatively, you can leverage Spring Retry support for retries and recovery in your own code. For example, you can have a custom ItemWriter<T> in which retry functionality is desired or even an entire service interface for which retry support is desired.

Spring Batch supports these scenarios through the RetryTemplate that (much like its various other Template cousins) isolates your logic from the nuances of retries and instead enables you to write the code as though you were going to attempt it only once. Let Spring Batch handle everything else through declarative configuration.

The RetryTemplate supports many use cases, with convenient APIs to wrap otherwise tedious retry/fail/recover cycles in concise, single-method invocations.

Let’s take a look at the modified version of a simple ItemWriter<T> from recipe 11-4 on how to write a custom ItemWriter<T>. The solution was simple enough and would ideally work all the time. It fails to handle the error cases for the service, however. When dealing with RPC, always proceed as if it’s almost impossible for things to go right; the service itself may surface a semantic or system violation. An example might be a duplicate database key, invalid credit card number, and so on. This is true whether the service is distributed or in-VM, of course .

Next, the RPC layer below the system may also fault. Here’s the rewritten code, this time allowing for retries:

ppackage com.apress.springrecipes.springbatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.support.RetryTemplate;


import java.util.List;

public class RetryableUserRegistrationServiceItemWriter implements ItemWriter<UserRegistration> {

    private static final Logger logger = LoggerFactory.getLogger(RetryableUserRegistrationServiceItemWriter.class);

    private final UserRegistrationService userRegistrationService;
    private final RetryTemplate retryTemplate;


    public RetryableUserRegistrationServiceItemWriter(UserRegistrationService userRegistrationService, RetryTemplate retryTemplate) {
        this.userRegistrationService = userRegistrationService;
        this.retryTemplate = retryTemplate;
    }


    public void write(List<?extends UserRegistration> items)
        throws Exception {
        for (final UserRegistration userRegistration : items) {
            UserRegistration registeredUserRegistration = retryTemplate.execute(
                (RetryCallback<UserRegistration, Exception>) context -> userRegistrationService.registerUser(userRegistration));


            logger.debug("Registered: {}", registeredUserRegistration);
        }
    }
}

As you can see, the code hasn’t changed much, and the result is much more robust . The RetryTemplate itself is configured in the Spring context, although it’s trivial to create in code. I declare it in the Spring context only because there is some surface area for configuration when creating the object, and I try to let Spring handle the configuration.

One of the more useful settings for the RetryTemplate is the BackOffPolicy in use. The BackOffPolicy dictates how long the RetryTemplate should back off between retries. Indeed, there’s even support for growing the delay between retries after each failed attempt to avoid lock stepping with other clients attempting the same invocation. This is great for situations in which there are potentially many concurrent attempts on the same resource and a race condition may ensue. There are other BackOffPolicy settings, including one that delays retries by a fixed amount called FixedBackOffPolicy.

@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
}


@Bean
public ExponentialBackOffPolicy backOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMaxInterval(10000);
    backOffPolicy.setMultiplier(2);
    return backOffPolicy;
}

You have configured a RetryTemplate’s backOffPolicy so that backOffPolicy will wait 1 second (1,000 milliseconds) before the initial retry. Subsequent attempts will double that value (the growth is influenced by the multiplier). It’ll continue until the maxInterval is met, at which point all subsequent retry intervals will level off, retrying at a consistent interval.

AOP-Based Retries

An alternative is an AOP adviser provided by Spring Batch that will wrap invocations of methods whose success is not guaranteed in retries, as you did with the RetryTemplate. In the previous example, you rewrote an ItemWriter<T> to make use of the template. Another approach might be to merely advise the entire userRegistrationService proxy with this retry logic. In this case, the code could go back to the way it was in the original example, with no RetryTemplate!

To do so, you would annotate the method (or methods) to be retryable with the @Retryable annotation. To achieve the same as in the code with an explicit RetryTemplate, you would need to add the following.

@Retryable(backoff = @Backoff(delay = 1000, maxDelay = 10000, multiplier = 2))
public UserRegistration registerUser(UserRegistration userRegistrationRegistration) { ... }

Only adding this annotation isn’t enough; you would also need to enable annotation processing for this with the @EnableRetry annotation on your configuration .

@Configuration
@EnableBatchProcessing
@EnableRetry
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
public class BatchConfiguration { ... }

11-7. Control Step Execution

Problem

You want to control how steps are executed, perhaps to eliminate a needless waste of time by introducing concurrency or by executing steps only if a condition is true.

Solution

There are different ways to change the runtime profile of your jobs, mainly by exerting control over the way steps are executed: concurrent steps, decisions, and sequential steps.

How It Works

Thus far, you have explored running one step in a job . Typical jobs of almost any complexity will have multiple steps, however. A step provides a boundary (transactional or not) to the beans and logic it encloses. A step can have its own reader, writer, and processor. Each step helps decide what the next step will be. A step is isolated and provides focused functionality that can be assembled using the updated schema and configuration options in Spring Batch in sophisticated workflows. In fact, some of the concepts and patterns you’re about to see will be familiar if you have an interest in business process management (BPM) systems and workflows. BPM provides many constructs for process or job control that are similar to what you’re seeing here. A step often corresponds to a bullet point when you outline the definition of a job on paper. For example, a batch job to load the daily sales and produce a report might be proposed as follows:

  1. Load customers from the CSV file into the database.

  2. Calculate daily statistics and write to a report file.

  3. Send messages to the message queue to notify an external system of the successful registration for each of the newly loaded customers.

Sequential Steps

In the previous example, there’s an implied sequence between the first two steps; the audit file can’t be written until all the registrations have completed. This sort of relationship is the default relationship between two steps. One occurs after the other. Each step executes with its own execution context and shares only a parent job execution context and an order.

@Bean
public Job nightlyRegistrationsJob () {
    return jobs.get("nightlyRegistrationsJob ")
        .start(loadRegistrations())
        .next(reportStatistics())
        .next(...)
        .build();
    }
}

Concurrency

The first version of Spring Batch was oriented toward batch processing inside the same thread and, with some alteration, perhaps inside the virtual machine. There were workarounds, of course, but the situation was less than ideal.

In the outline for this example job, the first step had to come before the second two because the second two are dependent on the first. The second two, however, do not share any such dependencies. There’s no reason why the audit log couldn’t be written at the same time as the JMS messages are being delivered. Spring Batch provides the capability to fork processing to enable just this sort of arrangement.

@Bean
public Job insertIntoDbFromCsvJob() {
    JobBuilder builder = jobs.get("insertIntoDbFromCsvJob");
    return builder
        .start(loadRegistrations())
        .split(taskExecutor())
            .add(
                builder.flow(reportStatistics()),
                builder.flow(sendJmsNotifications()))
        .build();
}

You can use the split() method on the job builder. To make a step into a flow , the flow() method of the job builder can be used; then, to add more steps to the flow, these can be added with the next() method. The split() method requires a TaskExecutor to be set; see recipe 2-23 for more information on scheduling and concurrency.

In this example, there’s nothing to prevent you from having many steps within the flow elements, nor was there anything preventing you from having more steps after the split element. The split element, like the step elements, takes a next attribute as well.

Spring Batch provides a mechanism to offload processing to another process. This distribution requires some sort of durable, reliable connection. This is a perfect use of JMS because it’s rock-solid and transactional, fast, and reliable. Spring Batch support is modeled at a slightly higher level, on top of the Spring Integration abstractions for Spring Integration channels. This support is not in the main Spring Batch code; it can be found in the spring-batch-integration project. Remote chunking lets individual steps read and aggregate items as usual in the main thread. This step is called the master. Items read are sent to the ItemProcessor<I,O>/ItemWriter<T> running in another process (this is called the slave). If the slave is an aggressive consumer, you have a simple, generic mechanism to scale: work is instantly farmed out over as many JMS clients as you can throw at it. The aggressive-consumer pattern refers to the arrangement of multiple JMS clients all consuming the same queue’s messages. If one client consumes a message and is busy processing, other idle queues will get the message instead. As long as there’s a client that’s idle, the message will be processed instantly.

Additionally, Spring Batch supports implicitly scaling out using a feature called partitioning. This feature is interesting because it’s built in and generally very flexible. You replace your instance of a step with a subclass, PartitionStep, which knows how to coordinate distributed executors and maintains the metadata for the execution of the step, thus eliminating the need for a durable medium of communication as in the “remote chunking” technology.

The functionality here is also very generic. It could, conceivably, be used with any sort of grid fabric technology such as GridGain or Hadoop. Spring Batch ships with only a TaskExecutorPartitionHandler, which executes steps in multiple threads using a TaskExecutor strategy. This simple improvement might be enough of a justification for this feature! If you’re really hurting, however, you can extend it .

Conditional Steps with Statuses

Using the ExitStatus of a given job or step to determine the next step is the simplest example of a conditional flow. Spring Batch facilitates this through the use of the stop, next, fail, and end elements. By default, assuming no intervention, a step will have an ExitStatus that matches its BatchStatus, which is a property whose values are defined in an enum and may be any of the following: COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, or UNKNOWN.

Let’s look at an example that executes one of two steps based on the success of a preceding step:

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
            .on("COMPLETED").to(step2())
            .on("FAILED").to(failureStep())
        .build();
}

It’s also possible to provide a wildcard. This is useful if you want to ensure a certain behavior for any number of BatchStatus values, perhaps in tandem with a more specific next element that matches only one BatchStatus.

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
            .on("COMPLETED").to(step2())
            .on("*").to(failureStep())
        .build();
}

In this example, you are instructing Spring Batch to perform some step based on any unaccounted-for ExitStatus. Another option is to just stop processing altogether with a BatchStatus of FAILED. You can do this using the fail element. A less aggressive rewrite of the preceding example might be the following:

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
            .on("COMPLETED").to(step2())
            .on("FAILED").fail()
        .build();
}

In all these examples, you’re reacting to the standard BatchStatus values that the Spring Batch framework provides. But it’s also possible to raise your own ExitStatus. If, for example, you wanted the whole job to fail with a custom ExitStatus of MAN DOWN, you might do something like this:

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
            .on("COMPLETED").to(step2())
            .on("FAILED").end("MAN DOWN")
        .build();
}

Finally, if all you want to do is end processing with a BatchStatus of COMPLETED, you can use the end() method. This is an explicit way of ending a flow as if it had run out of steps and incurred no errors .

@Bean
public Job insertIntoDbFromCsvJob() {
    return jobs.get("User Registration Import Job")
        .start(step1())
            .on("COMPLETED").end()
            .on("FAILED").to(errorStep())
        .build();
}

Conditional Steps with Decisions

If you want to vary the execution flow based on some logic more complex than a job’s ExitStatus values, you may give Spring Batch a helping hand by using a decision element and providing it with an implementation of a JobExecutionDecider.

package com.apress.springrecipes.springbatch;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;


public class HoroscopeDecider implements JobExecutionDecider {

    private boolean isMercuryIsInRetrograde () { return Math.random() > .9 ; }

    public FlowExecutionStatus decide(JobExecution jobExecution,
                                      StepExecution stepExecution) {
        if (isMercuryIsInRetrograde()) {
            return new FlowExecutionStatus("MERCURY_IN_RETROGRADE");
        }
        return FlowExecutionStatus.COMPLETED;
    }
}

All that remains is the configuration, shown here:

@Bean                                                                  
public Job insertIntoDbFromCsvJob() {
    JobBuilder builder = jobs.get("insertIntoDbFromCsvJob");
    return builder
        .start(step1())
        .next((horoscopeDecider())
            .on("MERCURY_IN_RETROGRADE").to(step2())
            .on(("COMPLETED ").to(step3())
        .build();
}

11-8. Launch a Job

Problem

What deployment scenarios does Spring Batch support? How does Spring Batch launch? How does Spring Batch work with a system scheduler, such as cron or autosys, or from a web application? You want to understand all this.

Solution

Spring Batch works well in all environments that Spring runs: your public static void main, OSGi, a web application—anywhere! Some use cases are uniquely challenging, though: it is rarely practical to run Spring Batch in the same thread as an HTTP response because it might end up stalling execution, for example. Spring Batch supports asynchronous execution for just this scenario. Spring Batch also provides a convenience class that can be readily used with cron or autosys to support launching jobs. Additionally, Spring’s excellent scheduler namespace provides a great mechanism to schedule jobs.

How It Works

Before you get into creating a solution , it’s important to know what options are available for deploying and running these solutions. All solutions require, at minimum, a job and a JobLauncher. You already configured these components in the previous recipe. The job is configured in your Spring application context, as you’ll see later. The simplest example of launching a Spring Batch solution from Java code is about five lines of Java code (three if you’ve already got a handle to the ApplicationContext)!

package com.apress.springrecipes.springbatch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;


import java.util.Date;

public class Main {
    public static void main(String[] args) throws Throwable {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("solution2.xml");


        JobLauncher jobLauncher = ctx.getBean("jobLauncher", JobLauncher.class);
        Job job = ctx.getBean("myJobName", Job.class);
        JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
    }
}

As you can see, the JobLauncher reference you configured previously is obtained and used to then launch an instance of a Job. The result is a JobExecution. You can interrogate the JobExecution for information on the state of the Job, including its exit status and runtime status .

JobExecution jobExecution = jobLauncher.run(job, jobParameters);
BatchStatus batchStatus = jobExecution.getStatus();
while(batchStatus.isRunning()) {
    System.out.println( "Still running...");
    Thread.sleep( 10 * 1000 ); // 10 seconds
}

You can also get the ExitStatus.

System.out.println( "Exit code: "+ jobExecution.getExitStatus().getExitCode());

The JobExecution also provides a lot of other useful information such as the create time of the Job, the start time, the last updated date, and the end time—all as java.util.Date instances. If you want to correlate the job back to the database, you’ll need the job instance and the ID .

JobInstance jobInstance = jobExecution.getJobInstance();
System.out.println( "job instance Id: "+ jobInstance.getId());

In this simple example, you use an empty JobParameters instance. In practice, this will work only once. Spring Batch builds a unique key based on the parameters and will use that to keep uniquely identifying one run of a given Job from another. You’ll learn about parameterizing a Job in detail in the next recipe.

Launch from a Web Application

Launching a job from a web application requires a slightly different approach because the client thread (presumably an HTTP request) can’t usually wait for a batch job to finish. The ideal solution is to have the job execute asynchronously when launched from a controller or action in the web tier, unattended by the client thread. Spring Batch supports this scenario through the use of a Spring TaskExecutor. This requires a simple change to the configuration for the JobLauncher, although the Java code can stay the same. Here, you will use a SimpleAsyncTaskExecutor that will spawn a thread of execution and manage that thread without blocking:

package com.apress.springrecipes.springbatch.config;

@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
public class BatchConfiguration {


    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}

As you cannot use the default settings anymore, you need to add your own implementation of a BatchConfigurer to configure the TaskExecutor and add it to the SimpleJobLauncher. For this implementation, you used the DefaultBatchConfigurer as a reference; you only override the createJobLauncher method to add the TaskExecutor.

package com.apress.springrecipes.springbatch.config;

import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;


@Component
public class CustomBatchConfigurer extends DefaultBatchConfigurer {


    private final TaskExecutor taskExecutor;

    public CustomBatchConfigurer(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }


    @Override
    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(this.taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

Run from the Command Line

Another common use case is deployment of a batch process from a system scheduler such as cron or autosys, or even Window’s event scheduler. Spring Batch provides a convenience class that takes as its parameters the name of the XML application context (that contains everything required to run a job) as well as the name of the job bean itself. Additional parameters may be provided and used to parameterize the job. These parameters must be in the form name=value. An example invocation of this class on the command line (on a Linux/Unix system), assuming that you set up the classpath, might look like this:

java CommandLineJobRunner jobs.xml hourlyReport date=`date +%m/%d/%Y time=date +%H`

The CommandLineJobRunner will even return system error codes (0 for success, 1 for failure, and 2 for an issue with loading the batch job) so that a shell (such as used by most system schedulers) can react or do something about the failure. More complicated return codes can be returned by creating and declaring a top-level bean that implements the interface ExitCodeMapper, in which you can specify a more useful translation of exit status messages to integer-based error codes that the shell will see on process exit.

Run on a Schedule

Spring has support for a scheduling framework (see also recipe 3-22). This framework lends itself perfectly to running Spring Batch. First, let’s modify the existing application context configuration to enable scheduling by using the @EnableScheduling annotation and by adding a ThreadPoolTaskScheduler.

package com.apress.springrecipes.springbatch.config;

@Configuration
@EnableBatchProcessing
@ComponentScan("com.apress.springrecipes.springbatch")
@PropertySource("classpath:/batch.properties")
@EnableScheduling
@EnableAsync
public class BatchConfiguration {


    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setThreadGroupName("batch-scheduler");
        taskScheduler.setPoolSize(10);
        return taskScheduler;
    }


}

These imports enable the simplest possible support for scheduling. The preceding annotations ensure that any bean under the package com.apress.springrecipes.springbatch will be configured and scheduled as required. The scheduler bean is as follows:

package com.apress.springrecipes.springbatch.scheduler;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;


import java.util.Date;

@Component
public class JobScheduler {


    private final JobLauncher jobLauncher;
    private final Job job;


    public JobScheduler(JobLauncher jobLauncher, Job job) {
        this.jobLauncher = jobLauncher;
        this.job = job;
    }


    public void runRegistrationsJob(Date date) throws Throwable {
        System.out.println("Starting job at " + date.toString());


        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("date", date);
        jobParametersBuilder.addString("input.file", "registrations");


        JobParameters jobParameters = jobParametersBuilder.toJobParameters();

        JobExecution jobExecution = jobLauncher.run(job, jobParameters);

        System.out.println("jobExecution finished, exit code: " + jobExecution.getExitStatus().getExitCode());
    }


    @Scheduled(fixedDelay = 1000 * 10)
    public void runRegistrationsJobOnASchedule() throws Throwable {
        runRegistrationsJob(new Date());
    }
}

There is nothing particularly novel here; it’s a good study of how the different components of the Spring Framework work well together. The bean is recognized and becomes part of the application context because of the @Component annotation, which you enabled with the @ComponentScan annotation in your configuration class. There’s only one Job in the UserJob class and only one JobLauncher, so you simply have those autowired into your bean. Finally, the logic for kicking off a batch run is inside the runRegistrationsJob(java.util.Date date) method. This method could be called from anywhere. Your only client for this functionality is the scheduled method runRegistrationsJobOnASchedule. The framework will invoke this method for you, according to the timeline dictated by the @Scheduled annotation.

There are other options for this sort of thing; traditionally in the Java and Spring world, this sort of problem would be a good fit for Quartz. It might still be, as the Spring scheduling support isn’t designed to be as extensible as Quartz. If you are in an environment requiring more traditional, ops-friendly scheduling tools, there are of course old standbys like cron, autosys, and BMC, too.

11-9. Parameterize a Job

Problem

The previous examples work well enough, but they leave something to be desired in terms of flexibility. To apply the batch code to some other file, you’d have to edit the configuration and hard-code the name in there. The ability to parameterize the batch solution would be very helpful.

Solution

Use JobParameters to parameterize a job , which is then available to your steps through Spring Batch’s expression language or via API calls.

How It Works

First you will see how to launch a job using JobParameters and after that you will learn how to use and access the JobParameters in a Job and the configuration.

Launch a Job with Parameters

A job is a prototype of a JobInstance. Job Parameters are used to provide a way of identifying a unique run of a job (a JobInstance). These JobParameters allow you to give input to your batch process, just as you would with a method definition in Java. You’ve seen the JobParameters in previous examples but not in detail. The JobParameters object is created as you launch the job using the JobLauncher. To launch a job called dailySalesFigures, with the date for the job to work with, you would write something like this:

package com.apress.springrecipes.springbatch;

import com.apress.springrecipes.springbatch.config.BatchConfiguration;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


import java.util.Date;

public class Main {
    public static void main(String[] args) throws Throwable {


        ApplicationContext context =
            new AnnotationConfigApplicationContext(BatchConfiguration.class);


        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean("dailySalesFigures", Job.class);


        jobLauncher.run(job, new JobParametersBuilder()
            .addDate( "date", new Date() ).toJobParameters());
    }
}

Access JobParameters

Technically, you can get at JobParameters via any of the ExecutionContexts (step and job). Once you have it, you can access the parameters in a type-safe way by calling getLong(), getString(), and so on. A simple way to do this is to bind to the @BeforeStep event, save the StepExecution, and iterate over the parameters this way. From here, you can inspect the parameters and do anything you want with them. Let’s look at that in terms of the ItemProcessor<I,O> you wrote earlier.

// ...                    
private StepExecution stepExecution;


@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}


public UserRegistration process(UserRegistration input) throws Exception {

    Map<String, JobParameter> params =  stepExecution.getJobParameters().getParameters();

    for (String jobParameterKey : params.keySet()) {
        System.out.println(String.format("%s=%s", jobParameterKey,
    params.get(jobParameterKey).getValue().toString()));
    }


    Date date = stepExecution.getJobParameters().getDate("date");
    // etc ...
}

This turns out to be of limited value. The 80 percent case is that you’ll need to bind parameters from the job’s launch to the Spring beans in the application context. These parameters are available only at runtime, whereas the steps in the XML application context are configured at design time. This happens in many places. Previous examples demonstrated ItemWriters<T> and ItemReaders<T> with hard-coded paths. That works fine unless you want to parameterize the file name. This is hardly acceptable unless you plan on using a job just once!

The core Spring Framework features an enhanced expression language that Spring Batch uses to defer binding of the parameter until the correct time (or, in this case, until the bean is in the correct scope). Spring Batch has the “step” scope for just this purpose. Let’s take a look at how you’d rework the previous example to use a parameterized file name for the ItemReader’s resource:

@Bean
@StepScope
public ItemReader<UserRegistration> csvFileReader(@Value("file:${user.home}/batches/#{jobParameters['input.fileName']}.csv") Resource input) { ... }

All you did is scope the bean (the FlatFileItemReader<T>) to the life cycle of a step (at which point those JobParameters will resolve correctly) and then used the EL syntax to parameterize the path to work off of.

Summary

This chapter introduced you to the concepts of batch processing, some of its history, and why it fits in a modern-day architecture. You learned about Spring Batch, the batch processing from SpringSource, and how to do reading and writing with ItemReader<T> and ItemWriter<T> implementations in your batch jobs. You wrote your own ItemReader<T> and ItemWriter <T> implementations as needed and saw how to control the execution of steps inside a job.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset