增加 RocketMQ 组件
支付成功后,回调通知业务线订单支付成功的逻辑,简单完成。后续,需要封装下,对不同业务线的回调。以及,http 回调的实现。
This commit is contained in:
parent
9d7af382ab
commit
eb760ab312
@ -60,6 +60,10 @@
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.56</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
@ -1,10 +1,39 @@
|
||||
package cn.iocoder.common.framework.util;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
public class DateUtil {
|
||||
|
||||
/**
|
||||
* 计算当期时间相差的日期
|
||||
*
|
||||
* @param field 日历字段.<br/>eg:Calendar.MONTH,Calendar.DAY_OF_MONTH,<br/>Calendar.HOUR_OF_DAY等.
|
||||
* @param amount 相差的数值
|
||||
* @return 计算后的日志
|
||||
*/
|
||||
public static Date addDate(int field, int amount) {
|
||||
return addDate(null, field, amount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算当期时间相差的日期
|
||||
*
|
||||
* @param date 设置时间
|
||||
* @param field 日历字段.<br/>eg:Calendar.MONTH,Calendar.DAY_OF_MONTH,<br/>Calendar.HOUR_OF_DAY等.
|
||||
* @param amount 相差的数值
|
||||
* @return 计算后的日志
|
||||
*/
|
||||
public static Date addDate(Date date, int field, int amount) {
|
||||
Calendar c = Calendar.getInstance();
|
||||
if (date != null) {
|
||||
c.setTime(date);
|
||||
}
|
||||
c.add(field, amount);
|
||||
return c.getTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param date 时间。若为空,则返回空串
|
||||
* @param pattern 时间格式化
|
||||
|
@ -1,6 +1,7 @@
|
||||
package cn.iocoder.common.framework.util;
|
||||
|
||||
import cn.iocoder.common.framework.exception.ServiceException;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
import javax.validation.ConstraintViolationException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
@ -44,4 +45,8 @@ public class ExceptionUtil {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static String getRootCauseMessage(Throwable th) {
|
||||
return ExceptionUtils.getRootCauseMessage(th);
|
||||
}
|
||||
|
||||
}
|
@ -9,6 +9,7 @@ import com.alibaba.dubbo.config.annotation.Reference;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@ -25,7 +26,8 @@ public class PayDemoController {
|
||||
private PayTransactionService payTransactionService;
|
||||
|
||||
@PostMapping("/create_order")
|
||||
public void createOrder(HttpServletRequest request) {
|
||||
public void createOrder(HttpServletRequest request,
|
||||
@RequestParam("orderId") String orderId) {
|
||||
// 创建业务订单
|
||||
// ...
|
||||
|
||||
@ -33,7 +35,7 @@ public class PayDemoController {
|
||||
PayTransactionCreateDTO payTransactionCreateDTO = new PayTransactionCreateDTO()
|
||||
.setAppId("POd4RC6a")
|
||||
.setCreateIp(HttpUtil.getIp(request))
|
||||
.setOrderId("1")
|
||||
.setOrderId(orderId)
|
||||
.setOrderSubject("商品名" )
|
||||
.setOrderDescription("商品描述")
|
||||
.setOrderMemo("商品备注")
|
||||
|
@ -52,8 +52,8 @@ public class PayTransactionController {
|
||||
// JSONObject bodyObj = JSON.parseObject(sb.toString());
|
||||
// bodyObj.put("webhookId", bodyObj.remove("id"));
|
||||
// String body = bodyObj.toString();
|
||||
payService.updateTransactionPaySuccess(PayChannelEnum.PINGXX.getId(), sb.toString());
|
||||
return "";
|
||||
CommonResult<Boolean> result = payService.updateTransactionPaySuccess(PayChannelEnum.PINGXX.getId(), sb.toString());
|
||||
return result.isSuccess() ? "success" : "failure";
|
||||
}
|
||||
|
||||
}
|
@ -73,6 +73,12 @@
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,13 @@
|
||||
package cn.iocoder.mall.pay.biz.constant;
|
||||
|
||||
/**
|
||||
* MQ 枚举类
|
||||
*/
|
||||
public class MQConstant {
|
||||
|
||||
/**
|
||||
* Topic - 支付交易单支付成功
|
||||
*/
|
||||
public static final String TOPIC_PAY_TRANSACTION_PAY_SUCCESS = "PAY_TRANSACTION_PAY_SUCCESS";
|
||||
|
||||
}
|
@ -5,6 +5,8 @@ import cn.iocoder.mall.pay.api.dto.PayTransactionCreateDTO;
|
||||
import cn.iocoder.mall.pay.api.dto.PayTransactionSubmitDTO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionExtensionDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
|
||||
import cn.iocoder.mall.pay.biz.mq.PayTransactionPaySuccessMessage;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mappings;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
@ -23,4 +25,7 @@ public interface PayTransactionConvert {
|
||||
@Mappings({})
|
||||
PayTransactionExtensionDO convert(PayTransactionSubmitDTO payTransactionSubmitDTO);
|
||||
|
||||
@Mappings({})
|
||||
PayTransactionPaySuccessMessage convert(PayTransactionNotifyTaskDO payTransactionNotifyTaskDO);
|
||||
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package cn.iocoder.mall.pay.biz.dao;
|
||||
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyLogDO;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface PayTransactionNotifyLogMapper {
|
||||
|
||||
void insert(PayTransactionNotifyLogDO entity);
|
||||
|
||||
}
|
@ -3,6 +3,8 @@ package cn.iocoder.mall.pay.biz.dao;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public interface PayTransactionNotifyTaskMapper {
|
||||
|
||||
@ -10,4 +12,15 @@ public interface PayTransactionNotifyTaskMapper {
|
||||
|
||||
int update(PayTransactionNotifyTaskDO entity);
|
||||
|
||||
/**
|
||||
* 获得需要通知的 PayTransactionNotifyTaskDO 记录。需要满足如下条件:
|
||||
*
|
||||
* 1. status 非成功
|
||||
* 2. nextNotifyTime 小于当前时间
|
||||
* 3. lastExecuteTime > nextNotifyTime
|
||||
*
|
||||
* @return PayTransactionNotifyTaskDO 数组
|
||||
*/
|
||||
List<PayTransactionNotifyTaskDO> selectByNotify();
|
||||
|
||||
}
|
@ -13,6 +13,10 @@ public class PayTransactionNotifyLogDO extends BaseDO {
|
||||
* 日志编号,自增
|
||||
*/
|
||||
private Integer id;
|
||||
/**
|
||||
* 通知编号
|
||||
*/
|
||||
private Integer notifyId;
|
||||
/**
|
||||
* 请求参数
|
||||
*/
|
||||
@ -64,4 +68,13 @@ public class PayTransactionNotifyLogDO extends BaseDO {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getNotifyId() {
|
||||
return notifyId;
|
||||
}
|
||||
|
||||
public PayTransactionNotifyLogDO setNotifyId(Integer notifyId) {
|
||||
this.notifyId = notifyId;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package cn.iocoder.mall.pay.biz.dataobject;
|
||||
|
||||
import cn.iocoder.common.framework.dataobject.BaseDO;
|
||||
import cn.iocoder.mall.pay.biz.service.PayServiceImpl;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@ -9,6 +10,16 @@ import java.util.Date;
|
||||
*/
|
||||
public class PayTransactionNotifyTaskDO extends BaseDO {
|
||||
|
||||
/**
|
||||
* 通知频率,单位为秒。
|
||||
*
|
||||
* 算上首次的通知,实际是一共 1 + 8 = 9 次。
|
||||
*/
|
||||
public static final Integer[] NOTIFY_FREQUENCY = new Integer[]{
|
||||
15, 15, 30, 180,
|
||||
1800, 1800, 1800, 3600
|
||||
};
|
||||
|
||||
/**
|
||||
* 编号,自增
|
||||
*/
|
||||
@ -40,9 +51,26 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
|
||||
*/
|
||||
private Integer status;
|
||||
/**
|
||||
* 最后一次通知时间
|
||||
* 下一次通知时间
|
||||
*/
|
||||
private Date lastNotifyTime;
|
||||
private Date nextNotifyTime;
|
||||
/**
|
||||
* 最后一次执行时间
|
||||
*
|
||||
* 这个字段,需要结合 {@link #nextNotifyTime} 一起使用。
|
||||
*
|
||||
* 1. 初始时,{@link PayServiceImpl#updateTransactionPaySuccess(Integer, String)}
|
||||
* nextNotifyTime 为当前时间 + 15 秒
|
||||
* lastExecuteTime 为空
|
||||
* 并发送给 MQ ,执行执行
|
||||
*
|
||||
* 2. MQ 消费时,更新 lastExecuteTime 为当时时间
|
||||
*
|
||||
* 3. 定时任务,扫描 nextNotifyTime < lastExecuteTime 的任务
|
||||
* nextNotifyTime 为当前时间 + N 秒。具体的 N ,由第几次通知决定
|
||||
* lastExecuteTime 为当前时间
|
||||
*/
|
||||
private Date lastExecuteTime;
|
||||
/**
|
||||
* 当前通知次数
|
||||
*/
|
||||
@ -92,12 +120,12 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Date getLastNotifyTime() {
|
||||
return lastNotifyTime;
|
||||
public Date getNextNotifyTime() {
|
||||
return nextNotifyTime;
|
||||
}
|
||||
|
||||
public PayTransactionNotifyTaskDO setLastNotifyTime(Date lastNotifyTime) {
|
||||
this.lastNotifyTime = lastNotifyTime;
|
||||
public PayTransactionNotifyTaskDO setNextNotifyTime(Date nextNotifyTime) {
|
||||
this.nextNotifyTime = nextNotifyTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -145,4 +173,14 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
|
||||
this.notifyUrl = notifyUrl;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Date getLastExecuteTime() {
|
||||
return lastExecuteTime;
|
||||
}
|
||||
|
||||
public PayTransactionNotifyTaskDO setLastExecuteTime(Date lastExecuteTime) {
|
||||
this.lastExecuteTime = lastExecuteTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
package cn.iocoder.mall.pay.biz.mq;
|
||||
|
||||
import cn.iocoder.common.framework.util.DateUtil;
|
||||
import cn.iocoder.common.framework.util.ExceptionUtil;
|
||||
import cn.iocoder.mall.pay.api.constant.PayTransactionNotifyStatusEnum;
|
||||
import cn.iocoder.mall.pay.biz.constant.MQConstant;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionMapper;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyLogMapper;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyTaskMapper;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyLogDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
|
||||
import com.alibaba.dubbo.config.ApplicationConfig;
|
||||
import com.alibaba.dubbo.config.ReferenceConfig;
|
||||
import com.alibaba.dubbo.config.RegistryConfig;
|
||||
import com.alibaba.dubbo.rpc.service.GenericService;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
@Service
|
||||
@RocketMQMessageListener(
|
||||
topic = MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
|
||||
consumerGroup = "pay-consumer-group-" + MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS
|
||||
)
|
||||
public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTransactionPaySuccessMessage> {
|
||||
|
||||
@Autowired
|
||||
private PayTransactionNotifyTaskMapper payTransactionNotifyTaskMapper;
|
||||
@Autowired
|
||||
private PayTransactionNotifyLogMapper payTransactionNotifyLogMapper;
|
||||
@Autowired
|
||||
private PayTransactionMapper payTransactionMapper;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void onMessage(PayTransactionPaySuccessMessage message) {
|
||||
// TODO 先简单写,后面重构
|
||||
|
||||
ApplicationConfig application = new ApplicationConfig();
|
||||
application.setName("api-generic-consumer");
|
||||
|
||||
RegistryConfig registry = new RegistryConfig();
|
||||
registry.setAddress("zookeeper://127.0.0.1:2181");
|
||||
|
||||
application.setRegistry(registry);
|
||||
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
|
||||
// 弱类型接口名
|
||||
reference.setInterface("cn.iocoder.mall.pay.api.PayDemoService");
|
||||
// 声明为泛化接口
|
||||
reference.setGeneric(true);
|
||||
|
||||
reference.setApplication(application);
|
||||
|
||||
// 用com.alibaba.dubbo.rpc.service.GenericService可以替代所有接口引用
|
||||
GenericService genericService = reference.get(); // TODO 芋艿,要缓存,不然重复引用
|
||||
|
||||
String response = null; // RPC / HTTP 调用的响应
|
||||
PayTransactionNotifyTaskDO updateTask = new PayTransactionNotifyTaskDO() // 更新 PayTransactionNotifyTaskDO 对象
|
||||
.setId(message.getId())
|
||||
.setLastExecuteTime(new Date())
|
||||
.setNotifyTimes(message.getNotifyTimes() + 1);
|
||||
try {
|
||||
response = (String) genericService.$invoke("updatePaySuccess", new String[]{String.class.getName()}, new Object[]{message.getOrderId()});
|
||||
if ("success".equals(response)) { // 情况一,请求成功且返回成功
|
||||
// 更新通知成功
|
||||
updateTask.setStatus(PayTransactionNotifyStatusEnum.SUCCESS.getValue());
|
||||
payTransactionNotifyTaskMapper.update(updateTask);
|
||||
// 需要更新支付交易单通知应用成功
|
||||
PayTransactionDO updateTransaction = new PayTransactionDO().setId(message.getTransactionId())
|
||||
.setFinishTime(new Date());
|
||||
payTransactionMapper.update(updateTransaction, null);
|
||||
} else { // 情况二,请求成功且返回失败
|
||||
// 更新通知请求成功,但是结果失败
|
||||
handleFailure(updateTask, PayTransactionNotifyStatusEnum.REQUEST_SUCCESS.getValue());
|
||||
payTransactionNotifyTaskMapper.update(updateTask);
|
||||
}
|
||||
} catch (Throwable e) { // 请求失败
|
||||
// 更新通知请求失败
|
||||
response = ExceptionUtil.getRootCauseMessage(e);
|
||||
handleFailure(updateTask, PayTransactionNotifyStatusEnum.REQUEST_FAILURE.getValue());
|
||||
payTransactionNotifyTaskMapper.update(updateTask);
|
||||
// 抛出异常,回滚事务
|
||||
throw e;
|
||||
} finally {
|
||||
// 插入 PayTransactionNotifyLogDO 日志
|
||||
PayTransactionNotifyLogDO notifyLog = new PayTransactionNotifyLogDO().setNotifyId(message.getId())
|
||||
.setRequest(message.getOrderId()).setResponse(response).setStatus(updateTask.getStatus());
|
||||
payTransactionNotifyLogMapper.insert(notifyLog);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFailure(PayTransactionNotifyTaskDO updateTask, Integer defaultStatus) {
|
||||
if (updateTask.getNotifyTimes() >= PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY.length) {
|
||||
updateTask.setStatus(PayTransactionNotifyStatusEnum.FAILURE.getValue());
|
||||
} else {
|
||||
updateTask.setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY[updateTask.getNotifyTimes()]));
|
||||
updateTask.setStatus(defaultStatus);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
package cn.iocoder.mall.pay.biz.mq;
|
||||
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
|
||||
|
||||
/**
|
||||
* {@link cn.iocoder.mall.pay.biz.constant.MQConstant#TOPIC_PAY_TRANSACTION_PAY_SUCCESS} 的消息对象
|
||||
*/
|
||||
public class PayTransactionPaySuccessMessage {
|
||||
|
||||
/**
|
||||
* 编号,自增
|
||||
*/
|
||||
private Integer id;
|
||||
/**
|
||||
* 交易编号
|
||||
*
|
||||
* {@link PayTransactionDO#getId()}
|
||||
*/
|
||||
private Integer transactionId;
|
||||
/**
|
||||
* 应用编号
|
||||
*/
|
||||
private String appId;
|
||||
/**
|
||||
* 应用订单编号
|
||||
*/
|
||||
private String orderId;
|
||||
/**
|
||||
* 当前通知次数
|
||||
*/
|
||||
private Integer notifyTimes;
|
||||
/**
|
||||
* 通知地址
|
||||
*/
|
||||
private String notifyUrl;
|
||||
|
||||
public Integer getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setId(Integer id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getOrderId() {
|
||||
return orderId;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setOrderId(String orderId) {
|
||||
this.orderId = orderId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getNotifyTimes() {
|
||||
return notifyTimes;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setNotifyTimes(Integer notifyTimes) {
|
||||
this.notifyTimes = notifyTimes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getNotifyUrl() {
|
||||
return notifyUrl;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setNotifyUrl(String notifyUrl) {
|
||||
this.notifyUrl = notifyUrl;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getTransactionId() {
|
||||
return transactionId;
|
||||
}
|
||||
|
||||
public PayTransactionPaySuccessMessage setTransactionId(Integer transactionId) {
|
||||
this.transactionId = transactionId;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
package cn.iocoder.mall.pay.biz.mq;
|
@ -1,25 +0,0 @@
|
||||
package cn.iocoder.mall.pay.biz.scheduler;
|
||||
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.JobHandler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* TODO
|
||||
*/
|
||||
@Component
|
||||
@JobHandler(value = "payNotifyAppJob")
|
||||
public class PayNotifyAppJob extends IJobHandler {
|
||||
|
||||
@Override
|
||||
public ReturnT<String> execute(String param) throws Exception {
|
||||
System.out.println("1");
|
||||
return null;
|
||||
}
|
||||
|
||||
// TODO 需要考虑下是基于 MQ 还是 Job
|
||||
// TODO 通知频率
|
||||
// TODO rpc 泛化回调
|
||||
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package cn.iocoder.mall.pay.biz.scheduler;
|
||||
|
||||
import cn.iocoder.mall.pay.biz.constant.MQConstant;
|
||||
import cn.iocoder.mall.pay.biz.convert.PayTransactionConvert;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyTaskMapper;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.JobHandler;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 支付交易成功通知 Job
|
||||
*/
|
||||
@Component
|
||||
@JobHandler(value = "payTransactionNotifyJob")
|
||||
public class PayTransactionNotifyJob extends IJobHandler {
|
||||
|
||||
@Autowired
|
||||
private PayTransactionNotifyTaskMapper payTransactionNotifyTaskMapper;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Override
|
||||
public ReturnT<String> execute(String param) {
|
||||
// 获得需要通知的任务
|
||||
List<PayTransactionNotifyTaskDO> notifyTasks = payTransactionNotifyTaskMapper.selectByNotify();
|
||||
// 循环任务,发送通知
|
||||
for (PayTransactionNotifyTaskDO payTransactionNotifyTask : notifyTasks) {
|
||||
// 发送 MQ
|
||||
rocketMQTemplate.convertAndSend(MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
|
||||
PayTransactionConvert.INSTANCE.convert(payTransactionNotifyTask));
|
||||
// 更新最后通知时间
|
||||
// 1. 这样操作,虽然可能会出现 MQ 消费快于下面 PayTransactionNotifyTaskDO 的更新语句。但是,因为更新字段不同,所以不会有问题。
|
||||
// 2. 换个视角,如果先更新 PayTransactionNotifyTaskDO ,再发送 MQ 消息。如果 MQ 消息发送失败,则 PayTransactionNotifyTaskDO 再也不会被轮询到了。
|
||||
// 3. 当然,最最最完美的话,就是做事务消息,不过这样又过于复杂~
|
||||
PayTransactionNotifyTaskDO updateNotifyTask = new PayTransactionNotifyTaskDO()
|
||||
.setId(payTransactionNotifyTask.getId()).setLastExecuteTime(new Date());
|
||||
payTransactionNotifyTaskMapper.update(updateNotifyTask);
|
||||
}
|
||||
return new ReturnT<>("执行通知数:" + notifyTasks.size());
|
||||
}
|
||||
|
||||
}
|
@ -9,7 +9,8 @@ public class PayDemoServiceImpl implements PayDemoService {
|
||||
|
||||
@Override
|
||||
public String updatePaySuccess(String orderId) {
|
||||
return "你好呀";
|
||||
// return "你好呀";
|
||||
return "success";
|
||||
}
|
||||
|
||||
}
|
@ -15,6 +15,7 @@ import cn.iocoder.mall.pay.api.dto.PayTransactionSubmitDTO;
|
||||
import cn.iocoder.mall.pay.biz.client.AbstractPaySDK;
|
||||
import cn.iocoder.mall.pay.biz.client.PaySDKFactory;
|
||||
import cn.iocoder.mall.pay.biz.client.TransactionPaySuccessBO;
|
||||
import cn.iocoder.mall.pay.biz.constant.MQConstant;
|
||||
import cn.iocoder.mall.pay.biz.convert.PayTransactionConvert;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionExtensionMapper;
|
||||
import cn.iocoder.mall.pay.biz.dao.PayTransactionMapper;
|
||||
@ -23,12 +24,15 @@ import cn.iocoder.mall.pay.biz.dataobject.PayAppDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionExtensionDO;
|
||||
import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
@Service
|
||||
@ -46,6 +50,9 @@ public class PayServiceImpl implements PayTransactionService {
|
||||
@Autowired
|
||||
private PayAppServiceImpl payAppService;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("Duplicates")
|
||||
public CommonResult<PayTransactionBO> createTransaction(PayTransactionCreateDTO payTransactionCreateDTO) {
|
||||
@ -158,14 +165,18 @@ public class PayServiceImpl implements PayTransactionService {
|
||||
if (updateCounts == 0) { // 校验状态,必须是待支付 TODO 这种类型,需要思考下。需要返回错误,但是又要保证事务回滚
|
||||
throw ServiceExceptionUtil.exception(PayErrorCodeEnum.PAY_TRANSACTION_STATUS_IS_NOT_WAITING.getCode());
|
||||
}
|
||||
// 3. 插入
|
||||
// 3.1 插入
|
||||
PayTransactionNotifyTaskDO payTransactionNotifyTask = new PayTransactionNotifyTaskDO()
|
||||
.setTransactionId(payTransactionExtension.getTransactionId()).setTransactionExtensionId(payTransactionExtension.getId())
|
||||
.setAppId(payTransactionDO.getAppId()).setOrderId(payTransactionDO.getOrderId())
|
||||
.setStatus(PayTransactionNotifyStatusEnum.WAITING.getValue())
|
||||
.setNotifyTimes(0).setMaxNotifyTimes(5)
|
||||
.setNotifyTimes(0).setMaxNotifyTimes(PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY.length + 1)
|
||||
.setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY[0]))
|
||||
.setNotifyUrl(payTransactionDO.getNotifyUrl());
|
||||
payTransactionNotifyTaskMapper.insert(payTransactionNotifyTask);
|
||||
// 3.2 发送 MQ
|
||||
rocketMQTemplate.convertAndSend(MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
|
||||
PayTransactionConvert.INSTANCE.convert(payTransactionNotifyTask));
|
||||
// 返回结果
|
||||
return CommonResult.success(true);
|
||||
}
|
||||
|
@ -25,7 +25,6 @@ dubbo:
|
||||
base-packages: cn.iocoder.mall.pay.biz.service
|
||||
|
||||
# xxl-job
|
||||
|
||||
xxl:
|
||||
job:
|
||||
admin:
|
||||
@ -36,4 +35,10 @@ xxl:
|
||||
port: 0
|
||||
logpath: /Users/yunai/logs/xxl-job/
|
||||
logretentiondays: 1
|
||||
accessToken:
|
||||
accessToken:
|
||||
|
||||
# rocketmq
|
||||
rocketmq:
|
||||
name-server: 127.0.0.1:9876
|
||||
producer:
|
||||
group: pay-producer-group
|
||||
|
@ -38,6 +38,9 @@
|
||||
<if test="entity.paymentTime != null">
|
||||
, payment_time = #{entity.paymentTime}
|
||||
</if>
|
||||
<if test="entity.finishTime != null">
|
||||
, finish_time = #{entity.finishTime}
|
||||
</if>
|
||||
<if test="entity.notifyTime != null">
|
||||
, notify_time = #{entity.notifyTime}
|
||||
</if>
|
||||
|
@ -0,0 +1,46 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyLogMapper">
|
||||
|
||||
<!--<sql id="FIELDS">-->
|
||||
<!--id, transaction_id, transaction_extension_id, app_id, order_id,-->
|
||||
<!--status, next_notify_time, last_execute_time, notify_times, max_notify_times,-->
|
||||
<!--create_time-->
|
||||
<!--</sql>-->
|
||||
|
||||
<insert id="insert" parameterType="PayTransactionNotifyLogDO" useGeneratedKeys="true" keyColumn="id" keyProperty="id">
|
||||
INSERT INTO transaction_notify_log (
|
||||
notify_id, request, response, status
|
||||
) VALUES (
|
||||
#{notifyId}, #{request}, #{response}, #{status}
|
||||
)
|
||||
</insert>
|
||||
|
||||
<!--<update id="update" parameterType="PayTransactionNotifyTaskDO">-->
|
||||
<!--UPDATE transaction_notify_task-->
|
||||
<!--<set>-->
|
||||
<!--<if test="status != null">-->
|
||||
<!--, status = #{status}-->
|
||||
<!--</if>-->
|
||||
<!--<if test="nextNotifyTime != null">-->
|
||||
<!--, last_notify_time = #{nextNotifyTime}-->
|
||||
<!--</if>-->
|
||||
<!--<if test="lastExecuteTime != null">-->
|
||||
<!--, last_execute_time = #{lastExecuteTime}-->
|
||||
<!--</if>-->
|
||||
<!--<if test="notifyTimes != null">-->
|
||||
<!--, notify_times = #{notifyTimes}-->
|
||||
<!--</if>-->
|
||||
<!--</set>-->
|
||||
<!--WHERE id = #{id}-->
|
||||
<!--</update>-->
|
||||
|
||||
<!--<select id="selectByTransactionCode" parameterType="String" resultType="PayTransactionExtensionDO">-->
|
||||
<!--SELECT-->
|
||||
<!--<include refid="FIELDS"/>-->
|
||||
<!--FROM transaction_extension-->
|
||||
<!--WHERE transaction_code = #{transactionCode}-->
|
||||
<!--LIMIT 1-->
|
||||
<!--</select>-->
|
||||
|
||||
</mapper>
|
@ -4,16 +4,17 @@
|
||||
|
||||
<sql id="FIELDS">
|
||||
id, transaction_id, transaction_extension_id, app_id, order_id,
|
||||
status, last_Notify_time, notify_times, max_notify_times, create_time
|
||||
status, next_notify_time, last_execute_time, notify_times, max_notify_times,
|
||||
create_time
|
||||
</sql>
|
||||
|
||||
<insert id="insert" parameterType="PayTransactionNotifyTaskDO" useGeneratedKeys="true" keyColumn="id" keyProperty="id">
|
||||
INSERT INTO transaction_notify_task (
|
||||
transaction_id, transaction_extension_id, app_id, order_id,
|
||||
status, last_notify_time, notify_times, max_notify_times
|
||||
status, next_notify_time, notify_times, max_notify_times
|
||||
) VALUES (
|
||||
#{transactionId}, #{transactionExtensionId}, #{appId}, #{orderId},
|
||||
#{status}, #{lastNotifyTime}, #{notifyTimes}, #{maxNotifyTimes}
|
||||
#{status}, #{nextNotifyTime}, #{notifyTimes}, #{maxNotifyTimes}
|
||||
)
|
||||
</insert>
|
||||
|
||||
@ -23,8 +24,11 @@
|
||||
<if test="status != null">
|
||||
, status = #{status}
|
||||
</if>
|
||||
<if test="lastNotifyTime != null">
|
||||
, last_notify_time = #{lastNotifyTime}
|
||||
<if test="nextNotifyTime != null">
|
||||
, next_notify_time = #{nextNotifyTime}
|
||||
</if>
|
||||
<if test="lastExecuteTime != null">
|
||||
, last_execute_time = #{lastExecuteTime}
|
||||
</if>
|
||||
<if test="notifyTimes != null">
|
||||
, notify_times = #{notifyTimes}
|
||||
@ -33,12 +37,13 @@
|
||||
WHERE id = #{id}
|
||||
</update>
|
||||
|
||||
<!--<select id="selectByTransactionCode" parameterType="String" resultType="PayTransactionExtensionDO">-->
|
||||
<!--SELECT-->
|
||||
<!--<include refid="FIELDS"/>-->
|
||||
<!--FROM transaction_extension-->
|
||||
<!--WHERE transaction_code = #{transactionCode}-->
|
||||
<!--LIMIT 1-->
|
||||
<!--</select>-->
|
||||
<select id="selectByNotify" resultType="PayTransactionNotifyTaskDO">
|
||||
SELECT
|
||||
<include refid="FIELDS"/>
|
||||
FROM transaction_notify_task
|
||||
WHERE status IN (1, 3, 4, 5)
|
||||
AND next_notify_time <![CDATA[ <= ]]> NOW()
|
||||
AND last_execute_time > next_notify_time
|
||||
</select>
|
||||
|
||||
</mapper>
|
Loading…
Reference in New Issue
Block a user