本文最后更新于 2024-06-14,文章内容可能已经过时。

摘要

避免流量高峰,引入MQ去做异步调用、流量消峰

流程:

  • 用户在商品页面点击购买按钮,前端发送请求到后台,获取一个本次订单的唯一token返回给前端【就是防重token,防止用户多次点击造成数据错乱】

  • 前端拿到后即跳转到订单详情界面,此界面为了展示用户已选的商品和总金额,后台需要根据商品的id去查询商品的信息和每个商品的价格以及总金额返回给前端做回显,这里前端界面需要设置一个定时器提示用户支付最晚支付时间,后端通过rocketMQTemplate.syncSend向mq发送一个延迟消息,本身的订单服务和外部的支付服务作为两个消费者,如果超时检查订单仍为待支付,则修改订单服务和支付服务的订单状态为已取消,同时支付服务要关闭订单交易【Factory.Payment.Common().close(payOrder.getOrderNo)】,为防止意外,在支付宝回调通知中我们要去判断这个支付单的状态,如果支付单为已取消,则直接退款【Factory.Payment.Common().refund(payOrder.getOrderNo,dto.getTotal_amout())】,返回success给支付宝

  • 用户点击下单,将携带token和商品信息以及价格数量等信息到后台,后台接收后就去预创建订单,因为引进了MQ,所以本地不做数据库的写操作,而是将这个操作交给MQ去执行,本地只需要将必须的订单属性封装好发送事务消息给MQ即可,同时也要将消息的主题message封装好一起发送过去,用于支付模块消费者获取到去做新增支付订单操作,事务监听器监听到后获取Object的数据,JSON转换为对象后调用业务方法进行保存订单数据,使用try-catch包裹,如果没有发生异常则是直接提交本地事务,MQ会携带message将事务消息发送到MQ队列中,然后支付服务消费者去监听MQ队列的某个topic完成支付订单的添加

  • 消息发送成功,则会提示用户正在下单,前端定时器轮询去数据库中查询支付服务的支付订单,查询到后后端将单号返回给前端

  • 前端获取到后携带单号以及一些必要的属性向后端支付接口发起调用,支付服务去调用支付宝的支付接口,返回支付宝给的form表单给前端,剩下的交给支付宝去操作了

  • 支付完成后通过回调地址return_url作为支付完成或取消后的跳转页面,如果没有指定回调地址,则会跳转至支付宝的默认回调地址

  • 支付宝同时会根据notify_url对支付结果进行通知,最大努力通知8次,支付宝通知时会返回一大堆参数,总共有23个,主要有签名、支付金额、消息主体、订单号、支付宝自己的订单号、交易状态、签名方式等,我们通过dto去接收这些参数,然后去做验签,验签成功判断支付状态是否成功,金额是否正确、订单号是否匹配等校验,校验不通过返回fail让支付宝重新发送通知,校验成功则做业务处理:修改支付订单状态、添加支付流水、为保证支付服务和课程服务以及订单服务的一致性,使用mq发送事务消息让mq去操作本地事务

  • 成功后发送message到mq队列,以广播的形式让订单服务去消费,修改订单状态和添加course_user_learn记录,如果购买过,增加总时长,没有则新增记录,在mq成功执行本地事务并发送事务消息后为支付宝返回success,让它别再通知我们了,整个下单支付流程就结束了

