分布式任务调度

返回 Spring Cloud

单机 @Scheduled 在微服务环境中会因多实例导致重复执行。分布式任务调度通过统一调度中心解决:任务只在一个节点执行、支持分片、提供执行记录与告警。


主流方案对比

特性XXL-JobElastic-JobQuartz(集群)
架构调度中心 + 执行器无中心(ZK 协调)共享数据库
管理控制台内置,功能丰富需自建
分片支持(广播/分片)原生支持不支持
失败重试支持支持需手动实现
动态配置支持,控制台修改支持需改数据库
上手难度
适用场景国内主流,中大型海量任务分片简单集群

XXL-Job

部署调度中心

# docker-compose.yml
services:
  xxl-job-admin:
    image: xuxueli/xxl-job-admin:2.4.1
    ports:
      - "8080:8080"
    environment:
      PARAMS: >
        --spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job
        --spring.datasource.username=root
        --spring.datasource.password=123456
        --xxl.job.accessToken=my-token

执行器集成

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.1</version>
</dependency>
xxl:
  job:
    admin:
      addresses: http://xxl-job-admin:8080/xxl-job-admin
    accessToken: my-token
    executor:
      appname: order-service   # 执行器名称,控制台据此分组
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
@Configuration
public class XxlJobConfig {
 
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
 
    @Value("${xxl.job.accessToken}")
    private String accessToken;
 
    @Value("${xxl.job.executor.appname}")
    private String appname;
 
    @Value("${xxl.job.executor.port}")
    private int port;
 
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
        executor.setAdminAddresses(adminAddresses);
        executor.setAccessToken(accessToken);
        executor.setAppname(appname);
        executor.setPort(port);
        return executor;
    }
}

定义 Job Handler

@Component
public class OrderJobHandler {
 
    @Autowired
    private OrderService orderService;
 
    /** 超时订单关闭,支持分片 */
    @XxlJob("closeTimeoutOrderJob")
    public void closeTimeoutOrder() {
        // 分片参数:当前分片 index 和总分片数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
 
        // 每个实例只处理属于自己分片的数据
        // 例:order_id % shardTotal == shardIndex
        List<Long> orderIds = orderService.queryTimeoutOrders(shardIndex, shardTotal);
        orderIds.forEach(orderService::closeOrder);
 
        XxlJobHelper.log("分片 {}/{} 关闭订单 {} 条", shardIndex + 1, shardTotal, orderIds.size());
    }
 
    /** 数据报表统计(无需分片,由调度中心保证单次执行) */
    @XxlJob("dailyReportJob")
    public void dailyReport() {
        String param = XxlJobHelper.getJobParam();  // 控制台传入的参数
        LocalDate date = StringUtils.hasText(param)
            ? LocalDate.parse(param) : LocalDate.now().minusDays(1);
        orderService.generateDailyReport(date);
    }
}

在控制台配置

  1. 新建执行器:order-service
  2. 新建任务:
    • JobHandler:closeTimeoutOrderJob
    • Cron:0 */5 * * * ?(每 5 分钟)
    • 路由策略:分片广播(所有实例同时执行,各取各的分片)
    • 失败重试:2 次

Elastic-Job(ShardingSphere 生态)

依赖

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.3</version>
</dependency>

配置

elasticjob:
  zookeeper:
    server-lists: zk:2181
    namespace: elastic-job
  jobs:
    inventoryJob:
      elastic-job-class: com.example.InventoryJob
      cron: 0 0 2 * * ?
      sharding-total-count: 4         # 分片数,通常 = 实例数
      sharding-item-parameters: 0=A,1=B,2=C,3=D  # 分片参数

Job 实现

@Component
public class InventoryJob implements SimpleJob {
 
    @Override
    public void execute(ShardingContext ctx) {
        int item = ctx.getShardingItem();           // 0 / 1 / 2 / 3
        String param = ctx.getShardingParameter();  // A / B / C / D
        // 按分片参数查询不同范围的数据
        inventoryService.syncInventory(param);
    }
}

幂等性保障

任务失败重试时可能重复执行,必须保证幂等:

@XxlJob("sendCouponJob")
public void sendCoupon() {
    // Redis SETNX 加锁,同一任务执行 ID 只处理一次
    String lockKey = "job:sendCoupon:" + XxlJobHelper.getJobId();
    Boolean locked = redisTemplate.opsForValue()
        .setIfAbsent(lockKey, "1", Duration.ofMinutes(10));
    if (!Boolean.TRUE.equals(locked)) {
        XxlJobHelper.log("任务已执行,跳过");
        return;
    }
    couponService.batchSend();
}

单机降级方案

无法引入外部调度中心时,用数据库锁保证集群中只有一个实例执行:

@Scheduled(cron = "0 0 3 * * ?")
@SchedulerLock(name = "dailyCleanup", lockAtMostFor = "PT30M")
public void dailyCleanup() {
    // Spring Integration / ShedLock 实现
}
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-spring</artifactId>
    <version>5.13.0</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-provider-redis-spring</artifactId>
    <version>5.13.0</version>
</dependency>

相关链接

本目录

Spring Boot

架构