畅购商城(十四):秒杀系统「下」
发表时间:2020-10-19
发布人:葵宇科技
浏览次数:69
好好学习,天天向上
本文已收录至我的Github仓库DayDayUP:github.com/RobodLee/DayDayUP,欢迎Star
- 畅购商城(一):环境搭建
- 畅购商城(二):分布式文件系统FastDFS
- 畅购商城(三):商品管理
- 畅购商城(四):Lua、OpenResty、Canal实现广告缓存与同步
- 畅购商城(五):Elasticsearch实现商品搜索
- 畅购商城(六):商品搜索
- 畅购商城(七):Thymeleaf实现静态页
- 畅购商城(八):微服务网关和JWT令牌
- 畅购商城(九):Spring Security Oauth2
- 畅购商城(十):购物车
- 畅购商城(十一):订单
- 畅购商城(十二):接入微信支付
- 畅购商城(十三):秒杀系统「上」
- 畅购商城(十四):秒杀系统「下」
防止秒杀重复排队
回顾一下上一篇文章中讲到的下单的流程。当用户点击下单之后,用户名和商品id就会组装成一个SeckillStatus对象存入Redis队列中等待被处理,这个过程叫做排队。所以说,只要用户点击了一次下单后不论最后是否下单成功,他都会进入到排队的状态。如果用户重复点击下单,那么Redis队列中就会有很多个相同的SeckillStatus对象,也就是一个用户排队多次,这显然是不符合逻辑的,一个用户应该只能排队一次。
为了避免用户重复排队的情况,可以为每个用户在Redis中设置一个自增值,每次排队的时候加1,如果大于1,说明重复排队了,那么直接抛出异常,告诉用户重复排队了。
//SeckillOrderServiceImpl
@Override
public boolean add(Long id, String time, String username) {
Long increment = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).increment(username, 1);
if (increment>1) { //记录指定hashkey的增量,大于1说明排队次数超过1次,重复排队
throw new RuntimeException("重复排队");
}
…………
}
这段代码中,添加了对用户重复排队的判断,先自增1,再进行判断。这里的key设置的是username,因为一个用户只能下单一件商品,如果去下单其它商品,同样也是重复排队。
测试了一下,是成功的。但是有一个问题:如果用户在这里排队未成功,该怎么清理排队信息呢?这个下一步就会说,接着往下看👇
并发超卖问题解决
现在的代码看似很完美,但是漏洞百出,比如就存在并发超卖的问题。为什么这么说,看代码说话:
这个是多线程下单的方法,流程是查库存——>下单——>减库存。假如现在有件商品还剩1件,正好有多个线程同时走到了查询库存这一步,结果查出来都是一件,然后这三个线程就可以往下接着走,最后三个线程都成功下单了,不就多卖了两件嘛。所以这段代码还存在问题,那怎么解决呢?可不可以采用加锁的方法,不可以。因为如果是在集群环境下,一台机器上多个线程走到了同一步确实可以锁住防止超卖,但是不同机器上的线程走到了同一部就锁不住了。
所以可以采用Redis队列的方式去解决。
给每个sku创建一个队列,比如id为4399的商品数量为4,那么就在4399的队列里放入4件商品。然后每次查询就从队列里去取,假如现在有五个线程去查库存,因为只有4件商品,所以5个线程只有4个线程能够查询出库存。因为Redis是单线程的,所以不会出现多个线程同时访问数据出错的情况,这样就可以避免并发超卖的问题。
之前在SeckillGoodsPushTask中只是将商品存入Redis中,现在再加一步,为每个sku都创建一个队列并存入库存数量的数据到队列中。
//定时将秒杀商品加载到redis中
@Scheduled(cron = "0/5 * * * * ?")
public void loadGoodsPushRedis() {
…………
for (SeckillGoods seckillGood : seckillGoods) {
boundHashOperations.put(seckillGood.getId(),seckillGood); //把商品存入到redis
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGood.getId())
.leftPushAll(getGoodsNumber(seckillGood.getNum())); //存到Redis队列
}
}
}
//获取秒杀商品数量的数组
public Byte[] getGoodsNumber(int num) {
Byte[] arr = new Byte[num];
for (int i = 0; i < num; i++) {
arr[i] = '0';
}
return arr;
}
队列的内容就是商品数量的Byte,视频中用的是商品id,但是商品id是Long型的,Byte比Long要省空间,而且放什么无所谓关键是放几个,所以我就放了对应数量的Byte进去。
接下来就该在下单之前获取库存的信息:
@Async
public void createOrder() {
…………
//从秒杀商品队列中获取数据,如果获取不到则说明已经卖完了,清除掉排队信息
Object o = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.rightPop();
if (o == null) {
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(seckillStatus.getUsername()); //清除排队队列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(seckillStatus.getUsername()); //排队状态队列
return;
}
//创建秒杀订单
…………
}
如果商品库存不足,那么应该清除掉排队的信息,否则用户该商品下不了单还不能下单其它商品。这里将排队的队列以及查询状态的队列清除了。
同步库存不精准问题
解决了并发超卖的问题之后,还有一个库存数量不精准的问题。这个问题出现的原因和超卖问题类似,假如现在同时有两个线程下单完成了开始递减库存,A线程查询出库存有3个,B线程也查询出库存有3个,然后它们同时递减,都是2个,写到了数据库中。其实此时库存应该还剩一个。
解决的办法也很简单,因为现在是调用**seckillGoods.getStockCount()**查询出的库存,那我们就不用这个查询,直接用上一节中的队列,队列中剩余多少就说明现在的库存是多少,绝对准确。
@Async
public void createOrder() {
…………
//减库存,如果库存没了就从redis中删除,并将库存数据写到MySQL中
//seckillGoods.setStockCount(seckillGoods.getStockCount()-1);
Long size = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId()).size();//获取库存
//if (seckillGoods.getStockCount() <= 0) {
seckillGoods.setNum(size.intValue());
if (size <= 0) {
seckillGoodsBoundHashOps.delete(seckillStatus.getGoodsId());
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoodsBoundHashOps.put(seckillStatus.getGoodsId(),seckillGoods);
}
//创建秒杀订单
…………
}
秒杀支付
改造二维码创建及支付结果通知方法
秒杀支付的流程和之前做的类似,只不过现在秒杀订单的支付状态发送到Queue2中,普通订单还是发送到queue1中,但是我们怎么知道该将订单的支付状态发送给queue1还是queue2呢?如果微信服务器可以将MQ队列的exchange和routingKey返回给我们就好了,这样我们就可以动态地指定要发送的MQ了。
从微信支付的官方文档中我们可以知道在创建二维码和接收支付结果的参数中都有一个attach参数
这个是自定义的数据,也就是说我们在创建二维码的时候发送给微信服务器什么,返回支付结果的时候就会返回给我们什么。所以在创建二维码的时候由前端将指定的exchange和routingKey发送给后端,然后再添加到attach参数中。就可以实现将不同的订单动态地发送到指定的队列了。
普通订单:exchange:exchange.order routingKey:routing.order
秒杀订单:exchange:exchange.seckill_order routingKey:routing.seckill_order
由于之前写的createNative方法是接收一个order对象,所以在Order里面添加两个字段:
private String exchange; //mq交换机的名称
private String routingKey; //mq的路由键
修改之前**createNative()**的代码,添加attach参数,
@Override
public Map<String, String> createNative(Order order) {
…………
//获取exchange和routingKey,封装程map集合,添加到attach参数中
String exchange = order.getExchange();
String routingKey = order.getRoutingKey();
Map<String,String> attachMap = new HashMap<>(2);
attachMap.put("exchange",exchange);
attachMap.put("routingKey",routingKey);
String attach = JSON.toJSONString(attachMap);
map.put("attach",attach);
…………
}
然后再修改**WeChatPayController.notifyUrl()**方法,从服务器返回的Map集合中获取attach,并从attach中获取exchange和routingKey。
@RequestMapping("/notify/url")
public String notifyUrl(HttpServletRequest request) throws Exception {
…………
Map<String, String> xmlMap = WXPayUtil.xmlToMap(xmlString);
String attach = xmlMap.get("attach");
Map<String, String> attachMap = JSONObject.parseObject(attach, Map.class);
//将java对象转换成amqp消息发送出去,调用的是send方法
//rabbitTemplate.convertAndSend("exchange.order","routing.order", xmlString);
rabbitTemplate.convertAndSend(attachMap.get("exchange"),attachMap.get("routingKey"), xmlString);
…………
}
监听秒杀
前面已经将消息发送到消息队列中了现在就可以去监听消息队列了。
从流程图中可以看到,在写监听的方法之前,需要有两个方法:改订单状态和删除订单。
SeckillOrderServiceImpl.updatePayStatus
public void updatePayStatus(String username, String transactionId, String endTime) {
//从Redis中将订单信息查询出来
SeckillOrder order = (SeckillOrder) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY)
.get(username);
if (order != null) {
try {
order.setStatus("1");
order.setTransactionId(transactionId);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
order.setPayTime(simpleDateFormat.parse(endTime));
seckillOrderMapper.insertSelective(order); //将订单信息存到mysql中
//删除redis中的订单信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//删除用户的排队信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排队队列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排队状态队列
} catch (ParseException e) {
e.printStackTrace();
}
}
}
首先将订单信息从Redis中查询出来,将订单状态改为已支付,然后将交易流水号和支付时间补充完整存入MySQL。这时候交易已经完成了,可以将订单信息从Redis中删除,并将用户的排队信息也一并删除。
SeckillOrderServiceImpl.deleteOrder
public void deleteOrder(String username) {
//删除Redis中的订单
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//删除用户的排队信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排队队列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排队状态队列
//查询出秒杀的状态信息
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY)
.get(username);
//回滚库存
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.get(seckillStatus.getGoodsId());
if (seckillGoods == null) {
seckillGoodsMapper.selectByPrimaryKey(seckillGoods.getId());
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
}
redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.put(seckillGoods.getId(),seckillGoods);
//将商品放入队列
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.leftPush("0");
}
支付失败了,应该将订单删除掉。首先将Redis中的订单删除,然后删除用户的排队信息。接着回滚库存,如果Redis中没有则说明已经卖完了,就从MySQL中查询出来然后将商品数量加1再存入MySQL;如果Redis中有数据就将Redis中的商品数量加1即可。上面讲防止并发超卖的时候不是为每个商品都在Redis队列中存放了一下么,所以最后将商品放回到队列中。
SeckillMessageListener
@Component
@RabbitListener(queues = "queue.seckillorder")
public class SeckillMessageListener {
@Autowired
private SeckillOrderService seckillOrderService;
//https://pay.weixin.qq.com/wiki/doc/api/native.php?chapter=9_7&index=8
@RabbitHandler
public void getMessage(String message) {
try {
Map<String, String> resultMap = JSON.parseObject(message,Map.class);
String returnCode = resultMap.get("return_code"); //状态码
if ("SUCCESS".equals(returnCode)) {
String resultCode = resultMap.get("result_code"); //业务结果
String attach = resultMap.get("attach");
Map<String,String> attachMap = JSON.parseObject(attach,Map.class);
if ("SUCCESS".equals(resultCode)) {
//改订单状态
seckillOrderService.updatePayStatus(attachMap.get("username"),
resultMap.get("transaction_id"),resultMap.get("time_end"));
} else {
//删除订单
seckillOrderService.deleteOrder(attachMap.get("username"));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个方法使用来监听秒杀队列的消息的,exchange和queue需要我们手动地在RabbitMQ的网页中创建并进行绑定
在该方法中,首先去读取状态码和业务结果,如果都为“SUCCESS”的话则说明订单支付成功,修改订单的状态。反之订单支付失败,删除订单。
总结
文章鸽了快一个月了,终于补上了,主要是上篇文章写完后就在做个小东西,然后就是国庆节放假,在家待着有点懒。回校后又在参加电赛,没时间。所以一路鸽到现在。
这篇文章主要是将之前的秒杀流程进行一个完善,实现了防止秒杀重复排队,解决并发超卖的问题,并解决了同步库存不精准的问题。最后实现了秒杀支付。
码字不易,可以的话,给我来个
点赞
,收藏
,关注
如果你喜欢我的文章,欢迎关注微信公众号 『 R o b o d 』
代码:https://github.com/RobodLee/changgou
本文已收录至我的Github仓库DayDayUP:github.com/RobodLee/DayDayUP,欢迎Star