1. 秒杀场景和超卖问题

在一个高并发的场景下,比如说大营销服务的抽奖活动。可能会有很多用户在同一时间进行某个奖品的获得,比如某个奖品的库存只有10,而很多用户在同一时间都获得了该奖品,如果没有正确的并发控制,可能会卖出比库存更多的奖品

在传统库存扣减场景下如果使用数据库锁在高并发场景下可能会导致

  1. 锁导致很多用户等待同一行数据的资源。发生竞争锁资源
  2. 如果大量用户都在等待数据库操作,连接资源就会被耗尽,导致服务器响应时间过长甚至崩溃

在这种场景下使用redis管理库存,用一些原子操作避免锁的竞争和等待:

  • 高性能:redis是使用内存存储的,支持高并发,能够应对高访问量
  • 去锁化:在redis中使用一些原子操作避免锁的竞争和等待

Redis中的原子操作decrsetnx

decr: 用于将某个key的值减小1
setnx: set if not exists. 实现分布式锁,如果key不存在,就设置他并返回true,如果存在就返回false

库存消耗后更新数据库

当redis中的库存被消耗,库存变化需要更新到mysql库表中,但是不能每次redis库存变化就要同步mysql,会带来频繁的数据库写操作。使用延迟队列和定时任务。

  • 延迟队列:将redis库存扣减记录下来放到redission延迟队列中
  • 定时任务:定期消费这个延迟队列,将更新结果通过到数据库

代码开发

将库存装配到redis中

缓存奖品库存,用于decr扣减库存使用
不需要每次使用库存数据的时候都加载数据库连接

  1. 将库存逻辑加在StrategyArmoryDispatch
    1
    2
    3
    4
    5
    6
    //2.将数据库中的奖品库存缓存到redis内存中,用于decr原子操作扣减库存  
    for (StrategyAwardEntity strategyAward:strategyAwardEntities){
    Integer awardId = strategyAward.getAwardId();
    Integer awardCount = strategyAward.getAwardCount();
    cacheStrategyAwardCount(strategyId, awardId, awardCount);
    }
  2. 增加记录策略奖品库存缓存的key在constant中。
  3. 在StrategyArmoryDispatch中增加cacheStrategyAwardCount方法
    1
    2
    3
    4
    5
    private void cacheStrategyAwardCount(Long strategyId, Integer awardId, Integer awardCount){  
    //cacheKey: strategy_award_count_key_1000001_101
    String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_KEY+strategyId+Constants.UNDERLINE+awardId;
    repository.cacheStrategyAwardCount(cacheKey, awardCount);
    }
  4. 在repository中增加cacheStrategyAwardCount方法
    1
    2
    3
    4
    public void cacheStrategyAwardCount(String cacheKey, Integer awardCount) {  
    if (redisService.isExists(cacheKey)) return;
    redisService.setAtomicLong(cacheKey, awardCount);
    }
  5. 在IRedisService中新增getAtmicLong方法,setAtomicLong方法.不set成atomic的话没法做原子化操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**  
    * 设置值
    *
    * @param key key 键
    * @param value 值
    */
    @Override
    public void setAtomicLong(String key, long value) {
    redissonClient.getAtomicLong(key).set(value);
    }

    /**
    * 获取值
    *
    * @param key key 键
    */
    @Override
    public Long getAtomicLong(String key) {
    return redissonClient.getAtomicLong(key).get();
    }

    @Override
    public void cacheStrategyAwardCount(String cacheKey, Integer awardCount) {
    if (redisService.isExists(cacheKey)) return;
    redisService.setAtomicLong(cacheKey, awardCount);
    }

