Spring Batch
Spring Batch 是企业级批处理框架,提供作业调度、分块处理、重试/跳过、状态持久化等能力,适合大规模数据迁移、报表生成、ETL 场景。可与 @Scheduled 或外部调度系统(Quartz、XXL-Job)配合触发。
核心概念
Job
└── Step[]
├── Chunk-oriented Step: ItemReader → ItemProcessor → ItemWriter
└── Tasklet Step: 单次执行任意逻辑
| 组件 | 说明 |
|---|---|
| Job | 批处理任务的顶层单元,由一个或多个 Step 组成 |
| Step | Job 的执行阶段,可串行或并行 |
| ItemReader | 从数据源逐条读取数据 |
| ItemProcessor | 对单条数据做转换/过滤(可选) |
| ItemWriter | 将一批数据写出到目标 |
| JobRepository | 持久化 Job/Step 的执行状态(元数据) |
| JobLauncher | 启动 Job 的入口 |
| JobParameters | 每次运行传入的参数(影响 Job 实例标识) |
| ExecutionContext | Job/Step 级别的上下文,可存储状态用于重启 |
快速开始
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- 元数据需要数据库,开发时可用 H2 -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>配置:
spring:
batch:
job:
enabled: false # 禁止启动时自动执行,手动触发
jdbc:
initialize-schema: always # 自动建批处理元数据表Chunk 模式(读-处理-写)
最常用的 Step 类型,按 chunk-size 分批提交事务:
@Configuration
@EnableBatchProcessing // Spring Batch 3.x;4.x 以后 Boot 自动配置
public class CsvImportJobConfig {
@Bean
public Job importUserJob(JobRepository repo, Step csvStep) {
return new JobBuilder("importUserJob", repo)
.start(csvStep)
.build();
}
@Bean
public Step csvStep(JobRepository repo, PlatformTransactionManager tm,
ItemReader<UserCsv> reader,
ItemProcessor<UserCsv, User> processor,
ItemWriter<User> writer) {
return new StepBuilder("csvStep", repo)
.<UserCsv, User>chunk(500, tm) // 每 500 条提交一次事务
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skip(ParseException.class).skipLimit(100) // 跳过解析异常,最多 100 次
.retry(TransientDataAccessException.class).retryLimit(3)
.build();
}
}ItemReader
FlatFileItemReader — 读 CSV/TXT
@Bean
@StepScope
public FlatFileItemReader<UserCsv> csvReader(
@Value("#{jobParameters['inputFile']}") String file) {
return new FlatFileItemReaderBuilder<UserCsv>()
.name("csvReader")
.resource(new FileSystemResource(file))
.delimited().delimiter(",")
.names("name", "email", "age")
.targetType(UserCsv.class)
.linesToSkip(1) // 跳过表头
.build();
}JdbcCursorItemReader — 读数据库(游标)
@Bean
@StepScope
public JdbcCursorItemReader<User> dbReader(DataSource ds) {
return new JdbcCursorItemReaderBuilder<User>()
.name("dbReader")
.dataSource(ds)
.sql("SELECT * FROM users WHERE status = 'PENDING'")
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.build();
}JdbcPagingItemReader — 读数据库(分页,推荐大数据量)
@Bean
@StepScope
public JdbcPagingItemReader<User> pagingReader(DataSource ds) {
Map<String, Order> sortKeys = Map.of("id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<User>()
.name("pagingReader")
.dataSource(ds)
.selectClause("SELECT *")
.fromClause("FROM users")
.whereClause("WHERE status = 'PENDING'")
.sortKeys(sortKeys)
.pageSize(500)
.rowMapper(new BeanPropertyRowMapper<>(User.class))
.build();
}与 JPA与Hibernate 整合可使用 JpaPagingItemReader;与 MyBatis 整合可使用 MyBatisCursorItemReader。
ItemProcessor
@Component
public class UserProcessor implements ItemProcessor<UserCsv, User> {
@Override
public User process(UserCsv item) throws Exception {
// 返回 null 表示过滤该条数据,不传给 Writer
if (!isValid(item)) return null;
return User.builder()
.name(item.getName().trim())
.email(item.getEmail().toLowerCase())
.age(Integer.parseInt(item.getAge()))
.build();
}
}链式 Processor
CompositeItemProcessor<UserCsv, User> composite = new CompositeItemProcessorBuilder<UserCsv, User>()
.delegates(validateProcessor, transformProcessor, enrichProcessor)
.build();ItemWriter
JdbcBatchItemWriter — 批量写入数据库
@Bean
public JdbcBatchItemWriter<User> dbWriter(DataSource ds) {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(ds)
.sql("INSERT INTO users (name, email, age) VALUES (:name, :email, :age)")
.beanMapped() // 使用 Bean 属性名映射 SQL 参数
.build();
}FlatFileItemWriter — 写 CSV/TXT
@Bean
@StepScope
public FlatFileItemWriter<User> csvWriter(
@Value("#{jobParameters['outputFile']}") String file) {
return new FlatFileItemWriterBuilder<User>()
.name("csvWriter")
.resource(new FileSystemResource(file))
.delimited().delimiter(",")
.names("name", "email", "age")
.build();
}Tasklet 模式
适合不需要 Read-Process-Write 的单次操作(如清理临时表、发送通知):
@Bean
public Step cleanupStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("cleanupStep", repo)
.tasklet((contribution, chunkContext) -> {
jdbcTemplate.update("DELETE FROM temp_import");
return RepeatStatus.FINISHED;
}, tm)
.build();
}Job 流程控制
顺序执行
new JobBuilder("job", repo)
.start(step1())
.next(step2())
.next(step3())
.build();条件分支(基于 ExitStatus)
new JobBuilder("job", repo)
.start(step1())
.on("COMPLETED").to(step2())
.on("FAILED").to(errorStep())
.on("*").to(step3()) // 通配符
.end()
.build();并行执行 Step(Split)
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1").start(step1()).build();
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2").start(step2()).build();
new JobBuilder("job", repo)
.start(new FlowBuilder<SimpleFlow>("parallel")
.split(new SimpleAsyncTaskExecutor())
.add(flow1, flow2)
.build())
.end()
.build();分区并行(Partitioning)
将大数据集按 ID 范围切片,每个分区在独立线程/节点执行,适合超大数据量:
@Bean
public Step partitionedStep(JobRepository repo, Step workerStep,
PartitionHandler handler) {
return new StepBuilder("partitionedStep", repo)
.partitioner("workerStep", new RangePartitioner(0, 1_000_000, 10))
.partitionHandler(handler)
.build();
}
@Bean
public PartitionHandler partitionHandler(Step workerStep) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(workerStep);
handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
handler.setGridSize(10); // 10 个分区
return handler;
}
// workerStep 使用 @StepScope 获取分区参数
@Bean
@StepScope
public JdbcPagingItemReader<User> workerReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) { ... }@Value 中的 SpEL 语法 用于从 stepExecutionContext 动态绑定分区参数。
多线程 Step
同一 Step 内多线程并发处理 chunk(Reader 需支持线程安全):
@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("multiThreadedStep", repo)
.<User, User>chunk(200, tm)
.reader(synchronizedReader()) // 包装成线程安全
.processor(processor())
.writer(writer())
.taskExecutor(new SimpleAsyncTaskExecutor())
.throttleLimit(8) // 最大并发线程数
.build();
}更多异步处理模式参见 异步与线程池。
作业重启与跳过
重启
批处理失败后,使用相同 JobParameters 重新启动,会从失败的 Step 续跑(不重复已完成的 Step):
// 使 Job 可重启(默认即可重启)
new StepBuilder("step", repo)
.<User, User>chunk(500, tm)
.allowStartIfComplete(false) // 已完成的 Step 不重跑
...Skip(跳过异常记录)
.faultTolerant()
.skip(ParseException.class)
.skip(ConstraintViolationException.class)
.skipLimit(1000)
.skipPolicy(new AlwaysSkipItemSkipPolicy()) // 自定义策略Retry(重试)
.faultTolerant()
.retry(TransientDataAccessException.class)
.retryLimit(3)
.retryPolicy(new SimpleRetryPolicy(3, Map.of(
TransientDataAccessException.class, true
)))监听器
// Job 监听
public class JobNotificationListener implements JobExecutionListener {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("Job 完成,共处理 {} 条", jobExecution.getStepExecutions()
.stream().mapToLong(StepExecution::getWriteCount).sum());
}
}
}
// Step 监听
public class StepProgressListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("Step {} 读 {} 条,写 {} 条,跳 {} 条",
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount());
return stepExecution.getExitStatus();
}
}
// Item 监听(记录被跳过的数据)
public class SkipLoggingListener implements SkipListener<UserCsv, User> {
@Override
public void onSkipInProcess(UserCsv item, Throwable t) {
log.warn("处理跳过: item={}, reason={}", item, t.getMessage());
}
}手动触发与外部集成
REST 接口触发
@RestController
@RequiredArgsConstructor
public class BatchController {
private final JobLauncher jobLauncher;
private final Job importUserJob;
@PostMapping("/batch/import")
public String triggerImport(@RequestParam String file) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", file)
.addLong("timestamp", System.currentTimeMillis()) // 保证每次是新实例
.toJobParameters();
JobExecution execution = jobLauncher.run(importUserJob, params);
return execution.getStatus().toString();
}
}定时触发
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点
public void nightlyImport() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLocalDate("date", LocalDate.now())
.toJobParameters();
jobLauncher.run(importUserJob, params);
}详见 定时任务。
元数据库表
Spring Batch 在数据库中自动创建以下表(initialize-schema: always):
| 表名 | 说明 |
|---|---|
BATCH_JOB_INSTANCE | Job 实例(Job 名 + 参数的唯一标识) |
BATCH_JOB_EXECUTION | 每次 Job 执行记录(状态、开始/结束时间) |
BATCH_JOB_EXECUTION_PARAMS | Job 执行参数 |
BATCH_STEP_EXECUTION | Step 执行记录(读/写/跳过计数) |
BATCH_STEP_EXECUTION_CONTEXT | Step 执行上下文(用于重启断点) |
BATCH_JOB_EXECUTION_CONTEXT | Job 执行上下文 |
注意事项
- @StepScope 和 @JobScope 是 Spring Batch 专属 Bean 作用域,必须与
@Value("#{...}")配合使用才能延迟绑定运行时参数 JobParameters相同则视为同一JobInstance;为了每次重跑加入时间戳参数- 多线程 Step 中
JdbcCursorItemReader非线程安全,必须使用SynchronizedItemStreamReader包装或改用JdbcPagingItemReader - Chunk 事务失败会回滚整个 chunk,与 Spring 事务 集成时注意传播行为
- 生产环境将元数据存储到专用数据库,避免与业务库混用