# -*- codeing = utf-8 -*-
# @Time : 2023/3/22 13:36
# @Author : Clown
# @File : demo_baiduAd.py
# @Software : PyCharm
import requests
import json
import schedule
import threading
import time
import pymysql
from datetime import datetime,timedelta
from dateutil.parser import parse
import traceback
from all_key_table import update_key_value_pair_7qiaoPlus
from demo_baidu_ad_tk_refresh import getAllBaiDuUsersList
def linkTomySql(host, passwd, db_name, port):
'''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
try:
# 本地连接为:localhost 服务器连接为:124.222.188.59
db = pymysql.connect (
host=host, user="root",
passwd=passwd,
db=db_name,
port=port,
charset='utf8mb4',
local_infile=1)
print ('\nconnect to mysql server 成功')
print ('---------------------------------------')
except:
print ("\ncould not connect to mysql server")
db = "连接失败"
return db
def read_key_value_pair(db, brand_name, wm_plate, owner):
'''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
db:数据库,
brand_name:品牌名,
wm_plate:外卖平台MEITUAN或ELEME,
owner:账号权限all或one
'''
cursor = db.cursor ()
sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
cursor.execute (sql)
pair = json.loads (cursor.fetchall ()[0][0])
return pair
def orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData):
solutionjsondata = {'data':solutionJsonData}
key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
applicationId = '63631ee57005c0103426fdc7'
processId = '642128bd80236836229b3b1d'
Token = key_json['data']
headers_api = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
"Content-type": "application/json",
"X-Auth0-Token": Token}
url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
params_json = {
"nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
"variables": {"任务触发时间": lastCatchTime_ago_s,'solutioncntall':solutionCntAll,'solutionjsondata':json.dumps(solutionjsondata,ensure_ascii=False)},
"loginUserId":"92874ed35ecb45f51a58adcf18e99b5e"}
try:
resp = requests.post(url,headers =headers_api,json=params_json)
msg = resp.json()['msg']
print(resp.text)
if msg == '执行成功':
operation_results = '无异常'
else:
operation_results = f'[orderTo7qiao]-返回结果为{msg}'
except Exception as e:
operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
return operation_results
def sendMsgToRot(url,msg_txt):
params_json = {
"msgtype": "markdown",
"markdown": {
"content": msg_txt,
"mentioned_list":["@all"]
}}
resp = requests.post(url,json=params_json).text
print(resp)
def baiduAD(userName,accessToken,solutionType,lastCatchTime_ago_s,lastCatchTime_now_s):
url = "https://api.baidu.com/json/sms/service/LeadsNoticeService/getNoticeList"
user_payload = {
"header": {
"authorityType": 5,
"userName": userName,
"accessToken": accessToken,
"action": "API-PYTHON"
},
"body": {
"solutionType": solutionType,
"startDate": lastCatchTime_ago_s,
"endDate": lastCatchTime_now_s
}
}
http_headers = {
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/json",
"Accept": "application/json"
}
user_payload = json.dumps(user_payload)
try:
response = requests.request("POST", url, data=user_payload, headers=http_headers)
except:
response = 'Error'
return response
# 查询计时器时间,输出起始时间
def selectbaidu_ad_time_meter(db):
cursor = db.cursor ()
sql = f'SELECT * FROM baidu_ad_time_meter WHERE id = 1;'
cursor.execute (sql)
lastCatchTime_ago = cursor.fetchall ()[0][0]
lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
# 更新间隔
lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S')
cursor.close()
return lastCatchTime_ago_s,lastCatchTime_now_s
# 更新计时器时间
def updatebaidu_ad_time_meter(db,lastCatchTime_now_s):
cursor = db.cursor ()
sql = f'UPDATE baidu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
cursor.execute(sql)
db.commit()
cursor.close()
# 新增记录至抓取记录表
def insertbaidu_ad_solution_catch(db,insert_valuse):
cursor = db.cursor ()
sql = f'INSERT INTO baidu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
cursor.execute (sql,insert_valuse)
db.commit ()
cursor.close ()
# 新增记录至抓取明细记录表
def insertbaidu_ad_solution_info(db,insert_valuse):
cursor = db.cursor ()
sql = f'INSERT INTO baidu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
cursor.execute (sql,insert_valuse)
db.commit ()
cursor.close ()
# 解析分配数据至分配队列
def dumpBaiduJosn(db,userName,brandName,routeName,response_json):
solutionJsonDatas = []
datas = response_json['body']['data']
for data in datas:
noticeDetailList = data['noticeDetailList']
for solution in noticeDetailList:
clueId = solution['clueId']
commitTime = solution['commitTime']
solutionType = solution['solutionType']
try:
clueName = '未知'
formDetail = json.loads(solution['formDetail'])
for i in formDetail:
if i['type'] == 'name':
clueName = i['value']
except:
clueName = '未知'
cluePhoneNumber = solution['cluePhoneNumber']
try:
flowChannelName = solution['flowChannelName']
except:
flowChannelName = '其他'
try:
area = solution['area']
except:
area = '未知区域'
try:
searchWord = solution['searchWord']
except:
searchWord = ''
try:
keyword = solution['keyword']
except:
keyword = ''
solutionInfo = json.dumps(solution,ensure_ascii=False)
userName = userName
if userName == 'baidu-hp-cxb餐04yxB23HY08203':
keyword = '【定向加油包】'+keyword
insert_valuse = (clueId,commitTime,solutionType,clueName,cluePhoneNumber,flowChannelName,area,searchWord,keyword,solutionInfo,userName,brandName,routeName)
if 1==0:
insertbaidu_ad_solution_info (db, insert_valuse)
else:
print('测试中')
solutionJsonData = {'clueId':clueId,
'commitTime':commitTime,
'solutionType':solutionType,
'clueName':clueName,
'cluePhoneNumber':cluePhoneNumber,
'flowChannelName':flowChannelName,
'area':area,
'searchWord':searchWord,
'keyword':keyword,
'userName':userName,
'brandName':brandName,
'routeName':routeName}
solutionJsonDatas.append(solutionJsonData)
return solutionJsonDatas
# 执行数据抓取
def runBaiduAD(db,userName,accessToken,brandName,routeName,lastCatchTime_ago_s,lastCatchTime_now_s):
operation_results = '无异常'
solutionTypes = ["phone","form"]
solutionJsonDataList = []
for solutionType in solutionTypes:
response = baiduAD (userName, accessToken, solutionType, lastCatchTime_ago_s, lastCatchTime_now_s)
if response == 'Error':
response_data = 'Error'
operation_results = '[runBaiduAD]-[baiduAD]请求失败'
else:
response_data = response.text
response_json = response.json()
try:
desc = response_json['header']['desc']
solutionCnt = response_json['body']['data'][0]['totalNum']
solutionJsonDatas = dumpBaiduJosn (db, userName,brandName,routeName, response_json)
solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
except:
operation_results = '[runBaiduAD]-[response_json]无法解析'
desc = 'fail'
solutionCnt = 0
insert_valuse = (lastCatchTime_now_s,response_data,solutionCnt,desc,solutionType,userName,brandName,routeName)
if 1==0:
insertbaidu_ad_solution_catch (db, insert_valuse)
else:
print('测试中')
print (response_data)
return operation_results,solutionJsonDataList
# 定时更新7巧密钥,每6小时更新一次
def update7qiaoToken(db):
now = datetime.now ()
mm = now.strftime ("%M")
hh = now.strftime ("%H")
if int (hh) % 6 == 0 and int (mm) <= 1:
update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '')
# 多账号计时抓取投流数据
def runADsolution(db):
# 基础参数
operation_results = '无异常'
e = 'no'
# host = 'localhost'
# passwd = '111???clown'
# db_name = 'hexingxing'
# port = 3306
baidu_kisses = getAllBaiDuUsersList(db)
try:
# db = linkTomySql (host, passwd, db_name, port)
lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
for baidu_kis in baidu_kisses:
userName = baidu_kis['userName']
accessToken = baidu_kis['accessToken']
brandName = baidu_kis['brandName']
routeName = baidu_kis['routeName']
operation_results,solutionJsonDataList = runBaiduAD (db,userName, accessToken,brandName,routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
solutionCntAll = len(solutionJsonDataList)
update7qiaoToken (db)
if solutionCntAll > 0:
if len(str(solutionJsonDataList)) >= 5000:
for solutionJsonDataList_0 in solutionJsonDataList:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
time.sleep(0.5)
else:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
updatebaidu_ad_time_meter (db, lastCatchTime_now_s)
# db.close ()
except Exception as e:
operation_results = traceback.format_exc ()
e = 'no'
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
def run(db):
# 初始化时间周期
# 更新间隔
minute = 5
f_now = datetime.now ()
f_mm = f_now.strftime ("%M")
f_ss = f_now.strftime ("%S")
f_c = int (f_mm) % minute
wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
time.sleep (wait_s)
# 执行循环
while True:
now = datetime.now ()
mm = now.strftime ("%M")
ss = now.strftime ("%S")
c = int (mm) % minute
if c == 0 and ss == '01':
s = time.time ()
# print(now)
runADsolution(db)
time.sleep (minute * 60 - 0.5 - (time.time () - s))
if __name__ == '__main__':
# runBaiduAD ()
if 1==1:
host = 'localhost'
passwd = '111???clown'
db_name = 'hexingxing'
port = 3306
db = linkTomySql (host, passwd, db_name, port)
if 1 == 1:
userName = 'baidu-hp-cxb餐01yxB23HY08203'
accessToken = 'eyJhbGciOiJIUzM4NCJ9.eyJzdWIiOiJhY2MiLCJhdWQiOiLkv6Hmga_lkIzmraUiLCJ1aWQiOjQ3MzA1MjQ3LCJhcHBJZCI6ImY1NzVmZGM2YTdiYTNiMzEzOTE3YjAwZTA1ZjM5YTlkIiwiaXNzIjoi5ZWG5Lia5byA5Y-R6ICF5Lit5b-DIiwicGxhdGZvcm1JZCI6IjQ5NjAzNDU5NjU5NTg1NjE3OTQiLCJleHAiOjE2ODE3OTQyNDIsImp0aSI6Ijc0MDk1NDYwMTU2Mjc2ODk5OTgifQ.qP4ucmmos_BlRZk9jZykS48VXuEmGAS1YQO6Q2_AU92KvuZtpfhqLGppBXfhexAR'
brandName = '粥小鲜'
routeName = 'baidu'
lastCatchTime_ago_s = '2023-04-16 15:00:00'
lastCatchTime_now_s = '2023-04-16 15:59:00'
runBaiduAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
db.close()
if 1 == 0:
# 更新至当前,用于重启服务时,自动更新数据
lastCatchTime_ago_s,lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
while parse(lastCatchTime_now_s) <= datetime.now():
runADsolution (db)
time.sleep(10)
lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
db.close ()
# 正式运行
print('正式运行')
db = linkTomySql (host, passwd, db_name, port)
run(db)
# lastCatchTime_ago_s = '2023-03-26 23:29:03'
# solutionCntAll = 0
# solutionJsonData = {}
# orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData)
if 1==0:
runADsolution ()
if 1==0:
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
ff = '''Traceback (most recent call last):
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in
csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
e = 0/0 #v2优化
ZeroDivisionError: division by zero
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in
csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
e = 0/0 #v2优化
ZeroDivisionError: division by zero
'''
msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->测试\nDetail->{ff}'
sendMsgToRot (url, msg_txt)