增加库存扣减操作

  1. IStrategyDispatch定义库存扣减方法subtractionAwardStock。在StrategyArmoryDispatch中实现库存扣减方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**  
    * 根据策略ID和奖品ID,扣减奖品缓存库存
    *
    * @param strategyId 策略ID
    * @param awardId 奖品ID
    * @return 扣减结果
    */
    @Override
    public Boolean subtractionAwardStock(Long strategyId, Integer awardId) {
    String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_KEY+strategyId+Constants.UNDERLINE+awardId;
    return repository.subtractionAwardStock(cacheKey);
    }
  2. 在repository中新增并实现库存扣减方法,并增加锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**  
    * 缓存key,decr 方式扣减库存
    *
    * @param cacheKey 缓存Key
    * @return 扣减结果
    */
    @Override
    public Boolean subtractionAwardStock(String cacheKey) {
    //将目前对应的库存值进行原子操作减1,并返回decr之后的库存值给surplus
    long surplus = redisService.decr(cacheKey);
    if (surplus < 0){
    //库存被扣减到0一下了,发生错误了
    //恢复库存为0个
    redisService.setValue(cacheKey,0);
    return false;
    }
    //组装库存锁的key。 cacheKey_库存扣减后的值
    String lockKey = cacheKey + Constants.UNDERLINE + surplus;
    Boolean lock = redisService.setNx(lockKey);
    //如果加锁失败
    if (!lock){
    log.info("策略奖品库存加锁失败 {}", lockKey);
    }
    return lock;
    }
  3. 使用trySet实现setNx方法在redissonservice中
    1
    2
    3
    4
    @Override  
    public Boolean setNx(String key) {
    return redissonClient.getBucket(key).trySet("lock");
    }

实现tree下的库存扣减操作

  1. 使用StrategyDispatch中的接口实现RuleStockLogicTreeNode
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Slf4j  
    @Component("rule_stock")
    public class RuleStockLogicTreeNode implements ILogicTreeNode {

    @Resource
    private IStrategyDispatch strategyDispatch;
    @Resource
    private IStrategyRepository strategyRepository;

    @Override
    public DefaultTreeFactory.TreeActionEntity logic(String userId, Long strategyId, Integer awardId, String ruleValue) {
    log.info("规则树过滤-库存扣减 userID:{} strategyID:{} awardID:{} ruleValue:{}", userId, strategyId, awardId, ruleValue);

    Boolean status = strategyDispatch.subtractionAwardStock(strategyId,awardId);
    if (status){
    log.info("规则过滤-库存扣减成功");

    //将扣减库存写入延迟队列,延迟消费更新数据库的记录
    //使用trigger层的job: UpdateAwardStockJob 消费延迟队列,更新数据库记录
    strategyRepository.awardStockConsumeSendQueue(StrategyAwardStockKeyVO.builder()
    .strategyId(strategyId)
    .awardId(awardId)
    .build());

    //库存扣减成功就被接管了
    return DefaultTreeFactory.TreeActionEntity.builder()
    .ruleLogicCheckTypeVO(RuleLogicCheckTypeVO.TAKE_OVER)
    .strategyAwardVO(DefaultTreeFactory.StrategyAwardVO.builder()
    .awardId(awardId)
    .awardRuleValue(ruleValue)
    .build())
    .build();
    }


    // 如果库存不足或者扣减库存失败(加锁失败),则直接返回放行
    log.warn("规则过滤-库存扣减-告警,库存不足。userId:{} strategyId:{} awardId:{}", userId, strategyId, awardId);
    return DefaultTreeFactory.TreeActionEntity.builder()
    .ruleLogicCheckTypeVO(RuleLogicCheckTypeVO.ALLOW)
    .build();
    }
    }
  2. 在valobj中增加StrategyAwardStockKeyVO
    1
    2
    3
    4
    5
    6
    7
    8
    @Data  
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class StrategyAwardStockKeyVO {
    private Long strategyId;
    private Integer awardId;
    }
  3. 扣减状态成功的话,需要将扣减结果放进异步消息队列中,所以在repository中增加awardStockConsumeSendQueue 延迟队列操作。实现队列方法在repository中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /**  
    * 写入奖品库存消费队列
    *
    * @param strategyAwardStockKeyVO 对象值对象
    */
    @Override
    public void awardStockConsumeSendQueue(StrategyAwardStockKeyVO strategyAwardStockKeyVO) {
    //构建缓存键
    String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_QUEUE_KEY;
    //获取阻塞队列,用于在延迟时间到达后存储需要处理的任务
    RBlockingQueue<StrategyAwardStockKeyVO> blockingQueue = redisService.getBlockingQueue(cacheKey);
    //获取延迟队列,strategyAwardStockKeyVO在等待后,转移到阻塞队列
    RDelayedQueue<StrategyAwardStockKeyVO> delayedQueue = redisService.getDelayedQueue(blockingQueue);
    //3秒将对象放入阻塞队列
    delayedQueue.offer(strategyAwardStockKeyVO, 3, TimeUnit.SECONDS);
    }

