123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- # -*- codeing = utf-8 -*-
- # @Time : 2023/3/22 13:36
- # @Author : Clown
- # @File : demo_baiduAd.py
- # @Software : PyCharm
- import requests
- import json
- import schedule
- import threading
- import time
- import pymysql
- from datetime import datetime,timedelta
- from dateutil.parser import parse
- import traceback
- from all_key_table import update_key_value_pair_7qiaoPlus
- from demo_baidu_ad_tk_refresh import getAllBaiDuUsersList
- def linkTomySql(host, passwd, db_name, port):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- # 本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect (
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- port=port,
- charset='utf8mb4',
- local_infile=1)
- print ('\nconnect to mysql server 成功')
- print ('---------------------------------------')
- except:
- print ("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- def read_key_value_pair(db, brand_name, wm_plate, owner):
- '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
- db:数据库,
- brand_name:品牌名,
- wm_plate:外卖平台MEITUAN或ELEME,
- owner:账号权限all或one
- '''
- cursor = db.cursor ()
- sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
- cursor.execute (sql)
- pair = json.loads (cursor.fetchall ()[0][0])
- return pair
- def orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData):
- solutionjsondata = {'data':solutionJsonData}
- key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
- applicationId = '63631ee57005c0103426fdc7'
- processId = '642128bd80236836229b3b1d'
- Token = key_json['data']
- headers_api = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
- "Content-type": "application/json",
- "X-Auth0-Token": Token}
- url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
- params_json = {
- "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
- "variables": {"任务触发时间": lastCatchTime_ago_s,'solutioncntall':solutionCntAll,'solutionjsondata':json.dumps(solutionjsondata,ensure_ascii=False)},
- "loginUserId":"92874ed35ecb45f51a58adcf18e99b5e"}
- try:
- resp = requests.post(url,headers =headers_api,json=params_json)
- msg = resp.json()['msg']
- print(resp.text)
- if msg == '执行成功':
- operation_results = '无异常'
- else:
- operation_results = f'[orderTo7qiao]-返回结果为{msg}'
- except Exception as e:
- operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
- return operation_results
- def sendMsgToRot(url,msg_txt):
- params_json = {
- "msgtype": "markdown",
- "markdown": {
- "content": msg_txt,
- "mentioned_list":["@all"]
- }}
- resp = requests.post(url,json=params_json).text
- print(resp)
- def baiduAD(userName,accessToken,solutionType,lastCatchTime_ago_s,lastCatchTime_now_s):
- url = "https://api.baidu.com/json/sms/service/LeadsNoticeService/getNoticeList"
- user_payload = {
- "header": {
- "authorityType": 5,
- "userName": userName,
- "accessToken": accessToken,
- "action": "API-PYTHON"
- },
- "body": {
- "solutionType": solutionType,
- "startDate": lastCatchTime_ago_s,
- "endDate": lastCatchTime_now_s
- }
- }
- http_headers = {
- "Accept-Encoding": "gzip, deflate",
- "Content-Type": "application/json",
- "Accept": "application/json"
- }
- user_payload = json.dumps(user_payload)
- try:
- response = requests.request("POST", url, data=user_payload, headers=http_headers)
- except:
- response = 'Error'
- return response
- # 查询计时器时间,输出起始时间
- def selectbaidu_ad_time_meter(db):
- cursor = db.cursor ()
- sql = f'SELECT * FROM baidu_ad_time_meter WHERE id = 1;'
- cursor.execute (sql)
- lastCatchTime_ago = cursor.fetchall ()[0][0]
- lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
- # 更新间隔
- lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S')
- cursor.close()
- return lastCatchTime_ago_s,lastCatchTime_now_s
- # 更新计时器时间
- def updatebaidu_ad_time_meter(db,lastCatchTime_now_s):
- cursor = db.cursor ()
- sql = f'UPDATE baidu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
- cursor.execute(sql)
- db.commit()
- cursor.close()
- # 新增记录至抓取记录表
- def insertbaidu_ad_solution_catch(db,insert_valuse):
- cursor = db.cursor ()
- sql = f'INSERT INTO baidu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql,insert_valuse)
- db.commit ()
- cursor.close ()
- # 新增记录至抓取明细记录表
- def insertbaidu_ad_solution_info(db,insert_valuse):
- cursor = db.cursor ()
- sql = f'INSERT INTO baidu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql,insert_valuse)
- db.commit ()
- cursor.close ()
- # 解析分配数据至分配队列
- def dumpBaiduJosn(db,userName,brandName,routeName,response_json):
- solutionJsonDatas = []
- datas = response_json['body']['data']
- for data in datas:
- noticeDetailList = data['noticeDetailList']
- for solution in noticeDetailList:
- clueId = solution['clueId']
- commitTime = solution['commitTime']
- solutionType = solution['solutionType']
- try:
- clueName = '未知'
- formDetail = json.loads(solution['formDetail'])
- for i in formDetail:
- if i['type'] == 'name':
- clueName = i['value']
- except:
- clueName = '未知'
- cluePhoneNumber = solution['cluePhoneNumber']
- try:
- flowChannelName = solution['flowChannelName']
- except:
- flowChannelName = '其他'
- try:
- area = solution['area']
- except:
- area = '未知区域'
- try:
- searchWord = solution['searchWord']
- except:
- searchWord = ''
- try:
- keyword = solution['keyword']
- except:
- keyword = ''
- solutionInfo = json.dumps(solution,ensure_ascii=False)
- userName = userName
- if userName == 'baidu-hp-cxb餐04yxB23HY08203':
- keyword = '【定向加油包】'+keyword
- insert_valuse = (clueId,commitTime,solutionType,clueName,cluePhoneNumber,flowChannelName,area,searchWord,keyword,solutionInfo,userName,brandName,routeName)
- if 1==0:
- insertbaidu_ad_solution_info (db, insert_valuse)
- else:
- print('测试中')
- solutionJsonData = {'clueId':clueId,
- 'commitTime':commitTime,
- 'solutionType':solutionType,
- 'clueName':clueName,
- 'cluePhoneNumber':cluePhoneNumber,
- 'flowChannelName':flowChannelName,
- 'area':area,
- 'searchWord':searchWord,
- 'keyword':keyword,
- 'userName':userName,
- 'brandName':brandName,
- 'routeName':routeName}
- solutionJsonDatas.append(solutionJsonData)
- return solutionJsonDatas
- # 执行数据抓取
- def runBaiduAD(db,userName,accessToken,brandName,routeName,lastCatchTime_ago_s,lastCatchTime_now_s):
- operation_results = '无异常'
- solutionTypes = ["phone","form"]
- solutionJsonDataList = []
- for solutionType in solutionTypes:
- response = baiduAD (userName, accessToken, solutionType, lastCatchTime_ago_s, lastCatchTime_now_s)
- if response == 'Error':
- response_data = 'Error'
- operation_results = '[runBaiduAD]-[baiduAD]请求失败'
- else:
- response_data = response.text
- response_json = response.json()
- try:
- desc = response_json['header']['desc']
- solutionCnt = response_json['body']['data'][0]['totalNum']
- solutionJsonDatas = dumpBaiduJosn (db, userName,brandName,routeName, response_json)
- solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
- except:
- operation_results = '[runBaiduAD]-[response_json]无法解析'
- desc = 'fail'
- solutionCnt = 0
- insert_valuse = (lastCatchTime_now_s,response_data,solutionCnt,desc,solutionType,userName,brandName,routeName)
- if 1==0:
- insertbaidu_ad_solution_catch (db, insert_valuse)
- else:
- print('测试中')
- print (response_data)
- return operation_results,solutionJsonDataList
- # 定时更新7巧密钥,每6小时更新一次
- def update7qiaoToken(db):
- now = datetime.now ()
- mm = now.strftime ("%M")
- hh = now.strftime ("%H")
- if int (hh) % 6 == 0 and int (mm) <= 1:
- update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '')
- # 多账号计时抓取投流数据
- def runADsolution(db):
- # 基础参数
- operation_results = '无异常'
- e = 'no'
- # host = 'localhost'
- # passwd = '111???clown'
- # db_name = 'hexingxing'
- # port = 3306
- baidu_kisses = getAllBaiDuUsersList(db)
- try:
- # db = linkTomySql (host, passwd, db_name, port)
- lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
- for baidu_kis in baidu_kisses:
- userName = baidu_kis['userName']
- accessToken = baidu_kis['accessToken']
- brandName = baidu_kis['brandName']
- routeName = baidu_kis['routeName']
- operation_results,solutionJsonDataList = runBaiduAD (db,userName, accessToken,brandName,routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
- solutionCntAll = len(solutionJsonDataList)
- update7qiaoToken (db)
- if solutionCntAll > 0:
- if len(str(solutionJsonDataList)) >= 5000:
- for solutionJsonDataList_0 in solutionJsonDataList:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
- time.sleep(0.5)
- else:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- updatebaidu_ad_time_meter (db, lastCatchTime_now_s)
- # db.close ()
- except Exception as e:
- operation_results = traceback.format_exc ()
- e = 'no'
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- def run(db):
- # 初始化时间周期
- # 更新间隔
- minute = 5
- f_now = datetime.now ()
- f_mm = f_now.strftime ("%M")
- f_ss = f_now.strftime ("%S")
- f_c = int (f_mm) % minute
- wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
- time.sleep (wait_s)
- # 执行循环
- while True:
- now = datetime.now ()
- mm = now.strftime ("%M")
- ss = now.strftime ("%S")
- c = int (mm) % minute
- if c == 0 and ss == '01':
- s = time.time ()
- # print(now)
- runADsolution(db)
- time.sleep (minute * 60 - 0.5 - (time.time () - s))
- if __name__ == '__main__':
- # runBaiduAD ()
- if 1==1:
- host = 'localhost'
- passwd = '111???clown'
- db_name = 'hexingxing'
- port = 3306
- db = linkTomySql (host, passwd, db_name, port)
- if 1 == 1:
- userName = 'baidu-hp-cxb餐01yxB23HY08203'
- accessToken = 'eyJhbGciOiJIUzM4NCJ9.eyJzdWIiOiJhY2MiLCJhdWQiOiLkv6Hmga_lkIzmraUiLCJ1aWQiOjQ3MzA1MjQ3LCJhcHBJZCI6ImY1NzVmZGM2YTdiYTNiMzEzOTE3YjAwZTA1ZjM5YTlkIiwiaXNzIjoi5ZWG5Lia5byA5Y-R6ICF5Lit5b-DIiwicGxhdGZvcm1JZCI6IjQ5NjAzNDU5NjU5NTg1NjE3OTQiLCJleHAiOjE2ODE3OTQyNDIsImp0aSI6Ijc0MDk1NDYwMTU2Mjc2ODk5OTgifQ.qP4ucmmos_BlRZk9jZykS48VXuEmGAS1YQO6Q2_AU92KvuZtpfhqLGppBXfhexAR'
- brandName = '粥小鲜'
- routeName = 'baidu'
- lastCatchTime_ago_s = '2023-04-16 15:00:00'
- lastCatchTime_now_s = '2023-04-16 15:59:00'
- runBaiduAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
- db.close()
- if 1 == 0:
- # 更新至当前,用于重启服务时,自动更新数据
- lastCatchTime_ago_s,lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
- while parse(lastCatchTime_now_s) <= datetime.now():
- runADsolution (db)
- time.sleep(10)
- lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
- db.close ()
- # 正式运行
- print('正式运行')
- db = linkTomySql (host, passwd, db_name, port)
- run(db)
- # lastCatchTime_ago_s = '2023-03-26 23:29:03'
- # solutionCntAll = 0
- # solutionJsonData = {}
- # orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData)
- if 1==0:
- runADsolution ()
- if 1==0:
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- ff = '''Traceback (most recent call last):
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in <module>
- csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
- e = 0/0 #v2优化
- ZeroDivisionError: division by zero
- During handling of the above exception, another exception occurred:
- Traceback (most recent call last):
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in <module>
- csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
- e = 0/0 #v2优化
- ZeroDivisionError: division by zero
- '''
- msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->测试\nDetail->{ff}'
- sendMsgToRot (url, msg_txt)
|