抽数服务中大屏(mongodb)优化为批量查重和插入
This commit is contained in:
parent
77070a7c99
commit
ae89050d8d
@ -4,8 +4,18 @@ package cn.iocoder.yudao.module.infra.dal.mongodb.checkticket;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import org.springframework.data.mongodb.repository.MongoRepository;
|
||||
import org.springframework.data.mongodb.repository.Query;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public interface CheckTicketRepository extends MongoRepository<CheckTicket,String> {
|
||||
/**
|
||||
* 根据dataId数组查找
|
||||
* @param dataIdList
|
||||
* @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket>
|
||||
*/
|
||||
@Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }")
|
||||
List<CheckTicket> findByDataIdIn(List<String> dataIdList);
|
||||
}
|
||||
|
@ -1,12 +1,22 @@
|
||||
package cn.iocoder.yudao.module.infra.dal.mongodb.saledata;
|
||||
|
||||
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import org.springframework.data.mongodb.repository.MongoRepository;
|
||||
import org.springframework.data.mongodb.repository.Query;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
public interface SaleDataRepository extends MongoRepository<SaleData,String> {
|
||||
|
||||
/**
|
||||
* 更具dataId查找售票记录
|
||||
* @param dataIdList
|
||||
* @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket>
|
||||
*/
|
||||
@Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }")
|
||||
List<CheckTicket> findByDataIdIn(List<String> dataIdList);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingReqVO;
|
||||
import cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingRespVO;
|
||||
import cn.iocoder.yudao.module.infra.service.checkticket.CheckTicketService;
|
||||
import cn.iocoder.yudao.module.infra.service.saledata.SaleDataService;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
@ -35,9 +36,17 @@ import java.util.Map;
|
||||
public class TicketDataMigration {
|
||||
private static Logger logger = LoggerFactory.getLogger(TicketDataMigration.class);
|
||||
// 目标地址
|
||||
private static String OBJECT_URL = "http://shinanlundu.pro.jiutianda.com/joytime-erp-eportal/console/openapi/handler";
|
||||
private static final String OBJECT_URL = "http://shinanlundu.pro.jiutianda.com/joytime-erp-eportal/console/openapi/handler";
|
||||
// 每次读取条数
|
||||
private static final int PAGE_SIZE = 500;
|
||||
// 检票请求服务名
|
||||
private static final String CHECK_TICKET_SERVICE = "apiCheckService";
|
||||
// 售票请求服务名
|
||||
private static final String SALE_DATA_SERVICE = "apiSaleService";
|
||||
// 检票请求方法名
|
||||
private static final String CHECK_TICKET_METHOD = "CheckDetail";
|
||||
// 售票请求方法名
|
||||
private static final String SALE_DATA_METHOD = "SaleDetail";
|
||||
@Resource
|
||||
private SaleDataService saleDataService;
|
||||
@Resource
|
||||
@ -51,25 +60,7 @@ public class TicketDataMigration {
|
||||
int failedCount = 0;
|
||||
int pageNumber = 1;
|
||||
XxlJobHelper.log("抽数服务:mysql销售数据开始抽数...");
|
||||
// 准备请求参数
|
||||
TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO();
|
||||
ticketingSamplingReqVO.setService("apiSaleService");
|
||||
ticketingSamplingReqVO.setMethod("SaleDetail");
|
||||
// 抽取前一天的所有售票数据
|
||||
ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat());
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
ticketingSamplingReqVO.setPageSize(PAGE_SIZE);
|
||||
// 先发起第一次请求
|
||||
String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
// 解析读取响应数据
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
// 如果访问失败则让调度中心重新调用
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber);
|
||||
int totalRows = ticketingSamplingRespVO.getTotalRows();
|
||||
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 总条数小于等于每页条数 则直接按照总条数进行插入即可
|
||||
@ -84,15 +75,7 @@ public class TicketDataMigration {
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber);
|
||||
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 对最后一页进行处理,得到最后一页的实际条数
|
||||
@ -116,26 +99,7 @@ public class TicketDataMigration {
|
||||
int failedCount = 0;
|
||||
int pageNumber = 1;
|
||||
XxlJobHelper.log("抽数服务:mysql检票数据开始抽数...");
|
||||
// 准备请求参数
|
||||
TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO();
|
||||
ticketingSamplingReqVO.setService("apiCheckService");
|
||||
ticketingSamplingReqVO.setMethod("CheckDetail");
|
||||
// 抽取前一天的所有检票数据
|
||||
ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat());
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
ticketingSamplingReqVO.setPageSize(PAGE_SIZE);
|
||||
// 先发起第一次请求
|
||||
String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
|
||||
// 解析读取响应数据
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
// 如果访问失败则让调度中心重新调用
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber);
|
||||
int totalRows = ticketingSamplingRespVO.getTotalRows();
|
||||
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 总条数小于等于每页条数 则直接按照总条数进行插入即可
|
||||
@ -150,16 +114,7 @@ public class TicketDataMigration {
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
|
||||
ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber);
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 对最后一页进行处理,得到最后一页的实际条数
|
||||
if (pageNumber != totalPages) {
|
||||
@ -179,64 +134,29 @@ public class TicketDataMigration {
|
||||
*/
|
||||
@XxlJob("getSaleDataByMongoDB")
|
||||
public void getSaleDataByMongoDBHandler() throws Exception {
|
||||
int failedCount = 0;
|
||||
int successCount = 0;
|
||||
int duplicatesCount = 0;
|
||||
int pageNumber = 1;
|
||||
XxlJobHelper.log("抽数服务:mongodb销售数据开始抽数...");
|
||||
// 准备请求参数
|
||||
TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO();
|
||||
ticketingSamplingReqVO.setService("apiSaleService");
|
||||
ticketingSamplingReqVO.setMethod("SaleDetail");
|
||||
// 抽取前一天的所有售票数据
|
||||
ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat());
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
ticketingSamplingReqVO.setPageSize(PAGE_SIZE);
|
||||
// 先发起第一次请求
|
||||
String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
// 解析读取响应数据
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
// 如果访问失败则让调度中心重新调用
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
|
||||
int totalRows = ticketingSamplingRespVO.getTotalRows();
|
||||
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 总条数小于等于每页条数 则直接按照总条数进行插入即可
|
||||
if (totalRows <= PAGE_SIZE) {
|
||||
failedCount += insertSaleDataByMongoDB(mapList, totalRows, pageNumber);
|
||||
} else {
|
||||
// 总条数大于每页条数 执行完第一次插入后根据返回的页数继续进行请求和插入
|
||||
// 插入第一页的数据
|
||||
failedCount += insertSaleDataByMongoDB(mapList, PAGE_SIZE, pageNumber);
|
||||
pageNumber++;
|
||||
// 获得总页数
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 对最后一页进行处理,得到最后一页的实际条数
|
||||
if (pageNumber != totalPages) {
|
||||
failedCount += insertSaleDataByMongoDB(mapList, PAGE_SIZE, pageNumber);
|
||||
} else {
|
||||
failedCount += insertSaleDataByMongoDB(mapList, totalRows % PAGE_SIZE, pageNumber);
|
||||
}
|
||||
List<Map<String, Object>> mapList;
|
||||
// 获得总页数
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
List<Integer> resultNum = saleDataService.insertByMongoDB(BeanUtils.toBean(mapList, SaleData.class));
|
||||
if (resultNum == null) {
|
||||
XxlJobHelper.log("数据批量插入异常!", pageNumber);
|
||||
} else {
|
||||
successCount += resultNum.get(0);
|
||||
duplicatesCount += resultNum.get(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount);
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", totalRows, duplicatesCount, successCount);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -244,65 +164,30 @@ public class TicketDataMigration {
|
||||
*/
|
||||
@XxlJob("getCheckTicketByMongoDB")
|
||||
public void getCheckTicketByMongoDBHandler() throws Exception {
|
||||
int failedCount = 0;
|
||||
int pageNumber = 1;
|
||||
XxlJobHelper.log("抽数服务:mongodb检票数据开始抽数...");
|
||||
// 准备请求参数
|
||||
TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO();
|
||||
ticketingSamplingReqVO.setService("apiCheckService");
|
||||
ticketingSamplingReqVO.setMethod("CheckDetail");
|
||||
// 抽取前一天的所有检票数据
|
||||
ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat());
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
ticketingSamplingReqVO.setPageSize(PAGE_SIZE);
|
||||
// 先发起第一次请求
|
||||
String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
|
||||
// 解析读取响应数据
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
// 如果访问失败则让调度中心重新调用
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
int successCount = 0;
|
||||
int duplicatesCount = 0;
|
||||
int pageNumber = 1;
|
||||
String date = TickingDateUtils.getNowDayFormat();
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber);
|
||||
int totalRows = ticketingSamplingRespVO.getTotalRows();
|
||||
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 总条数小于等于每页条数 则直接按照总条数进行插入即可
|
||||
if (totalRows <= PAGE_SIZE) {
|
||||
failedCount += insertCheckTicketByMongoDB(mapList, totalRows, pageNumber);
|
||||
} else {
|
||||
// 总条数大于每页条数 执行完第一次插入后根据返回的页数继续进行请求和插入
|
||||
// 插入第一页的数据
|
||||
failedCount += insertCheckTicketByMongoDB(mapList, PAGE_SIZE, pageNumber);
|
||||
pageNumber++;
|
||||
// 获得总页数
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
return;
|
||||
}
|
||||
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
// 对最后一页进行处理,得到最后一页的实际条数
|
||||
if (pageNumber != totalPages) {
|
||||
failedCount += insertCheckTicketByMongoDB(mapList, PAGE_SIZE, pageNumber);
|
||||
} else {
|
||||
failedCount += insertCheckTicketByMongoDB(mapList, totalRows % PAGE_SIZE, pageNumber);
|
||||
}
|
||||
List<Map<String, Object>> mapList;
|
||||
// 获得总页数
|
||||
int totalPages = ticketingSamplingRespVO.getTotalPages();
|
||||
// 处理剩余页的数据
|
||||
for (; pageNumber <= totalPages; pageNumber++) {
|
||||
ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber);
|
||||
mapList = ticketingSamplingRespVO.getDataMapList();
|
||||
List<Integer> resultNum = checkTicketService.insertByMongoDB(BeanUtils.toBean(mapList, CheckTicket.class));
|
||||
if (resultNum == null) {
|
||||
XxlJobHelper.log("数据批量插入异常!", pageNumber);
|
||||
} else {
|
||||
successCount += resultNum.get(0);
|
||||
duplicatesCount += resultNum.get(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount);
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", totalRows, duplicatesCount, successCount);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -348,66 +233,55 @@ public class TicketDataMigration {
|
||||
}
|
||||
|
||||
/**
|
||||
* 插入销售数据的方法(mongodb)
|
||||
* 请求目标url的响应数据
|
||||
*
|
||||
* @param mapList 数据集合数组
|
||||
* @param listSize 实际个数
|
||||
* @param pageNumber 每页显示条数
|
||||
* @return int 失败条数
|
||||
* @param service
|
||||
* @param method
|
||||
* @param date
|
||||
* @param pageNumber
|
||||
* @return cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingRespVO
|
||||
*/
|
||||
private int insertSaleDataByMongoDB(List<Map<String, Object>> mapList, int listSize, int pageNumber) {
|
||||
int failedCount = 0;
|
||||
int repeatingCount = 0;
|
||||
for (int i = 0; i < listSize; i++) {
|
||||
Map<String, Object> currentMap = mapList.get(i);
|
||||
SaleData saleData = BeanUtils.toBean(currentMap, SaleData.class);
|
||||
switch (saleDataService.createSaleDataByMongoDB(saleData)) {
|
||||
case "添加成功":
|
||||
break;
|
||||
case "重复数据":
|
||||
repeatingCount++;
|
||||
XxlJobHelper.log("第{}条数据重复,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap);
|
||||
break;
|
||||
case "Mongo数据库写入异常":
|
||||
failedCount++;
|
||||
XxlJobHelper.log("第{}条数据插入失败,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap);
|
||||
break;
|
||||
}
|
||||
private TicketingSamplingRespVO getUrlResponseData(String service, String method, String date, int pageNumber) {
|
||||
// 准备请求参数
|
||||
TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO();
|
||||
ticketingSamplingReqVO.setService(service);
|
||||
ticketingSamplingReqVO.setMethod(method);
|
||||
ticketingSamplingReqVO.setQueryDate(date);
|
||||
ticketingSamplingReqVO.setPageNumber(pageNumber);
|
||||
ticketingSamplingReqVO.setPageSize(PAGE_SIZE);
|
||||
// 先发起第一次请求
|
||||
String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO));
|
||||
// 解析读取响应数据
|
||||
TicketingSamplingRespVO ticketingSamplingRespVO;
|
||||
try {
|
||||
ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
XxlJobHelper.log("请求数据解析异常");
|
||||
throw new RuntimeException();
|
||||
}
|
||||
XxlJobHelper.log("共有{}条数据重复,已跳过存储。", repeatingCount);
|
||||
return failedCount;
|
||||
// TODO 需要再优化
|
||||
if (ticketingSamplingRespVO != null && !ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
throw new RuntimeException();
|
||||
}
|
||||
return ticketingSamplingRespVO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 插入检票数据的方法(mongodb)
|
||||
* 请求失败消息
|
||||
*
|
||||
* @param mapList 数据集合数组
|
||||
* @param listSize 实际个数
|
||||
* @param pageNumber 每页显示条数
|
||||
* @return int 失败条数
|
||||
* @param
|
||||
* @return void
|
||||
*/
|
||||
private int insertCheckTicketByMongoDB(List<Map<String, Object>> mapList, int listSize, int pageNumber) {
|
||||
int failedCount = 0;
|
||||
int repeatingCount = 0;
|
||||
for (int i = 0; i < listSize; i++) {
|
||||
Map<String, Object> currentMap = mapList.get(i);
|
||||
// 把获取到的数据转为插入检票的参数类型
|
||||
CheckTicket checkTicket = BeanUtils.toBean(currentMap, CheckTicket.class);
|
||||
switch (checkTicketService.createCheckTicketByMongoDB(checkTicket)) {
|
||||
case "添加成功":
|
||||
break;
|
||||
case "重复数据":
|
||||
repeatingCount++;
|
||||
XxlJobHelper.log("第{}条数据重复,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap);
|
||||
break;
|
||||
case "Mongo数据库写入异常":
|
||||
failedCount++;
|
||||
XxlJobHelper.log("第{}条数据插入失败,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap);
|
||||
break;
|
||||
}
|
||||
private void requestFailedMessage(TicketingSamplingRespVO ticketingSamplingRespVO) {
|
||||
// 如果访问失败则让调度中心重新调用
|
||||
if (!ticketingSamplingRespVO.isPassflag()) {
|
||||
// 让调度中心重新调用
|
||||
XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
|
||||
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
|
||||
}
|
||||
XxlJobHelper.log("共有{}条数据重复,已跳过存储。", repeatingCount);
|
||||
return failedCount;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,5 +24,17 @@ public interface CheckTicketService {
|
||||
*/
|
||||
Long createCheckTicket(@Valid CheckTicketDO createReqVO);
|
||||
|
||||
String createCheckTicketByMongoDB(CheckTicket checkTicketSaveReqDTO);
|
||||
/**
|
||||
* 查询重复的dataId(mongodb)
|
||||
* @param dataIdList dataId数组
|
||||
* @return java.util.List<java.lang.Boolean>
|
||||
*/
|
||||
List<String> checkDuplicatesByMongoDB(List<String> dataIdList);
|
||||
|
||||
/**
|
||||
* 批量插入检票数据(mongodb)
|
||||
* @param checkTicketList
|
||||
* @return java.util.List<java.lang.Integer> 插入数据和重复数据
|
||||
*/
|
||||
List<Integer> insertByMongoDB(List<CheckTicket> checkTicketList);
|
||||
}
|
@ -12,6 +12,8 @@ import com.baomidou.dynamic.datasource.annotation.Slave;
|
||||
import com.mongodb.MongoException;
|
||||
import org.springframework.data.domain.Example;
|
||||
import org.springframework.data.domain.ExampleMatcher;
|
||||
import org.springframework.data.mongodb.core.query.Criteria;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
@ -19,6 +21,7 @@ import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.exact;
|
||||
|
||||
@ -45,23 +48,31 @@ public class CheckTicketServiceImpl implements CheckTicketService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createCheckTicketByMongoDB(CheckTicket checkTicket) {
|
||||
public List<String> checkDuplicatesByMongoDB(List<String> dataIdList) {
|
||||
// 查询数据库中存在的 dataId
|
||||
List<CheckTicket> existingTickets = checkTicketRepository.findByDataIdIn(dataIdList);
|
||||
// 把CheckTicket对象列表改为dataId字符串列表
|
||||
return existingTickets.stream().map(CheckTicket::getDataId).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> insertByMongoDB(List<CheckTicket> checkTicketList) {
|
||||
List<Integer> resultNum = new ArrayList<>(2);
|
||||
// 把检票数据的dataId提取出来成数组
|
||||
List<String> dataIdList = checkTicketList.stream().map(CheckTicket::getDataId).collect(Collectors.toList());
|
||||
// 获取重复的dataId
|
||||
List<String> duplicatesDataId = this.checkDuplicatesByMongoDB(dataIdList);
|
||||
// 对checkTicketList过滤掉重复的dataId对应的数据,得到需要插入的不重复数据
|
||||
List<CheckTicket> insertCheckTicketList = checkTicketList.stream().filter(ticket -> !duplicatesDataId.contains(ticket.getDataId())).collect(Collectors.toList());
|
||||
try {
|
||||
checkTicketRepository.findOne(Example.of(checkTicket, ExampleMatcher.matching().
|
||||
withIgnorePaths("_id").withMatcher("dataId", exact())));
|
||||
return "重复数据";
|
||||
} catch (NoSuchElementException e) {
|
||||
try {
|
||||
// 执行插入操作
|
||||
checkTicketRepository.insert(checkTicket);
|
||||
return "添加成功";
|
||||
} catch (MongoException e2) {
|
||||
return "Mongo数据库写入异常";
|
||||
}
|
||||
// 批量插入
|
||||
checkTicketRepository.insert(insertCheckTicketList);
|
||||
} catch (MongoException e){
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
resultNum.add(insertCheckTicketList.size());
|
||||
resultNum.add(duplicatesDataId.size());
|
||||
return resultNum;
|
||||
}
|
||||
|
||||
}
|
@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.infra.service.saledata;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataPageReqVO;
|
||||
import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataSaveReqVO;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleDataDO;
|
||||
|
||||
@ -27,9 +28,16 @@ public interface SaleDataService {
|
||||
Long createSaleData(@Valid SaleDataDO saleDataDO);
|
||||
|
||||
/**
|
||||
* 插入售票信息到mongodb中
|
||||
* @param saleData
|
||||
* @return java.lang.String
|
||||
* 查询重复的dataId(mongodb)
|
||||
* @param dataIdList dataId数组
|
||||
* @return java.util.List<java.lang.Boolean>
|
||||
*/
|
||||
String createSaleDataByMongoDB(SaleData saleData);
|
||||
List<String> checkDuplicatesByMongoDB(List<String> dataIdList);
|
||||
|
||||
/**
|
||||
* 批量插入检票数据(mongodb)
|
||||
* @param saleDataList
|
||||
* @return java.util.List<java.lang.Integer>
|
||||
*/
|
||||
List<Integer> insertByMongoDB(List<SaleData> saleDataList);
|
||||
}
|
@ -5,13 +5,17 @@ import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.ticket.IdCardUtil;
|
||||
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
|
||||
import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataSaveReqVO;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData;
|
||||
import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleDataDO;
|
||||
import cn.iocoder.yudao.module.infra.dal.mongodb.saledata.SaleDataRepository;
|
||||
import cn.iocoder.yudao.module.infra.dal.mysql.saledata.SaleDataMapper;
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.baomidou.dynamic.datasource.annotation.Slave;
|
||||
import com.mongodb.DuplicateKeyException;
|
||||
import com.mongodb.MongoException;
|
||||
import org.springframework.data.domain.Example;
|
||||
import org.springframework.data.domain.ExampleMatcher;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
@ -22,6 +26,9 @@ import java.text.SimpleDateFormat;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.exact;
|
||||
|
||||
|
||||
/**
|
||||
@ -45,15 +52,33 @@ public class SaleDataServiceImpl implements SaleDataService {
|
||||
return saleDataDO.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createSaleDataByMongoDB(SaleData saleData) {
|
||||
try {
|
||||
saleDataRepository.insert(saleData);
|
||||
return "添加成功";
|
||||
} catch (MongoException e) {
|
||||
return "Mongo数据库写入异常";
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> checkDuplicatesByMongoDB(List<String> dataIdList) {
|
||||
// 查询数据库中存在的 dataId
|
||||
List<CheckTicket> existingTickets = saleDataRepository.findByDataIdIn(dataIdList);
|
||||
// 把CheckTicket对象列表改为dataId字符串列表
|
||||
return existingTickets.stream().map(CheckTicket::getDataId).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> insertByMongoDB(List<SaleData> saleDataList) {
|
||||
List<Integer> resultNum = new ArrayList<>(2);
|
||||
// 把检票数据的dataId提取出来成数组
|
||||
List<String> dataIdList = saleDataList.stream().map(SaleData::getDataId).collect(Collectors.toList());
|
||||
// 获取重复的dataId
|
||||
List<String> duplicatesDataId = this.checkDuplicatesByMongoDB(dataIdList);
|
||||
// 对checkTicketList过滤掉重复的dataId对应的数据,得到需要插入的不重复数据
|
||||
List<SaleData> insertSaleDataList = saleDataList.stream().filter(saleData -> !duplicatesDataId.contains(saleData.getDataId())).collect(Collectors.toList());
|
||||
try {
|
||||
// 批量插入
|
||||
saleDataRepository.insert(insertSaleDataList);
|
||||
} catch (MongoException e){
|
||||
return null;
|
||||
}
|
||||
resultNum.add(insertSaleDataList.size());
|
||||
resultNum.add(duplicatesDataId.size());
|
||||
return resultNum;
|
||||
}
|
||||
|
||||
|
||||
|
@ -4,6 +4,7 @@ spring:
|
||||
mongodb:
|
||||
uri: mongodb://root:123456@120.46.37.243:27017/admin?authMechanism=SCRAM-SHA-256
|
||||
database: sn-lundu-db
|
||||
# uri: mongodb://127.0.0.1:27017/sn-lundu-db?
|
||||
# 数据源配置项
|
||||
autoconfigure:
|
||||
exclude:
|
||||
|
Loading…
Reference in New Issue
Block a user