更改树引擎,实现树logic操作

库表有变化,之前在award中的rule value字段现在放在了ruleTreeNode,需要在引擎中获得

  1. 在DecisionTreeEngine中增加rulevalue,并且传递给LogicTreeNode.logic.更改每个treelogic的入参
  2. 实现RuleLuckAwardLogicTreeNode
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Slf4j  
    @Component("rule_luck_award")
    public class RuleLuckAwardLogicTreeNode implements ILogicTreeNode {



    @Override
    public DefaultTreeFactory.TreeActionEntity logic(String userId, Long strategyId, Integer awardId, String ruleValue) {
    log.info("规则过滤-兜底奖品 userId:{} strategyId:{} awardId:{} ruleValue:{}", userId, strategyId, awardId, ruleValue);

    //ruleValue: 101:1,100 split = ["101","1,100"]
    String[] split = ruleValue.split(Constants.COLON);
    log.info("测试的split数组:{}",split);
    if (split.length == 0){
    log.error("规则过滤-兜底奖品,兜底奖品未配置告警 userId:{} strategyId:{} awardId:{}", userId, strategyId, awardId);
    throw new RuntimeException("兜底奖品未配置"+ruleValue);
    }

    //将兜底奖品id和 ruleValue值拆出来
    Integer luckAwardId = Integer.valueOf(split[0]);
    String awardRuleValue = split.length > 1 ? split[1] : "";
    log.info("规则过滤-兜底奖品 userId:{} strategyId:{} awardId:{} awardRuleValue:{}", userId, strategyId, luckAwardId, awardRuleValue);

    //返回兜底奖品
    return DefaultTreeFactory.TreeActionEntity.builder()
    .ruleLogicCheckTypeVO(RuleLogicCheckTypeVO.TAKE_OVER)
    .strategyAwardVO(DefaultTreeFactory.StrategyAwardVO.builder()
    .awardId(luckAwardId)
    .awardRuleValue(awardRuleValue)
    .build())
    .build();
    }
    }
  3. 实现RuleStockLogicTreeNode
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Slf4j  
    @Component("rule_lock")
    public class RuleLockLogicTreeNode implements ILogicTreeNode {

    //用户抽奖个数,本应该从数据库中拿到,现在先自定义
    private Long userRaffleCount = 10L;

    @Override
    public DefaultTreeFactory.TreeActionEntity logic(String userId, Long strategyId, Integer awardId, String ruleValue) {
    log.info("规则过滤-次数锁 userId:{} strategyId:{} awardId:{} ruleValue:{}", userId, strategyId, awardId, ruleValue);
    //看数据库中拿到的raffleCount是否出现异常
    long raffleCount = 0L;
    try {
    raffleCount = Long.parseLong(ruleValue);
    }catch (Exception e){
    throw new RuntimeException("规则过滤-次数锁异常 ruleValue: " + ruleValue + " 配置不正确");
    }

    //检验用户抽奖次数和奖品次数锁的关系
    //如果大于,直接发放奖品并放行
    if (userRaffleCount >= raffleCount){
    return DefaultTreeFactory.TreeActionEntity.builder()
    .ruleLogicCheckTypeVO(RuleLogicCheckTypeVO.ALLOW)
    .build();
    }

    return DefaultTreeFactory.TreeActionEntity.builder()
    .ruleLogicCheckTypeVO(RuleLogicCheckTypeVO.TAKE_OVER)
    .build();
    }
    }

在trigger层增加消费队列的job

