Spring Batch

返回 Spring Boot 基础

Spring Batch 是企业级批处理框架,提供作业调度、分块处理、重试/跳过、状态持久化等能力,适合大规模数据迁移、报表生成、ETL 场景。可与 @Scheduled 或外部调度系统(Quartz、XXL-Job)配合触发。


核心概念

Job
 └── Step[]
       ├── Chunk-oriented Step: ItemReader → ItemProcessor → ItemWriter
       └── Tasklet Step: 单次执行任意逻辑
组件说明
Job批处理任务的顶层单元,由一个或多个 Step 组成
StepJob 的执行阶段,可串行或并行
ItemReader从数据源逐条读取数据
ItemProcessor对单条数据做转换/过滤(可选)
ItemWriter将一批数据写出到目标
JobRepository持久化 Job/Step 的执行状态(元数据)
JobLauncher启动 Job 的入口
JobParameters每次运行传入的参数(影响 Job 实例标识)
ExecutionContextJob/Step 级别的上下文,可存储状态用于重启

快速开始

引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- 元数据需要数据库,开发时可用 H2 -->
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

配置:

spring:
  batch:
    job:
      enabled: false       # 禁止启动时自动执行,手动触发
    jdbc:
      initialize-schema: always   # 自动建批处理元数据表

Chunk 模式(读-处理-写)

最常用的 Step 类型,按 chunk-size 分批提交事务

@Configuration
@EnableBatchProcessing   // Spring Batch 3.x;4.x 以后 Boot 自动配置
public class CsvImportJobConfig {
 
    @Bean
    public Job importUserJob(JobRepository repo, Step csvStep) {
        return new JobBuilder("importUserJob", repo)
            .start(csvStep)
            .build();
    }
 
    @Bean
    public Step csvStep(JobRepository repo, PlatformTransactionManager tm,
                        ItemReader<UserCsv> reader,
                        ItemProcessor<UserCsv, User> processor,
                        ItemWriter<User> writer) {
        return new StepBuilder("csvStep", repo)
            .<UserCsv, User>chunk(500, tm)   // 每 500 条提交一次事务
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skip(ParseException.class).skipLimit(100)   // 跳过解析异常,最多 100 次
            .retry(TransientDataAccessException.class).retryLimit(3)
            .build();
    }
}

ItemReader

FlatFileItemReader — 读 CSV/TXT

@Bean
@StepScope
public FlatFileItemReader<UserCsv> csvReader(
        @Value("#{jobParameters['inputFile']}") String file) {
    return new FlatFileItemReaderBuilder<UserCsv>()
        .name("csvReader")
        .resource(new FileSystemResource(file))
        .delimited().delimiter(",")
        .names("name", "email", "age")
        .targetType(UserCsv.class)
        .linesToSkip(1)          // 跳过表头
        .build();
}

JdbcCursorItemReader — 读数据库(游标)

@Bean
@StepScope
public JdbcCursorItemReader<User> dbReader(DataSource ds) {
    return new JdbcCursorItemReaderBuilder<User>()
        .name("dbReader")
        .dataSource(ds)
        .sql("SELECT * FROM users WHERE status = 'PENDING'")
        .rowMapper(new BeanPropertyRowMapper<>(User.class))
        .build();
}

JdbcPagingItemReader — 读数据库(分页,推荐大数据量)

@Bean
@StepScope
public JdbcPagingItemReader<User> pagingReader(DataSource ds) {
    Map<String, Order> sortKeys = Map.of("id", Order.ASCENDING);
    return new JdbcPagingItemReaderBuilder<User>()
        .name("pagingReader")
        .dataSource(ds)
        .selectClause("SELECT *")
        .fromClause("FROM users")
        .whereClause("WHERE status = 'PENDING'")
        .sortKeys(sortKeys)
        .pageSize(500)
        .rowMapper(new BeanPropertyRowMapper<>(User.class))
        .build();
}

JPA与Hibernate 整合可使用 JpaPagingItemReader;与 MyBatis 整合可使用 MyBatisCursorItemReader


