Compare commits

..

No commits in common. "0d8fa5efb190a258ff982746ac40134a5b5c7be6" and "0f7f5797f8ac7e011dc9c7ecaf8636719c51d160" have entirely different histories.

7 changed files with 25 additions and 83 deletions

View File

@ -12,29 +12,24 @@ import java.util.List;
@Repository @Repository
public interface CheckTicketTodayRepository extends MongoRepository<CheckTicketToday, String> { public interface CheckTicketTodayRepository extends MongoRepository<CheckTicketToday,String> {
/** /**
* 根据dataId数组查找 * 根据dataId数组查找
*
* @param dataIdList * @param dataIdList
* @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket> * @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket>
*/ */
@Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }") @Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }")
List<CheckTicketToday> findByDataIdIn(List<String> dataIdList); List<CheckTicketToday> findByDataIdIn(List<String> dataIdList);
/** /**
* 找到所有不在时间段内的日期字段 * 找到所有不在时间段内的日期字段
*
* @param startDate * @param startDate
* @param endDate * @param endDate
* @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicketToday> * @return java.util.List<cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicketToday>
*/ */
@Query("{ 'checkticketdate': { $not: { $gte: ?0, $lt: ?1 } } }") @Query("{ 'checkticketdate': { $not: { $gte: ?0, $lt: ?1 } } }")
List<CheckTicketToday> findNotInTimeRange(String startDate, String endDate); List<CheckTicketToday> findNotInTimeRange(String startDate, String endDate);
/** /**
* 查询所有去重的 checkticketdate 字段值 * 查询所有去重的 checkticketdate 字段值
*
* @return 去重后的 checkticketdate 字段值列表 * @return 去重后的 checkticketdate 字段值列表
*/ */
@Aggregation(pipeline = { @Aggregation(pipeline = {
@ -44,10 +39,9 @@ public interface CheckTicketTodayRepository extends MongoRepository<CheckTicketT
List<String> findDistinctDate(); List<String> findDistinctDate();
/** /**
* 计算自定义日期的总数 * 删除 checkticketdate 字段值在给定列表中的所有记录
* @param date * @param dates 日期列表
* @return int
*/ */
@Query(value = "{ 'checkticketdate': ?0 }", count = true) @Query("{ 'checkticketdate': { $in: ?0 } }")
long countByCustomDate(String date); void deleteByCheckticketdateIn(List<String> dates);
} }

View File

@ -87,12 +87,4 @@ public interface SaleDataTodayRepository extends MongoRepository<SaleDataToday,S
"{$project: {_id: 0, sddate: '$_id'}}" "{$project: {_id: 0, sddate: '$_id'}}"
}) })
List<String> findDistinctDate(); List<String> findDistinctDate();
/**
* 根据日期计算总数
* @param date
* @return java.lang.Long
*/
@Query(value = "{ 'sddate': ?0 }", count = true)
Long countByCustomDate(String date);
} }

View File

