ElasticJob

ElasticJob(现为 Apache ShardingSphere 子项目 ElasticJob-Lite / ElasticJob-Cloud)是一个分布式任务调度框架,以 Zookeeper 为注册中心,提供任务分片、弹性扩缩容、失效转移和任务事件追踪能力。

适合:需要水平扩展、分片并行处理大批量数据的定时任务;相比 XXL-Job 更偏向数据分片场景。


核心概念

概念说明
分片(Sharding)将任务数据集按分片总数拆分,每个实例只处理自己的分片
分片参数每个分片携带的自定义业务参数(如数据库范围、城市编码)
失效转移某实例宕机,其分片自动转移给在线实例
弹性扩缩新实例上线/下线后,分片自动重新分配
错过补偿misfire 机制补偿错过的触发次数
事件追踪任务执行历史记录(开始/成功/失败),持久化到数据库

架构

ElasticJob 实例 1     ElasticJob 实例 2     ElasticJob 实例 3
  分片 0, 1              分片 2, 3               分片 4
       │                     │                       │
       └──────────────────────┴───────────────────────┘
                              │
                        ZooKeeper(选主、分片协调、注册)

快速上手(ElasticJob-Lite + Spring Boot)

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.4</version>
</dependency>
elasticjob:
  reg-center:
    server-lists: 127.0.0.1:2181
    namespace: elastic-job-demo
  jobs:
    orderSyncJob:
      elasticJobClass: com.example.job.OrderSyncJob
      cron: "0 0/5 * * * ?"
      sharding-total-count: 3
      sharding-item-parameters: "0=BJ,1=SH,2=GZ"  # 分片参数
      overwrite: true                               # 启动时覆盖注册中心配置

实现分片任务

@Component
public class OrderSyncJob implements SimpleJob {
 
    @Autowired
    private OrderService orderService;
 
    @Override
    public void execute(ShardingContext ctx) {
        int shardingItem    = ctx.getShardingItem();         // 当前分片序号 (0/1/2)
        String shardingParam = ctx.getShardingParameter();   // 对应城市 (BJ/SH/GZ)
        int total           = ctx.getShardingTotalCount();   // 总分片数 3
 
        log.info("处理分片 {} / 城市 {}", shardingItem, shardingParam);
        orderService.syncByCity(shardingParam);
    }
}

数据分片模式(DataflowJob)

适用于持续消费数据队列的场景,自动处理取数 → 加工流程:

@Component
public class OrderDataflowJob implements DataflowJob<OrderDTO> {
 
    @Override
    public List<OrderDTO> fetchData(ShardingContext ctx) {
        // 每次拉取当前分片的待处理数据(返回空则停止本轮)
        return orderRepo.findPendingBySharding(
            ctx.getShardingItem(), ctx.getShardingTotalCount(), 100);
    }
 
    @Override
    public void processData(ShardingContext ctx, List<OrderDTO> orders) {
        orders.forEach(orderService::process);
    }
}

失效转移与错过补偿

elasticjob:
  jobs:
    orderSyncJob:
      failover: true       # 开启失效转移:宕机后分片立即转移给存活实例
      misfire: true        # 开启错过补偿:错过的触发在下次空闲时补跑

事件追踪(持久化执行历史)

elasticjob:
  tracing:
    type: RDB             # 使用关系型数据库追踪
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/elastic_job_log
    username: root
    password: root

自动创建并写入 JOB_EXECUTION_LOGJOB_STATUS_TRACE_LOG 表。


任务类型对比

类型接口适用场景
SimpleJobSimpleJob通用定时任务,手动分片处理
DataflowJobDataflowJob<T>批量数据拉取加工,流式消费
ScriptJob无需实现接口Shell / Python 等脚本任务

ElasticJob vs XXL-Job

特性ElasticJobXXL-Job
注册中心ZooKeeperMySQL + 自研调度中心
数据分片原生支持不支持(需业务手动实现)
管理控制台轻量(ElasticJob-UI)功能完整(任务管理/日志/报警)
失效转移自动(ZK 监听)自动
动态分片实例变化自动重分片不涉及分片
上手难度中(需部署 ZK)低(MySQL 即可)
适用场景大数据分片处理普通业务定时任务

相关链接

  • XXL-Job — 更轻量、控制台功能更强,适合一般业务任务
  • Quartz — 单机/集群调度基础框架
  • Zookeeper — ElasticJob 的注册中心依赖
  • 任务调度 ← 返回任务调度目录