ItemProcessor

@Component
public class UserProcessor implements ItemProcessor<UserCsv, User> {
 
    @Override
    public User process(UserCsv item) throws Exception {
        // 返回 null 表示过滤该条数据,不传给 Writer
        if (!isValid(item)) return null;
 
        return User.builder()
            .name(item.getName().trim())
            .email(item.getEmail().toLowerCase())
            .age(Integer.parseInt(item.getAge()))
            .build();
    }
}

链式 Processor

CompositeItemProcessor<UserCsv, User> composite = new CompositeItemProcessorBuilder<UserCsv, User>()
    .delegates(validateProcessor, transformProcessor, enrichProcessor)
    .build();

ItemWriter

JdbcBatchItemWriter — 批量写入数据库

@Bean
public JdbcBatchItemWriter<User> dbWriter(DataSource ds) {
    return new JdbcBatchItemWriterBuilder<User>()
        .dataSource(ds)
        .sql("INSERT INTO users (name, email, age) VALUES (:name, :email, :age)")
        .beanMapped()   // 使用 Bean 属性名映射 SQL 参数
        .build();
}

FlatFileItemWriter — 写 CSV/TXT

@Bean
@StepScope
public FlatFileItemWriter<User> csvWriter(
        @Value("#{jobParameters['outputFile']}") String file) {
    return new FlatFileItemWriterBuilder<User>()
        .name("csvWriter")
        .resource(new FileSystemResource(file))
        .delimited().delimiter(",")
        .names("name", "email", "age")
        .build();
}

Tasklet 模式

适合不需要 Read-Process-Write 的单次操作(如清理临时表、发送通知):

@Bean
public Step cleanupStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("cleanupStep", repo)
        .tasklet((contribution, chunkContext) -> {
            jdbcTemplate.update("DELETE FROM temp_import");
            return RepeatStatus.FINISHED;
        }, tm)
        .build();
}

Job 流程控制

顺序执行

new JobBuilder("job", repo)
    .start(step1())
    .next(step2())
    .next(step3())
    .build();

条件分支(基于 ExitStatus)

new JobBuilder("job", repo)
    .start(step1())
        .on("COMPLETED").to(step2())
        .on("FAILED").to(errorStep())
        .on("*").to(step3())        // 通配符
    .end()
    .build();

并行执行 Step(Split)

Flow flow1 = new FlowBuilder<SimpleFlow>("flow1").start(step1()).build();
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2").start(step2()).build();
 
new JobBuilder("job", repo)
    .start(new FlowBuilder<SimpleFlow>("parallel")
        .split(new SimpleAsyncTaskExecutor())
        .add(flow1, flow2)
        .build())
    .end()
    .build();

分区并行(Partitioning)

将大数据集按 ID 范围切片,每个分区在独立线程/节点执行,适合超大数据量:

@Bean
public Step partitionedStep(JobRepository repo, Step workerStep,
                             PartitionHandler handler) {
    return new StepBuilder("partitionedStep", repo)
        .partitioner("workerStep", new RangePartitioner(0, 1_000_000, 10))
        .partitionHandler(handler)
        .build();
}
 
@Bean
public PartitionHandler partitionHandler(Step workerStep) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setStep(workerStep);
    handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
    handler.setGridSize(10);     // 10 个分区
    return handler;
}
 
// workerStep 使用 @StepScope 获取分区参数
@Bean
@StepScope
public JdbcPagingItemReader<User> workerReader(
        @Value("#{stepExecutionContext['minId']}") Long minId,
        @Value("#{stepExecutionContext['maxId']}") Long maxId) { ... }

@Value 中的 SpEL 语法 用于从 stepExecutionContext 动态绑定分区参数。


多线程 Step

同一 Step 内多线程并发处理 chunk(Reader 需支持线程安全):

@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("multiThreadedStep", repo)
        .<User, User>chunk(200, tm)
        .reader(synchronizedReader())    // 包装成线程安全
        .processor(processor())
        .writer(writer())
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .throttleLimit(8)                // 最大并发线程数
        .build();
}

