Transforming Data with Spring-Batch

While changing small amounts of data is easy to do with something like Spring-Data sometimes we need a bit more. For changing large amounts of data we use a process called ETL (Extract, Transform, Load). This means we read from a datasource, do something with it and then write the transformed data to a new datasource.

For operations like this Spring has Spring-Batch. Which works like this:

Spring Batch calls extract read, transform process and load write. They do this for each item and that is represented in the yellow blocks (interface names) in the picture above. A combination of those 3 is called a Step. Spring Batch works with Steps which are one ETL process. Sometimes we want to do more ETL processes that depend on each other that’s why Spring Batch puts steps in a Job. So when we want to run an ETL process we run A Job which contains one or more steps. We run the Job with a JobLauncher. Everything that happens during a Job is being managed by a JobRepository.

In the following example we look at the most basic type of ETL process possible.There are ways to detect errors and have special triggers, but we are not going to look at that this time. We are going to read a CSV file, transform the data and write it to a SQL database.

We are going to make use of the following maven dependencies:

  • spring-boot-starter-jdbc
  • mysql-connector-java
  • spring-boot-starter-batch version 4.0.0.M2
  • lombok

We need a database with the following table in it:

CREATE TABLE IF NOT EXISTS `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(120) NOT NULL,
  `price` decimal(10,2) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 AUTO_INCREMENT=1 ;

To connect with the database we write the following in our application.properties:

spring.datasource.url=jdbc:mysql://localhost:3307/databasename
spring.datasource.username=root
spring.datasource.password=usbw
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.name="databasename"
spring.datasource.initialize=false

Now Spring has created a connection to the database.

We also need a CSV file to read data from. Let’s create a CSV file under resources called products.csv. Fill it with rows like: pita,2.11 for how many times you want. Remember to not have an empty row somewhere, because we won’t have error handling.

Spring Batch needs a object to transfer the data. For this we create a Product class:

@Data
@NoArgsConstructor
public class Product{

    private Long id;

    private String name;

    private double price;
}

The annotations are from Lombok and create boilerplate code explained here.

We create a class in which we will build our batch Job and give it the ability to run itself.

@EnableBatchProcessing
@SpringBootApplication
class BatchConfiguration
{
    public static void main(String[] args) {
        SpringApplication.run(BatchConfiguration.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;

    private final StepBuilderFactory stepBuilderFactory;

    private final DataSource dataSource;

    @Autowired
    BatchConfiguration(StepBuilderFactory stepBuilderFactory, JobBuilderFactory jobBuilderFactory,
                       DataSource dataSource) {
        this.stepBuilderFactory = stepBuilderFactory;
        this.jobBuilderFactory = jobBuilderFactory;
        this.dataSource = dataSource;
    }
}
  • @EnableBatchProcessing gives us the following beans: jobRepository, jobLauncher, jobRegistry, transactionManager, jobBuilders, stepBuilders, which make it easy to quickly run a Job.
  • We autowire the database and the factory’s to create jobs and steps.

As we just found out we need a Job, so let’s create a Job Bean.

@Bean
public Job ourJob() throws Exception {
     return jobBuilderFactory.get("jobname")
                            .incrementer(new RunIdIncrementer())
                            .start(createStep())
                            .build();
}
  • get gives the Job a name
  • incrementer gives the Job an id
  • start tells the Job with which Step to begin
  • build create an instance of the Job
@Bean
public Step createStep() throws Exception {
    return stepBuilderFactory.get("createStep")
            .<Product, Product>chunk(100)
            .reader(readFromCSV())
            .processor(new DoubleProductPriceProcessor())
            .writer(writeProductsToDatabase())
            .build();
}
  • get give the Step a name
  • chunk says what items we read in and what items we write out. It also tells use with how many items at the time we do this.
  • The following 3 lines configure our ETL after which we build the Step instance.

For our reader we create the following Bean:

@Bean
public FlatFileItemReader readFromCSV() throws Exception {
    FlatFileItemReader reader = new FlatFileItemReader<>();
    Resource resource = new ClassPathResource("products.csv");  
    reader.setResource(resource);
    reader.setLinesToSkip(0);
    reader.setLineMapper(new DefaultLineMapper() {{
        setLineTokenizer(new DelimitedLineTokenizer() {{
            setNames(new String[]{"name", "price"});
        }});
        setFieldSetMapper(new ProductFieldSetMapper());
    }});
    return reader;
}
  • First we create a reader that reads flat files like .csv or .txt
  • To let the reader know what file to read we give it a resource which we first create.
  • Sometimes the first row of a csv has the names of the columns, so that’s why you can set lines to skip. In our case have have no column names, so we skip 0 lines.
  • The LineMapper tells at what token lines should be broken. The default is a comma. The line breaks into strings and we give them a name. in this case name and price. After that we use a ProductFieldSetMapper to make a Product instance out of the strings.

The ProductFieldSetMapper looks like this:

class ProductFieldSetMapper implements FieldSetMapper {

    @Override
    public Product mapFieldSet(FieldSet fieldSet) throws BindException {
        Product product = new Product();
        product.setName(fieldSet.readString("name"));
        product.setPrice(fieldSet.readDouble("price"));
        return product;
    }
}

For our processor we create the following class. There are no default concrete processor classes, because there are way to many processing options possible.

public class DoubleProductPriceProcessor implements ItemProcessor<Product, Product>
{
	@Override
	public Product process(Product product) throws Exception {
		return product.setPrice(product.getPrice()*2);
		return product;
	}
}

The processor takes in a Product and gives Product back. We take the old price double it and return the Product.

For our writer we create the following Bean:

@Bean
JdbcBatchItemWriter writeProductsToDatabase() {
    JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    writer.setJdbcTemplate(new NamedParameterJdbcTemplate(dataSource));
    String QUERY_INSERT = "INSERT INTO product (name, price) VALUES (?, ?)";

    writer.setSql(QUERY_INSERT);

    writer.setItemPreparedStatementSetter(new ProductPreparedStatementSetter());

    return writer;
}
  • We create our writer and tell it where to write too and in which way. We do with this setting and datasource and a JdbcTemplate.
  • To know where in de database what should be placed we give the writer an SQL query. In this case we fill our product table.
  • Since we use a PreparedStatement which protects us against SQL injection, we need a ProductPreparedStatementSetter. Each question mark in the query has a number starting at 1 and we tell what value should be placed at the question mark.

The ProductPreparedStatementSetter looks like this:

class ProductPreparedStatementSetter implements ItemPreparedStatementSetter {
    @Override
    public void setValues(Product product, PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, product.getName());
        preparedStatement.setDouble(2, product.getPrice());
    }
}

Now we got everything we need to run this basic batch application. Lets run it and see the table fill with the data from our CSV with double prices.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s