From 3eca0681d9d3d8a057314e8b8bcf32ae49e23730 Mon Sep 17 00:00:00 2001 From: XinWei <2718030729@qq.com> Date: Mon, 2 Sep 2024 10:58:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E6=95=B0=EF=BC=9A=E9=AA=8C=E8=AF=81my?= =?UTF-8?q?sql=E5=92=8Cmongo=E6=95=B0=E9=87=8F=E6=98=AF=E5=90=A6=E4=B8=80?= =?UTF-8?q?=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CheckTicketTodayRepository.java | 16 +++-- .../saledata/SaleDataTodayRepository.java | 8 +++ .../job/ticketing/TicketDataMigration.java | 60 ++++++++++++------- .../checkticket/CheckTicketService.java | 7 +++ .../checkticket/CheckTicketServiceImpl.java | 5 ++ .../service/saledata/SaleDataService.java | 7 +++ .../service/saledata/SaleDataServiceImpl.java | 5 ++ 7 files changed, 83 insertions(+), 25 deletions(-) diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketTodayRepository.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketTodayRepository.java index aa1c5adf2..e239325c1 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketTodayRepository.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketTodayRepository.java @@ -12,24 +12,29 @@ import java.util.List; @Repository -public interface CheckTicketTodayRepository extends MongoRepository { +public interface CheckTicketTodayRepository extends MongoRepository { /** * 根据dataId数组查找 + * * @param dataIdList * @return java.util.List */ @Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }") List findByDataIdIn(List dataIdList); + /** * 找到所有不在时间段内的日期字段 + * * @param startDate * @param endDate * @return java.util.List */ @Query("{ 'checkticketdate': { $not: { $gte: ?0, $lt: ?1 } } }") List findNotInTimeRange(String startDate, String endDate); + /** * 查询所有去重的 checkticketdate 字段值 + * * @return 去重后的 checkticketdate 字段值列表 */ @Aggregation(pipeline = { @@ -39,9 +44,10 @@ public interface CheckTicketTodayRepository extends MongoRepository findDistinctDate(); /** - * 删除 checkticketdate 字段值在给定列表中的所有记录 - * @param dates 日期列表 + * 计算自定义日期的总数 + * @param date + * @return int */ - @Query("{ 'checkticketdate': { $in: ?0 } }") - void deleteByCheckticketdateIn(List dates); + @Query(value = "{ 'checkticketdate': ?0 }", count = true) + long countByCustomDate(String date); } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataTodayRepository.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataTodayRepository.java index 241dc0158..401b110d5 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataTodayRepository.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataTodayRepository.java @@ -87,4 +87,12 @@ public interface SaleDataTodayRepository extends MongoRepository findDistinctDate(); + + /** + * 根据日期计算总数 + * @param date + * @return java.lang.Long + */ + @Query(value = "{ 'sddate': ?0 }", count = true) + Long countByCustomDate(String date); } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java index 30a5504a7..b97bfee83 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java @@ -63,7 +63,11 @@ public class TicketDataMigration { int failedCount = 0; int pageNumber = 1; XxlJobHelper.log("抽数服务:mysql销售数据开始抽数..."); - TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); + if (ticketingSamplingRespVO == null) { + XxlJobHelper.log("暂无数据!"); + return; + } int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList = ticketingSamplingRespVO.getDataMapList(); // 总条数小于等于每页条数 则直接按照总条数进行插入即可 @@ -78,7 +82,7 @@ public class TicketDataMigration { int totalPages = ticketingSamplingRespVO.getTotalPages(); // 处理剩余页的数据 for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); mapList = ticketingSamplingRespVO.getDataMapList(); // 对最后一页进行处理,得到最后一页的实际条数 @@ -90,7 +94,12 @@ public class TicketDataMigration { } } - + if (!saleDataService.dataValidation(TickingDateUtils.getPreviousDayFormat())) { + XxlJobHelper.log("mysql和mongo数据验证不一致!!!"); + XxlJobHelper.log("重新抽取({})mongo数据", TickingDateUtils.getPreviousDayFormat()); + getSaleDataByMongoDBHandler(); + XxlJobHelper.log("抽数完,比对结果:{}", saleDataService.dataValidation(TickingDateUtils.getPreviousDayFormat()) ? "相等" : "不想等"); + } XxlJobHelper.log("抽数服务:销售数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); } @@ -102,7 +111,11 @@ public class TicketDataMigration { int failedCount = 0; int pageNumber = 1; XxlJobHelper.log("抽数服务:mysql检票数据开始抽数..."); - TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); + if (ticketingSamplingRespVO == null) { + XxlJobHelper.log("暂无数据!"); + return; + } int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList = ticketingSamplingRespVO.getDataMapList(); // 总条数小于等于每页条数 则直接按照总条数进行插入即可 @@ -117,7 +130,7 @@ public class TicketDataMigration { int totalPages = ticketingSamplingRespVO.getTotalPages(); // 处理剩余页的数据 for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); mapList = ticketingSamplingRespVO.getDataMapList(); // 对最后一页进行处理,得到最后一页的实际条数 if (pageNumber != totalPages) { @@ -126,9 +139,13 @@ public class TicketDataMigration { failedCount += insertCheckTicket(mapList, totalRows % PAGE_SIZE, pageNumber); } } - } - + if (!checkTicketService.dataValidation(TickingDateUtils.getPreviousDayFormat())) { + XxlJobHelper.log("mysql和mongo数据验证不一致!!!"); + XxlJobHelper.log("重新抽取({})mongo数据", TickingDateUtils.getPreviousDayFormat()); + getCheckTicketByMongoDBHandler(); + XxlJobHelper.log("抽数完,比对结果:{}", checkTicketService.dataValidation(TickingDateUtils.getPreviousDayFormat()) ? "相等" : "不想等"); + } XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); } @@ -142,6 +159,10 @@ public class TicketDataMigration { int pageNumber = 1; XxlJobHelper.log("抽数服务:mongodb销售数据开始抽数..."); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + if (ticketingSamplingRespVO == null) { + XxlJobHelper.log("暂无数据!"); + return; + } int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList; // 获得总页数 @@ -173,6 +194,10 @@ public class TicketDataMigration { int pageNumber = 1; String date = TickingDateUtils.getNowDayFormat(); TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber); + if (ticketingSamplingRespVO == null) { + XxlJobHelper.log("暂无数据!"); + return; + } int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList; // 获得总页数 @@ -307,7 +332,7 @@ public class TicketDataMigration { * 删除(今年和去年)往前推31天外的数据和抽取去年明天的数据 */ @XxlJob("deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow") - public void deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow(){ + public void deleteTwoYearOneMonthAgoAndUpdateLastYearTomorrow() { XxlJobHelper.log("删除多余数据开始..."); // 检票数据今年去年多余删除 XxlJobHelper.log("删除检票日期为{}", checkTicketService.deleteTwoYearLastMonthAgo().toString()); @@ -401,7 +426,7 @@ public class TicketDataMigration { int currentDay = startDate.get(Calendar.DAY_OF_MONTH); result.add(String.format("抽数服务:%d年%d月%d日:" + methodName + "数据抽数正常结束!总共:%d条数据,%d条数据重复,%d条数据插入成功!", currentYear, currentMonth, currentDay, totalRows, duplicatesCount, successCount)); XxlJobHelper.log("抽数服务:{}年{}月{}日:{}数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", currentYear, currentMonth, currentDay, methodName, totalRows, duplicatesCount, successCount); - if (startDate.equals(endDate)){ // 如果日期相等,退出循环 + if (startDate.equals(endDate)) { // 如果日期相等,退出循环 break; } // 日期递增一天 @@ -488,25 +513,20 @@ public class TicketDataMigration { XxlJobHelper.handleFail("数据抽数失败,请重试"); throw new RuntimeException(); } else if (ticketingSamplingRespVO.getPageNumber() == null || ticketingSamplingRespVO.getPageSize() == null || ticketingSamplingRespVO.getTotalPages() == null || ticketingSamplingRespVO.getTotalRows() == null || ticketingSamplingRespVO.getDataMapList().isEmpty()) { - XxlJobHelper.log("{},请求抽数成功,但返回参数值存在问题:{}", date, ticketingSamplingRespVO); + XxlJobHelper.log("{},请求抽数成功,但返回参数无值:{}", date, ticketingSamplingRespVO); return null; } return ticketingSamplingRespVO; } /** - * 请求失败消息 + * 校验数据 * - * @param - * @return void + * @param date + * @return java.lang.Boolean */ - private void requestFailedMessage(TicketingSamplingRespVO ticketingSamplingRespVO) { - // 如果访问失败则让调度中心重新调用 - if (!ticketingSamplingRespVO.isPassflag()) { - // 让调度中心重新调用 - XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - } + public Boolean dataValidation(String date) { + return checkTicketService.dataValidation(date) && saleDataService.dataValidation(date); } } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java index 6370b1eec..eb687dae3 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java @@ -56,4 +56,11 @@ public interface CheckTicketService { List deleteTwoYearLastMonthAgo(); List findDistinctDate(); + + /** + * mongo和mysql数据数量验证 + * @param date + * @return java.lang.Boolean + */ + Boolean dataValidation(String date); } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java index 58c81f869..2e8659898 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java @@ -149,4 +149,9 @@ public class CheckTicketServiceImpl implements CheckTicketService { return checkTicketTodayRepository.findDistinctDate(); } + @Override + public Boolean dataValidation(String date) { + return checkTicketMapper.selectCount(CheckTicketDO::getCheckticketdate, date) == checkTicketTodayRepository.countByCustomDate(date); + } + } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java index 4d2a2d318..2215f5912 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java @@ -55,4 +55,11 @@ public interface SaleDataService { * @return java.util.List */ List deleteTwoYearLastMonthAgo(); + + /** + * mongo和mysql数据数量验证 + * @param date + * @return boolean + */ + Boolean dataValidation(String date); } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java index fad037a84..f80356acb 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java @@ -136,4 +136,9 @@ public class SaleDataServiceImpl implements SaleDataService { mongoTemplate.remove(query, "sale_data_today"); return filteredDate; } + + @Override + public Boolean dataValidation(String date) { + return Objects.equals(saleDataMapper.selectCount(SaleDataDO::getSddate, date), saleDataTodayRepository.countByCustomDate(date)); + } } \ No newline at end of file