# -*- codeing = utf-8 -*-
# @Time : 2023/3/22 13:36
# @Author : Clown
# @File : demo_baiduAd.py
# @Software : PyCharm
import requests
import json
import schedule
import threading
import time
import pymysql
from datetime import datetime, timedelta
from dateutil.parser import parse
import traceback
from all_key_table import update_key_value_pair_7qiaoPlus
from demo_feiyu_ad_tk_refresh import getAllFeiYuUsersList
def linkTomySql(host, passwd, db_name, port):
'''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
try:
# 本地连接为:localhost 服务器连接为:124.222.188.59
db = pymysql.connect (
host=host, user="root",
passwd=passwd,
db=db_name,
port=port,
charset='utf8mb4',
local_infile=1)
print ('\nconnect to mysql server 成功')
print ('---------------------------------------')
except:
print ("\ncould not connect to mysql server")
db = "连接失败"
return db
def read_key_value_pair(db, brand_name, wm_plate, owner):
'''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
db:数据库,
brand_name:品牌名,
wm_plate:外卖平台MEITUAN或ELEME,
owner:账号权限all或one
'''
cursor = db.cursor ()
sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
cursor.execute (sql)
pair = json.loads (cursor.fetchall ()[0][0])
return pair
def orderTo7qiao(db, lastCatchTime_ago_s, solutionCntAll, solutionJsonData):
solutionjsondata = {'data': solutionJsonData}
key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
applicationId = '63631ee57005c0103426fdc7'
processId = '642128bd80236836229b3b1d'
Token = key_json['data']
headers_api = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
"Content-type": "application/json",
"X-Auth0-Token": Token}
url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
params_json = {
"nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
"variables": {"任务触发时间": lastCatchTime_ago_s, 'solutioncntall': solutionCntAll,
'solutionjsondata': json.dumps (solutionjsondata, ensure_ascii=False)},
"loginUserId": "92874ed35ecb45f51a58adcf18e99b5e"}
try:
resp = requests.post (url, headers=headers_api, json=params_json)
msg = resp.json ()['msg']
print (resp.text)
if msg == '执行成功':
operation_results = '无异常'
else:
operation_results = f'[orderTo7qiao]-返回结果为{msg}'
except Exception as e:
operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
return operation_results
def sendMsgToRot(url, msg_txt):
params_json = {
"msgtype": "markdown",
"markdown": {
"content": msg_txt,
"mentioned_list": ["@all"]
}}
resp = requests.post (url, json=params_json).text
print (resp)
# 查询投放信息
def feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s):
open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
uri = "2/tools/clue/get/"
url = open_api_url_prefix + uri
user_payload = {'advertiser_ids': advertiser_ids,
'start_time': lastCatchTime_ago_s,
'end_time': lastCatchTime_now_s,
'page_size': 100,
'page': 1}
http_headers = {
"Access-Token": accessToken
}
try:
response = requests.get (url, json=user_payload, headers=http_headers)
except:
response = 'Error'
return response
# 查询计时器时间,输出起始时间
def selectfeiyu_ad_time_meter(db):
cursor = db.cursor ()
sql = f'SELECT * FROM feiyu_ad_time_meter WHERE id = 1;'
cursor.execute (sql)
lastCatchTime_ago = cursor.fetchall ()[0][0]
lastCatchTime_ago_s = (lastCatchTime_ago + timedelta (seconds=1)).strftime ('%Y-%m-%d %H:%M:%S')
# 更新间隔
lastCatchTime_now_s = (lastCatchTime_ago + timedelta (minutes=5)).strftime ('%Y-%m-%d %H:%M:%S')
cursor.close ()
return lastCatchTime_ago_s, lastCatchTime_now_s
# 更新计时器时间
def updatefeiyu_ad_time_meter(db, lastCatchTime_now_s):
cursor = db.cursor ()
sql = f'UPDATE feiyu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
cursor.execute (sql)
db.commit ()
cursor.close ()
# 新增记录至抓取记录表
def insertfeiyu_ad_solution_catch(db, insert_valuse):
cursor = db.cursor ()
sql = f'INSERT INTO feiyu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
cursor.execute (sql, insert_valuse)
db.commit ()
cursor.close ()
# 新增记录至抓取明细记录表
def insertfeiyu_ad_solution_info(db, insert_valuse):
cursor = db.cursor ()
sql = f'REPLACE INTO feiyu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
cursor.execute (sql, insert_valuse)
db.commit ()
cursor.close ()
# 检查数据库中是否有数据
def checkfeiyu_ad_solution_info_in_table(db,clueId):
cursor = db.cursor ()
sql = f'SELECT * FROM feiyu_ad_solution_info WHERE clueId = "{clueId}";'
cursor.execute (sql)
cnt = cursor.fetchall ()
if len(cnt) == 0:
result = True
else:
result = False
return result
# 解析分配数据至分配队列
def dumpFeiYuJosn(db, userName, brandName, routeName, response_json, mode):
'''
userName 账号
brandName 品牌
routeName 投流渠道
response_json 请求返回的json文件
'''
solutionJsonDatas = []
datas = response_json['data']['list']
for data in datas:
clueId = data['clue_id']
commitTime = data['create_time_detail']
try:
solutionNum = {'0': '表单提交',
'1': '在线咨询',
'2': '智能电话',
'3': '网页回呼',
'4': '卡券',
'5': '抽奖'}
solutionType = solutionNum[str (data['clue_type'])]
except:
solutionType = '其他'
try:
module_name = data['module_name']
except:
module_name = '其他'
solutionType = f'{solutionType}-{module_name}'
try:
clueName = data['name']
except:
clueName = '未知'
cluePhoneNumber = data['telephone']
try:
flowChannelNum = {'0': '外部流量',
'1': '正常投放',
'2': '外部导入',
'3': '异常提交',
'4': '广告预览',
'5': '抖音私信',
'6': '鲁班线索'}
flowChannelName = flowChannelNum[str (data['clue_source'])]
except:
flowChannelName = '其他'
try:
app_name = data['app_name']
except:
app_name = '未知app'
flowChannelName = f'{flowChannelName}-{app_name}'
try:
area = data['location']
except:
area = '未知区域'
# try:
# searchWord = solution['searchWord']
# except:
searchWord = ''
# try:
# keyword = solution['keyword']
# except:
keyword = ''
solutionInfo = json.dumps (data, ensure_ascii=False)
try:
userName = data['advertiser_name']
except:
userName = '未知'
insert_valuse = (
clueId, commitTime, solutionType, clueName, cluePhoneNumber, flowChannelName, area, searchWord, keyword,
solutionInfo, userName, brandName, routeName)
if mode == 'check':
check_result = checkfeiyu_ad_solution_info_in_table (db, clueId)
if check_result:
insertfeiyu_ad_solution_info (db, insert_valuse)
solutionJsonData = {'clueId': clueId,
'commitTime': commitTime,
'solutionType': solutionType,
'clueName': clueName,
'cluePhoneNumber': cluePhoneNumber,
'flowChannelName': flowChannelName,
'area': area,
'searchWord': searchWord,
'keyword': keyword,
'userName': userName,
'brandName': brandName,
'routeName': routeName}
solutionJsonDatas.append (solutionJsonData)
else:
insertfeiyu_ad_solution_info (db, insert_valuse)
solutionJsonData = {'clueId': clueId,
'commitTime': commitTime,
'solutionType': solutionType,
'clueName': clueName,
'cluePhoneNumber': cluePhoneNumber,
'flowChannelName': flowChannelName,
'area': area,
'searchWord': searchWord,
'keyword': keyword,
'userName': userName,
'brandName': brandName,
'routeName': routeName}
solutionJsonDatas.append (solutionJsonData)
return solutionJsonDatas
# 执行数据抓取
def runFeiYuAD(db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s,
advertiser_ids, mode):
operation_results = '无异常'
solutionJsonDataList = []
solutionType = 'all'
response = feiyuAD (advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s)
if response == 'Error':
response_data = 'Error'
operation_results = '[runBaiduAD]-[baiduAD]请求失败'
else:
response_data = response.text
response_json = response.json ()
try:
desc = response_json['message']
solutionCnt = response_json['data']['page_info']['total_number']
solutionJsonDatas = dumpFeiYuJosn (db, userName, brandName, routeName, response_json, mode)
solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
except:
operation_results = '[runBaiduAD]-[response_json]无法解析'
desc = 'fail'
solutionCnt = 0
insert_valuse = (
lastCatchTime_now_s, response_data, solutionCnt, desc, solutionType, userName, brandName, routeName)
insertfeiyu_ad_solution_catch (db, insert_valuse)
print (response_data)
return operation_results, solutionJsonDataList
# 定时更新7巧密钥,每6小时更新一次
def update7qiaoToken(db):
now = datetime.now ()
mm = now.strftime ("%M")
hh = now.strftime ("%H")
if int (hh) % 6 == 0 and int (mm) <= 1:
update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '')
# 多账号计时抓取投流数据
def runADsolution(db):
# 基础参数
operation_results = '无异常'
e = 'no'
mode = 'run'
# host = 'localhost'
# passwd = '111???clown'
# db_name = 'hexingxing'
# port = 3306
feiyu_kisses = getAllFeiYuUsersList (db)
try:
# db = linkTomySql (host, passwd, db_name, port)
lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
for feiyu_kis in feiyu_kisses:
userName = feiyu_kis['userName']
accessToken = feiyu_kis['accessToken']
brandName = feiyu_kis['brandName']
advertiser_ids = feiyu_kis['advertiser_ids']
routeName = feiyu_kis['routeName']
operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName,
lastCatchTime_ago_s, lastCatchTime_now_s,
advertiser_ids, mode)
solutionCntAll = len (solutionJsonDataList)
# update7qiaoToken (db)
if 1 == 1:
if solutionCntAll > 0:
if len (str (solutionJsonDataList)) >= 5000:
for solutionJsonDataList_0 in solutionJsonDataList:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
time.sleep (0.5)
else:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
updatefeiyu_ad_time_meter (db, lastCatchTime_now_s)
# db.close ()
except Exception as e:
operation_results = traceback.format_exc ()
e = 'no'
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
# 检查未成功获取的数据
def runADsolutionCheck(db,time_slot):
# 基础参数
operation_results = '无异常'
e = 'no'
mode = 'check'
# host = 'localhost'
# passwd = '111???clown'
# db_name = 'hexingxing'
# port = 3306
feiyu_kisses = getAllFeiYuUsersList (db)
try:
# db = linkTomySql (host, passwd, db_name, port)
now = datetime.now ().strftime ('%Y%m%d')
lastCatchTime_ago_s = (parse (now) + timedelta (hours=time_slot[0])).strftime ('%Y-%m-%d %H:%M:%S')
lastCatchTime_now_s = (parse (now) + timedelta (hours=time_slot[1])).strftime ('%Y-%m-%d %H:%M:%S')
# lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
for feiyu_kis in feiyu_kisses:
userName = feiyu_kis['userName']
accessToken = feiyu_kis['accessToken']
brandName = feiyu_kis['brandName']
advertiser_ids = feiyu_kis['advertiser_ids']
routeName = feiyu_kis['routeName']
operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName,
lastCatchTime_ago_s, lastCatchTime_now_s,
advertiser_ids, mode)
solutionCntAll = len (solutionJsonDataList)
# update7qiaoToken (db)
if 1 == 1:
if solutionCntAll > 0:
if len (str (solutionJsonDataList)) >= 5000:
for solutionJsonDataList_0 in solutionJsonDataList:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
time.sleep (0.5)
else:
operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
# updatefeiyu_ad_time_meter (db, lastCatchTime_now_s)
# db.close ()
except Exception as e:
operation_results = traceback.format_exc ()
e = 'no'
# 发送报错信息
if operation_results != '无异常':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}'
sendMsgToRot (url, msg_txt)
else:
...
def run(db):
# 初始化时间周期
# 更新间隔
minute = 5
f_now = datetime.now ()
f_mm = f_now.strftime ("%M")
f_ss = f_now.strftime ("%S")
f_c = int (f_mm) % minute
wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
time.sleep (wait_s)
# 执行循环
while True:
now = datetime.now ()
mm = now.strftime ("%M")
ss = now.strftime ("%S")
c = int (mm) % minute
if c == 0 and ss == '01':
s = time.time ()
# print(now)
runADsolution (db)
time.sleep (minute * 60 - 0.5 - (time.time () - s))
# 飞鱼推送
def get_access_token():
open_api_url_prefix = "https://open.oceanengine.com/open_api/"
uri = "oauth2/app_access_token/"
url = open_api_url_prefix + uri
data = {
"app_id": 1761788970859667,
"secret": "dd148df1cb5feb6d0f25639ad7176bf387ef53f7"
}
rsp = requests.post (url, json=data)
access_token = rsp.json ()['data']['access_token']
return access_token
def get_clue_list(token):
open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
uri = "2/tools/clue/get/"
url = open_api_url_prefix + uri
headers = {"Access-Token": token}
data = {
'advertiser_ids': ['2828422484857512'],
'start_time': '2022-04-01',
'end_time': '2022-04-11'
}
rsp = requests.get (url, json=data, headers=headers)
rsp_data = rsp.json ()
print (json.dumps (rsp_data, ensure_ascii=False))
return rsp_data
if __name__ == '__main__':
if 1==1:
advertiser_ids = ['1762396620978184','1762396621515784','1762396622070791']
accessToken = 'c06ac76994e55d4951223e38d174b2591efb7065'
open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
uri = "2/advertiser/info/"
url = open_api_url_prefix + uri
params = {
"advertiser_ids": advertiser_ids,
"fields": ["id", "name", "note"]
}
headers = {"Access-Token": accessToken}
rsp = requests.get (url, json=params, headers=headers)
rsp_data = rsp.json()
print(json.dumps(rsp_data,ensure_ascii=False))
if 1 == 0:
host = '124.222.188.59'
passwd = '111...Clown'
db_name = 'zuzu_data'
port = 63306
db = linkTomySql (host, passwd, db_name, port)
# 更新至当前,用于重启服务时,自动更新数据
lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
while parse (lastCatchTime_now_s) <= (datetime.now () + timedelta (minutes=-10)):
runADsolution (db)
time.sleep (2)
lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
db.close ()
# 正式运行
print ('正式运行')
db = linkTomySql (host, passwd, db_name, port)
run (db)
db.close ()
if 1 == 0:
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
ff = '''Traceback (most recent call last):
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in
csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
e = 0/0 #v2优化
ZeroDivisionError: division by zero
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in
csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
e = 0/0 #v2优化
ZeroDivisionError: division by zero
'''
msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->测试\nDetail->{ff}'
sendMsgToRot (url, msg_txt)
if 1==0:
host = '124.222.188.59'
passwd = '111...Clown'
db_name = 'zuzu_data'
port = 63306
db = linkTomySql (host, passwd, db_name, port)
# advertiser_ids = [1758512677972046, 1758589190448142, 1758589191073800, 1758589191664648, 1758589192210568, 1758589192767501]
# accessToken = 'f14ee576aad9503b529e5a0123126cdd12169577'
# lastCatchTime_ago_s = '2023-04-01 12:04:43'
# lastCatchTime_now_s = '2023-04-19 12:04:43'
# print(feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s).text)
print(getAllFeiYuUsersList (db))
db.close ()
if 1==0:
now = datetime.now ().strftime('%Y%m%d')
s_time = (parse(now)+timedelta(hours=0)).strftime('%Y-%m-%d %H:%M:%S')
e_time = (parse(now)+timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S')
print(s_time,e_time)