通过RocketMQ的事务消息,我们去保证本地事务与发送消息的原子性,用户点击下单,后端通过订单服务预创建订单,然后将订单数据封装进dto发送事务消息,需要携带的参数有:txProducerGroup【消息的分组】、destination【消息的topic和tag,用:隔开】、message【发送给消费者的消息内容】、arg【扩展参数,将封装好的dto放到这儿传给MQ让MQ去执行本地事务的操作】,然后事务监听器通过注解@RocketMQTransactionListener的txProducerGroup属性去监听某一消息组,监听器类实现RocketMQLocalTransactionListener接口,重写两个方法,分别为执行本地事务操作的方法和事务回调的方法,在执行本地事务的方法中,用try-catch包裹代码,通过强转参数Obejct,调用业务层方法将这个参数携带过去完成添加,返回RocketMQLocalTransactionState.COMMIT,cache代表业务方法执行失败,则返回回滚ROLLBACK,在回调方法中,主要通过传过来的message与订单中一致的订单号去数据库中查询,如果查到这个数据则代表本地事务执行成功,提交,反之回滚,此方法一般不会被调用,除非MQ很久没有响应,本地事务执行成功后MQ会携带message发送消息到MQ队列中,然后支付服务创建一个消费者,通过@RocketMQMessageListener去创建一个消费组、指定监听的topic和tag以及消费模式,一般使用集群,一个只允许被消费一次,消费者去实现RocketMQListener,指定泛型MessageExt,然后在重写的方法里面通过携带过来的message去保存支付订单,需要注意的是,这里要去保证消息的幂等性,所以要先去判断数据库中是否已经创建了此订单,然后再去调用业务方法保存订单,这样就完成了MQ的异步下单

业务代码


/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author yang
 * @since 2022-09-16
 */
@Service
public class MarketOrderServiceImpl extends ServiceImpl<MarketOrderMapper, MarketOrder> implements IMarketOrderService {

    @Autowired
    private RedisTemplate<Object,Object> redisTemplate;

    @Autowired
    private MarketFeign marketFeign;

