Spring Batch tip – Partitioning

Basically, Spring Batch Job runs in single thread. To increase the throughput, we need to parallelize the job by partitioning. The core architecture of Job partitioning is as follows.

spring_batch_partition

  • Partition Step : The wrapper of common Step with a partitioner and grid size (or partition size)
  • Single Step : Common Step with a Reader, a Processor and a Writer
  • Partitioner : The core part of partitioning. Partitioner generates Step Execution Context for each Single Step
  • Step Execution Context : usually contains query parameters for each Step

By partitioning a Job, we can increase concurrency as mush as grid size (or partition size). Now, I am showing you an example.

Example table

CREATE TABLE TB_PARTITION_SOURCE(
    DAY_OF_WEEK NUMBER,
    COL1 NUMBER,
    COL2 VARCHAR2(10),
    PRIMARY KEY(DAY_OF_WEEK, COL1)
);

CREATE TABLE TB_PARTITION_TARGET(
    NEW_COL1 NUMBER,
    NEW_COL2 VARCHAR2(10),
    PRIMARY KEY(NEW_COL1)
);

Notice that source table (TB_PARTITION_SOURCE) has a partition column whose value is 1 (Sunday) to 7 (Saturday).

To split the query, I use the following sql.

SELECT COL1, COL2
FROM TB_PARTITION_SOURCE
WHERE DAY_OF_WEEK BETWEEN ? AND ?

Example partitioner

Partitioner is the implementation of org.springframework.batch.core.partition.support.Partitioner. The role is to make a StepExecutionContext for each Step.

import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class DayOfWeekPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
if(gridSize >= 7) {
gridSize = 7;
}
int [] slots = calcSlotSize(gridSize);
int startIdx = 1;
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
for(int i = 0; i < slots.length; i++) {
ExecutionContext context = new ExecutionContext();
context.put("StartDay", startIdx);
context.put("EndDay", startIdx + slots[i] - 1);
partitionMap.put("Partition-" + i, context);
startIdx += slots[i];
}
return partitionMap;
}
private int [] calcSlotSize(int gridSize) {
int [] slots = new int[gridSize];
int totalSlot = 7;
for(int i = 0; i < slots.length; i++) {
slots[i] = 0;
}
int idx = 0;
while(true) {
if(totalSlot == 0) break;
slots[idx % slots.length]++;
idx++;
totalSlot--;
}
return slots;
}
}

The example builds query parameters depending on the grid size.

Grid size Query Parameters (from, to)
1 1-7
2 1-4, 5-7
3 1-3, 4-5, 6-7
4 1-2, 3-4, 5-6, 7-7
5 1-2, 3-4, 5-5, 6-6, 7-7
6 1-2, 3-3, 4-4, 5-5, 6-6, 7-7
7 1-1, 2-2, 3-3, 4-4, 5-5, 6-6, 7-7

Example Spring Context

<batch:job id="TestPartitionJob" job-repository="jobRepository">
<batch:step id="TestPartitionStep">
<batch:partition step="SingleStep" partitioner="DayOfWeekPartitioner">
<batch:handler grid-size="7" task-executor="TaskExecutor" />
</batch:partition>
</batch:step>
</batch:job>
<bean id="DayOfWeekPartitioner"
class="test.reader.DayOfWeekPartitioner"
scope="step">
</bean>
<bean id="TaskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor"
scope="step">
<constructor-arg type="java.lang.String" value="test-executor"/>
<property name="concurrencyLimit" value="20"/>
</bean>
<batch:step id="SingleStep">
<batch:tasklet transaction-manager="testTransactionManager" allow-start-if-complete="true">
<batch:chunk
reader="TestReader"
processor="TestProcessor"
writer="TestWriter"
commit-interval="1"
reader-transactional-queue="false"
/>
</batch:tasklet>
</batch:step>

Partitioned Step is referencing “SingleStep” with a partitioner and grid size.

<bean id="ParameterSetter" class="test.reader.TestPartitionParameterSetter" scope="step">
<property name="startParam" value="#{stepExecutionContext['StartDay']}"/>
<property name="endParam" value="#{stepExecutionContext['EndDay']}"/>
</bean>
<bean id="TestReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"
scope="step">
<property name="dataSource" ref="testDataSource"/>
<property name="sql"
value="SELECT COL1, COL2
FROM TB_PARTITION_SOURCE
WHERE DAY_OF_WEEK BETWEEN ? AND ?"/>
<property name="rowMapper"
ref="SourceMapper"/>
<property name="preparedStatementSetter" ref="ParameterSetter"/>
<property name="fetchSize" value="100" />
<property name="maxRows" value="0" />
</bean>

SingleStep is a common Step with a Reader, a Processor and a Writer.

Full source code

You can download full source code from https://github.com/tkstone/spring_batch_sample01. The files to check are

  • src/main/resources/spring/job-test-partition-context.xml – main spring context
  • src/main/java/test/main/TestPartitionRun.java – main job invoker
  • src/main/java/test/reader/DayOfWeekPartitioner.java – partitioner
  • src/main/java/test/reader/TestPartitionParameterSetter.java – parameter setter

Some points to consider

  • To use partition, you must write a partitioner. That is, you must build the logic to split source data. This is the weak point compared to Hadoop whose partition is done automatically.
  • The source data (i.e. table) must have suitable partition key (range or list). If source table’s key is auto increment, partitioning is not suitable. (applying hash function to the key can disable index)
  • If parallelizing a Job is required, consider using Hadoop. But in case of RDBMS to RDBMS ETL, Spring Batch could be the choice.
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.