|
|
|
|
@ -14,11 +14,17 @@ import com.alibaba.dubbo.config.ApplicationConfig;
|
|
|
|
|
import com.alibaba.dubbo.config.ReferenceConfig;
|
|
|
|
|
import com.alibaba.dubbo.config.RegistryConfig;
|
|
|
|
|
import com.alibaba.dubbo.rpc.service.GenericService;
|
|
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
|
|
|
import com.google.common.cache.CacheLoader;
|
|
|
|
|
import com.google.common.cache.LoadingCache;
|
|
|
|
|
import lombok.Data;
|
|
|
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
|
|
|
|
import java.util.Calendar;
|
|
|
|
|
import java.util.Date;
|
|
|
|
|
@ -30,6 +36,26 @@ import java.util.Date;
|
|
|
|
|
)
|
|
|
|
|
public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTransactionPaySuccessMessage> {
|
|
|
|
|
|
|
|
|
|
@Data
|
|
|
|
|
private class ReferenceMeta {
|
|
|
|
|
|
|
|
|
|
private final ReferenceConfig config; // TODO 芋艿,后续需要做销毁
|
|
|
|
|
private final GenericService service;
|
|
|
|
|
private final String methodName;
|
|
|
|
|
|
|
|
|
|
private ReferenceMeta(ReferenceConfig config, GenericService service, String methodName) {
|
|
|
|
|
this.config = config;
|
|
|
|
|
this.service = service;
|
|
|
|
|
this.methodName = methodName;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Value("${dubbo.registry.address}")
|
|
|
|
|
private String dubboRegistryAddress;
|
|
|
|
|
@Value("${dubbo.application.name}")
|
|
|
|
|
private String dubboApplicationName;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private PayTransactionNotifyTaskMapper payTransactionNotifyTaskMapper;
|
|
|
|
|
@Autowired
|
|
|
|
|
@ -37,37 +63,55 @@ public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTra
|
|
|
|
|
@Autowired
|
|
|
|
|
private PayTransactionMapper payTransactionMapper;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
@Transactional
|
|
|
|
|
public void onMessage(PayTransactionPaySuccessMessage message) {
|
|
|
|
|
// TODO 先简单写,后面重构
|
|
|
|
|
private LoadingCache<String, ReferenceMeta> referenceMetaCache = CacheBuilder.newBuilder()
|
|
|
|
|
.build(new CacheLoader<String, ReferenceMeta>() {
|
|
|
|
|
@Override
|
|
|
|
|
public ReferenceMeta load(String notifyUrl) {
|
|
|
|
|
return createGenericService(notifyUrl);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
private ReferenceMeta createGenericService(String notifyUrl) {
|
|
|
|
|
String[] notifyUrlParts = notifyUrl.split("#");
|
|
|
|
|
// 创建 ApplicationConfig 对象
|
|
|
|
|
ApplicationConfig application = new ApplicationConfig();
|
|
|
|
|
application.setName("api-generic-consumer");
|
|
|
|
|
|
|
|
|
|
application.setName(dubboApplicationName);
|
|
|
|
|
// 创建 RegistryConfig 对象
|
|
|
|
|
RegistryConfig registry = new RegistryConfig();
|
|
|
|
|
registry.setAddress("zookeeper://127.0.0.1:2181");
|
|
|
|
|
|
|
|
|
|
// registry.setAddress("zookeeper://127.0.0.1:2181");
|
|
|
|
|
registry.setAddress(dubboRegistryAddress);
|
|
|
|
|
application.setRegistry(registry);
|
|
|
|
|
|
|
|
|
|
// 创建 ReferenceConfig 对象
|
|
|
|
|
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
|
|
|
|
|
// 弱类型接口名
|
|
|
|
|
reference.setInterface("cn.iocoder.mall.pay.api.PayDemoService");
|
|
|
|
|
// 声明为泛化接口
|
|
|
|
|
reference.setGeneric(true);
|
|
|
|
|
|
|
|
|
|
reference.setInterface(notifyUrlParts[0]); // 弱类型接口名
|
|
|
|
|
reference.setGeneric(true); // 声明为泛化接口
|
|
|
|
|
reference.setApplication(application);
|
|
|
|
|
// 获得 GenericService 对象
|
|
|
|
|
GenericService genericService = reference.get();
|
|
|
|
|
// 构建最终的 ReferenceMeta 对象
|
|
|
|
|
return new ReferenceMeta(reference, genericService, notifyUrlParts[1]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 用com.alibaba.dubbo.rpc.service.GenericService可以替代所有接口引用
|
|
|
|
|
GenericService genericService = reference.get(); // TODO 芋艿,要缓存,不然重复引用
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
@Transactional
|
|
|
|
|
public void onMessage(PayTransactionPaySuccessMessage message) {
|
|
|
|
|
// 获得 ReferenceMeta 对象
|
|
|
|
|
ReferenceMeta referenceMeta = referenceMetaCache.getUnchecked(message.getNotifyUrl());
|
|
|
|
|
Assert.notNull(referenceMeta, String.format("notifyUrl(%s) 不存在对应的 ReferenceMeta 对象", message.getNotifyUrl()));
|
|
|
|
|
GenericService genericService = referenceMeta.getService();
|
|
|
|
|
String methodName = referenceMeta.getMethodName();
|
|
|
|
|
// 查询支付交易
|
|
|
|
|
PayTransactionDO transaction = payTransactionMapper.selectById(message.getTransactionId());
|
|
|
|
|
Assert.notNull(transaction, String.format("回调消息(%s) 订单交易不能为空", message.toString()));
|
|
|
|
|
// 发起调用
|
|
|
|
|
String response = null; // RPC / HTTP 调用的响应
|
|
|
|
|
PayTransactionNotifyTaskDO updateTask = new PayTransactionNotifyTaskDO() // 更新 PayTransactionNotifyTaskDO 对象
|
|
|
|
|
.setId(message.getId())
|
|
|
|
|
.setLastExecuteTime(new Date())
|
|
|
|
|
.setNotifyTimes(message.getNotifyTimes() + 1);
|
|
|
|
|
try {
|
|
|
|
|
response = (String) genericService.$invoke("updatePaySuccess", new String[]{String.class.getName()}, new Object[]{message.getOrderId()});
|
|
|
|
|
response = (String) genericService.$invoke(methodName, new String[]{String.class.getName(), Integer.class.getName()},
|
|
|
|
|
new Object[]{message.getOrderId(), transaction.getPrice()});
|
|
|
|
|
if ("success".equals(response)) { // 情况一,请求成功且返回成功
|
|
|
|
|
// 更新通知成功
|
|
|
|
|
updateTask.setStatus(PayTransactionNotifyStatusEnum.SUCCESS.getValue());
|
|
|
|
|
@ -87,7 +131,7 @@ public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTra
|
|
|
|
|
handleFailure(updateTask, PayTransactionNotifyStatusEnum.REQUEST_FAILURE.getValue());
|
|
|
|
|
payTransactionNotifyTaskMapper.update(updateTask);
|
|
|
|
|
// 抛出异常,回滚事务
|
|
|
|
|
throw e;
|
|
|
|
|
throw e; // TODO 芋艿,此处不能抛出异常。因为,会导致 MQ + 定时任务多重试。此处的目标是,事务回滚 + 吃掉事务。另外,最后的 finally 的日志,要插入成功。
|
|
|
|
|
} finally {
|
|
|
|
|
// 插入 PayTransactionNotifyLogDO 日志
|
|
|
|
|
PayTransactionNotifyLogDO notifyLog = new PayTransactionNotifyLogDO().setNotifyId(message.getId())
|
|
|
|
|
@ -105,4 +149,4 @@ public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|