# -*- codeing = utf-8 -*- # @Time : 2023/3/22 13:36 # @Author : Clown # @File : demo_baiduAd.py # @Software : PyCharm import requests import json import schedule import threading import time import pymysql from datetime import datetime,timedelta from dateutil.parser import parse import traceback from all_key_table import update_key_value_pair_7qiaoPlus from demo_baidu_ad_tk_refresh import getAllBaiDuUsersList def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, port=port, charset='utf8mb4', local_infile=1) print ('\nconnect to mysql server 成功') print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db def read_key_value_pair(db, brand_name, wm_plate, owner): '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出 db:数据库, brand_name:品牌名, wm_plate:外卖平台MEITUAN或ELEME, owner:账号权限all或one ''' cursor = db.cursor () sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";' cursor.execute (sql) pair = json.loads (cursor.fetchall ()[0][0]) return pair def orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData): solutionjsondata = {'data':solutionJsonData} key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all') applicationId = '63631ee57005c0103426fdc7' processId = '642128bd80236836229b3b1d' Token = key_json['data'] headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start' params_json = { "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}], "variables": {"任务触发时间": lastCatchTime_ago_s,'solutioncntall':solutionCntAll,'solutionjsondata':json.dumps(solutionjsondata,ensure_ascii=False)}, "loginUserId":"92874ed35ecb45f51a58adcf18e99b5e"} try: resp = requests.post(url,headers =headers_api,json=params_json) msg = resp.json()['msg'] print(resp.text) if msg == '执行成功': operation_results = '无异常' else: operation_results = f'[orderTo7qiao]-返回结果为{msg}' except Exception as e: operation_results = f'[orderTo7qiao]-道一云端无法连接{e}' return operation_results def sendMsgToRot(url,msg_txt): params_json = { "msgtype": "markdown", "markdown": { "content": msg_txt, "mentioned_list":["@all"] }} resp = requests.post(url,json=params_json).text print(resp) def baiduAD(userName,accessToken,solutionType,lastCatchTime_ago_s,lastCatchTime_now_s): url = "https://api.baidu.com/json/sms/service/LeadsNoticeService/getNoticeList" user_payload = { "header": { "authorityType": 5, "userName": userName, "accessToken": accessToken, "action": "API-PYTHON" }, "body": { "solutionType": solutionType, "startDate": lastCatchTime_ago_s, "endDate": lastCatchTime_now_s } } http_headers = { "Accept-Encoding": "gzip, deflate", "Content-Type": "application/json", "Accept": "application/json" } user_payload = json.dumps(user_payload) try: response = requests.request("POST", url, data=user_payload, headers=http_headers) except: response = 'Error' return response # 查询计时器时间,输出起始时间 def selectbaidu_ad_time_meter(db): cursor = db.cursor () sql = f'SELECT * FROM baidu_ad_time_meter WHERE id = 1;' cursor.execute (sql) lastCatchTime_ago = cursor.fetchall ()[0][0] lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S') # 更新间隔 lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S') cursor.close() return lastCatchTime_ago_s,lastCatchTime_now_s # 更新计时器时间 def updatebaidu_ad_time_meter(db,lastCatchTime_now_s): cursor = db.cursor () sql = f'UPDATE baidu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;' cursor.execute(sql) db.commit() cursor.close() # 新增记录至抓取记录表 def insertbaidu_ad_solution_catch(db,insert_valuse): cursor = db.cursor () sql = f'INSERT INTO baidu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);' cursor.execute (sql,insert_valuse) db.commit () cursor.close () # 新增记录至抓取明细记录表 def insertbaidu_ad_solution_info(db,insert_valuse): cursor = db.cursor () sql = f'INSERT INTO baidu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' cursor.execute (sql,insert_valuse) db.commit () cursor.close () # 解析分配数据至分配队列 def dumpBaiduJosn(db,userName,brandName,routeName,response_json): solutionJsonDatas = [] datas = response_json['body']['data'] for data in datas: noticeDetailList = data['noticeDetailList'] for solution in noticeDetailList: clueId = solution['clueId'] commitTime = solution['commitTime'] solutionType = solution['solutionType'] try: clueName = '未知' formDetail = json.loads(solution['formDetail']) for i in formDetail: if i['type'] == 'name': clueName = i['value'] except: clueName = '未知' cluePhoneNumber = solution['cluePhoneNumber'] try: flowChannelName = solution['flowChannelName'] except: flowChannelName = '其他' try: area = solution['area'] except: area = '未知区域' try: searchWord = solution['searchWord'] except: searchWord = '' try: keyword = solution['keyword'] except: keyword = '' solutionInfo = json.dumps(solution,ensure_ascii=False) userName = userName if userName == 'baidu-hp-cxb餐04yxB23HY08203': keyword = '【定向加油包】'+keyword insert_valuse = (clueId,commitTime,solutionType,clueName,cluePhoneNumber,flowChannelName,area,searchWord,keyword,solutionInfo,userName,brandName,routeName) if 1==0: insertbaidu_ad_solution_info (db, insert_valuse) else: print('测试中') solutionJsonData = {'clueId':clueId, 'commitTime':commitTime, 'solutionType':solutionType, 'clueName':clueName, 'cluePhoneNumber':cluePhoneNumber, 'flowChannelName':flowChannelName, 'area':area, 'searchWord':searchWord, 'keyword':keyword, 'userName':userName, 'brandName':brandName, 'routeName':routeName} solutionJsonDatas.append(solutionJsonData) return solutionJsonDatas # 执行数据抓取 def runBaiduAD(db,userName,accessToken,brandName,routeName,lastCatchTime_ago_s,lastCatchTime_now_s): operation_results = '无异常' solutionTypes = ["phone","form"] solutionJsonDataList = [] for solutionType in solutionTypes: response = baiduAD (userName, accessToken, solutionType, lastCatchTime_ago_s, lastCatchTime_now_s) if response == 'Error': response_data = 'Error' operation_results = '[runBaiduAD]-[baiduAD]请求失败' else: response_data = response.text response_json = response.json() try: desc = response_json['header']['desc'] solutionCnt = response_json['body']['data'][0]['totalNum'] solutionJsonDatas = dumpBaiduJosn (db, userName,brandName,routeName, response_json) solutionJsonDataList = solutionJsonDataList + solutionJsonDatas except: operation_results = '[runBaiduAD]-[response_json]无法解析' desc = 'fail' solutionCnt = 0 insert_valuse = (lastCatchTime_now_s,response_data,solutionCnt,desc,solutionType,userName,brandName,routeName) if 1==0: insertbaidu_ad_solution_catch (db, insert_valuse) else: print('测试中') print (response_data) return operation_results,solutionJsonDataList # 定时更新7巧密钥,每6小时更新一次 def update7qiaoToken(db): now = datetime.now () mm = now.strftime ("%M") hh = now.strftime ("%H") if int (hh) % 6 == 0 and int (mm) <= 1: update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '') # 多账号计时抓取投流数据 def runADsolution(db): # 基础参数 operation_results = '无异常' e = 'no' # host = 'localhost' # passwd = '111???clown' # db_name = 'hexingxing' # port = 3306 baidu_kisses = getAllBaiDuUsersList(db) try: # db = linkTomySql (host, passwd, db_name, port) lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db) for baidu_kis in baidu_kisses: userName = baidu_kis['userName'] accessToken = baidu_kis['accessToken'] brandName = baidu_kis['brandName'] routeName = baidu_kis['routeName'] operation_results,solutionJsonDataList = runBaiduAD (db,userName, accessToken,brandName,routeName, lastCatchTime_ago_s, lastCatchTime_now_s) solutionCntAll = len(solutionJsonDataList) update7qiaoToken (db) if solutionCntAll > 0: if len(str(solutionJsonDataList)) >= 5000: for solutionJsonDataList_0 in solutionJsonDataList: operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0) time.sleep(0.5) else: operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList) # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... updatebaidu_ad_time_meter (db, lastCatchTime_now_s) # db.close () except Exception as e: operation_results = traceback.format_exc () e = 'no' # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... def run(db): # 初始化时间周期 # 更新间隔 minute = 5 f_now = datetime.now () f_mm = f_now.strftime ("%M") f_ss = f_now.strftime ("%S") f_c = int (f_mm) % minute wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss) time.sleep (wait_s) # 执行循环 while True: now = datetime.now () mm = now.strftime ("%M") ss = now.strftime ("%S") c = int (mm) % minute if c == 0 and ss == '01': s = time.time () # print(now) runADsolution(db) time.sleep (minute * 60 - 0.5 - (time.time () - s)) if __name__ == '__main__': # runBaiduAD () if 1==1: host = 'localhost' passwd = '111???clown' db_name = 'hexingxing' port = 3306 db = linkTomySql (host, passwd, db_name, port) if 1 == 1: userName = 'baidu-hp-cxb餐01yxB23HY08203' accessToken = 'eyJhbGciOiJIUzM4NCJ9.eyJzdWIiOiJhY2MiLCJhdWQiOiLkv6Hmga_lkIzmraUiLCJ1aWQiOjQ3MzA1MjQ3LCJhcHBJZCI6ImY1NzVmZGM2YTdiYTNiMzEzOTE3YjAwZTA1ZjM5YTlkIiwiaXNzIjoi5ZWG5Lia5byA5Y-R6ICF5Lit5b-DIiwicGxhdGZvcm1JZCI6IjQ5NjAzNDU5NjU5NTg1NjE3OTQiLCJleHAiOjE2ODE3OTQyNDIsImp0aSI6Ijc0MDk1NDYwMTU2Mjc2ODk5OTgifQ.qP4ucmmos_BlRZk9jZykS48VXuEmGAS1YQO6Q2_AU92KvuZtpfhqLGppBXfhexAR' brandName = '粥小鲜' routeName = 'baidu' lastCatchTime_ago_s = '2023-04-16 15:00:00' lastCatchTime_now_s = '2023-04-16 15:59:00' runBaiduAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s) db.close() if 1 == 0: # 更新至当前,用于重启服务时,自动更新数据 lastCatchTime_ago_s,lastCatchTime_now_s = selectbaidu_ad_time_meter (db) while parse(lastCatchTime_now_s) <= datetime.now(): runADsolution (db) time.sleep(10) lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db) db.close () # 正式运行 print('正式运行') db = linkTomySql (host, passwd, db_name, port) run(db) # lastCatchTime_ago_s = '2023-03-26 23:29:03' # solutionCntAll = 0 # solutionJsonData = {} # orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData) if 1==0: runADsolution () if 1==0: url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' ff = '''Traceback (most recent call last): File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df) File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E e = 0/0 #v2优化 ZeroDivisionError: division by zero During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df) File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E e = 0/0 #v2优化 ZeroDivisionError: division by zero ''' msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->测试\nDetail->{ff}' sendMsgToRot (url, msg_txt)