123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- # -*- codeing = utf-8 -*-
- # @Time : 2023/3/22 13:36
- # @Author : Clown
- # @File : demo_baiduAd.py
- # @Software : PyCharm
- import requests
- import json
- import schedule
- import threading
- import time
- import pymysql
- from datetime import datetime, timedelta
- from dateutil.parser import parse
- import traceback
- from all_key_table import update_key_value_pair_7qiaoPlus
- from demo_feiyu_ad_tk_refresh import getAllFeiYuUsersList
- def linkTomySql(host, passwd, db_name, port):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- # 本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect (
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- port=port,
- charset='utf8mb4',
- local_infile=1)
- print ('\nconnect to mysql server 成功')
- print ('---------------------------------------')
- except:
- print ("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- def read_key_value_pair(db, brand_name, wm_plate, owner):
- '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
- db:数据库,
- brand_name:品牌名,
- wm_plate:外卖平台MEITUAN或ELEME,
- owner:账号权限all或one
- '''
- cursor = db.cursor ()
- sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
- cursor.execute (sql)
- pair = json.loads (cursor.fetchall ()[0][0])
- return pair
- def orderTo7qiao(db, lastCatchTime_ago_s, solutionCntAll, solutionJsonData):
- solutionjsondata = {'data': solutionJsonData}
- key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
- applicationId = '63631ee57005c0103426fdc7'
- processId = '642128bd80236836229b3b1d'
- Token = key_json['data']
- headers_api = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
- "Content-type": "application/json",
- "X-Auth0-Token": Token}
- url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
- params_json = {
- "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
- "variables": {"任务触发时间": lastCatchTime_ago_s, 'solutioncntall': solutionCntAll,
- 'solutionjsondata': json.dumps (solutionjsondata, ensure_ascii=False)},
- "loginUserId": "92874ed35ecb45f51a58adcf18e99b5e"}
- try:
- resp = requests.post (url, headers=headers_api, json=params_json)
- msg = resp.json ()['msg']
- print (resp.text)
- if msg == '执行成功':
- operation_results = '无异常'
- else:
- operation_results = f'[orderTo7qiao]-返回结果为{msg}'
- except Exception as e:
- operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
- return operation_results
- def sendMsgToRot(url, msg_txt):
- params_json = {
- "msgtype": "markdown",
- "markdown": {
- "content": msg_txt,
- "mentioned_list": ["@all"]
- }}
- resp = requests.post (url, json=params_json).text
- print (resp)
- # 查询投放信息
- def feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s):
- open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
- uri = "2/tools/clue/get/"
- url = open_api_url_prefix + uri
- user_payload = {'advertiser_ids': advertiser_ids,
- 'start_time': lastCatchTime_ago_s,
- 'end_time': lastCatchTime_now_s,
- 'page_size': 100,
- 'page': 1}
- http_headers = {
- "Access-Token": accessToken
- }
- try:
- response = requests.get (url, json=user_payload, headers=http_headers)
- except:
- response = 'Error'
- return response
- # 查询计时器时间,输出起始时间
- def selectfeiyu_ad_time_meter(db):
- cursor = db.cursor ()
- sql = f'SELECT * FROM feiyu_ad_time_meter WHERE id = 1;'
- cursor.execute (sql)
- lastCatchTime_ago = cursor.fetchall ()[0][0]
- lastCatchTime_ago_s = (lastCatchTime_ago + timedelta (seconds=1)).strftime ('%Y-%m-%d %H:%M:%S')
- # 更新间隔
- lastCatchTime_now_s = (lastCatchTime_ago + timedelta (minutes=5)).strftime ('%Y-%m-%d %H:%M:%S')
- cursor.close ()
- return lastCatchTime_ago_s, lastCatchTime_now_s
- # 更新计时器时间
- def updatefeiyu_ad_time_meter(db, lastCatchTime_now_s):
- cursor = db.cursor ()
- sql = f'UPDATE feiyu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
- cursor.execute (sql)
- db.commit ()
- cursor.close ()
- # 新增记录至抓取记录表
- def insertfeiyu_ad_solution_catch(db, insert_valuse):
- cursor = db.cursor ()
- sql = f'INSERT INTO feiyu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql, insert_valuse)
- db.commit ()
- cursor.close ()
- # 新增记录至抓取明细记录表
- def insertfeiyu_ad_solution_info(db, insert_valuse):
- cursor = db.cursor ()
- sql = f'REPLACE INTO feiyu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql, insert_valuse)
- db.commit ()
- cursor.close ()
- # 检查数据库中是否有数据
- def checkfeiyu_ad_solution_info_in_table(db,clueId):
- cursor = db.cursor ()
- sql = f'SELECT * FROM feiyu_ad_solution_info WHERE clueId = "{clueId}";'
- cursor.execute (sql)
- cnt = cursor.fetchall ()
- if len(cnt) == 0:
- result = True
- else:
- result = False
- return result
- # 解析分配数据至分配队列
- def dumpFeiYuJosn(db, userName, brandName, routeName, response_json, mode):
- '''
- userName 账号
- brandName 品牌
- routeName 投流渠道
- response_json 请求返回的json文件
- '''
- solutionJsonDatas = []
- datas = response_json['data']['list']
- for data in datas:
- clueId = data['clue_id']
- commitTime = data['create_time_detail']
- try:
- solutionNum = {'0': '表单提交',
- '1': '在线咨询',
- '2': '智能电话',
- '3': '网页回呼',
- '4': '卡券',
- '5': '抽奖'}
- solutionType = solutionNum[str (data['clue_type'])]
- except:
- solutionType = '其他'
- try:
- module_name = data['module_name']
- except:
- module_name = '其他'
- solutionType = f'{solutionType}-{module_name}'
- try:
- clueName = data['name']
- except:
- clueName = '未知'
- cluePhoneNumber = data['telephone']
- try:
- flowChannelNum = {'0': '外部流量',
- '1': '正常投放',
- '2': '外部导入',
- '3': '异常提交',
- '4': '广告预览',
- '5': '抖音私信',
- '6': '鲁班线索'}
- flowChannelName = flowChannelNum[str (data['clue_source'])]
- except:
- flowChannelName = '其他'
- try:
- app_name = data['app_name']
- except:
- app_name = '未知app'
- flowChannelName = f'{flowChannelName}-{app_name}'
- try:
- area = data['location']
- except:
- area = '未知区域'
- # try:
- # searchWord = solution['searchWord']
- # except:
- searchWord = ''
- # try:
- # keyword = solution['keyword']
- # except:
- keyword = ''
- solutionInfo = json.dumps (data, ensure_ascii=False)
- try:
- userName = data['advertiser_name']
- except:
- userName = '未知'
- insert_valuse = (
- clueId, commitTime, solutionType, clueName, cluePhoneNumber, flowChannelName, area, searchWord, keyword,
- solutionInfo, userName, brandName, routeName)
- if mode == 'check':
- check_result = checkfeiyu_ad_solution_info_in_table (db, clueId)
- if check_result:
- insertfeiyu_ad_solution_info (db, insert_valuse)
- solutionJsonData = {'clueId': clueId,
- 'commitTime': commitTime,
- 'solutionType': solutionType,
- 'clueName': clueName,
- 'cluePhoneNumber': cluePhoneNumber,
- 'flowChannelName': flowChannelName,
- 'area': area,
- 'searchWord': searchWord,
- 'keyword': keyword,
- 'userName': userName,
- 'brandName': brandName,
- 'routeName': routeName}
- solutionJsonDatas.append (solutionJsonData)
- else:
- insertfeiyu_ad_solution_info (db, insert_valuse)
- solutionJsonData = {'clueId': clueId,
- 'commitTime': commitTime,
- 'solutionType': solutionType,
- 'clueName': clueName,
- 'cluePhoneNumber': cluePhoneNumber,
- 'flowChannelName': flowChannelName,
- 'area': area,
- 'searchWord': searchWord,
- 'keyword': keyword,
- 'userName': userName,
- 'brandName': brandName,
- 'routeName': routeName}
- solutionJsonDatas.append (solutionJsonData)
- return solutionJsonDatas
- # 执行数据抓取
- def runFeiYuAD(db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s,
- advertiser_ids, mode):
- operation_results = '无异常'
- solutionJsonDataList = []
- solutionType = 'all'
- response = feiyuAD (advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s)
- if response == 'Error':
- response_data = 'Error'
- operation_results = '[runBaiduAD]-[baiduAD]请求失败'
- else:
- response_data = response.text
- response_json = response.json ()
- try:
- desc = response_json['message']
- solutionCnt = response_json['data']['page_info']['total_number']
- solutionJsonDatas = dumpFeiYuJosn (db, userName, brandName, routeName, response_json, mode)
- solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
- except:
- operation_results = '[runBaiduAD]-[response_json]无法解析'
- desc = 'fail'
- solutionCnt = 0
- insert_valuse = (
- lastCatchTime_now_s, response_data, solutionCnt, desc, solutionType, userName, brandName, routeName)
- insertfeiyu_ad_solution_catch (db, insert_valuse)
- print (response_data)
- return operation_results, solutionJsonDataList
- # 定时更新7巧密钥,每6小时更新一次
- def update7qiaoToken(db):
- now = datetime.now ()
- mm = now.strftime ("%M")
- hh = now.strftime ("%H")
- if int (hh) % 6 == 0 and int (mm) <= 1:
- update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '')
- # 多账号计时抓取投流数据
- def runADsolution(db):
- # 基础参数
- operation_results = '无异常'
- e = 'no'
- mode = 'run'
- # host = 'localhost'
- # passwd = '111???clown'
- # db_name = 'hexingxing'
- # port = 3306
- feiyu_kisses = getAllFeiYuUsersList (db)
- try:
- # db = linkTomySql (host, passwd, db_name, port)
- lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
- for feiyu_kis in feiyu_kisses:
- userName = feiyu_kis['userName']
- accessToken = feiyu_kis['accessToken']
- brandName = feiyu_kis['brandName']
- advertiser_ids = feiyu_kis['advertiser_ids']
- routeName = feiyu_kis['routeName']
- operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName,
- lastCatchTime_ago_s, lastCatchTime_now_s,
- advertiser_ids, mode)
- solutionCntAll = len (solutionJsonDataList)
- # update7qiaoToken (db)
- if 1 == 1:
- if solutionCntAll > 0:
- if len (str (solutionJsonDataList)) >= 5000:
- for solutionJsonDataList_0 in solutionJsonDataList:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
- time.sleep (0.5)
- else:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- updatefeiyu_ad_time_meter (db, lastCatchTime_now_s)
- # db.close ()
- except Exception as e:
- operation_results = traceback.format_exc ()
- e = 'no'
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- # 检查未成功获取的数据
- def runADsolutionCheck(db,time_slot):
- # 基础参数
- operation_results = '无异常'
- e = 'no'
- mode = 'check'
- # host = 'localhost'
- # passwd = '111???clown'
- # db_name = 'hexingxing'
- # port = 3306
- feiyu_kisses = getAllFeiYuUsersList (db)
- try:
- # db = linkTomySql (host, passwd, db_name, port)
- now = datetime.now ().strftime ('%Y%m%d')
- lastCatchTime_ago_s = (parse (now) + timedelta (hours=time_slot[0])).strftime ('%Y-%m-%d %H:%M:%S')
- lastCatchTime_now_s = (parse (now) + timedelta (hours=time_slot[1])).strftime ('%Y-%m-%d %H:%M:%S')
- # lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
- for feiyu_kis in feiyu_kisses:
- userName = feiyu_kis['userName']
- accessToken = feiyu_kis['accessToken']
- brandName = feiyu_kis['brandName']
- advertiser_ids = feiyu_kis['advertiser_ids']
- routeName = feiyu_kis['routeName']
- operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName,
- lastCatchTime_ago_s, lastCatchTime_now_s,
- advertiser_ids, mode)
- solutionCntAll = len (solutionJsonDataList)
- # update7qiaoToken (db)
- if 1 == 1:
- if solutionCntAll > 0:
- if len (str (solutionJsonDataList)) >= 5000:
- for solutionJsonDataList_0 in solutionJsonDataList:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
- time.sleep (0.5)
- else:
- operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- # updatefeiyu_ad_time_meter (db, lastCatchTime_now_s)
- # db.close ()
- except Exception as e:
- operation_results = traceback.format_exc ()
- e = 'no'
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot (url, msg_txt)
- else:
- ...
- def run(db):
- # 初始化时间周期
- # 更新间隔
- minute = 5
- f_now = datetime.now ()
- f_mm = f_now.strftime ("%M")
- f_ss = f_now.strftime ("%S")
- f_c = int (f_mm) % minute
- wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
- time.sleep (wait_s)
- # 执行循环
- while True:
- now = datetime.now ()
- mm = now.strftime ("%M")
- ss = now.strftime ("%S")
- c = int (mm) % minute
- if c == 0 and ss == '01':
- s = time.time ()
- # print(now)
- runADsolution (db)
- time.sleep (minute * 60 - 0.5 - (time.time () - s))
- # 飞鱼推送
- def get_access_token():
- open_api_url_prefix = "https://open.oceanengine.com/open_api/"
- uri = "oauth2/app_access_token/"
- url = open_api_url_prefix + uri
- data = {
- "app_id": 1761788970859667,
- "secret": "dd148df1cb5feb6d0f25639ad7176bf387ef53f7"
- }
- rsp = requests.post (url, json=data)
- access_token = rsp.json ()['data']['access_token']
- return access_token
- def get_clue_list(token):
- open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
- uri = "2/tools/clue/get/"
- url = open_api_url_prefix + uri
- headers = {"Access-Token": token}
- data = {
- 'advertiser_ids': ['2828422484857512'],
- 'start_time': '2022-04-01',
- 'end_time': '2022-04-11'
- }
- rsp = requests.get (url, json=data, headers=headers)
- rsp_data = rsp.json ()
- print (json.dumps (rsp_data, ensure_ascii=False))
- return rsp_data
- if __name__ == '__main__':
- if 1==1:
- advertiser_ids = ['1762396620978184','1762396621515784','1762396622070791']
- accessToken = 'c06ac76994e55d4951223e38d174b2591efb7065'
- open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
- uri = "2/advertiser/info/"
- url = open_api_url_prefix + uri
- params = {
- "advertiser_ids": advertiser_ids,
- "fields": ["id", "name", "note"]
- }
- headers = {"Access-Token": accessToken}
- rsp = requests.get (url, json=params, headers=headers)
- rsp_data = rsp.json()
- print(json.dumps(rsp_data,ensure_ascii=False))
- if 1 == 0:
- host = '124.222.188.59'
- passwd = '111...Clown'
- db_name = 'zuzu_data'
- port = 63306
- db = linkTomySql (host, passwd, db_name, port)
- # 更新至当前,用于重启服务时,自动更新数据
- lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
- while parse (lastCatchTime_now_s) <= (datetime.now () + timedelta (minutes=-10)):
- runADsolution (db)
- time.sleep (2)
- lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db)
- db.close ()
- # 正式运行
- print ('正式运行')
- db = linkTomySql (host, passwd, db_name, port)
- run (db)
- db.close ()
- if 1 == 0:
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- ff = '''Traceback (most recent call last):
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in <module>
- csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
- e = 0/0 #v2优化
- ZeroDivisionError: division by zero
- During handling of the above exception, another exception occurred:
- Traceback (most recent call last):
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in <module>
- csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
- File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
- e = 0/0 #v2优化
- ZeroDivisionError: division by zero
- '''
- msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->测试\nDetail->{ff}'
- sendMsgToRot (url, msg_txt)
- if 1==0:
- host = '124.222.188.59'
- passwd = '111...Clown'
- db_name = 'zuzu_data'
- port = 63306
- db = linkTomySql (host, passwd, db_name, port)
- # advertiser_ids = [1758512677972046, 1758589190448142, 1758589191073800, 1758589191664648, 1758589192210568, 1758589192767501]
- # accessToken = 'f14ee576aad9503b529e5a0123126cdd12169577'
- # lastCatchTime_ago_s = '2023-04-01 12:04:43'
- # lastCatchTime_now_s = '2023-04-19 12:04:43'
- # print(feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s).text)
- print(getAllFeiYuUsersList (db))
- db.close ()
- if 1==0:
- now = datetime.now ().strftime('%Y%m%d')
- s_time = (parse(now)+timedelta(hours=0)).strftime('%Y-%m-%d %H:%M:%S')
- e_time = (parse(now)+timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S')
- print(s_time,e_time)
|