package com.qlm.job; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.druid.DruidPlugin; import com.jfinal.weixin.sdk.kit.PaymentKit; import com.qlm.tools.LocalDateUtils; import com.qlm.tools.StateMentsUtils; import com.qlm.tools.WxUtil; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author nommpp * 每天10点开始 */ public class MoveData implements Job { protected final static Logger log = LoggerFactory.getLogger(MoveData.class); private static final int SINGLE_C = 100000; @Override public void execute(JobExecutionContext jobExecutionContext) { Calendar instance = Calendar.getInstance(); int hour = instance.get(Calendar.HOUR_OF_DAY); instance.add(Calendar.DAY_OF_MONTH, -1); Date time = instance.getTime(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss"); String format = sdf.format(time); try { if(hour>=9 && hour<=12){ MoveData.checkNo(format); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static void con(){ DruidPlugin dp = new DruidPlugin("jdbc:mysql://rm-bp1j1ibp4ra74p110to.mysql.rds.aliyuncs.com:3306/kongka?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull", "dnzc", "Wangzih2022+-"); ActiveRecordPlugin arp = new ActiveRecordPlugin(dp); // ��web����Ψһ�IJ�ͬ��Ҫ�ֶ�����һ����ز����start()���� dp.start(); arp.start(); } public static void main(String[] args) throws Exception { con(); String date = "2022.06.26"; checkNo(date); } public static void checkNo(String date) throws Exception { log.info("开始检查单号:"+date); String startTime = date+" 00:00:00"; String endTime = date+" 23:59:59"; readNo(date); String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"' and amount <=100000"; Long payCountDb = Db.queryLong(payCountSql); if(payCountDb == null){ payCountDb = 0l; } if(payCountDb == 0){ log.info("暂无资金"); return; } Long scanCount = Db.queryLong("select count(*) from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'"); if(scanCount == null){ scanCount = 0l; } if(!scanCount .equals(payCountDb)){ Map maps = new HashMap(); List find = Db.find("select * from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime + "' and amount <=100000"); for (Record record : find) { String wx_no = record.getStr("wx_no"); maps.put(wx_no, record); } List list = Db.find("select * from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'"); for (Record record : list) { String weixin_no_ = record.getStr("weixin_no_"); maps.remove(weixin_no_); } Set keySet = maps.keySet(); for (String nos : keySet) { Record record = maps.get(nos); String pay_no = record.getStr("pay_no"); Record findFirst = Db.findFirst("select * from t_qcode_hb_a where no_ = ?",pay_no); if(findFirst != null){ String id = findFirst.getStr("id"); Date payDate = record.getDate("pay_time"); findFirst.set("send_time",payDate).set("send_state", 1); Db.update("t_qcode_hb_a", findFirst); Record findById = Db.findById("t_scanrecord", "qrcode_id", id); if(findById != null){ findById.set("send_time",payDate).set("send_state", 1).set("weixin_no_", nos); Db.update("t_scanrecord", "qrcode_id", findById); } log.info(id+" 更新完成"); } } } log.info(date+" 完成"); } /** * 核对数据 */ private static void check(String startTime,String endTime) { log.info("{}开始核对数据",startTime); ExecutorService executors = Executors.newFixedThreadPool(4); CompletableFuture> c1 = CompletableFuture.supplyAsync(()->{ log.info("开始查询 {} 表数据","t_order2"); String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"'"; Long payCountDb = Db.queryLong(payCountSql); int payCount = payCountDb == null ? 0 : payCountDb.intValue(); if(payCount > SINGLE_C){ //总次数 int count = payCount / SINGLE_C; //剩余次数 int residue = payCount % SINGLE_C; List ls1 = new ArrayList<>(); int len = 0; for (int i = 0; i < count; i++) { String sqlPay = "select wx_no from t_order2 where pay_time >='" + startTime + "' and pay_time<='" + endTime + "' limit " + len + "," + SINGLE_C + ""; List payList = Db.find(sqlPay); for (Record record : payList) { ls1.add(record.getStr("wx_no")); } len += SINGLE_C; } if(residue > 0){ String sqlPay = "select wx_no from t_order2 where pay_time >='" + startTime + "' and pay_time<='" + endTime + "' limit " + len + "," + payCount + ""; List payList = Db.find(sqlPay); for (Record record : payList) { ls1.add(record.getStr("wx_no")); } } log.info("{} 表数据查询完成","t_order2"); return ls1; }else{ String sql1 = "select wx_no from t_order2 where pay_time >='"+startTime+"' and pay_time <='"+endTime+"'"; return Db.query(sql1); } },executors); CompletableFuture> c2 = CompletableFuture.supplyAsync(()->{ //查询发放成功的红包总个数 log.info("开始查询 {} 表数据","t_scanrecord"); String hbCountSql = "select count(*) from t_scanrecord where send_time >= '"+startTime+"' and send_time <= '"+endTime+"'"; Long hbCountDb = Db.queryLong(hbCountSql); int hbCount = hbCountDb == null ? 0 : hbCountDb.intValue(); if(hbCount > SINGLE_C){ //总次数 int count = hbCount / SINGLE_C; //剩余次数 int residue = hbCount % SINGLE_C; List ls2 = new ArrayList<>(); int len = 0; for (int i = 0; i < count; i++) { String sqlHb = "select weixin_no_ from t_scanrecord where send_time >='" + startTime + "' and send_time<='" + endTime + "' limit " + len + "," + SINGLE_C + ""; List hbList = Db.find(sqlHb); for (Record record : hbList) { ls2.add(record.getStr("weixin_no_")); } len += SINGLE_C; } if(residue > 0){ String sqlHb = "select weixin_no_ from t_scanrecord where send_time >='" + startTime + "' and send_time<='" + endTime + "' limit " + len + "," + hbCount + ""; List hbList = Db.find(sqlHb); for (Record record : hbList) { ls2.add(record.getStr("weixin_no_")); } } log.info("{} 表数据查询完成","t_scanrecord"); return ls2; }else{ String sql2 = "select weixin_no_ from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'"; return Db.query(sql2); } },executors); c1.thenCombine(c2, MoveData::getDiffent).thenApplyAsync((lst3) -> { if (lst3.isEmpty()) { log.info("{} 数据准确,无需核对", startTime); return null; } List records = new ArrayList<>(lst3.size()); lst3.parallelStream().forEach(key -> { Record record = Db.findFirst("select * from t_order2 where wx_no = ?", key); if (record != null) { record.set("status_", 1); Db.update("t_order2", "pay_no", record); records.add(record); } else { log.info("{} 号微信对账单不存在此 wx_no:{}", startTime, key); } }); if (records.isEmpty()) { log.info("{} 号微信对账单有误", startTime); } return records; }, executors).thenApplyAsync(records -> { if (records == null) { return null; } List hbRecords = new ArrayList<>(records.size()); if (!records.isEmpty()) { records.parallelStream().forEach(r -> { String payNo = r.getStr("pay_no"); if(payNo != null && !payNo.startsWith("c")){ Record unPayRecord = Db.findFirst("select * from t_unpayinfo where pay_no = ? limit 1",payNo); String code; if(unPayRecord == null || unPayRecord.getStr("id") == null){ log.info("补发表没有订单号 {} 记录",payNo); /* 补发记录太多会占满cpu和内存 2020.03.12 lds String tableName = "t_qcode_hb_a"; Record hbRecord = Db.findFirst("select * from t_qcode_hb_a where no_ = ? limit 1",payNo); if (hbRecord != null) { hbRecord.set("send_state", 1).set("send_time", r.getDate("pay_time")); //Db.update("update t_qcode_hb_a set send_state = ? and send_time = ? where no_ = ?",1,r.getDate("pay_time"),r.getStr("pay_no")); Db.update(tableName, hbRecord); hbRecord.set("wx_no", r.getStr("wx_no")); hbRecords.add(hbRecord); } */ }else{ code = unPayRecord.getStr("id"); String prefix = code.substring(0,1); String tableName = "t_qcode_hb_"+prefix; Record hbRecord = Db.findById(tableName,code); if (hbRecord != null) { hbRecord.set("send_state", 1).set("send_time", r.getDate("pay_time")); //Db.update("update t_qcode_hb_a set send_state = ? and send_time = ? where no_ = ?",1,r.getDate("pay_time"),r.getStr("pay_no")); Db.update(tableName, hbRecord); hbRecord.set("wx_no", r.getStr("wx_no")); hbRecords.add(hbRecord); } } } }); if (hbRecords.isEmpty()) { log.info("{} 红包表没有记录,不能获取二维码,无法核对", startTime); } } return hbRecords; }, executors).thenApplyAsync(hbrecords -> { if (hbrecords == null) { return true; } List scanRecords = new ArrayList<>(); if (!hbrecords.isEmpty()) { hbrecords.parallelStream().forEach(r -> { log.info("有误差的二维码 {}", r.getStr("id")); Db.update("delete from t_unpayinfo where id = ?", r.getStr("id")); Record sr = Db.findById("t_scanrecord", "qrcode_id", r.getStr("id")); sr.set("send_state", 1).set("send_time", r.getDate("send_time")).set("weixin_no_", r.getStr("wx_no")); Db.update("t_scanrecord", "qrcode_id", sr); scanRecords.add(sr); }); if (scanRecords.isEmpty()) { log.info("{} 扫码表没有记录,无法核对", startTime); return false; } } return !scanRecords.isEmpty(); }, executors).handle((res, ex) -> { if (ex != null) { executors.shutdown(); log.info("{} 核对数据出现异常,原因 {}", startTime,ex.getMessage()); ex.printStackTrace(); } else { executors.shutdown(); if (res) { log.info("{} 核对数据成功", startTime); }else{ log.info("{} 核对数据失败", startTime); } } return res; }); } private static List getDiffent(List collmax, List collmin) { //使用LinkeList防止差异过大时,元素拷贝 List csReturn = new LinkedList(); List max = collmax; List min = collmin; //先比较大小,这样会减少后续map的if判断次数 if(collmax.size() map = new HashMap<>(max.size()); for (String object : min) { map.put(object, 1); } for (String object : max) { if(map.get(object)==null) { csReturn.add(object); } } return csReturn; } /** * 从微信读取指定日期的资金账单 * @param date 指定的日期 yyyy.MM.dd 格式 * @throws Exception */ private static void readNo(String date) throws Exception { String startTime = date+" 00:00:00"; String endTime = date+" 23:59:59"; String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"' and amount <=100000"; Long payCountDb = Db.queryLong(payCountSql); if(payCountDb == null){ payCountDb = 0l; } if(payCountDb>0){ log.info("{} 资金账单已经存在",date); return; } log.info("{} 开始下载资金账单",date); Record config = Db.findFirst("select * from t_pay_config"); String mch_id =config.getStr("mch_id"); Map params = new HashMap(); params.put("appid", "wx83a34a499a3db4f1");//商户号 params.put("mch_id", mch_id); params.put("nonce_str", System.currentTimeMillis()+"");//随机字符串 String partner_key = config.getStr("partner_key"); String[] dd = date.split("\\."); StringBuilder sb = new StringBuilder(); for (String string : dd) { sb.append(string); } date = sb.toString(); params.put("bill_date", date); String certPath; certPath = "/apiclient_cert.p12"; params.put("account_type", "Operation");// String stringA = PaymentKit.packageSign(params, false); String stringSignTemp = stringA + "&key=" + partner_key; String sign = HMACSHA256(stringSignTemp,partner_key); params.put("sign", sign); java.net.URL resource = StateMentsUtils.class.getResource(certPath); String file = resource.getFile(); String xmlStr = PaymentKit.postSSL("https://api.mch.weixin.qq.com/pay/downloadfundflow", PaymentKit.toXml(params),file,mch_id); String[] split = xmlStr.split("\n"); int lineNo = split.length; List list = new ArrayList(); for (int i = 1; i < lineNo-2; i++) { Record r = new Record(); String line = split[i]; String[] split2 = line.split(","); if(split2.length<=6){ System.out.println(date); break; } String payTime = split2[0]; String payWxno = split2[2]; String payAmount = split2[6]; String payNo = split2[10]; String mcId = split2[8]; payAmount = payAmount.substring(1); payNo = payNo.substring(1); payWxno = payWxno.substring(1); if(WxUtil.isNull(payNo)){ continue; } BigDecimal multiply = new BigDecimal(payAmount).multiply(new BigDecimal(100)); int intValue = multiply.intValue(); payTime = payTime.substring(1); r.set("pay_time", payTime).set("amount", intValue) .set("pay_no", payNo).set("mc_id", mcId).set("wx_no", payWxno); list.add(r); } WxUtil.batchSave("t_order2", list); log.info("{} 资金账单下载结束",date); } public static String HMACSHA256(String data,String key) throws Exception { Mac sha256_HMAC = Mac.getInstance("HmacSHA256"); SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(StandardCharsets.UTF_8), "HmacSHA256"); sha256_HMAC.init(secret_key); byte[] array = sha256_HMAC.doFinal(data.getBytes(StandardCharsets.UTF_8)); StringBuilder sb = new StringBuilder(); for (byte item : array) { sb.append(Integer.toHexString((item & 0xFF) | 0x100).substring(1, 3)); } return sb.toString().toUpperCase(); } }