MoveData.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. package com.qlm.job;
  2. import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
  3. import com.jfinal.plugin.activerecord.Db;
  4. import com.jfinal.plugin.activerecord.Record;
  5. import com.jfinal.plugin.druid.DruidPlugin;
  6. import com.jfinal.weixin.sdk.kit.PaymentKit;
  7. import com.qlm.tools.LocalDateUtils;
  8. import com.qlm.tools.StateMentsUtils;
  9. import com.qlm.tools.WxUtil;
  10. import org.quartz.Job;
  11. import org.quartz.JobExecutionContext;
  12. import org.quartz.JobExecutionException;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import javax.crypto.Mac;
  16. import javax.crypto.spec.SecretKeySpec;
  17. import java.math.BigDecimal;
  18. import java.nio.charset.StandardCharsets;
  19. import java.text.SimpleDateFormat;
  20. import java.time.LocalDateTime;
  21. import java.util.*;
  22. import java.util.concurrent.CompletableFuture;
  23. import java.util.concurrent.ExecutionException;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. /**
  27. * @author nommpp
  28. * 每天10点开始
  29. */
  30. public class MoveData implements Job {
  31. protected final static Logger log = LoggerFactory.getLogger(MoveData.class);
  32. private static final int SINGLE_C = 100000;
  33. @Override
  34. public void execute(JobExecutionContext jobExecutionContext) {
  35. Calendar instance = Calendar.getInstance();
  36. int hour = instance.get(Calendar.HOUR_OF_DAY);
  37. instance.add(Calendar.DAY_OF_MONTH, -1);
  38. Date time = instance.getTime();
  39. SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");
  40. String format = sdf.format(time);
  41. try {
  42. if(hour>=9 && hour<=12){
  43. MoveData.checkNo(format);
  44. }
  45. } catch (Exception e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. }
  50. private static void con(){
  51. DruidPlugin dp = new DruidPlugin("jdbc:mysql://rm-bp1j1ibp4ra74p110to.mysql.rds.aliyuncs.com:3306/kongka?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull", "dnzc", "Wangzih2022+-");
  52. ActiveRecordPlugin arp = new ActiveRecordPlugin(dp);
  53. // ��web����Ψһ�IJ�ͬ��Ҫ�ֶ�����һ����ز����start()����
  54. dp.start();
  55. arp.start();
  56. }
  57. public static void main(String[] args) throws Exception {
  58. con();
  59. String date = "2022.06.26";
  60. checkNo(date);
  61. }
  62. public static void checkNo(String date) throws Exception {
  63. log.info("开始检查单号:"+date);
  64. String startTime = date+" 00:00:00";
  65. String endTime = date+" 23:59:59";
  66. readNo(date);
  67. String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"' and amount <=100000";
  68. Long payCountDb = Db.queryLong(payCountSql);
  69. if(payCountDb == null){
  70. payCountDb = 0l;
  71. }
  72. if(payCountDb == 0){
  73. log.info("暂无资金");
  74. return;
  75. }
  76. Long scanCount = Db.queryLong("select count(*) from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'");
  77. if(scanCount == null){
  78. scanCount = 0l;
  79. }
  80. if(!scanCount .equals(payCountDb)){
  81. Map<String,Record> maps = new HashMap<String, Record>();
  82. List<Record> find = Db.find("select * from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime + "' and amount <=100000");
  83. for (Record record : find) {
  84. String wx_no = record.getStr("wx_no");
  85. maps.put(wx_no, record);
  86. }
  87. List<Record> list = Db.find("select * from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'");
  88. for (Record record : list) {
  89. String weixin_no_ = record.getStr("weixin_no_");
  90. maps.remove(weixin_no_);
  91. }
  92. Set<String> keySet = maps.keySet();
  93. for (String nos : keySet) {
  94. Record record = maps.get(nos);
  95. String pay_no = record.getStr("pay_no");
  96. Record findFirst = Db.findFirst("select * from t_qcode_hb_a where no_ = ?",pay_no);
  97. if(findFirst != null){
  98. String id = findFirst.getStr("id");
  99. Date payDate = record.getDate("pay_time");
  100. findFirst.set("send_time",payDate).set("send_state", 1);
  101. Db.update("t_qcode_hb_a", findFirst);
  102. Record findById = Db.findById("t_scanrecord", "qrcode_id", id);
  103. if(findById != null){
  104. findById.set("send_time",payDate).set("send_state", 1).set("weixin_no_", nos);
  105. Db.update("t_scanrecord", "qrcode_id", findById);
  106. }
  107. log.info(id+" 更新完成");
  108. }
  109. }
  110. }
  111. log.info(date+" 完成");
  112. }
  113. /**
  114. * 核对数据
  115. */
  116. private static void check(String startTime,String endTime) {
  117. log.info("{}开始核对数据",startTime);
  118. ExecutorService executors = Executors.newFixedThreadPool(4);
  119. CompletableFuture<List<String>> c1 = CompletableFuture.supplyAsync(()->{
  120. log.info("开始查询 {} 表数据","t_order2");
  121. String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"'";
  122. Long payCountDb = Db.queryLong(payCountSql);
  123. int payCount = payCountDb == null ? 0 : payCountDb.intValue();
  124. if(payCount > SINGLE_C){
  125. //总次数
  126. int count = payCount / SINGLE_C;
  127. //剩余次数
  128. int residue = payCount % SINGLE_C;
  129. List<String> ls1 = new ArrayList<>();
  130. int len = 0;
  131. for (int i = 0; i < count; i++) {
  132. String sqlPay = "select wx_no from t_order2 where pay_time >='" + startTime + "' and pay_time<='" + endTime + "' limit " + len + "," + SINGLE_C + "";
  133. List<Record> payList = Db.find(sqlPay);
  134. for (Record record : payList) {
  135. ls1.add(record.getStr("wx_no"));
  136. }
  137. len += SINGLE_C;
  138. }
  139. if(residue > 0){
  140. String sqlPay = "select wx_no from t_order2 where pay_time >='" + startTime + "' and pay_time<='" + endTime + "' limit " + len + "," + payCount + "";
  141. List<Record> payList = Db.find(sqlPay);
  142. for (Record record : payList) {
  143. ls1.add(record.getStr("wx_no"));
  144. }
  145. }
  146. log.info("{} 表数据查询完成","t_order2");
  147. return ls1;
  148. }else{
  149. String sql1 = "select wx_no from t_order2 where pay_time >='"+startTime+"' and pay_time <='"+endTime+"'";
  150. return Db.query(sql1);
  151. }
  152. },executors);
  153. CompletableFuture<List<String>> c2 = CompletableFuture.supplyAsync(()->{
  154. //查询发放成功的红包总个数
  155. log.info("开始查询 {} 表数据","t_scanrecord");
  156. String hbCountSql = "select count(*) from t_scanrecord where send_time >= '"+startTime+"' and send_time <= '"+endTime+"'";
  157. Long hbCountDb = Db.queryLong(hbCountSql);
  158. int hbCount = hbCountDb == null ? 0 : hbCountDb.intValue();
  159. if(hbCount > SINGLE_C){
  160. //总次数
  161. int count = hbCount / SINGLE_C;
  162. //剩余次数
  163. int residue = hbCount % SINGLE_C;
  164. List<String> ls2 = new ArrayList<>();
  165. int len = 0;
  166. for (int i = 0; i < count; i++) {
  167. String sqlHb = "select weixin_no_ from t_scanrecord where send_time >='" + startTime + "' and send_time<='" + endTime + "' limit " + len + "," + SINGLE_C + "";
  168. List<Record> hbList = Db.find(sqlHb);
  169. for (Record record : hbList) {
  170. ls2.add(record.getStr("weixin_no_"));
  171. }
  172. len += SINGLE_C;
  173. }
  174. if(residue > 0){
  175. String sqlHb = "select weixin_no_ from t_scanrecord where send_time >='" + startTime + "' and send_time<='" + endTime + "' limit " + len + "," + hbCount + "";
  176. List<Record> hbList = Db.find(sqlHb);
  177. for (Record record : hbList) {
  178. ls2.add(record.getStr("weixin_no_"));
  179. }
  180. }
  181. log.info("{} 表数据查询完成","t_scanrecord");
  182. return ls2;
  183. }else{
  184. String sql2 = "select weixin_no_ from t_scanrecord where send_time >='"+startTime+"' and send_time <='"+endTime+"'";
  185. return Db.query(sql2);
  186. }
  187. },executors);
  188. c1.thenCombine(c2, MoveData::getDiffent).thenApplyAsync((lst3) -> {
  189. if (lst3.isEmpty()) {
  190. log.info("{} 数据准确,无需核对", startTime);
  191. return null;
  192. }
  193. List<Record> records = new ArrayList<>(lst3.size());
  194. lst3.parallelStream().forEach(key -> {
  195. Record record = Db.findFirst("select * from t_order2 where wx_no = ?", key);
  196. if (record != null) {
  197. record.set("status_", 1);
  198. Db.update("t_order2", "pay_no", record);
  199. records.add(record);
  200. } else {
  201. log.info("{} 号微信对账单不存在此 wx_no:{}", startTime, key);
  202. }
  203. });
  204. if (records.isEmpty()) {
  205. log.info("{} 号微信对账单有误", startTime);
  206. }
  207. return records;
  208. }, executors).thenApplyAsync(records -> {
  209. if (records == null) {
  210. return null;
  211. }
  212. List<Record> hbRecords = new ArrayList<>(records.size());
  213. if (!records.isEmpty()) {
  214. records.parallelStream().forEach(r -> {
  215. String payNo = r.getStr("pay_no");
  216. if(payNo != null && !payNo.startsWith("c")){
  217. Record unPayRecord = Db.findFirst("select * from t_unpayinfo where pay_no = ? limit 1",payNo);
  218. String code;
  219. if(unPayRecord == null || unPayRecord.getStr("id") == null){
  220. log.info("补发表没有订单号 {} 记录",payNo);
  221. /* 补发记录太多会占满cpu和内存 2020.03.12 lds
  222. String tableName = "t_qcode_hb_a";
  223. Record hbRecord = Db.findFirst("select * from t_qcode_hb_a where no_ = ? limit 1",payNo);
  224. if (hbRecord != null) {
  225. hbRecord.set("send_state", 1).set("send_time", r.getDate("pay_time"));
  226. //Db.update("update t_qcode_hb_a set send_state = ? and send_time = ? where no_ = ?",1,r.getDate("pay_time"),r.getStr("pay_no"));
  227. Db.update(tableName, hbRecord);
  228. hbRecord.set("wx_no", r.getStr("wx_no"));
  229. hbRecords.add(hbRecord);
  230. }
  231. */
  232. }else{
  233. code = unPayRecord.getStr("id");
  234. String prefix = code.substring(0,1);
  235. String tableName = "t_qcode_hb_"+prefix;
  236. Record hbRecord = Db.findById(tableName,code);
  237. if (hbRecord != null) {
  238. hbRecord.set("send_state", 1).set("send_time", r.getDate("pay_time"));
  239. //Db.update("update t_qcode_hb_a set send_state = ? and send_time = ? where no_ = ?",1,r.getDate("pay_time"),r.getStr("pay_no"));
  240. Db.update(tableName, hbRecord);
  241. hbRecord.set("wx_no", r.getStr("wx_no"));
  242. hbRecords.add(hbRecord);
  243. }
  244. }
  245. }
  246. });
  247. if (hbRecords.isEmpty()) {
  248. log.info("{} 红包表没有记录,不能获取二维码,无法核对", startTime);
  249. }
  250. }
  251. return hbRecords;
  252. }, executors).thenApplyAsync(hbrecords -> {
  253. if (hbrecords == null) {
  254. return true;
  255. }
  256. List<Record> scanRecords = new ArrayList<>();
  257. if (!hbrecords.isEmpty()) {
  258. hbrecords.parallelStream().forEach(r -> {
  259. log.info("有误差的二维码 {}", r.getStr("id"));
  260. Db.update("delete from t_unpayinfo where id = ?", r.getStr("id"));
  261. Record sr = Db.findById("t_scanrecord", "qrcode_id", r.getStr("id"));
  262. sr.set("send_state", 1).set("send_time", r.getDate("send_time")).set("weixin_no_", r.getStr("wx_no"));
  263. Db.update("t_scanrecord", "qrcode_id", sr);
  264. scanRecords.add(sr);
  265. });
  266. if (scanRecords.isEmpty()) {
  267. log.info("{} 扫码表没有记录,无法核对", startTime);
  268. return false;
  269. }
  270. }
  271. return !scanRecords.isEmpty();
  272. }, executors).handle((res, ex) -> {
  273. if (ex != null) {
  274. executors.shutdown();
  275. log.info("{} 核对数据出现异常,原因 {}", startTime,ex.getMessage());
  276. ex.printStackTrace();
  277. } else {
  278. executors.shutdown();
  279. if (res) {
  280. log.info("{} 核对数据成功", startTime);
  281. }else{
  282. log.info("{} 核对数据失败", startTime);
  283. }
  284. }
  285. return res;
  286. });
  287. }
  288. private static List<String> getDiffent(List<String> collmax, List<String> collmin)
  289. {
  290. //使用LinkeList防止差异过大时,元素拷贝
  291. List<String> csReturn = new LinkedList();
  292. List<String> max = collmax;
  293. List<String> min = collmin;
  294. //先比较大小,这样会减少后续map的if判断次数
  295. if(collmax.size()<collmin.size())
  296. {
  297. max = collmin;
  298. min = collmax;
  299. }
  300. //直接指定大小,防止再散列
  301. Map<String,Integer> map = new HashMap<>(max.size());
  302. for (String object : min) {
  303. map.put(object, 1);
  304. }
  305. for (String object : max) {
  306. if(map.get(object)==null)
  307. {
  308. csReturn.add(object);
  309. }
  310. }
  311. return csReturn;
  312. }
  313. /**
  314. * 从微信读取指定日期的资金账单
  315. * @param date 指定的日期 yyyy.MM.dd 格式
  316. * @throws Exception
  317. */
  318. private static void readNo(String date) throws Exception {
  319. String startTime = date+" 00:00:00";
  320. String endTime = date+" 23:59:59";
  321. String payCountSql = "select count(*) from t_order2 where pay_time >= '"+startTime+"' and pay_time <= '"+endTime+"' and amount <=100000";
  322. Long payCountDb = Db.queryLong(payCountSql);
  323. if(payCountDb == null){
  324. payCountDb = 0l;
  325. }
  326. if(payCountDb>0){
  327. log.info("{} 资金账单已经存在",date);
  328. return;
  329. }
  330. log.info("{} 开始下载资金账单",date);
  331. Record config = Db.findFirst("select * from t_pay_config");
  332. String mch_id =config.getStr("mch_id");
  333. Map<String, String> params = new HashMap<String, String>();
  334. params.put("appid", "wx83a34a499a3db4f1");//商户号
  335. params.put("mch_id", mch_id);
  336. params.put("nonce_str", System.currentTimeMillis()+"");//随机字符串
  337. String partner_key = config.getStr("partner_key");
  338. String[] dd = date.split("\\.");
  339. StringBuilder sb = new StringBuilder();
  340. for (String string : dd) {
  341. sb.append(string);
  342. }
  343. date = sb.toString();
  344. params.put("bill_date", date);
  345. String certPath;
  346. certPath = "/apiclient_cert.p12";
  347. params.put("account_type", "Operation");//
  348. String stringA = PaymentKit.packageSign(params, false);
  349. String stringSignTemp = stringA + "&key=" + partner_key;
  350. String sign = HMACSHA256(stringSignTemp,partner_key);
  351. params.put("sign", sign);
  352. java.net.URL resource = StateMentsUtils.class.getResource(certPath);
  353. String file = resource.getFile();
  354. String xmlStr = PaymentKit.postSSL("https://api.mch.weixin.qq.com/pay/downloadfundflow", PaymentKit.toXml(params),file,mch_id);
  355. String[] split = xmlStr.split("\n");
  356. int lineNo = split.length;
  357. List<Record> list = new ArrayList<Record>();
  358. for (int i = 1; i < lineNo-2; i++) {
  359. Record r = new Record();
  360. String line = split[i];
  361. String[] split2 = line.split(",");
  362. if(split2.length<=6){
  363. System.out.println(date);
  364. break;
  365. }
  366. String payTime = split2[0];
  367. String payWxno = split2[2];
  368. String payAmount = split2[6];
  369. String payNo = split2[10];
  370. String mcId = split2[8];
  371. payAmount = payAmount.substring(1);
  372. payNo = payNo.substring(1);
  373. payWxno = payWxno.substring(1);
  374. if(WxUtil.isNull(payNo)){
  375. continue;
  376. }
  377. BigDecimal multiply = new BigDecimal(payAmount).multiply(new BigDecimal(100));
  378. int intValue = multiply.intValue();
  379. payTime = payTime.substring(1);
  380. r.set("pay_time", payTime).set("amount", intValue)
  381. .set("pay_no", payNo).set("mc_id", mcId).set("wx_no", payWxno);
  382. list.add(r);
  383. }
  384. WxUtil.batchSave("t_order2", list);
  385. log.info("{} 资金账单下载结束",date);
  386. }
  387. public static String HMACSHA256(String data,String key) throws Exception {
  388. Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
  389. SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
  390. sha256_HMAC.init(secret_key);
  391. byte[] array = sha256_HMAC.doFinal(data.getBytes(StandardCharsets.UTF_8));
  392. StringBuilder sb = new StringBuilder();
  393. for (byte item : array) {
  394. sb.append(Integer.toHexString((item & 0xFF) | 0x100).substring(1, 3));
  395. }
  396. return sb.toString().toUpperCase();
  397. }
  398. }