集成 spring-cloud-starter-stream-rocketmq 组件

This commit is contained in:
YunaiV 2022-06-18 22:02:11 +08:00
parent 6471c4641d
commit 7b36eca609
13 changed files with 56 additions and 72 deletions

View File

@ -5,14 +5,16 @@ import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.function.Consumer;
// TODO 芋艿这个暂未实现 // TODO 芋艿这个暂未实现
@Component @Component
@Slf4j @Slf4j
public class MailSendConsumer extends AbstractStreamMessageListener<MailSendMessage> { public class MailSendConsumer implements Consumer<MailSendMessage> {
@Override @Override
public void onMessage(MailSendMessage message) { public void accept(MailSendMessage message) {
log.info("[onMessage][消息内容({})]", message); log.info("[accept][消息内容({})]", message);
} }
} }

View File

@ -22,7 +22,7 @@ public class MenuRefreshConsumer implements Consumer<MenuRefreshMessage> {
@Override @Override
public void accept(MenuRefreshMessage menuRefreshMessage) { public void accept(MenuRefreshMessage menuRefreshMessage) {
log.info("[onMessage][收到 Menu 刷新消息]"); log.info("[accept][收到 Menu 刷新消息]");
menuService.initLocalCache(); menuService.initLocalCache();
} }
} }

View File

@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.function.Consumer;
/** /**
* 针对 {@link RoleMenuRefreshMessage} 的消费者 * 针对 {@link RoleMenuRefreshMessage} 的消费者
@ -15,15 +16,14 @@ import javax.annotation.Resource;
*/ */
@Component @Component
@Slf4j @Slf4j
public class RoleMenuRefreshConsumer extends AbstractChannelMessageListener<RoleMenuRefreshMessage> { public class RoleMenuRefreshConsumer implements Consumer<RoleMenuRefreshMessage> {
@Resource @Resource
private PermissionService permissionService; private PermissionService permissionService;
@Override @Override
public void onMessage(RoleMenuRefreshMessage message) { public void accept(RoleMenuRefreshMessage roleMenuRefreshMessage) {
log.info("[onMessage][收到 Role 与 Menu 的关联刷新消息]"); log.info("[accept][收到 Role 与 Menu 的关联刷新消息]");
permissionService.initLocalCache(); permissionService.initLocalCache();
} }
} }

View File

@ -22,7 +22,7 @@ public class RoleRefreshConsumer implements Consumer<RoleRefreshMessage> {
@Override @Override
public void accept(RoleRefreshMessage message) { public void accept(RoleRefreshMessage message) {
log.info("[onMessage][收到 Role 刷新消息]"); log.info("[accept][收到 Role 刷新消息]");
roleService.initLocalCache(); roleService.initLocalCache();
} }

View File

@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.system.mq.consumer.permission; package cn.iocoder.yudao.module.system.mq.consumer.permission;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
import cn.iocoder.yudao.module.system.service.permission.PermissionService; import cn.iocoder.yudao.module.system.service.permission.PermissionService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.function.Consumer;
/** /**
* 针对 {@link UserRoleRefreshMessage} 的消费者 * 针对 {@link UserRoleRefreshMessage} 的消费者
@ -15,15 +15,14 @@ import javax.annotation.Resource;
*/ */
@Component @Component
@Slf4j @Slf4j
public class UserRoleRefreshConsumer extends AbstractChannelMessageListener<UserRoleRefreshMessage> { public class UserRoleRefreshConsumer implements Consumer<UserRoleRefreshMessage> {
@Resource @Resource
private PermissionService permissionService; private PermissionService permissionService;
@Override @Override
public void onMessage(UserRoleRefreshMessage message) { public void accept(UserRoleRefreshMessage userRoleRefreshMessage) {
log.info("[onMessage][收到 User 与 Role 的关联刷新消息]"); log.info("[accept][收到 User 与 Role 的关联刷新消息]");
permissionService.initLocalCache(); permissionService.initLocalCache();
} }
} }

View File

@ -13,8 +13,7 @@ import java.util.Map;
* @author 芋道源码 * @author 芋道源码
*/ */
@Data @Data
@EqualsAndHashCode(callSuper = true) public class MailSendMessage {
public class MailSendMessage extends AbstractStreamMessage {
/** /**
* 邮箱地址 * 邮箱地址
@ -40,9 +39,5 @@ public class MailSendMessage extends AbstractStreamMessage {
*/ */
private Integer userType; private Integer userType;
@Override
public String getStreamKey() {
return "system.mail.send";
}
} }

View File

@ -10,12 +10,5 @@ import lombok.EqualsAndHashCode;
* @author 芋道源码 * @author 芋道源码
*/ */
@Data @Data
@EqualsAndHashCode(callSuper = true) public class MenuRefreshMessage {
public class MenuRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "system.menu.refresh";
}
} }

View File

@ -10,12 +10,5 @@ import lombok.EqualsAndHashCode;
* @author 芋道源码 * @author 芋道源码
*/ */
@Data @Data
@EqualsAndHashCode(callSuper = true) public class RoleMenuRefreshMessage {
public class RoleMenuRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "system.role-menu.refresh";
}
} }

View File

@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.system.mq.message.permission; package cn.iocoder.yudao.module.system.mq.message.permission;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
/** /**
* 用户与角色的数据刷新 Message * 用户与角色的数据刷新 Message
@ -10,12 +8,5 @@ import lombok.EqualsAndHashCode;
* @author 芋道源码 * @author 芋道源码
*/ */
@Data @Data
@EqualsAndHashCode(callSuper = true) public class UserRoleRefreshMessage {
public class UserRoleRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "system.user-role.refresh";
}
} }

View File

@ -21,7 +21,7 @@ public class MenuProducer {
*/ */
public void sendMenuRefreshMessage() { public void sendMenuRefreshMessage() {
MenuRefreshMessage message = new MenuRefreshMessage(); MenuRefreshMessage message = new MenuRefreshMessage();
streamBridge.send("demo02-output", message); streamBridge.send("menuRefresh-out-0", message);
} }
} }

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.system.mq.producer.permission;
import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -14,14 +15,14 @@ import javax.annotation.Resource;
public class PermissionProducer { public class PermissionProducer {
@Resource @Resource
private RedisMQTemplate redisMQTemplate; private StreamBridge streamBridge;
/** /**
* 发送 {@link RoleMenuRefreshMessage} 消息 * 发送 {@link RoleMenuRefreshMessage} 消息
*/ */
public void sendRoleMenuRefreshMessage() { public void sendRoleMenuRefreshMessage() {
RoleMenuRefreshMessage message = new RoleMenuRefreshMessage(); RoleMenuRefreshMessage message = new RoleMenuRefreshMessage();
redisMQTemplate.send(message); streamBridge.send("roleMenuRefresh-out-0", message);
} }
/** /**
@ -29,7 +30,7 @@ public class PermissionProducer {
*/ */
public void sendUserRoleRefreshMessage() { public void sendUserRoleRefreshMessage() {
UserRoleRefreshMessage message = new UserRoleRefreshMessage(); UserRoleRefreshMessage message = new UserRoleRefreshMessage();
redisMQTemplate.send(message); streamBridge.send("userRoleRefresh-out-0", message);
} }
} }

View File

@ -26,7 +26,7 @@ public class RoleProducer {
*/ */
public void sendRoleRefreshMessage() { public void sendRoleRefreshMessage() {
RoleRefreshMessage message = new RoleRefreshMessage(); RoleRefreshMessage message = new RoleRefreshMessage();
streamBridge.send("demo01-output", message); streamBridge.send("roleRefresh-out-0", message);
} }
} }

View File

@ -56,42 +56,52 @@ spring:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类 # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream: stream:
function: function:
definition: roleRefreshConsumer;menuRefreshConsumer; definition: roleRefreshConsumer;menuRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer;
# Binding 配置项,对应 BindingProperties Map # Binding 配置项,对应 BindingProperties Map
bindings: bindings:
demo01-output: roleRefresh-out-0:
destination: TEST destination: system_role_refresh
roleRefreshConsumer-in-0: roleRefreshConsumer-in-0:
destination: TEST destination: system_role_refresh
group: roleRefreshConsumer group: system_role_refresh_consumer_group
demo02-output: menuRefresh-out-0:
destination: TEST2 destination: system_menu_refresh
menuRefreshConsumer-in-0: menuRefreshConsumer-in-0:
destination: TEST2 destination: system_menu_refresh
group: menuRefreshConsumer group: system_menu_refresh_consumer_group
roleMenuRefresh-out-0:
destination: system_role_menu_refresh
roleMenuRefreshConsumer-in-0:
destination: system_role_menu_refresh
group: system_role_menu_refresh_consumer_group
userRoleRefresh-out-0:
destination: system_user_role_refresh
userRoleRefreshConsumer-in-0:
destination: system_user_role_refresh
group: system_user_role_refresh_consumer_group
# Spring Cloud Stream RocketMQ 配置项 # Spring Cloud Stream RocketMQ 配置项
rocketmq: rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类 # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder: binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址 name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: system_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings: bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
roleRefreshConsumer-in-0: roleRefreshConsumer-in-0:
consumer: consumer:
message-model: BROADCASTING message-model: BROADCASTING # 广播消费
demo02-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
menuRefreshConsumer-in-0: menuRefreshConsumer-in-0:
consumer: consumer:
message-model: BROADCASTING message-model: BROADCASTING # 广播消费
roleMenuRefreshConsumer-in-0:
consumer:
message-model: BROADCASTING # 广播消费
userRoleRefreshConsumer-in-0:
consumer:
message-model: BROADCASTING # 广播消费
--- #################### 芋道相关配置 #################### --- #################### 芋道相关配置 ####################