更新奖品库存任务:为了不让更新库存的压力打到数据库,采用redis更新缓存库存,异步队列更新数据库

  1. 在trigger-job中增加UpdateAwardStockJob,更新奖品库存任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Slf4j  
    @Component()
    public class UpdateAwardStockJob {
    @Resource
    private IRaffleStock raffleStock;

    @Scheduled(cron = "0/5 * * * * ?")
    public void exec(){
    try {
    log.info("定时任务,更新奖品消耗库存【延迟队列获取,降低对数据库的更新频次,不要产生竞争】");
    StrategyAwardStockKeyVO strategyAwardStockKeyVO = raffleStock.takeQueueValue();
    if (strategyAwardStockKeyVO == null)
    return;
    log.info("定时任务,更新奖品消耗库存 strategyId:{} awardId:{}", strategyAwardStockKeyVO.getStrategyId(), strategyAwardStockKeyVO.getAwardId());
    raffleStock.updateStrategyAwardStock(strategyAwardStockKeyVO.getStrategyId(), strategyAwardStockKeyVO.getAwardId());
    } catch (Exception e){
    log.error("定时任务,更新奖品消耗库存失败", e);
    }
    }
    }
  2. 在domain-strategy-service增加库存相关的服务,两个接口takeQueue() updateStrategyAwardStock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    	public interface IRaffleStock {  

    /**
    * 获取奖品库存消耗队列
    *
    * @return 奖品库存Key信息
    * @throws InterruptedException 异常
    */
    StrategyAwardStockKeyVO takeQueueValue() throws InterruptedException;

    /**
    * 更新奖品库存消耗记录
    *
    * @param strategyId 策略ID
    * @param awardId 奖品ID
    */ void updateStrategyAwardStock(Long strategyId, Integer awardId);
    }
  3. 在抽象类和其子类中里增加实现库存相关的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Override  
    public StrategyAwardStockKeyVO takeQueueValue() throws InterruptedException {
    return repository.takeQueueValue();
    }

    /**
    * 更新奖品库存消耗记录
    *
    * @param strategyId 策略ID
    * @param awardId 奖品ID
    */@Override
    public void updateStrategyAwardStock(Long strategyId, Integer awardId) {
    repository.updateStrategyAwardStock(strategyId, awardId);
    }
  4. 在repository增加takeQueueValue获取队列值的操作,新增更新数据库的操作updateAwardStock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**  
    * 获取奖品库存消费队列
    */
    @Override
    public StrategyAwardStockKeyVO takeQueueValue() throws InterruptedException {
    //构建缓存键
    String cacheKey = Constants.RedisKey.STRATEGY_AWARD_COUNT_QUEUE_KEY;
    RBlockingQueue<StrategyAwardStockKeyVO> destinationQueue = redisService.getBlockingQueue(cacheKey);
    return destinationQueue.poll();
    }

    /**
    * 更新奖品库存消耗
    *
    * @param strategyId 策略ID
    * @param awardId 奖品ID
    */@Override
    public void updateStrategyAwardStock(Long strategyId, Integer awardId) {
    StrategyAward strategyAward = new StrategyAward();
    strategyAward.setStrategyId(strategyId);
    strategyAward.setAwardId(awardId);
    strategyAwardDao.updateStrategyAwardStock(strategyAward);
    }
  5. 在StrategyAwardDao中增加更新数据库的操作,在mapper中增加对应操作

  6. 在application中增加enable注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @SpringBootApplication  
    @Configurable
    @EnableScheduling
    public class Application {

    public static void main(String[] args){
    SpringApplication.run(Application.class);
    }

    }

测试

37分钟

核心掌握知识点

库存更新操作的原子性

在秒杀或抽奖场景中,库存更新操作必须是原子性的,以确保多线程或高并发情况下的正确性。这里的Redis缓存使用原子操作来实现库存扣减,避免超卖或重复扣减的风险。
redissonClient.getAtomicLong(key).set(value) 其中getAtomicLong方法会返回一个RAtomicLong对象,该对象是Redisson实现的一个分布式,线程安全的计数器。我们在项目中开发的setAtomicLong getAtomicLong方法是通过提供的key参数,将RAtomicLong对象与对应的某个数值键进行绑定,用于对该键执行原子性操作。

  • AtomicLong也提供了相应的线程安全的操作,确保扣减库存的时候不会因为多个请求同时访问而出现问题

setNx的开发和使用意义

setNx 方法用于实现一个分布式锁,目的是在高并发场景下控制奖品库存扣减的原子性,避免多个请求同时操作同一份库存数据,导致超卖等问题