@ -63,11 +63,7 @@ public class TicketDataMigration {
int failedCount = 0; int failedCount = 0;
int pageNumber = 1; int pageNumber = 1;
XxlJobHelper.log("抽数服务mysql销售数据开始抽数..."); XxlJobHelper.log("抽数服务mysql销售数据开始抽数...");
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
if (ticketingSamplingRespVO == null) {
XxlJobHelper.log("暂无数据!");
return;
}
int totalRows = ticketingSamplingRespVO.getTotalRows(); int totalRows = ticketingSamplingRespVO.getTotalRows();
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList(); List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
// 总条数小于等于每页条数 则直接按照总条数进行插入即可 // 总条数小于等于每页条数 则直接按照总条数进行插入即可
@ -82,7 +78,7 @@ public class TicketDataMigration {
int totalPages = ticketingSamplingRespVO.getTotalPages(); int totalPages = ticketingSamplingRespVO.getTotalPages();
// 处理剩余页的数据 // 处理剩余页的数据
for (; pageNumber <= totalPages; pageNumber++) { for (; pageNumber <= totalPages; pageNumber++) {
ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
mapList = ticketingSamplingRespVO.getDataMapList(); mapList = ticketingSamplingRespVO.getDataMapList();
// 对最后一页进行处理得到最后一页的实际条数 // 对最后一页进行处理得到最后一页的实际条数
@ -94,12 +90,7 @@ public class TicketDataMigration {
} }
} }
if (!saleDataService.dataValidation(TickingDateUtils.getPreviousDayFormat())) {
XxlJobHelper.log("mysql和mongo数据验证不一致");
XxlJobHelper.log("重新抽取({})mongo数据", TickingDateUtils.getPreviousDayFormat());
getSaleDataByMongoDBHandler();
XxlJobHelper.log("抽数完,比对结果:{}", saleDataService.dataValidation(TickingDateUtils.getPreviousDayFormat()) ? "相等" : "不想等");
}
XxlJobHelper.log("抽数服务:销售数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); XxlJobHelper.log("抽数服务:销售数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount);
} }
@ -111,11 +102,7 @@ public class TicketDataMigration {
int failedCount = 0; int failedCount = 0;
int pageNumber = 1; int pageNumber = 1;
XxlJobHelper.log("抽数服务mysql检票数据开始抽数..."); XxlJobHelper.log("抽数服务mysql检票数据开始抽数...");
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
if (ticketingSamplingRespVO == null) {
XxlJobHelper.log("暂无数据!");
return;
}
int totalRows = ticketingSamplingRespVO.getTotalRows(); int totalRows = ticketingSamplingRespVO.getTotalRows();
List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList(); List<Map<String, Object>> mapList = ticketingSamplingRespVO.getDataMapList();
// 总条数小于等于每页条数 则直接按照总条数进行插入即可 // 总条数小于等于每页条数 则直接按照总条数进行插入即可
@ -130,7 +117,7 @@ public class TicketDataMigration {
int totalPages = ticketingSamplingRespVO.getTotalPages(); int totalPages = ticketingSamplingRespVO.getTotalPages();
// 处理剩余页的数据 // 处理剩余页的数据
for (; pageNumber <= totalPages; pageNumber++) { for (; pageNumber <= totalPages; pageNumber++) {
ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
mapList = ticketingSamplingRespVO.getDataMapList(); mapList = ticketingSamplingRespVO.getDataMapList();
// 对最后一页进行处理得到最后一页的实际条数 // 对最后一页进行处理得到最后一页的实际条数
if (pageNumber != totalPages) { if (pageNumber != totalPages) {
@ -139,13 +126,9 @@ public class TicketDataMigration {
failedCount += insertCheckTicket(mapList, totalRows % PAGE_SIZE, pageNumber); failedCount += insertCheckTicket(mapList, totalRows % PAGE_SIZE, pageNumber);
} }
} }
} }
if (!checkTicketService.dataValidation(TickingDateUtils.getPreviousDayFormat())) {
XxlJobHelper.log("mysql和mongo数据验证不一致");
XxlJobHelper.log("重新抽取({})mongo数据", TickingDateUtils.getPreviousDayFormat());
getCheckTicketByMongoDBHandler();
XxlJobHelper.log("抽数完,比对结果:{}", checkTicketService.dataValidation(TickingDateUtils.getPreviousDayFormat()) ? "相等" : "不想等");
}
XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount);
} }
@ -159,10 +142,6 @@ public class TicketDataMigration {
int pageNumber = 1; int pageNumber = 1;
XxlJobHelper.log("抽数服务mongodb销售数据开始抽数..."); XxlJobHelper.log("抽数服务mongodb销售数据开始抽数...");
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber);
if (ticketingSamplingRespVO == null) {
XxlJobHelper.log("暂无数据!");
return;
}
int totalRows = ticketingSamplingRespVO.getTotalRows(); int totalRows = ticketingSamplingRespVO.getTotalRows();
List<Map<String, Object>> mapList; List<Map<String, Object>> mapList;
// 获得总页数 // 获得总页数
@ -194,10 +173,6 @@ public class TicketDataMigration {
int pageNumber = 1; int pageNumber = 1;
String date = TickingDateUtils.getNowDayFormat(); String date = TickingDateUtils.getNowDayFormat();
TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber);
if (ticketingSamplingRespVO == null) {
XxlJobHelper.log("暂无数据!");
return;
}
int totalRows = ticketingSamplingRespVO.getTotalRows(); int totalRows = ticketingSamplingRespVO.getTotalRows();
List<Map<String, Object>> mapList; List<Map<String, Object>> mapList;
// 获得总页数 // 获得总页数
@ -332,7 +307,7 @@ public class TicketDataMigration {
* 删除今年和去年往前推31天外的数据和抽取去年明天的数据 * 删除今年和去年往前推31天外的数据和抽取去年明天的数据
*/ */
@XxlJob("deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow") @XxlJob("deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow")
public void deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow() { public void deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow(){
XxlJobHelper.log("删除多余数据开始..."); XxlJobHelper.log("删除多余数据开始...");
// 检票数据今年去年多余删除 // 检票数据今年去年多余删除
XxlJobHelper.log("删除检票日期为{}", checkTicketService.deleteTwoYearLastMonthAgo().toString()); XxlJobHelper.log("删除检票日期为{}", checkTicketService.deleteTwoYearLastMonthAgo().toString());
@ -426,7 +401,7 @@ public class TicketDataMigration {
int currentDay = startDate.get(Calendar.DAY_OF_MONTH); int currentDay = startDate.get(Calendar.DAY_OF_MONTH);
result.add(String.format("抽数服务:%d年%d月%d日" + methodName + "数据抽数正常结束!总共:%d条数据%d条数据重复%d条数据插入成功", currentYear, currentMonth, currentDay, totalRows, duplicatesCount, successCount)); result.add(String.format("抽数服务:%d年%d月%d日" + methodName + "数据抽数正常结束!总共:%d条数据%d条数据重复%d条数据插入成功", currentYear, currentMonth, currentDay, totalRows, duplicatesCount, successCount));
XxlJobHelper.log("抽数服务:{}年{}月{}日:{}数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", currentYear, currentMonth, currentDay, methodName, totalRows, duplicatesCount, successCount); XxlJobHelper.log("抽数服务:{}年{}月{}日:{}数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", currentYear, currentMonth, currentDay, methodName, totalRows, duplicatesCount, successCount);
if (startDate.equals(endDate)) { // 如果日期相等退出循环 if (startDate.equals(endDate)){ // 如果日期相等退出循环
break; break;
} }
// 日期递增一天 // 日期递增一天
@ -513,20 +488,25 @@ public class TicketDataMigration {
XxlJobHelper.handleFail("数据抽数失败,请重试"); XxlJobHelper.handleFail("数据抽数失败,请重试");
throw new RuntimeException(); throw new RuntimeException();
} else if (ticketingSamplingRespVO.getPageNumber() == null || ticketingSamplingRespVO.getPageSize() == null || ticketingSamplingRespVO.getTotalPages() == null || ticketingSamplingRespVO.getTotalRows() == null || ticketingSamplingRespVO.getDataMapList().isEmpty()) { } else if (ticketingSamplingRespVO.getPageNumber() == null || ticketingSamplingRespVO.getPageSize() == null || ticketingSamplingRespVO.getTotalPages() == null || ticketingSamplingRespVO.getTotalRows() == null || ticketingSamplingRespVO.getDataMapList().isEmpty()) {
XxlJobHelper.log("{},请求抽数成功,但返回参数值:{}", date, ticketingSamplingRespVO); XxlJobHelper.log("{},请求抽数成功,但返回参数存在问题{}", date, ticketingSamplingRespVO);
return null; return null;
} }
return ticketingSamplingRespVO; return ticketingSamplingRespVO;
} }
/** /**
* 校验数据 * 请求失败消息
* *
* @param date * @param
* @return java.lang.Boolean * @return void
*/ */
public Boolean dataValidation(String date) { private void requestFailedMessage(TicketingSamplingRespVO ticketingSamplingRespVO) {
return checkTicketService.dataValidation(date) && saleDataService.dataValidation(date); // 如果访问失败则让调度中心重新调用
if (!ticketingSamplingRespVO.isPassflag()) {
// 让调度中心重新调用
XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg());
XxlJobHelper.handleFail("销售数据抽数失败,请重试");
}
} }
} }