更多异步处理模式参见 异步与线程池


作业重启与跳过

重启

批处理失败后,使用相同 JobParameters 重新启动,会从失败的 Step 续跑(不重复已完成的 Step):

// 使 Job 可重启(默认即可重启)
new StepBuilder("step", repo)
    .<User, User>chunk(500, tm)
    .allowStartIfComplete(false)   // 已完成的 Step 不重跑
    ...

Skip(跳过异常记录)

.faultTolerant()
.skip(ParseException.class)
.skip(ConstraintViolationException.class)
.skipLimit(1000)
.skipPolicy(new AlwaysSkipItemSkipPolicy())   // 自定义策略

Retry(重试)

.faultTolerant()
.retry(TransientDataAccessException.class)
.retryLimit(3)
.retryPolicy(new SimpleRetryPolicy(3, Map.of(
    TransientDataAccessException.class, true
)))

监听器

// Job 监听
public class JobNotificationListener implements JobExecutionListener {
    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("Job 完成,共处理 {} 条", jobExecution.getStepExecutions()
                .stream().mapToLong(StepExecution::getWriteCount).sum());
        }
    }
}
 
// Step 监听
public class StepProgressListener implements StepExecutionListener {
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        log.info("Step {} 读 {} 条,写 {} 条,跳 {} 条",
            stepExecution.getStepName(),
            stepExecution.getReadCount(),
            stepExecution.getWriteCount(),
            stepExecution.getSkipCount());
        return stepExecution.getExitStatus();
    }
}
 
// Item 监听(记录被跳过的数据)
public class SkipLoggingListener implements SkipListener<UserCsv, User> {
    @Override
    public void onSkipInProcess(UserCsv item, Throwable t) {
        log.warn("处理跳过: item={}, reason={}", item, t.getMessage());
    }
}

手动触发与外部集成

REST 接口触发

@RestController
@RequiredArgsConstructor
public class BatchController {
 
    private final JobLauncher jobLauncher;
    private final Job importUserJob;
 
    @PostMapping("/batch/import")
    public String triggerImport(@RequestParam String file) throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addString("inputFile", file)
            .addLong("timestamp", System.currentTimeMillis())   // 保证每次是新实例
            .toJobParameters();
        JobExecution execution = jobLauncher.run(importUserJob, params);
        return execution.getStatus().toString();
    }
}

定时触发

@Scheduled(cron = "0 0 2 * * ?")   // 每天凌晨 2 点
public void nightlyImport() throws Exception {
    JobParameters params = new JobParametersBuilder()
        .addLocalDate("date", LocalDate.now())
        .toJobParameters();
    jobLauncher.run(importUserJob, params);
}

详见 定时任务


元数据库表

Spring Batch 在数据库中自动创建以下表(initialize-schema: always):

表名说明
BATCH_JOB_INSTANCEJob 实例(Job 名 + 参数的唯一标识)
BATCH_JOB_EXECUTION每次 Job 执行记录(状态、开始/结束时间)
BATCH_JOB_EXECUTION_PARAMSJob 执行参数
BATCH_STEP_EXECUTIONStep 执行记录(读/写/跳过计数)
BATCH_STEP_EXECUTION_CONTEXTStep 执行上下文(用于重启断点)
BATCH_JOB_EXECUTION_CONTEXTJob 执行上下文

注意事项

  • @StepScope@JobScope 是 Spring Batch 专属 Bean 作用域,必须与 @Value("#{...}") 配合使用才能延迟绑定运行时参数
  • JobParameters 相同则视为同一 JobInstance;为了每次重跑加入时间戳参数
  • 多线程 Step 中 JdbcCursorItemReader 非线程安全,必须使用 SynchronizedItemStreamReader 包装或改用 JdbcPagingItemReader
  • Chunk 事务失败会回滚整个 chunk,与 Spring 事务 集成时注意传播行为
  • 生产环境将元数据存储到专用数据库,避免与业务库混用