setNx 的实现基于 RedissontrySet 方法,使用了 Redis 中的 SET NX 命令来实现“只在键不存在时设置键”的功能:
加锁:生成一个与剩余库存值相关的锁键(lockKey),然后调用 setNx 尝试获取锁,确保只有一个线程能够继续执行操作。

  • 如果锁获取失败,记录日志,说明该库存已被其他线程锁定,当前线程操作失败。
  • 如果成功获取锁,说明该线程拥有库存的修改权限,返回 true 表示成功。

将在缓存中扣减库存的信息写入延迟队列,定时任务写入数据库

分布式阻塞队列

Redis提供了RBlockingQueue对象,这是一个分布式阻塞队列,类似于Java的BlockingQueue。主要作用是获取一个支持阻塞操作的队列实例,可以在不同线程或分布式节点之间安全的传递数据

支持阻塞式的put take接口

  • put(T element): 如果队列已满,阻塞直到队列中有空间
  • take() 如果队列为空,阻塞直到有新元素加入队列
  • 可以用于生产者-消费者模式,在分布式环境中进行消息或任务的传递
    **redissonClient.getBlockingQueue(key)**:方法通过 redissonClient 获取与指定 key 关联的 RBlockingQueue 队列。如果该键已经存在,Redisson 会直接返回与该键绑定的队列;如果不存在,则创建一个新的阻塞队列。

延迟队列

getDelayedQueue使用RDelayedQueue对象,允许在指定延迟事件后才将元素推送到目标阻塞队列中。用于实现任务的延迟处理,比如定时任务和延迟消息处理
**RDelayedQueue<T>**:这是 Redisson 提供的延迟队列接口,它不直接存储元素,而是将元素存储在指定的 RBlockingQueue 中,但会在指定的延迟时间后才将元素推入目标队列中。

  • 延迟队列通常用于处理需要在将来某个时间执行的任务。数据在指定时间之前无法被消费,这样便可以控制处理的延迟。

RDelayedQueue使用方法:

  • **offer(T element, long delay, TimeUnit unit)**:将元素 element 添加到 RDelayedQueue,并指定延迟时间(delay)和时间单位(unit)。元素会在延迟时间之后被推送到目标 RBlockingQueue 中。
  • **remove(T element)**:从 RDelayedQueue 中移除指定元素,阻止其进入 RBlockingQueue
  • **destroy()**:清空 RDelayedQueue 并销毁队列,释放所有资源。

总结

  1. 构造缓存键
  2. 获取阻塞队列:该队列是一个支持阻塞操作的队列,用于在延迟时间到达后存储需要处理的任务。
  3. 获取延迟队列:元素在 RDelayedQueue 中等待指定的延迟时间,时间到达后,元素会被自动转移到 blockingQueue
  4. 添加任务并设置延迟:offer(strategyAwardStockKeyVO, 3, TimeUnit.SECONDS) 表示将 strategyAwardStockKeyVO 放入 RDelayedQueue 中,在 3 秒后,任务会自动被推送到关联的 blockingQueue 中,等待消费者取出处理

awardStockConsumeSendQueue 方法通过 RDelayedQueue 实现了延迟任务的推送,延迟指定时间后再进入阻塞队列供消费者消费。这种设计适合高并发下的任务控制,保证任务在指定延迟时间后再被处理。

Spring的定时任务调度机制

利用@Scheduled注解在后台按固定频率自动运行任务,从Redis的队列中获取库存扣减信息并更新数据库。

@EnableScheduling: 启用Spring的定时任务调度功能,让带有@Scheduled注解的方法按设定的时间频率执行。当Application类通过SpringApplication.run()启动时,Spring会扫描所有的组件,找到并启用@EnableScheduling来管理定时任务

@Scheduled(cron = "0/5 * * * * ?"): 指定方法的运行频率。这里使用cron表达式设置任务每5秒执行一次。Spring启动后,调度器会自动查找应用上下文中带有@Scheduled注解的方法并定期运行。在exec()方法中,每5秒获取库存扣减信息并更新数据库,避免数据库频繁操作,减少竞争
![[Pasted image 20241029233750.png]]