diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml
index c6cd6e496..09aac1ba8 100644
--- a/yudao-dependencies/pom.xml
+++ b/yudao-dependencies/pom.xml
@@ -29,6 +29,8 @@
3.5.2
3.5.0
3.17.3
+
+ 2.7.15
1.9.2
@@ -233,6 +235,11 @@
+
+ org.apache.dubbo
+ dubbo-common
+ ${dubbo.version}
+
cn.iocoder.cloud
yudao-spring-boot-starter-rpc
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java
index c1c969439..5fbacd93c 100644
--- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java
@@ -8,7 +8,8 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect;
import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJobHandlerDecorator;
-import cn.iocoder.yudao.framework.tenant.core.mq.TenantRedisMessageInterceptor;
+import cn.iocoder.yudao.framework.tenant.core.mq.TenantChannelInterceptor;
+import cn.iocoder.yudao.framework.tenant.core.mq.TenantFunctionAroundWrapper;
import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter;
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkServiceImpl;
@@ -23,8 +24,10 @@ import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
+import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.config.GlobalChannelInterceptor;
@Configuration
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户
@@ -82,14 +85,19 @@ public class YudaoTenantAutoConfiguration {
// ========== MQ ==========
@Bean
- public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() {
- return new TenantRedisMessageInterceptor();
+ @GlobalChannelInterceptor // 必须添加在方法上,否则无法生效
+ public TenantChannelInterceptor tenantChannelInterceptor() {
+ return new TenantChannelInterceptor();
+ }
+
+ @Bean
+ public FunctionAroundWrapper functionAroundWrapper() {
+ return new TenantFunctionAroundWrapper();
}
// ========== Job ==========
@Bean
- @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public BeanPostProcessor jobHandlerBeanPostProcessor(TenantFrameworkService tenantFrameworkService) {
return new BeanPostProcessor() {
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java
new file mode 100644
index 000000000..4eb471f52
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java
@@ -0,0 +1,32 @@
+package cn.iocoder.yudao.framework.tenant.core.mq;
+
+import cn.hutool.core.util.ReflectUtil;
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.ChannelInterceptor;
+
+import java.util.Map;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * 多租户的 {@link ChannelInterceptor} 实现类
+ * 发送消息时,设置租户编号到 Header 上
+ *
+ * @author 芋道源码
+ */
+public class TenantChannelInterceptor implements ChannelInterceptor {
+
+ @Override
+ @SuppressWarnings({"unchecked", "NullableProblems"})
+ public Message> preSend(Message> message, MessageChannel channel) {
+ Long tenantId = TenantContextHolder.getTenantId();
+ if (tenantId != null) {
+ Map headers = (Map) ReflectUtil.getFieldValue(message.getHeaders(), "headers");
+ headers.put(HEADER_TENANT_ID, tenantId);
+ }
+ return message;
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java
new file mode 100644
index 000000000..e1db0be9b
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.framework.tenant.core.mq;
+
+import cn.hutool.core.map.MapUtil;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
+import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
+import org.springframework.messaging.Message;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * 多租户 FunctionAroundWrapper 实现类
+ * 消费消息时,设置租户编号到 Context 上
+ *
+ * @author 芋道源码
+ */
+public class TenantFunctionAroundWrapper extends FunctionAroundWrapper {
+
+ @Override
+ protected Object doApply(Object input, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
+ // 如果不是 MQ 消息,则直接跳过
+ if (!(input instanceof Message)) {
+ return targetFunction.apply(input);
+ }
+ // 如果没有多租户,则直接跳过
+ Message> message = (Message>) input;
+ Long tenantId = MapUtil.getLong(message.getHeaders(), HEADER_TENANT_ID);
+ if (tenantId == null) {
+ return targetFunction.apply(input);
+ }
+
+ // 如果有多租户,则使用多租户上下文
+ return TenantUtils.execute(tenantId, () -> targetFunction.apply(input));
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java
deleted file mode 100644
index 15c72f992..000000000
--- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package cn.iocoder.yudao.framework.tenant.core.mq;
-
-import cn.hutool.core.util.StrUtil;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
-
-/**
- * 多租户 {@link AbstractRedisMessage} 拦截器
- *
- * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
- * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中
- *
- * @author 芋道源码
- */
-public class TenantRedisMessageInterceptor implements RedisMessageInterceptor {
-
- private static final String HEADER_TENANT_ID = "tenant-id";
-
- @Override
- public void sendMessageBefore(AbstractRedisMessage message) {
- Long tenantId = TenantContextHolder.getTenantId();
- if (tenantId != null) {
- message.addHeader(HEADER_TENANT_ID, tenantId.toString());
- }
- }
-
- @Override
- public void consumeMessageBefore(AbstractRedisMessage message) {
- String tenantIdStr = message.getHeader(HEADER_TENANT_ID);
- if (StrUtil.isNotEmpty(tenantIdStr)) {
- TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr));
- }
- }
-
- @Override
- public void consumeMessageAfter(AbstractRedisMessage message) {
- // 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况
- TenantContextHolder.clear();
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java
index 3ea29b227..e47288ab6 100644
--- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java
@@ -2,6 +2,8 @@ package cn.iocoder.yudao.framework.tenant.core.util;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import java.util.function.Supplier;
+
/**
* 多租户 Util
*
@@ -32,4 +34,27 @@ public class TenantUtils {
}
}
+ /**
+ * 使用指定租户,执行对应的逻辑
+ *
+ * 注意,如果当前是忽略租户的情况下,会被强制设置成不忽略租户
+ * 当然,执行完成后,还是会恢复回去
+ *
+ * @param tenantId 租户编号
+ * @param supplier 逻辑
+ */
+ public static T execute(Long tenantId, Supplier supplier) {
+ Long oldTenantId = TenantContextHolder.getTenantId();
+ Boolean oldIgnore = TenantContextHolder.isIgnore();
+ try {
+ TenantContextHolder.setTenantId(tenantId);
+ TenantContextHolder.setIgnore(false);
+ // 执行逻辑
+ return supplier.get();
+ } finally {
+ TenantContextHolder.setTenantId(oldTenantId);
+ TenantContextHolder.setIgnore(oldIgnore);
+ }
+ }
+
}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
index 49afa2a5e..5a6b4b1ec 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -20,12 +20,6 @@
https://github.com/YunaiV/ruoyi-vue-pro
-
-
- cn.iocoder.cloud
- yudao-spring-boot-starter-redis
-
-
com.alibaba.cloud
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
index a3595f859..515d21f21 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
@@ -1,15 +1,10 @@
package cn.iocoder.yudao.framework.mq.config;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
-import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.converter.*;
import java.util.ArrayList;
@@ -21,19 +16,8 @@ import java.util.List;
* @author 芋道源码
*/
@Configuration
-@AutoConfigureAfter(YudaoRedisAutoConfiguration.class)
-@Slf4j
public class YudaoMQAutoConfiguration {
- @Bean
- public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
- List interceptors) {
- RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
- // 添加拦截器
- interceptors.forEach(redisMQTemplate::addInterceptor);
- return redisMQTemplate;
- }
-
/**
* 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
*/
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java
deleted file mode 100644
index 8a31feda7..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core;
-
-import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import org.springframework.data.redis.connection.stream.RecordId;
-import org.springframework.data.redis.connection.stream.StreamRecords;
-import org.springframework.data.redis.core.RedisTemplate;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Redis MQ 操作模板类
- *
- * @author 芋道源码
- */
-@AllArgsConstructor
-public class RedisMQTemplate {
-
- @Getter
- private final RedisTemplate redisTemplate;
- /**
- * 拦截器数组
- */
- @Getter
- private final List interceptors = new ArrayList<>();
-
- /**
- * 发送 Redis 消息,基于 Redis pub/sub 实现
- *
- * @param message 消息
- */
- public void send(T message) {
- try {
- sendMessageBefore(message);
- // 发送消息
- redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
- } finally {
- sendMessageAfter(message);
- }
- }
-
- /**
- * 发送 Redis 消息,基于 Redis Stream 实现
- *
- * @param message 消息
- * @return 消息记录的编号对象
- */
- public RecordId send(T message) {
- try {
- sendMessageBefore(message);
- // 发送消息
- return redisTemplate.opsForStream().add(StreamRecords.newRecord()
- .ofObject(JsonUtils.toJsonString(message)) // 设置内容
- .withStreamKey(message.getStreamKey())); // 设置 stream key
- } finally {
- sendMessageAfter(message);
- }
- }
-
- /**
- * 添加拦截器
- *
- * @param interceptor 拦截器
- */
- public void addInterceptor(RedisMessageInterceptor interceptor) {
- interceptors.add(interceptor);
- }
-
- private void sendMessageBefore(AbstractRedisMessage message) {
- // 正序
- interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
- }
-
- private void sendMessageAfter(AbstractRedisMessage message) {
- // 倒序
- for (int i = interceptors.size() - 1; i >= 0; i--) {
- interceptors.get(i).sendMessageAfter(message);
- }
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java
deleted file mode 100644
index 11d8e1337..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.interceptor;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-
-/**
- * {@link AbstractRedisMessage} 消息拦截器
- * 通过拦截器,作为插件机制,实现拓展。
- * 例如说,多租户场景下的 MQ 消息处理
- *
- * @author 芋道源码
- */
-public interface RedisMessageInterceptor {
-
- default void sendMessageBefore(AbstractRedisMessage message) {
- }
-
- default void sendMessageAfter(AbstractRedisMessage message) {
- }
-
- default void consumeMessageBefore(AbstractRedisMessage message) {
- }
-
- default void consumeMessageAfter(AbstractRedisMessage message) {
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java
deleted file mode 100644
index f02e89d6f..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.message;
-
-import lombok.Data;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Redis 消息抽象基类
- *
- * @author 芋道源码
- */
-@Data
-public abstract class AbstractRedisMessage {
-
- /**
- * 头
- */
- private Map headers = new HashMap<>();
-
- public String getHeader(String key) {
- return headers.get(key);
- }
-
- public void addHeader(String key, String value) {
- headers.put(key, value);
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java
new file mode 100644
index 000000000..9953ae6e0
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * TODO 芋艿,后续删除,临时占位
+ */
+package cn.iocoder.yudao.framework.mq.core;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java
deleted file mode 100644
index fbc2a2826..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Channel Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractChannelMessage extends AbstractRedisMessage {
-
- /**
- * 获得 Redis Channel
- *
- * @return Channel
- */
- @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
- public abstract String getChannel();
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
deleted file mode 100644
index 8585aafe6..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
-
-import cn.hutool.core.util.TypeUtil;
-import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import lombok.Setter;
-import lombok.SneakyThrows;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Redis Pub/Sub 监听器抽象类,用于实现广播消费
- *
- * @param 消息类型。一定要填写噢,不然会报错
- *
- * @author 芋道源码
- */
-public abstract class AbstractChannelMessageListener implements MessageListener {
-
- /**
- * 消息类型
- */
- private final Class messageType;
- /**
- * Redis Channel
- */
- private final String channel;
- /**
- * RedisMQTemplate
- */
- @Setter
- private RedisMQTemplate redisMQTemplate;
-
- @SneakyThrows
- protected AbstractChannelMessageListener() {
- this.messageType = getMessageClass();
- this.channel = messageType.newInstance().getChannel();
- }
-
- /**
- * 获得 Sub 订阅的 Redis Channel 通道
- *
- * @return channel
- */
- public final String getChannel() {
- return channel;
- }
-
- @Override
- public final void onMessage(Message message, byte[] bytes) {
- T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
- try {
- consumeMessageBefore(messageObj);
- // 消费消息
- this.onMessage(messageObj);
- } finally {
- consumeMessageAfter(messageObj);
- }
- }
-
- /**
- * 处理消息
- *
- * @param message 消息
- */
- public abstract void onMessage(T message);
-
- /**
- * 通过解析类上的泛型,获得消息类型
- *
- * @return 消息类型
- */
- @SuppressWarnings("unchecked")
- private Class getMessageClass() {
- Type type = TypeUtil.getTypeArgument(getClass(), 0);
- if (type == null) {
- throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
- }
- return (Class) type;
- }
-
- private void consumeMessageBefore(AbstractRedisMessage message) {
- assert redisMQTemplate != null;
- List interceptors = redisMQTemplate.getInterceptors();
- // 正序
- interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
- }
-
- private void consumeMessageAfter(AbstractRedisMessage message) {
- assert redisMQTemplate != null;
- List interceptors = redisMQTemplate.getInterceptors();
- // 倒序
- for (int i = interceptors.size() - 1; i >= 0; i--) {
- interceptors.get(i).consumeMessageAfter(message);
- }
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java
deleted file mode 100644
index 29ea833f3..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.stream;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Stream Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractStreamMessage extends AbstractRedisMessage {
-
- /**
- * 获得 Redis Stream Key
- *
- * @return Channel
- */
- @JsonIgnore // 避免序列化
- public abstract String getStreamKey();
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java
deleted file mode 100644
index 1c4d91606..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.stream;
-
-import cn.hutool.core.util.TypeUtil;
-import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.SneakyThrows;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.connection.stream.ObjectRecord;
-import org.springframework.data.redis.stream.StreamListener;
-
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Redis Stream 监听器抽象类,用于实现集群消费
- *
- * @param 消息类型。一定要填写噢,不然会报错
- *
- * @author 芋道源码
- */
-public abstract class AbstractStreamMessageListener
- implements StreamListener> {
-
- /**
- * 消息类型
- */
- private final Class messageType;
- /**
- * Redis Channel
- */
- @Getter
- private final String streamKey;
-
- /**
- * Redis 消费者分组,默认使用 spring.application.name 名字
- */
- @Value("${spring.application.name}")
- @Getter
- private String group;
- /**
- * RedisMQTemplate
- */
- @Setter
- private RedisMQTemplate redisMQTemplate;
-
- @SneakyThrows
- protected AbstractStreamMessageListener() {
- this.messageType = getMessageClass();
- this.streamKey = messageType.newInstance().getStreamKey();
- }
-
- @Override
- public void onMessage(ObjectRecord message) {
- // 消费消息
- T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
- try {
- consumeMessageBefore(messageObj);
- // 消费消息
- this.onMessage(messageObj);
- // ack 消息消费完成
- redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
- // TODO 芋艿:需要额外考虑以下几个点:
- // 1. 处理异常的情况
- // 2. 发送日志;以及事务的结合
- // 3. 消费日志;以及通用的幂等性
- // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
- } finally {
- consumeMessageAfter(messageObj);
- }
- }
-
- /**
- * 处理消息
- *
- * @param message 消息
- */
- public abstract void onMessage(T message);
-
- /**
- * 通过解析类上的泛型,获得消息类型
- *
- * @return 消息类型
- */
- @SuppressWarnings("unchecked")
- private Class getMessageClass() {
- Type type = TypeUtil.getTypeArgument(getClass(), 0);
- if (type == null) {
- throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
- }
- return (Class) type;
- }
-
- private void consumeMessageBefore(AbstractRedisMessage message) {
- assert redisMQTemplate != null;
- List interceptors = redisMQTemplate.getInterceptors();
- // 正序
- interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
- }
-
- private void consumeMessageAfter(AbstractRedisMessage message) {
- assert redisMQTemplate != null;
- List interceptors = redisMQTemplate.getInterceptors();
- // 倒序
- for (int i = interceptors.size() - 1; i >= 0; i--) {
- interceptors.get(i).consumeMessageAfter(message);
- }
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml b/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml
index 42389560c..5fba4bdf9 100644
--- a/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml
@@ -35,8 +35,13 @@
- com.alibaba.cloud
- spring-cloud-starter-dubbo
+ org.apache.dubbo
+ dubbo-common
+
+
+
+
+
diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml
index a089dd6b4..8805738b2 100644
--- a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml
+++ b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml
@@ -1,6 +1,7 @@
spring:
main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
+ allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Dubbo 或者 Feign 等会存在重复定义的服务
# Servlet 配置
servlet:
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java
index ad52b4ed2..a2bdf90c8 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java
@@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.system.mq.consumer.mail;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java
index 42a10ba83..544dd18d2 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java
@@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.system.mq.message.mail;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
import lombok.Data;
-import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotNull;
import java.util.Map;
@@ -39,5 +37,4 @@ public class MailSendMessage {
*/
private Integer userType;
-
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
index 1430cfd5e..8c17aa7d3 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
+++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
@@ -1,6 +1,7 @@
spring:
main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
+ allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Dubbo 或者 Feign 等会存在重复定义的服务
# Servlet 配置
servlet: