# -*- codeing = utf-8 -*- # @Time : 2023/3/22 13:36 # @Author : Clown # @File : demo_baiduAd.py # @Software : PyCharm import requests import json import schedule import threading import time import pymysql from datetime import datetime, timedelta from dateutil.parser import parse import traceback from all_key_table import update_key_value_pair_7qiaoPlus from demo_feiyu_ad_tk_refresh import getAllFeiYuUsersList def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, port=port, charset='utf8mb4', local_infile=1) print ('\nconnect to mysql server 成功') print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db def read_key_value_pair(db, brand_name, wm_plate, owner): '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出 db:数据库, brand_name:品牌名, wm_plate:外卖平台MEITUAN或ELEME, owner:账号权限all或one ''' cursor = db.cursor () sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";' cursor.execute (sql) pair = json.loads (cursor.fetchall ()[0][0]) return pair def orderTo7qiao(db, lastCatchTime_ago_s, solutionCntAll, solutionJsonData): solutionjsondata = {'data': solutionJsonData} key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all') applicationId = '63631ee57005c0103426fdc7' processId = '642128bd80236836229b3b1d' Token = key_json['data'] headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start' params_json = { "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}], "variables": {"任务触发时间": lastCatchTime_ago_s, 'solutioncntall': solutionCntAll, 'solutionjsondata': json.dumps (solutionjsondata, ensure_ascii=False)}, "loginUserId": "92874ed35ecb45f51a58adcf18e99b5e"} try: resp = requests.post (url, headers=headers_api, json=params_json) msg = resp.json ()['msg'] print (resp.text) if msg == '执行成功': operation_results = '无异常' else: operation_results = f'[orderTo7qiao]-返回结果为{msg}' except Exception as e: operation_results = f'[orderTo7qiao]-道一云端无法连接{e}' return operation_results def sendMsgToRot(url, msg_txt): params_json = { "msgtype": "markdown", "markdown": { "content": msg_txt, "mentioned_list": ["@all"] }} resp = requests.post (url, json=params_json).text print (resp) # 查询投放信息 def feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s): open_api_url_prefix = "https://ad.oceanengine.com/open_api/" uri = "2/tools/clue/get/" url = open_api_url_prefix + uri user_payload = {'advertiser_ids': advertiser_ids, 'start_time': lastCatchTime_ago_s, 'end_time': lastCatchTime_now_s, 'page_size': 100, 'page': 1} http_headers = { "Access-Token": accessToken } try: response = requests.get (url, json=user_payload, headers=http_headers) except: response = 'Error' return response # 查询计时器时间,输出起始时间 def selectfeiyu_ad_time_meter(db): cursor = db.cursor () sql = f'SELECT * FROM feiyu_ad_time_meter WHERE id = 1;' cursor.execute (sql) lastCatchTime_ago = cursor.fetchall ()[0][0] lastCatchTime_ago_s = (lastCatchTime_ago + timedelta (seconds=1)).strftime ('%Y-%m-%d %H:%M:%S') # 更新间隔 lastCatchTime_now_s = (lastCatchTime_ago + timedelta (minutes=5)).strftime ('%Y-%m-%d %H:%M:%S') cursor.close () return lastCatchTime_ago_s, lastCatchTime_now_s # 更新计时器时间 def updatefeiyu_ad_time_meter(db, lastCatchTime_now_s): cursor = db.cursor () sql = f'UPDATE feiyu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;' cursor.execute (sql) db.commit () cursor.close () # 新增记录至抓取记录表 def insertfeiyu_ad_solution_catch(db, insert_valuse): cursor = db.cursor () sql = f'INSERT INTO feiyu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);' cursor.execute (sql, insert_valuse) db.commit () cursor.close () # 新增记录至抓取明细记录表 def insertfeiyu_ad_solution_info(db, insert_valuse): cursor = db.cursor () sql = f'REPLACE INTO feiyu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' cursor.execute (sql, insert_valuse) db.commit () cursor.close () # 检查数据库中是否有数据 def checkfeiyu_ad_solution_info_in_table(db,clueId): cursor = db.cursor () sql = f'SELECT * FROM feiyu_ad_solution_info WHERE clueId = "{clueId}";' cursor.execute (sql) cnt = cursor.fetchall () if len(cnt) == 0: result = True else: result = False return result # 解析分配数据至分配队列 def dumpFeiYuJosn(db, userName, brandName, routeName, response_json, mode): ''' userName 账号 brandName 品牌 routeName 投流渠道 response_json 请求返回的json文件 ''' solutionJsonDatas = [] datas = response_json['data']['list'] for data in datas: clueId = data['clue_id'] commitTime = data['create_time_detail'] try: solutionNum = {'0': '表单提交', '1': '在线咨询', '2': '智能电话', '3': '网页回呼', '4': '卡券', '5': '抽奖'} solutionType = solutionNum[str (data['clue_type'])] except: solutionType = '其他' try: module_name = data['module_name'] except: module_name = '其他' solutionType = f'{solutionType}-{module_name}' try: clueName = data['name'] except: clueName = '未知' cluePhoneNumber = data['telephone'] try: flowChannelNum = {'0': '外部流量', '1': '正常投放', '2': '外部导入', '3': '异常提交', '4': '广告预览', '5': '抖音私信', '6': '鲁班线索'} flowChannelName = flowChannelNum[str (data['clue_source'])] except: flowChannelName = '其他' try: app_name = data['app_name'] except: app_name = '未知app' flowChannelName = f'{flowChannelName}-{app_name}' try: area = data['location'] except: area = '未知区域' # try: # searchWord = solution['searchWord'] # except: searchWord = '' # try: # keyword = solution['keyword'] # except: keyword = '' solutionInfo = json.dumps (data, ensure_ascii=False) try: userName = data['advertiser_name'] except: userName = '未知' insert_valuse = ( clueId, commitTime, solutionType, clueName, cluePhoneNumber, flowChannelName, area, searchWord, keyword, solutionInfo, userName, brandName, routeName) if mode == 'check': check_result = checkfeiyu_ad_solution_info_in_table (db, clueId) if check_result: insertfeiyu_ad_solution_info (db, insert_valuse) solutionJsonData = {'clueId': clueId, 'commitTime': commitTime, 'solutionType': solutionType, 'clueName': clueName, 'cluePhoneNumber': cluePhoneNumber, 'flowChannelName': flowChannelName, 'area': area, 'searchWord': searchWord, 'keyword': keyword, 'userName': userName, 'brandName': brandName, 'routeName': routeName} solutionJsonDatas.append (solutionJsonData) else: insertfeiyu_ad_solution_info (db, insert_valuse) solutionJsonData = {'clueId': clueId, 'commitTime': commitTime, 'solutionType': solutionType, 'clueName': clueName, 'cluePhoneNumber': cluePhoneNumber, 'flowChannelName': flowChannelName, 'area': area, 'searchWord': searchWord, 'keyword': keyword, 'userName': userName, 'brandName': brandName, 'routeName': routeName} solutionJsonDatas.append (solutionJsonData) return solutionJsonDatas # 执行数据抓取 def runFeiYuAD(db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, advertiser_ids, mode): operation_results = '无异常' solutionJsonDataList = [] solutionType = 'all' response = feiyuAD (advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s) if response == 'Error': response_data = 'Error' operation_results = '[runBaiduAD]-[baiduAD]请求失败' else: response_data = response.text response_json = response.json () try: desc = response_json['message'] solutionCnt = response_json['data']['page_info']['total_number'] solutionJsonDatas = dumpFeiYuJosn (db, userName, brandName, routeName, response_json, mode) solutionJsonDataList = solutionJsonDataList + solutionJsonDatas except: operation_results = '[runBaiduAD]-[response_json]无法解析' desc = 'fail' solutionCnt = 0 insert_valuse = ( lastCatchTime_now_s, response_data, solutionCnt, desc, solutionType, userName, brandName, routeName) insertfeiyu_ad_solution_catch (db, insert_valuse) print (response_data) return operation_results, solutionJsonDataList # 定时更新7巧密钥,每6小时更新一次 def update7qiaoToken(db): now = datetime.now () mm = now.strftime ("%M") hh = now.strftime ("%H") if int (hh) % 6 == 0 and int (mm) <= 1: update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '') # 多账号计时抓取投流数据 def runADsolution(db): # 基础参数 operation_results = '无异常' e = 'no' mode = 'run' # host = 'localhost' # passwd = '111???clown' # db_name = 'hexingxing' # port = 3306 feiyu_kisses = getAllFeiYuUsersList (db) try: # db = linkTomySql (host, passwd, db_name, port) lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db) for feiyu_kis in feiyu_kisses: userName = feiyu_kis['userName'] accessToken = feiyu_kis['accessToken'] brandName = feiyu_kis['brandName'] advertiser_ids = feiyu_kis['advertiser_ids'] routeName = feiyu_kis['routeName'] operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, advertiser_ids, mode) solutionCntAll = len (solutionJsonDataList) # update7qiaoToken (db) if 1 == 1: if solutionCntAll > 0: if len (str (solutionJsonDataList)) >= 5000: for solutionJsonDataList_0 in solutionJsonDataList: operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0) time.sleep (0.5) else: operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList) # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... updatefeiyu_ad_time_meter (db, lastCatchTime_now_s) # db.close () except Exception as e: operation_results = traceback.format_exc () e = 'no' # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... # 检查未成功获取的数据 def runADsolutionCheck(db,time_slot): # 基础参数 operation_results = '无异常' e = 'no' mode = 'check' # host = 'localhost' # passwd = '111???clown' # db_name = 'hexingxing' # port = 3306 feiyu_kisses = getAllFeiYuUsersList (db) try: # db = linkTomySql (host, passwd, db_name, port) now = datetime.now ().strftime ('%Y%m%d') lastCatchTime_ago_s = (parse (now) + timedelta (hours=time_slot[0])).strftime ('%Y-%m-%d %H:%M:%S') lastCatchTime_now_s = (parse (now) + timedelta (hours=time_slot[1])).strftime ('%Y-%m-%d %H:%M:%S') # lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db) for feiyu_kis in feiyu_kisses: userName = feiyu_kis['userName'] accessToken = feiyu_kis['accessToken'] brandName = feiyu_kis['brandName'] advertiser_ids = feiyu_kis['advertiser_ids'] routeName = feiyu_kis['routeName'] operation_results, solutionJsonDataList = runFeiYuAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, advertiser_ids, mode) solutionCntAll = len (solutionJsonDataList) # update7qiaoToken (db) if 1 == 1: if solutionCntAll > 0: if len (str (solutionJsonDataList)) >= 5000: for solutionJsonDataList_0 in solutionJsonDataList: operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0) time.sleep (0.5) else: operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList) # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... # updatefeiyu_ad_time_meter (db, lastCatchTime_now_s) # db.close () except Exception as e: operation_results = traceback.format_exc () e = 'no' # 发送报错信息 if operation_results != '无异常': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' msg_txt = f'【飞鱼投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->{e}\nDetail->{operation_results}' sendMsgToRot (url, msg_txt) else: ... def run(db): # 初始化时间周期 # 更新间隔 minute = 5 f_now = datetime.now () f_mm = f_now.strftime ("%M") f_ss = f_now.strftime ("%S") f_c = int (f_mm) % minute wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss) time.sleep (wait_s) # 执行循环 while True: now = datetime.now () mm = now.strftime ("%M") ss = now.strftime ("%S") c = int (mm) % minute if c == 0 and ss == '01': s = time.time () # print(now) runADsolution (db) time.sleep (minute * 60 - 0.5 - (time.time () - s)) # 飞鱼推送 def get_access_token(): open_api_url_prefix = "https://open.oceanengine.com/open_api/" uri = "oauth2/app_access_token/" url = open_api_url_prefix + uri data = { "app_id": 1761788970859667, "secret": "dd148df1cb5feb6d0f25639ad7176bf387ef53f7" } rsp = requests.post (url, json=data) access_token = rsp.json ()['data']['access_token'] return access_token def get_clue_list(token): open_api_url_prefix = "https://ad.oceanengine.com/open_api/" uri = "2/tools/clue/get/" url = open_api_url_prefix + uri headers = {"Access-Token": token} data = { 'advertiser_ids': ['2828422484857512'], 'start_time': '2022-04-01', 'end_time': '2022-04-11' } rsp = requests.get (url, json=data, headers=headers) rsp_data = rsp.json () print (json.dumps (rsp_data, ensure_ascii=False)) return rsp_data if __name__ == '__main__': if 1==1: advertiser_ids = ['1762396620978184','1762396621515784','1762396622070791'] accessToken = 'c06ac76994e55d4951223e38d174b2591efb7065' open_api_url_prefix = "https://ad.oceanengine.com/open_api/" uri = "2/advertiser/info/" url = open_api_url_prefix + uri params = { "advertiser_ids": advertiser_ids, "fields": ["id", "name", "note"] } headers = {"Access-Token": accessToken} rsp = requests.get (url, json=params, headers=headers) rsp_data = rsp.json() print(json.dumps(rsp_data,ensure_ascii=False)) if 1 == 0: host = '124.222.188.59' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 db = linkTomySql (host, passwd, db_name, port) # 更新至当前,用于重启服务时,自动更新数据 lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db) while parse (lastCatchTime_now_s) <= (datetime.now () + timedelta (minutes=-10)): runADsolution (db) time.sleep (2) lastCatchTime_ago_s, lastCatchTime_now_s = selectfeiyu_ad_time_meter (db) db.close () # 正式运行 print ('正式运行') db = linkTomySql (host, passwd, db_name, port) run (db) db.close () if 1 == 0: url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0' ff = '''Traceback (most recent call last): File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df) File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E e = 0/0 #v2优化 ZeroDivisionError: division by zero During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df) File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E e = 0/0 #v2优化 ZeroDivisionError: division by zero ''' msg_txt = f'【百度投流数据同步】服务端出现异常,请管理员尽快处理!\n>Exception->测试\nDetail->{ff}' sendMsgToRot (url, msg_txt) if 1==0: host = '124.222.188.59' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 db = linkTomySql (host, passwd, db_name, port) # advertiser_ids = [1758512677972046, 1758589190448142, 1758589191073800, 1758589191664648, 1758589192210568, 1758589192767501] # accessToken = 'f14ee576aad9503b529e5a0123126cdd12169577' # lastCatchTime_ago_s = '2023-04-01 12:04:43' # lastCatchTime_now_s = '2023-04-19 12:04:43' # print(feiyuAD(advertiser_ids, accessToken, lastCatchTime_ago_s, lastCatchTime_now_s).text) print(getAllFeiYuUsersList (db)) db.close () if 1==0: now = datetime.now ().strftime('%Y%m%d') s_time = (parse(now)+timedelta(hours=0)).strftime('%Y-%m-%d %H:%M:%S') e_time = (parse(now)+timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S') print(s_time,e_time)