View File

@ -56,11 +56,4 @@ public interface CheckTicketService {
List<String> deleteTwoYearLastMonthAgo(); List<String> deleteTwoYearLastMonthAgo();
List<String> findDistinctDate(); List<String> findDistinctDate();
/**
* mongo和mysql数据数量验证
* @param date
* @return java.lang.Boolean
*/
Boolean dataValidation(String date);
} }

View File

@ -149,9 +149,4 @@ public class CheckTicketServiceImpl implements CheckTicketService {
return checkTicketTodayRepository.findDistinctDate(); return checkTicketTodayRepository.findDistinctDate();
} }
@Override
public Boolean dataValidation(String date) {
return checkTicketMapper.selectCount(CheckTicketDO::getCheckticketdate, date) == checkTicketTodayRepository.countByCustomDate(date);
}
} }

View File

@ -55,11 +55,4 @@ public interface SaleDataService {
* @return java.util.List<java.lang.String> * @return java.util.List<java.lang.String>
*/ */
List<String> deleteTwoYearLastMonthAgo(); List<String> deleteTwoYearLastMonthAgo();
/**
* mongo和mysql数据数量验证
* @param date
* @return boolean
*/
Boolean dataValidation(String date);
} }

View File

@ -136,9 +136,4 @@ public class SaleDataServiceImpl implements SaleDataService {
mongoTemplate.remove(query, "sale_data_today"); mongoTemplate.remove(query, "sale_data_today");
return filteredDate; return filteredDate;
} }
@Override
public Boolean dataValidation(String date) {
return Objects.equals(saleDataMapper.selectCount(SaleDataDO::getSddate, date), saleDataTodayRepository.countByCustomDate(date));
}
} }