From 4381d938bec1b21c5bd67945b928eceab713b9dc Mon Sep 17 00:00:00 2001 From: YunaiV Date: Wed, 22 Jun 2022 23:59:19 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=8E=A5=E5=85=A5=20spring=20cloud=20stre?= =?UTF-8?q?am=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A=E7=A7=9F=E6=88=B7=202.?= =?UTF-8?q?=20=E5=BC=B1=E5=8C=96=20spring=20cloud=20dubbo=20=E9=9B=86?= =?UTF-8?q?=E6=88=90=EF=BC=8C=E5=8F=AF=E9=80=9A=E8=BF=87=E5=8A=A0=E5=85=A5?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E8=87=AA=E5=8A=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 7 ++ .../config/YudaoTenantAutoConfiguration.java | 16 ++- .../core/mq/TenantChannelInterceptor.java | 32 +++++ .../core/mq/TenantFunctionAroundWrapper.java | 36 ++++++ .../mq/TenantRedisMessageInterceptor.java | 42 ------- .../tenant/core/util/TenantUtils.java | 25 ++++ .../yudao-spring-boot-starter-mq/pom.xml | 6 - .../mq/config/YudaoMQAutoConfiguration.java | 16 --- .../framework/mq/core/RedisMQTemplate.java | 87 -------------- .../interceptor/RedisMessageInterceptor.java | 26 ---- .../mq/core/message/AbstractRedisMessage.java | 29 ----- .../yudao/framework/mq/core/package-info.java | 4 + .../core/pubsub/AbstractChannelMessage.java | 21 ---- .../AbstractChannelMessageListener.java | 103 ---------------- .../mq/core/stream/AbstractStreamMessage.java | 21 ---- .../stream/AbstractStreamMessageListener.java | 113 ------------------ .../yudao-spring-boot-starter-rpc/pom.xml | 9 +- .../src/main/resources/application.yaml | 1 + .../mq/consumer/mail/MailSendConsumer.java | 1 - .../mq/message/mail/MailSendMessage.java | 3 - .../src/main/resources/application.yaml | 1 + 21 files changed, 125 insertions(+), 474 deletions(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java create mode 100644 yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java delete mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index c6cd6e496..09aac1ba8 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -29,6 +29,8 @@ 3.5.2 3.5.0 3.17.3 + + 2.7.15 1.9.2 @@ -233,6 +235,11 @@ + + org.apache.dubbo + dubbo-common + ${dubbo.version} + cn.iocoder.cloud yudao-spring-boot-starter-rpc diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java index c1c969439..5fbacd93c 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java @@ -8,7 +8,8 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect; import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor; import cn.iocoder.yudao.framework.tenant.core.job.TenantJob; import cn.iocoder.yudao.framework.tenant.core.job.TenantJobHandlerDecorator; -import cn.iocoder.yudao.framework.tenant.core.mq.TenantRedisMessageInterceptor; +import cn.iocoder.yudao.framework.tenant.core.mq.TenantChannelInterceptor; +import cn.iocoder.yudao.framework.tenant.core.mq.TenantFunctionAroundWrapper; import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter; import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService; import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkServiceImpl; @@ -23,8 +24,10 @@ import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; +import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.GlobalChannelInterceptor; @Configuration @ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户 @@ -82,14 +85,19 @@ public class YudaoTenantAutoConfiguration { // ========== MQ ========== @Bean - public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() { - return new TenantRedisMessageInterceptor(); + @GlobalChannelInterceptor // 必须添加在方法上,否则无法生效 + public TenantChannelInterceptor tenantChannelInterceptor() { + return new TenantChannelInterceptor(); + } + + @Bean + public FunctionAroundWrapper functionAroundWrapper() { + return new TenantFunctionAroundWrapper(); } // ========== Job ========== @Bean - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") public BeanPostProcessor jobHandlerBeanPostProcessor(TenantFrameworkService tenantFrameworkService) { return new BeanPostProcessor() { diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java new file mode 100644 index 000000000..4eb471f52 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantChannelInterceptor.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.framework.tenant.core.mq; + +import cn.hutool.core.util.ReflectUtil; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.ChannelInterceptor; + +import java.util.Map; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * 多租户的 {@link ChannelInterceptor} 实现类 + * 发送消息时,设置租户编号到 Header 上 + * + * @author 芋道源码 + */ +public class TenantChannelInterceptor implements ChannelInterceptor { + + @Override + @SuppressWarnings({"unchecked", "NullableProblems"}) + public Message preSend(Message message, MessageChannel channel) { + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId != null) { + Map headers = (Map) ReflectUtil.getFieldValue(message.getHeaders(), "headers"); + headers.put(HEADER_TENANT_ID, tenantId); + } + return message; + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java new file mode 100644 index 000000000..e1db0be9b --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantFunctionAroundWrapper.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.framework.tenant.core.mq; + +import cn.hutool.core.map.MapUtil; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; +import org.springframework.messaging.Message; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * 多租户 FunctionAroundWrapper 实现类 + * 消费消息时,设置租户编号到 Context 上 + * + * @author 芋道源码 + */ +public class TenantFunctionAroundWrapper extends FunctionAroundWrapper { + + @Override + protected Object doApply(Object input, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + // 如果不是 MQ 消息,则直接跳过 + if (!(input instanceof Message)) { + return targetFunction.apply(input); + } + // 如果没有多租户,则直接跳过 + Message message = (Message) input; + Long tenantId = MapUtil.getLong(message.getHeaders(), HEADER_TENANT_ID); + if (tenantId == null) { + return targetFunction.apply(input); + } + + // 如果有多租户,则使用多租户上下文 + return TenantUtils.execute(tenantId, () -> targetFunction.apply(input)); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java deleted file mode 100644 index 15c72f992..000000000 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java +++ /dev/null @@ -1,42 +0,0 @@ -package cn.iocoder.yudao.framework.tenant.core.mq; - -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; - -/** - * 多租户 {@link AbstractRedisMessage} 拦截器 - * - * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 - * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中 - * - * @author 芋道源码 - */ -public class TenantRedisMessageInterceptor implements RedisMessageInterceptor { - - private static final String HEADER_TENANT_ID = "tenant-id"; - - @Override - public void sendMessageBefore(AbstractRedisMessage message) { - Long tenantId = TenantContextHolder.getTenantId(); - if (tenantId != null) { - message.addHeader(HEADER_TENANT_ID, tenantId.toString()); - } - } - - @Override - public void consumeMessageBefore(AbstractRedisMessage message) { - String tenantIdStr = message.getHeader(HEADER_TENANT_ID); - if (StrUtil.isNotEmpty(tenantIdStr)) { - TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr)); - } - } - - @Override - public void consumeMessageAfter(AbstractRedisMessage message) { - // 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况 - TenantContextHolder.clear(); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java index 3ea29b227..e47288ab6 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/util/TenantUtils.java @@ -2,6 +2,8 @@ package cn.iocoder.yudao.framework.tenant.core.util; import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import java.util.function.Supplier; + /** * 多租户 Util * @@ -32,4 +34,27 @@ public class TenantUtils { } } + /** + * 使用指定租户,执行对应的逻辑 + * + * 注意,如果当前是忽略租户的情况下,会被强制设置成不忽略租户 + * 当然,执行完成后,还是会恢复回去 + * + * @param tenantId 租户编号 + * @param supplier 逻辑 + */ + public static T execute(Long tenantId, Supplier supplier) { + Long oldTenantId = TenantContextHolder.getTenantId(); + Boolean oldIgnore = TenantContextHolder.isIgnore(); + try { + TenantContextHolder.setTenantId(tenantId); + TenantContextHolder.setIgnore(false); + // 执行逻辑 + return supplier.get(); + } finally { + TenantContextHolder.setTenantId(oldTenantId); + TenantContextHolder.setIgnore(oldIgnore); + } + } + } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index 49afa2a5e..5a6b4b1ec 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -20,12 +20,6 @@ https://github.com/YunaiV/ruoyi-vue-pro - - - cn.iocoder.cloud - yudao-spring-boot-starter-redis - - com.alibaba.cloud diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index a3595f859..515d21f21 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -1,15 +1,10 @@ package cn.iocoder.yudao.framework.mq.config; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter; -import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.converter.*; import java.util.ArrayList; @@ -21,19 +16,8 @@ import java.util.List; * @author 芋道源码 */ @Configuration -@AutoConfigureAfter(YudaoRedisAutoConfiguration.class) -@Slf4j public class YudaoMQAutoConfiguration { - @Bean - public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, - List interceptors) { - RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); - // 添加拦截器 - interceptors.forEach(redisMQTemplate::addInterceptor); - return redisMQTemplate; - } - /** * 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题 */ diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java deleted file mode 100644 index 8a31feda7..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java +++ /dev/null @@ -1,87 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core; - -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; -import lombok.AllArgsConstructor; -import lombok.Getter; -import org.springframework.data.redis.connection.stream.RecordId; -import org.springframework.data.redis.connection.stream.StreamRecords; -import org.springframework.data.redis.core.RedisTemplate; - -import java.util.ArrayList; -import java.util.List; - -/** - * Redis MQ 操作模板类 - * - * @author 芋道源码 - */ -@AllArgsConstructor -public class RedisMQTemplate { - - @Getter - private final RedisTemplate redisTemplate; - /** - * 拦截器数组 - */ - @Getter - private final List interceptors = new ArrayList<>(); - - /** - * 发送 Redis 消息,基于 Redis pub/sub 实现 - * - * @param message 消息 - */ - public void send(T message) { - try { - sendMessageBefore(message); - // 发送消息 - redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); - } finally { - sendMessageAfter(message); - } - } - - /** - * 发送 Redis 消息,基于 Redis Stream 实现 - * - * @param message 消息 - * @return 消息记录的编号对象 - */ - public RecordId send(T message) { - try { - sendMessageBefore(message); - // 发送消息 - return redisTemplate.opsForStream().add(StreamRecords.newRecord() - .ofObject(JsonUtils.toJsonString(message)) // 设置内容 - .withStreamKey(message.getStreamKey())); // 设置 stream key - } finally { - sendMessageAfter(message); - } - } - - /** - * 添加拦截器 - * - * @param interceptor 拦截器 - */ - public void addInterceptor(RedisMessageInterceptor interceptor) { - interceptors.add(interceptor); - } - - private void sendMessageBefore(AbstractRedisMessage message) { - // 正序 - interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); - } - - private void sendMessageAfter(AbstractRedisMessage message) { - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).sendMessageAfter(message); - } - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java deleted file mode 100644 index 11d8e1337..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java +++ /dev/null @@ -1,26 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.interceptor; - -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; - -/** - * {@link AbstractRedisMessage} 消息拦截器 - * 通过拦截器,作为插件机制,实现拓展。 - * 例如说,多租户场景下的 MQ 消息处理 - * - * @author 芋道源码 - */ -public interface RedisMessageInterceptor { - - default void sendMessageBefore(AbstractRedisMessage message) { - } - - default void sendMessageAfter(AbstractRedisMessage message) { - } - - default void consumeMessageBefore(AbstractRedisMessage message) { - } - - default void consumeMessageAfter(AbstractRedisMessage message) { - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java deleted file mode 100644 index f02e89d6f..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.message; - -import lombok.Data; - -import java.util.HashMap; -import java.util.Map; - -/** - * Redis 消息抽象基类 - * - * @author 芋道源码 - */ -@Data -public abstract class AbstractRedisMessage { - - /** - * 头 - */ - private Map headers = new HashMap<>(); - - public String getHeader(String key) { - return headers.get(key); - } - - public void addHeader(String key, String value) { - headers.put(key, value); - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java new file mode 100644 index 000000000..9953ae6e0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java @@ -0,0 +1,4 @@ +/** + * TODO 芋艿,后续删除,临时占位 + */ +package cn.iocoder.yudao.framework.mq.core; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java deleted file mode 100644 index fbc2a2826..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; - -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Redis Channel Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractChannelMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Channel - * - * @return Channel - */ - @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 - public abstract String getChannel(); - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java deleted file mode 100644 index 8585aafe6..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java +++ /dev/null @@ -1,103 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; - -import cn.hutool.core.util.TypeUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import lombok.Setter; -import lombok.SneakyThrows; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; - -import java.lang.reflect.Type; -import java.util.List; - -/** - * Redis Pub/Sub 监听器抽象类,用于实现广播消费 - * - * @param 消息类型。一定要填写噢,不然会报错 - * - * @author 芋道源码 - */ -public abstract class AbstractChannelMessageListener implements MessageListener { - - /** - * 消息类型 - */ - private final Class messageType; - /** - * Redis Channel - */ - private final String channel; - /** - * RedisMQTemplate - */ - @Setter - private RedisMQTemplate redisMQTemplate; - - @SneakyThrows - protected AbstractChannelMessageListener() { - this.messageType = getMessageClass(); - this.channel = messageType.newInstance().getChannel(); - } - - /** - * 获得 Sub 订阅的 Redis Channel 通道 - * - * @return channel - */ - public final String getChannel() { - return channel; - } - - @Override - public final void onMessage(Message message, byte[] bytes) { - T messageObj = JsonUtils.parseObject(message.getBody(), messageType); - try { - consumeMessageBefore(messageObj); - // 消费消息 - this.onMessage(messageObj); - } finally { - consumeMessageAfter(messageObj); - } - } - - /** - * 处理消息 - * - * @param message 消息 - */ - public abstract void onMessage(T message); - - /** - * 通过解析类上的泛型,获得消息类型 - * - * @return 消息类型 - */ - @SuppressWarnings("unchecked") - private Class getMessageClass() { - Type type = TypeUtil.getTypeArgument(getClass(), 0); - if (type == null) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) type; - } - - private void consumeMessageBefore(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 正序 - interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); - } - - private void consumeMessageAfter(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).consumeMessageAfter(message); - } - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java deleted file mode 100644 index 29ea833f3..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.stream; - -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Redis Stream Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractStreamMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Stream Key - * - * @return Channel - */ - @JsonIgnore // 避免序列化 - public abstract String getStreamKey(); - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java deleted file mode 100644 index 1c4d91606..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java +++ /dev/null @@ -1,113 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.stream; - -import cn.hutool.core.util.TypeUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import lombok.Getter; -import lombok.Setter; -import lombok.SneakyThrows; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.stream.StreamListener; - -import java.lang.reflect.Type; -import java.util.List; - -/** - * Redis Stream 监听器抽象类,用于实现集群消费 - * - * @param 消息类型。一定要填写噢,不然会报错 - * - * @author 芋道源码 - */ -public abstract class AbstractStreamMessageListener - implements StreamListener> { - - /** - * 消息类型 - */ - private final Class messageType; - /** - * Redis Channel - */ - @Getter - private final String streamKey; - - /** - * Redis 消费者分组,默认使用 spring.application.name 名字 - */ - @Value("${spring.application.name}") - @Getter - private String group; - /** - * RedisMQTemplate - */ - @Setter - private RedisMQTemplate redisMQTemplate; - - @SneakyThrows - protected AbstractStreamMessageListener() { - this.messageType = getMessageClass(); - this.streamKey = messageType.newInstance().getStreamKey(); - } - - @Override - public void onMessage(ObjectRecord message) { - // 消费消息 - T messageObj = JsonUtils.parseObject(message.getValue(), messageType); - try { - consumeMessageBefore(messageObj); - // 消费消息 - this.onMessage(messageObj); - // ack 消息消费完成 - redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message); - // TODO 芋艿:需要额外考虑以下几个点: - // 1. 处理异常的情况 - // 2. 发送日志;以及事务的结合 - // 3. 消费日志;以及通用的幂等性 - // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 - } finally { - consumeMessageAfter(messageObj); - } - } - - /** - * 处理消息 - * - * @param message 消息 - */ - public abstract void onMessage(T message); - - /** - * 通过解析类上的泛型,获得消息类型 - * - * @return 消息类型 - */ - @SuppressWarnings("unchecked") - private Class getMessageClass() { - Type type = TypeUtil.getTypeArgument(getClass(), 0); - if (type == null) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) type; - } - - private void consumeMessageBefore(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 正序 - interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); - } - - private void consumeMessageAfter(AbstractRedisMessage message) { - assert redisMQTemplate != null; - List interceptors = redisMQTemplate.getInterceptors(); - // 倒序 - for (int i = interceptors.size() - 1; i >= 0; i--) { - interceptors.get(i).consumeMessageAfter(message); - } - } - -} diff --git a/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml b/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml index 42389560c..5fba4bdf9 100644 --- a/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-rpc/pom.xml @@ -35,8 +35,13 @@ - com.alibaba.cloud - spring-cloud-starter-dubbo + org.apache.dubbo + dubbo-common + + + + + diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml index a089dd6b4..8805738b2 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application.yaml @@ -1,6 +1,7 @@ spring: main: allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。 + allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Dubbo 或者 Feign 等会存在重复定义的服务 # Servlet 配置 servlet: diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java index ad52b4ed2..a2bdf90c8 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java @@ -1,6 +1,5 @@ package cn.iocoder.yudao.module.system.mq.consumer.mail; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java index 42a10ba83..544dd18d2 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java @@ -1,8 +1,6 @@ package cn.iocoder.yudao.module.system.mq.message.mail; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.Data; -import lombok.EqualsAndHashCode; import javax.validation.constraints.NotNull; import java.util.Map; @@ -39,5 +37,4 @@ public class MailSendMessage { */ private Integer userType; - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml index 1430cfd5e..8c17aa7d3 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml +++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml @@ -1,6 +1,7 @@ spring: main: allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。 + allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Dubbo 或者 Feign 等会存在重复定义的服务 # Servlet 配置 servlet: