短信渠道的本地缓存,使用 Job 轮询,替代 MQ 广播

This commit is contained in:
YunaiV 2023-07-29 08:59:15 +08:00
parent e205129943
commit 91e0af0944
8 changed files with 86 additions and 130 deletions

View File

@ -6,6 +6,9 @@ import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannelPageReqVO; import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannelPageReqVO;
import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO; import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.time.LocalDateTime;
@Mapper @Mapper
public interface SmsChannelMapper extends BaseMapperX<SmsChannelDO> { public interface SmsChannelMapper extends BaseMapperX<SmsChannelDO> {
@ -18,4 +21,7 @@ public interface SmsChannelMapper extends BaseMapperX<SmsChannelDO> {
.orderByDesc(SmsChannelDO::getId)); .orderByDesc(SmsChannelDO::getId));
} }
@Select("SELECT COUNT(*) FROM system_sms_channel WHERE update_time > #{maxUpdateTime}")
Long selectCountByUpdateTimeGt(LocalDateTime maxTime);
} }

View File

@ -1,29 +0,0 @@
package cn.iocoder.yudao.module.system.mq.consumer.sms;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
import cn.iocoder.yudao.module.system.service.sms.SmsChannelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 针对 {@link SmsChannelRefreshMessage} 的消费者
*
* @author 芋道源码
*/
@Component
@Slf4j
public class SmsChannelRefreshConsumer {
@Resource
private SmsChannelService smsChannelService;
@EventListener
public void execute(SmsChannelRefreshMessage message) {
log.info("[execute][收到 SmsChannel 刷新消息]");
smsChannelService.initLocalCache();
}
}

View File

@ -1,23 +0,0 @@
package cn.iocoder.yudao.module.system.mq.message.sms;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 短信渠道的数据刷新 Message
*
* @author 芋道源码
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class SmsChannelRefreshMessage extends RemoteApplicationEvent {
public SmsChannelRefreshMessage() {
}
public SmsChannelRefreshMessage(Object source, String originService, String destinationService) {
super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
}
}

View File

@ -1,23 +0,0 @@
package cn.iocoder.yudao.module.system.mq.message.sms;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 短信模板的数据刷新 Message
*
* @author 芋道源码
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class SmsTemplateRefreshMessage extends RemoteApplicationEvent {
public SmsTemplateRefreshMessage() {
}
public SmsTemplateRefreshMessage(Object source, String originService, String destinationService) {
super(source, originService, DEFAULT_DESTINATION_FACTORY.getDestination(destinationService));
}
}

View File

