From 6471c4641d852cb8c2c90c3c1cae3538f9ef510f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 18 Jun 2022 17:46:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E6=88=90=20spring-cloud-starter-strea?= =?UTF-8?q?m-rocketmq=20=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yudao-spring-boot-starter-mq/pom.xml | 7 +++ .../permission/MenuRefreshConsumer.java | 7 ++- .../permission/RoleRefreshConsumer.java | 6 +-- .../permission/RoleRefreshMessage.java | 11 +---- .../mq/producer/permission/MenuProducer.java | 5 ++- .../mq/producer/permission/RoleProducer.java | 8 +++- .../src/main/resources/application.yaml | 43 +++++++++++++++++++ 7 files changed, 66 insertions(+), 21 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index 54b22dc5f..d567c9634 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -21,6 +21,13 @@ cn.iocoder.cloud yudao-spring-boot-starter-redis + + + + com.alibaba.cloud + + spring-cloud-starter-stream-rocketmq + diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java index a4b633512..406e3d0ba 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java @@ -1,12 +1,12 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.MenuService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.function.Consumer; /** * 针对 {@link MenuRefreshMessage} 的消费者 @@ -15,15 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class MenuRefreshConsumer extends AbstractChannelMessageListener { +public class MenuRefreshConsumer implements Consumer { @Resource private MenuService menuService; @Override - public void onMessage(MenuRefreshMessage message) { + public void accept(MenuRefreshMessage menuRefreshMessage) { log.info("[onMessage][收到 Menu 刷新消息]"); menuService.initLocalCache(); } - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java index bb53b7499..5acf367d3 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java @@ -1,12 +1,12 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.RoleService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.function.Consumer; /** * 针对 {@link RoleRefreshMessage} 的消费者 @@ -15,13 +15,13 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class RoleRefreshConsumer extends AbstractChannelMessageListener { +public class RoleRefreshConsumer implements Consumer { @Resource private RoleService roleService; @Override - public void onMessage(RoleRefreshMessage message) { + public void accept(RoleRefreshMessage message) { log.info("[onMessage][收到 Role 刷新消息]"); roleService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java index e80d8f30c..a1cf81fc3 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java @@ -1,8 +1,6 @@ package cn.iocoder.yudao.module.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; -import lombok.EqualsAndHashCode; /** * 角色数据刷新 Message @@ -10,12 +8,5 @@ import lombok.EqualsAndHashCode; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class RoleRefreshMessage extends AbstractChannelMessage { - - @Override - public String getChannel() { - return "system.role.refresh"; - } - +public class RoleRefreshMessage { } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java index 5764c872a..7f6939116 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -13,14 +14,14 @@ import javax.annotation.Resource; public class MenuProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private StreamBridge streamBridge; /** * 发送 {@link MenuRefreshMessage} 消息 */ public void sendMenuRefreshMessage() { MenuRefreshMessage message = new MenuRefreshMessage(); - redisMQTemplate.send(message); + streamBridge.send("demo02-output", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java index c249d964e..def7e4157 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java @@ -2,6 +2,10 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +19,14 @@ import javax.annotation.Resource; public class RoleProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private StreamBridge streamBridge; /** * 发送 {@link RoleRefreshMessage} 消息 */ public void sendRoleRefreshMessage() { RoleRefreshMessage message = new RoleRefreshMessage(); - redisMQTemplate.send(message); + streamBridge.send("demo01-output", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml index 5598ef1a5..b0f39ab29 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml +++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml @@ -50,6 +50,49 @@ dubbo: registry: address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心 +--- #################### MQ 消息队列相关配置 #################### +spring: + cloud: + # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类 + stream: + function: + definition: roleRefreshConsumer;menuRefreshConsumer; + # Binding 配置项,对应 BindingProperties Map + bindings: + demo01-output: + destination: TEST + roleRefreshConsumer-in-0: + destination: TEST + group: roleRefreshConsumer + demo02-output: + destination: TEST2 + menuRefreshConsumer-in-0: + destination: TEST2 + group: menuRefreshConsumer + # Spring Cloud Stream RocketMQ 配置项 + rocketmq: + # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类 + binder: + name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址 + # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map + bindings: + demo01-output: + # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 + producer: + group: test # 生产者分组 + sync: true # 是否同步发送消息,默认为 false 异步。 + roleRefreshConsumer-in-0: + consumer: + message-model: BROADCASTING + demo02-output: + # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 + producer: + group: test # 生产者分组 + sync: true # 是否同步发送消息,默认为 false 异步。 + menuRefreshConsumer-in-0: + consumer: + message-model: BROADCASTING + --- #################### 芋道相关配置 #################### yudao: