# -*- codeing = utf-8 -*- # @Time : 2023/5/9 16:37 # @Author : Clown # @File : demo_match_rules_update.py # @Software : PyCharm import requests import pymysql import json import time from datetime import datetime, timedelta from all_key_table import update_key_value_pair_7qiaoPlus from dateutil.parser import parse import traceback def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, port=port, charset='utf8mb4', local_infile=1) print ('\nconnect to mysql server 成功') print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db def read_key_value_pair(db, brand_name, wm_plate, owner): '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出 db:数据库, brand_name:品牌名, wm_plate:外卖平台MEITUAN或ELEME, owner:账号权限all或one ''' cursor = db.cursor () sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";' cursor.execute (sql) pair = json.loads (cursor.fetchall ()[0][0]) return pair def sendMsgToRot(url,msg_txt): params_json = { "msgtype": "markdown", "markdown": { "content": msg_txt, "mentioned_list":["@all"] }} resp = requests.post(url,json=params_json).text print(resp) # 按要求长度对列表进行二次分组 def groupList(input_list,group_size): out_list = [] n = len (input_list) cnt = group_size s = n // cnt r = n % cnt if r > 0: s = s + 1 for i in range (s): b = input_list[(i) * cnt:(i + 1) * cnt] out_list.append(b) return out_list # 批量新增 def insert_7qiaoPlus_rows(Token, applicationId, formModelId, param_json): ''' :param Token: str() 密钥 :param applicationId: str() 应用id :param formModelId: str() 表单id :param param_json: dir() 表单参数 :return: 执行成功无结果,执行失败返回结果(错误报警) ''' out_list = groupList (param_json, 100) groupNum = 1 for param in out_list: print (f'新增批次:{groupNum}') groupNum += 1 headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url_insert = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/batch_save' n = 0 m = 0 while n == 0 and m <= 5: try: resp = requests.post (url_insert, data=json.dumps(param), headers=headers_api).json () if resp['code'] != 0: print ('resp: ',resp) else: ... # print ('Done') n = 1 except Exception as e: time.sleep (1) m += 1 n = 0 print('error: ',e) # 批量更新 def update_7qiaoPlus_rows(Token, applicationId, formModelId, param_json): ''' :param Token: str() 密钥 :param applicationId: str() 应用id :param formModelId: str() 表单id :param variables: 参数键值对,传入需更新的内容{ //"字段名" : "值" "fieldName": "fieldValue", "age": "22", "单行文本" : "文本内容", "数字" : 123, "单项选择" : "1", "多项选择" : ["1", "2", "3"], //人员、部门选择控件需要传入对应的id "人员单选" : "123412341243", "人员多选" : ["123","345","456"], "日期" : "2020-10-10", "日期时间" : "2020-10-10 10:10:00", "时间" : "10:10:00", "富文本" : "文本内容", //图片上传、文件上传、音频、视频可以通过调用文件上传接口,将返回的结果“data”值进行传参 "图片上传" : "data", //手写签名可以通过调用文件上传接口,将返回的结果“fileUrl”值进行传参 "手写签名" : "fileUrl" } :param row_id: 表单实例id :param version: 更新版本 需要通过函数 select_7qiaoPlus_row 获取 :return: ''' out_list = groupList (param_json, 100) groupNum = 1 for param in out_list: print(f'更新批次:{groupNum}') groupNum += 1 headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url_update = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/batch_update' param_json = param_json n = 0 m = 0 while n == 0 and m <= 5: try: resp = requests.post (url_update, data=json.dumps(param), headers=headers_api).json () if resp['code'] != 0: print ('resp: ',resp) else: ... n = 1 except Exception as e: time.sleep (1) m += 1 n = 0 print('error: ',e) # 按id查询 def select_7qiaoPlus_row(Token, applicationId, formModelId, row_id): ''' :param Token: str() 密钥 :param applicationId: str() 应用id :param formModelId: str() 表单id :param row_id: str() 表单实例id :return: resp json()格式 ''' headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url_select = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/{row_id}' resp = requests.get (url_select, headers=headers_api).json () return resp # 查询更新版本号 def selectRowVersion(Token, applicationId, formModelId, row_id): n = 0 m = 0 while n == 0 and m <=5: try: resp = select_7qiaoPlus_row(Token, applicationId, formModelId, row_id) n = 1 result = 0 except: time.sleep(1) m += 1 n = 0 result = 1 # print(resp) try: version = resp['data']['version'] variables = resp['data']['variables']['在售品牌'] newVersion = version+1 except: newVersion = -1-result return newVersion def selectGoodsHashFromMysql(db,keyName,date,Token, applicationId, formModelId): # data 格式为 '20230202' cursor = db.cursor () sql = f'''SELECT * FROM goods_hash WHERE `平台商品名称` LIKE '%{keyName}%' AND `最近一次活跃时间` > '{date}' ORDER BY `首次记录时间` DESC;''' cursor.execute (sql) value_out = cursor.fetchall () cursor.close () brandNameDict = {'JXB':'1','ZXX':'2','LLS':'3'} insertList = [] updateList = [] for goodsInfo in value_out: goodsName = goodsInfo[0] goodsHashId = goodsInfo[1] brandNames = goodsInfo[2] brandNameList = [] for i in ['JXB','ZXX','LLS']: if i in brandNames: brandNameNum = brandNameDict[i] brandNameList.append(brandNameNum) lastTime_date = goodsInfo[3] firstTime_date = goodsInfo[4] insertInfo = {"variables": { "平台菜单编号": goodsHashId, "平台菜单名称": goodsName, "在售品牌": brandNameList, "首次出现日期": int (parse (firstTime_date).timestamp ()) * 1000, "最近一次销售日期": int (parse (lastTime_date).timestamp ()) * 1000, "数据id": goodsHashId }, "id": goodsHashId, "version": 0, "loginUserId": "92874ed35ecb45f51a58adcf18e99b5e"} if int(firstTime_date) > int(date): # print(insertInfo) insertList.append(insertInfo) else: newVersion = selectRowVersion (Token, applicationId, formModelId, goodsHashId) if newVersion == -1: # print (insertInfo) insertList.append (insertInfo) elif newVersion == -2: print(f'【{goodsHashId}】请求错误') else: updateInfo = {"variables": { "在售品牌": brandNameList, "最近一次销售日期": int(parse (lastTime_date).timestamp ())*1000}, "id":goodsHashId , "version": newVersion, "loginUserId": "92874ed35ecb45f51a58adcf18e99b5e"} # print (updateInfo) updateList.append (updateInfo) print (f'总计{len(value_out)},插入{len(insertList)},更新{len(updateList)}') return insertList,updateList # 按最近一次销售日期/首次记录时间 查询倒序查询列表 def selectLogDate(Token, applicationId, formModelId, orderField): headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} # orderField = '最近一次销售日期' url_select = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query?orderField={orderField}' data_params = [] resp = requests.post (url_select, headers=headers_api,data=json.dumps(data_params)).json () lastTime_date = resp['data']['list'][0]['variables'][orderField] return lastTime_date def select_7qiaoPlus_row_by_orderField(Token, applicationId, formModelId, orderField,value): headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url_select = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query' data_params = [{"fieldName":orderField, "logic": "eq", "value":value}] resp = requests.post (url_select, headers=headers_api, data=json.dumps (data_params)).json () return resp def selectResultToList(Token, applicationId, formModelId, orderField,value): resp = select_7qiaoPlus_row_by_orderField(Token, applicationId, formModelId, orderField,value) resp_list = resp['data']['list'] list_len = len(resp_list) dict_out = {} list_out1 = [] list_out2 = [] for info in resp_list: standardId = info['prettyValue']['标准菜单编号'] standardName = info['variables']['标准菜单名称'] dict_out[standardId] = {'平台菜单hash编号': info['prettyValue']['平台菜单编号'], '平台菜单名称': info['variables']['平台菜单名称'], '标准菜单编号': standardId, '标准菜单名称': info['variables']['标准菜单名称'], '标品数量': info['variables']['标品数量']} list_out1.append(standardId) list_out2.append(standardName) return list_len,dict_out,list_out1,list_out2 def select_goods_match_standard(db,goods_hash): cursor = db.cursor () sql = f'''SELECT * FROM goods_match_standard WHERE `平台菜单hash编号` = '{goods_hash}';''' cursor.execute (sql) value_out = cursor.fetchall () cursor.close () value_len = len(value_out) dict_out = {} list_out2 = [] for value in value_out: standardId = value[2] dict_out[standardId] = {'平台菜单hash编号':value[0], '平台菜单名称':value[1], '标准菜单编号':value[2], '标准菜单名称':value[3], '标品数量':value[4]} list_out2.append(standardId) return value_len,dict_out,list_out2 def insert_goods_match_standard(db,insert_dict): cursor = db.cursor () sql = f'''INSERT INTO goods_match_standard values (%s,%s,%s,%s,%s);''' insert_value = (insert_dict['平台菜单hash编号'],insert_dict['平台菜单名称'],insert_dict['标准菜单编号'], insert_dict['标准菜单名称'],insert_dict['标品数量']) cursor.execute (sql,insert_value) db.commit () cursor.close () def delete_goods_match_standard(db,delete_dict): cursor = db.cursor () sql = f'''DELETE FROM goods_match_standard WHERE `平台菜单hash编号` = "{delete_dict['平台菜单hash编号']}" AND `标准菜单编号` = "{delete_dict['标准菜单编号']}";''' cursor.execute (sql) db.commit () cursor.close () def update_goods_match_standard(db,update_dict): cursor = db.cursor () sql = f'''UPDATE goods_match_standard SET `标品数量` = {update_dict['标品数量']} WHERE `平台菜单hash编号` = "{update_dict['平台菜单hash编号']}" AND `标准菜单编号` = "{update_dict['标准菜单编号']}";''' cursor.execute (sql) db.commit () cursor.close () def update_goods_match_standard_by_goodsHash(goods_hash): goods_hash = goods_hash['goods_hash'] try: if 1==1: host = '124.222.188.59' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 if 1==0: host = 'localhost' passwd = '111???clown' db_name = 'hexingxing' port = 3306 db = linkTomySql (host, passwd, db_name, port) key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all') Token = key_json['data'] applicationId = '63fee5abc8584e7ec9cd0bd4' formModelId_2 = '63fefa6b4871e3647302de2e' # 平台标准菜单对照表 orderField = '平台菜单编号' resp_len, resp_dict_out, resp_list2, resp_list3 = selectResultToList(Token, applicationId, formModelId_2, orderField, goods_hash) print(resp_len,resp_dict_out,resp_list2,resp_list3) sql_len, sql_dict_out, sql_list2 = select_goods_match_standard (db, goods_hash) print (sql_len, sql_dict_out, sql_list2) update_list = list (set (resp_list2).intersection (set (sql_list2))) for update_info in update_list: update_dict = resp_dict_out[update_info] update_goods_match_standard(db,update_dict) insert_list = list (set (resp_list2).difference (set (sql_list2))) for insert_info in insert_list: insert_dict = resp_dict_out[insert_info] insert_goods_match_standard(db,insert_dict) delete_list = list (set (sql_list2).difference (set (resp_list2))) for delete_info in delete_list: delete_dict = sql_dict_out[delete_info] delete_goods_match_standard(db,delete_dict) print (update_list, insert_list, delete_list) db.close () if len(resp_list3)==0: memo = '' else: memo = ','.join(resp_list3) result = 'ok' except Exception as e: a = traceback.format_exc() print(a) memo = '错误报警' result = 'error' json_out = {'result': result, 'memo': memo} return json_out if __name__ == '__main__': if 1 == 1: value = {'goods_hash':'f9a7972ea13a3e84f76d0eb0a049dfb2'} memo = update_goods_match_standard_by_goodsHash(value) print(memo) if 1 == 0: s_time = time.time() host = '124.222.188.59' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 db = linkTomySql (host, passwd, db_name, port) log_date = datetime.now () + timedelta(days=-1) key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all') Token = key_json['data'] applicationId = '63fee5abc8584e7ec9cd0bd4' formModelId_1 = '63fefa77c74fdb28e2e02f40' #平台销售菜单表 formModelId_2 = '63fefa6b4871e3647302de2e' #平台标准菜单对照表 if 1 == 0: try: lastTime_date = selectLogDate (Token, applicationId, formModelId_1,'最近一次销售日期') lastTime_date_no = parse(lastTime_date).strftime('%Y%m%d') insertList,updateList =selectGoodsHashFromMysql (db, '肠粉', lastTime_date_no,Token, applicationId, formModelId_1) insert_7qiaoPlus_rows(Token, applicationId, formModelId_1,insertList) update_7qiaoPlus_rows(Token, applicationId, formModelId_1,updateList) except Exception as e: operation_results = traceback.format_exc () # 发送报错信息 url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【7巧客户数据中心-平台菜单】服务端异常终止,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) db.close() spend_time = time.time()-s_time print(f'总用时{round(spend_time/60,2)}分钟')