@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.system.mq.producer.sms;
import cn.iocoder.yudao.framework.common.core.KeyValue; import cn.iocoder.yudao.framework.common.core.KeyValue;
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage; import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.stream.function.StreamBridge;
@ -24,13 +23,6 @@ public class SmsProducer extends AbstractBusProducer {
@Resource @Resource
private StreamBridge streamBridge; private StreamBridge streamBridge;
/**
* 发送 {@link SmsChannelRefreshMessage} 消息
*/
public void sendSmsChannelRefreshMessage() {
publishEvent(new SmsChannelRefreshMessage(this, getBusId(), selfDestinationService()));
}
/** /**
* 发送 {@link SmsSendMessage} 消息 * 发送 {@link SmsSendMessage} 消息
* *

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.system.service.sms; package cn.iocoder.yudao.module.system.service.sms;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.sms.core.client.SmsClientFactory; import cn.iocoder.yudao.framework.sms.core.client.SmsClientFactory;
import cn.iocoder.yudao.framework.sms.core.property.SmsChannelProperties; import cn.iocoder.yudao.framework.sms.core.property.SmsChannelProperties;
@ -9,15 +10,20 @@ import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannel
import cn.iocoder.yudao.module.system.convert.sms.SmsChannelConvert; import cn.iocoder.yudao.module.system.convert.sms.SmsChannelConvert;
import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO; import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO;
import cn.iocoder.yudao.module.system.dal.mysql.sms.SmsChannelMapper; import cn.iocoder.yudao.module.system.dal.mysql.sms.SmsChannelMapper;
import cn.iocoder.yudao.module.system.mq.producer.sms.SmsProducer; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.getMaxValue;
import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_HAS_CHILDREN; import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_HAS_CHILDREN;
import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_NOT_EXISTS; import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNEL_NOT_EXISTS;
@ -30,6 +36,12 @@ import static cn.iocoder.yudao.module.system.enums.ErrorCodeConstants.SMS_CHANNE
@Slf4j @Slf4j
public class SmsChannelServiceImpl implements SmsChannelService { public class SmsChannelServiceImpl implements SmsChannelService {
/**
* 短信渠道列表的缓存
*/
@Getter
private volatile List<SmsChannelDO> channelCache = Collections.emptyList();
@Resource @Resource
private SmsClientFactory smsClientFactory; private SmsClientFactory smsClientFactory;
@ -39,9 +51,6 @@ public class SmsChannelServiceImpl implements SmsChannelService {
@Resource @Resource
private SmsTemplateService smsTemplateService; private SmsTemplateService smsTemplateService;
@Resource
private SmsProducer smsProducer;
@Override @Override
@PostConstruct @PostConstruct
public void initLocalCache() { public void initLocalCache() {
@ -52,6 +61,27 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 第二步构建缓存创建或更新短信 Client // 第二步构建缓存创建或更新短信 Client
List<SmsChannelProperties> propertiesList = SmsChannelConvert.INSTANCE.convertList02(channels); List<SmsChannelProperties> propertiesList = SmsChannelConvert.INSTANCE.convertList02(channels);
propertiesList.forEach(properties -> smsClientFactory.createOrUpdateSmsClient(properties)); propertiesList.forEach(properties -> smsClientFactory.createOrUpdateSmsClient(properties));
this.channelCache = channels;
}
/**
* 通过定时任务轮询刷新缓存
*
* 目的多节点部署时通过轮询通知所有节点进行刷新
*/
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void refreshLocalCache() {
// 情况一如果缓存里没有数据则直接刷新缓存
if (CollUtil.isEmpty(channelCache)) {
initLocalCache();
return;
}
// 情况二如果缓存里数据则通过 updateTime 判断是否有数据变更有变更则刷新缓存
LocalDateTime maxTime = getMaxValue(channelCache, SmsChannelDO::getUpdateTime);
if (smsChannelMapper.selectCountByUpdateTimeGt(maxTime) > 0) {
initLocalCache();
}
} }
@Override @Override
@ -59,9 +89,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 插入 // 插入
SmsChannelDO smsChannel = SmsChannelConvert.INSTANCE.convert(createReqVO); SmsChannelDO smsChannel = SmsChannelConvert.INSTANCE.convert(createReqVO);
smsChannelMapper.insert(smsChannel); smsChannelMapper.insert(smsChannel);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage(); // 刷新缓存
// 返回 initLocalCache();
return smsChannel.getId(); return smsChannel.getId();
} }
@ -72,8 +102,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
// 更新 // 更新
SmsChannelDO updateObj = SmsChannelConvert.INSTANCE.convert(updateReqVO); SmsChannelDO updateObj = SmsChannelConvert.INSTANCE.convert(updateReqVO);
smsChannelMapper.updateById(updateObj); smsChannelMapper.updateById(updateObj);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage(); // 刷新缓存
initLocalCache();
} }
@Override @Override
@ -86,8 +117,9 @@ public class SmsChannelServiceImpl implements SmsChannelService {
} }
// 删除 // 删除
smsChannelMapper.deleteById(id); smsChannelMapper.deleteById(id);
// 发送刷新消息
smsProducer.sendSmsChannelRefreshMessage(); // 刷新缓存
initLocalCache();
} }
private void validateSmsChannelExists(Long id) { private void validateSmsChannelExists(Long id) {

View File

@ -39,6 +39,9 @@ import static cn.iocoder.yudao.framework.test.core.util.AssertUtils.assertServic
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomPojo; import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomPojo;
import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomString; import static cn.iocoder.yudao.framework.test.core.util.RandomUtils.randomString;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.isNull;
@ -289,7 +292,10 @@ public class OAuth2OpenControllerTest extends BaseMockitoUnitTest {
scope, redirectUri, true, state); scope, redirectUri, true, state);
// 断言 // 断言
assertEquals(0, result.getCode()); assertEquals(0, result.getCode());
assertEquals("https://www.iocoder.cn#access_token=test_access_token&token_type=bearer&state=test&expires_in=30&scope=read", result.getData()); assertThat(result.getData(), anyOf( // 29 30 都有一定概率主要是时间计算
is("https://www.iocoder.cn#access_token=test_access_token&token_type=bearer&state=test&expires_in=29&scope=read"),
is("https://www.iocoder.cn#access_token=test_access_token&token_type=bearer&state=test&expires_in=30&scope=read")
));
} }
@Test // autoApprove = false通过 + code @Test // autoApprove = false通过 + code

View File

@ -9,7 +9,6 @@ import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannel
import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannelUpdateReqVO; import cn.iocoder.yudao.module.system.controller.admin.sms.vo.channel.SmsChannelUpdateReqVO;
import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO; import cn.iocoder.yudao.module.system.dal.dataobject.sms.SmsChannelDO;
import cn.iocoder.yudao.module.system.dal.mysql.sms.SmsChannelMapper; import cn.iocoder.yudao.module.system.dal.mysql.sms.SmsChannelMapper;
import cn.iocoder.yudao.module.system.mq.producer.sms.SmsProducer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
@ -41,8 +40,6 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
private SmsClientFactory smsClientFactory; private SmsClientFactory smsClientFactory;
@MockBean @MockBean
private SmsTemplateService smsTemplateService; private SmsTemplateService smsTemplateService;
@MockBean
private SmsProducer smsProducer;
@Test @Test
public void testInitLocalCache_success() { public void testInitLocalCache_success() {
@ -59,6 +56,10 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
argThat(properties -> isPojoEquals(smsChannelDO01, properties))); argThat(properties -> isPojoEquals(smsChannelDO01, properties)));
verify(smsClientFactory, times(1)).createOrUpdateSmsClient( verify(smsClientFactory, times(1)).createOrUpdateSmsClient(
argThat(properties -> isPojoEquals(smsChannelDO02, properties))); argThat(properties -> isPojoEquals(smsChannelDO02, properties)));
// 断言 channelCache 缓存
assertEquals(2, smsChannelService.getChannelCache().size());
assertPojoEquals(smsChannelDO01, smsChannelService.getChannelCache().get(0));
assertPojoEquals(smsChannelDO02, smsChannelService.getChannelCache().get(1));
} }
@Test @Test
@ -73,8 +74,6 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
// 校验记录的属性是否正确 // 校验记录的属性是否正确
SmsChannelDO smsChannel = smsChannelMapper.selectById(smsChannelId); SmsChannelDO smsChannel = smsChannelMapper.selectById(smsChannelId);
assertPojoEquals(reqVO, smsChannel); assertPojoEquals(reqVO, smsChannel);
// 校验调用
verify(smsProducer, times(1)).sendSmsChannelRefreshMessage();
} }
@Test @Test
@ -94,8 +93,6 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
// 校验是否更新正确 // 校验是否更新正确
SmsChannelDO smsChannel = smsChannelMapper.selectById(reqVO.getId()); // 获取最新的 SmsChannelDO smsChannel = smsChannelMapper.selectById(reqVO.getId()); // 获取最新的
assertPojoEquals(reqVO, smsChannel); assertPojoEquals(reqVO, smsChannel);
// 校验调用
verify(smsProducer, times(1)).sendSmsChannelRefreshMessage();
} }
@Test @Test
@ -117,10 +114,8 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
// 调用 // 调用
smsChannelService.deleteSmsChannel(id); smsChannelService.deleteSmsChannel(id);
// 校验数据不存在了 // 校验数据不存在了
assertNull(smsChannelMapper.selectById(id)); assertNull(smsChannelMapper.selectById(id));
// 校验调用
verify(smsProducer, times(1)).sendSmsChannelRefreshMessage();
} }
@Test @Test
@ -177,31 +172,31 @@ public class SmsChannelServiceTest extends BaseDbUnitTest {
@Test @Test
public void testGetSmsChannelPage() { public void testGetSmsChannelPage() {
// mock 数据 // mock 数据
SmsChannelDO dbSmsChannel = randomPojo(SmsChannelDO.class, o -> { // 等会查询到 SmsChannelDO dbSmsChannel = randomPojo(SmsChannelDO.class, o -> { // 等会查询到
o.setSignature("芋道源码"); o.setSignature("芋道源码");
o.setStatus(CommonStatusEnum.ENABLE.getStatus()); o.setStatus(CommonStatusEnum.ENABLE.getStatus());
o.setCreateTime(buildTime(2020, 12, 12)); o.setCreateTime(buildTime(2020, 12, 12));
}); });
smsChannelMapper.insert(dbSmsChannel); smsChannelMapper.insert(dbSmsChannel);
// 测试 signature 不匹配 // 测试 signature 不匹配
smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setSignature("源码"))); smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setSignature("源码")));
// 测试 status 不匹配 // 测试 status 不匹配
smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setStatus(CommonStatusEnum.DISABLE.getStatus()))); smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setStatus(CommonStatusEnum.DISABLE.getStatus())));
// 测试 createTime 不匹配 // 测试 createTime 不匹配
smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setCreateTime(buildTime(2020, 11, 11)))); smsChannelMapper.insert(cloneIgnoreId(dbSmsChannel, o -> o.setCreateTime(buildTime(2020, 11, 11))));
// 准备参数 // 准备参数
SmsChannelPageReqVO reqVO = new SmsChannelPageReqVO(); SmsChannelPageReqVO reqVO = new SmsChannelPageReqVO();
reqVO.setSignature("芋道"); reqVO.setSignature("芋道");
reqVO.setStatus(CommonStatusEnum.ENABLE.getStatus()); reqVO.setStatus(CommonStatusEnum.ENABLE.getStatus());
reqVO.setCreateTime(buildBetweenTime(2020, 12, 1, 2020, 12, 24)); reqVO.setCreateTime(buildBetweenTime(2020, 12, 1, 2020, 12, 24));
// 调用 // 调用
PageResult<SmsChannelDO> pageResult = smsChannelService.getSmsChannelPage(reqVO); PageResult<SmsChannelDO> pageResult = smsChannelService.getSmsChannelPage(reqVO);
// 断言 // 断言
assertEquals(1, pageResult.getTotal()); assertEquals(1, pageResult.getTotal());
assertEquals(1, pageResult.getList().size()); assertEquals(1, pageResult.getList().size());
assertPojoEquals(dbSmsChannel, pageResult.getList().get(0)); assertPojoEquals(dbSmsChannel, pageResult.getList().get(0));
} }
} }