    @Autowired
    private IMarketOrderItemService marketOrderItemService;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 前端下单,保存订单信息
     */
    @Override
    public JSONResult placeOrder(MarketOrderDto marketOrderDto) {
        Long loginId = 3l;  //登录用户id暂时写死
        String marketIds = StringUtils.join(marketOrderDto.getMarketIds(),",");
        //1.判断防重token是否正确
        String key = loginId + ":" + marketIds + ":" +"token";
        Object tokenRedis = redisTemplate.opsForValue().get(key);
        AssertUtil.isNotNull(tokenRedis,GlobalErrorCode.TOKEN_EMPTY_ERROR);
        AssertUtil.isEquals(marketOrderDto.getToken(),tokenRedis.toString(),GlobalErrorCode.TOKEN_ERROR);
        //调用feign接口,查询当前下单的课程的信息和价格等信息
        JSONResult orderInfo = marketFeign.getOrderInfo(marketIds);
        AssertUtil.isNotNull(orderInfo.getData(),GlobalErrorCode.COURSE_SERVER_ERROR);
        //将返回结果转为原来的对象,这样做是为了传给保存订单和保存详情两个方法,避免查询两次
        MarketOrderVo marketOrderVo = JSON.parseObject(JSONObject.toJSONString(orderInfo.getData()), MarketOrderVo.class);
        //2.校验通过,保存数据到订单主表
        MarketOrder marketOrder = saveOrder(marketOrderDto,marketOrderVo);
        //3.保存数据到订单明细表【保存后要将结果设置到主表的title字段】
        MarketOrder marketOrderAndItem = saveDeatil(marketOrder, marketOrderVo);
        //使用MQ发送事务消息,完成MQ的异步下单(MQ需要保证本地事务执行和发送消息的原子性,所以本地事务的执行应交给MQ来完成)
        Message<MarketOrder2PayOrder> message = MessageBuilder.withPayload(new MarketOrder2PayOrder(
                marketOrder.getTotalAmount(),
                marketOrder.getPayType(),
                marketOrder.getOrderNo(),
                marketOrder.getUserId(),
                marketOrder.getTitle()
        )).build();
        //发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
                "txMarketOrder",  //消息的分组
                "order-topic:order-tag",  //消息的topic和tag
                message,  //消息主体,用于生成支付服务的支付单的必备参数,同时可以通过订单号检查本地事务回调,MQ需要将这个消息发送给pay服务
                marketOrderAndItem  //扩展参数,用于让MQ执行本地事务,将订单详情用list封装进主订单,再将主订单通过arg发送给消息监听器去执行添加订单操作
        );
        //获取本地事务执行状态
        LocalTransactionState localTransactionState = result.getLocalTransactionState();
        //获取消息发送状态
        SendStatus sendStatus = result.getSendStatus();
        Boolean isTrue = localTransactionState == LocalTransactionState.COMMIT_MESSAGE || sendStatus!= SendStatus.SEND_OK;
        AssertUtil.isTrue(isTrue,GlobalErrorCode.CREATE_ORDER_ERROR);
        //消息发送成功则删除redis中的token
        redisTemplate.delete(key);
        //将订单号传给前端
        return JSONResult.success(marketOrder.getOrderNo());
    }

    /**
     * RocketMQ异步下单,执行本地事务,最终的添加订单和详情是由MQ调此方法实现
     * @param order
     */
    @Override
    @Transactional
    public void saveMarketOrderAndItem(MarketOrder order) {
        List<MarketOrderItem> marketOrderItems = order.getMarketOrderItems();  //订单详情
        //保存主订单
        insert(order);
        //循环保存订单详情
        marketOrderItems.forEach(e->{
            e.setOrderId(order.getId());
            marketOrderItemService.insert(e);
        });
    }

    //封装订单明细表的数据
    private MarketOrder saveDeatil(MarketOrder marketOrder,MarketOrderVo marketOrderVo) {
        StringBuffer title = new StringBuffer();
        title.append("购买课程:【");
        //循环,传了多个课程就要保存多条记录
        List<cn.ybl.vo.MarketOrder> marketInfos = marketOrderVo.getMarketInfos();
        for (cn.ybl.vo.MarketOrder order:marketInfos){
            MarketOrderItem marketOrderItem = new MarketOrderItem();
            marketOrderItem.setOrderNo(marketOrder.getOrderNo());  //订单id
            marketOrderItem.setAmount(marketOrder.getTotalAmount());  //价格
            marketOrderItem.setCount(marketOrder.getTotalCount());  //数量
            marketOrderItem.setCreateTime(new Date());
            marketOrderItem.setMarketId(order.getMarket().getId());
            marketOrderItem.setMarketPic(order.getMarket().getPic());
            marketOrderItem.setMarketName(order.getMarket().getName());
            marketOrderItem.setVersion(0);
            //将订单明细数据封装进订单主表的list,因为一个主表可以对应多个订单明细
            marketOrder.getMarketOrderItems().add(marketOrderItem);
            title.append(order.getMarket().getName());
        }
        title.append("\"】,支付【"+marketOrder.getTotalAmount()+"】元");
        marketOrder.setTitle(title.toString());
        return marketOrder;
    }

    //封装订单主表的数据
    private MarketOrder saveOrder(MarketOrderDto marketOrderDto, MarketOrderVo marketOrderVo) {
        //下单课程id【1,2,3......】
        List<Long> marketIds = marketOrderDto.getMarketIds();
        String marketIdsArr = StringUtils.join(marketIds, ",");
        //支付方式
        Integer payType = marketOrderDto.getPayType();
        //普通订单
        Integer type = marketOrderDto.getType();
        MarketOrder marketOrder = new MarketOrder();
        marketOrder.setCreateTime(new Date()); // 订单创建时间
        marketOrder.setOrderNo(CodeGenerateUtils.generateOrderSn(3));  //根据用户id生成订单编号
        marketOrder.setTotalAmount(marketOrderVo.getTotalAmount()); //支付总金额
        marketOrder.setTotalCount(marketIdsArr.length());  //下单数量
        marketOrder.setStatusOrder(0);  //订单状态,下单成功待支付
        marketOrder.setUserId(3l);  //用户id
        marketOrder.getTitle();  //标题【需要先保存订单详情后将订单详情所有数据拼接起来】
        marketOrder.setVersion(0);
        marketOrder.setPayType(payType);
        return marketOrder;
    }
}

参考

RocketMQ异步下单+对接支付宝接口完成支付+支付宝通知处理