demo_baiduAd.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. # -*- codeing = utf-8 -*-
  2. # @Time : 2023/3/22 13:36
  3. # @Author : Clown
  4. # @File : demo_baiduAd.py
  5. # @Software : PyCharm
  6. import requests
  7. import json
  8. import schedule
  9. import threading
  10. import time
  11. import pymysql
  12. from datetime import datetime,timedelta
  13. from dateutil.parser import parse
  14. import traceback
  15. from all_key_table import update_key_value_pair_7qiaoPlus
  16. from demo_baidu_ad_tk_refresh import getAllBaiDuUsersList
  17. def linkTomySql(host, passwd, db_name, port):
  18. '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
  19. try:
  20. # 本地连接为:localhost 服务器连接为:124.222.188.59
  21. db = pymysql.connect (
  22. host=host, user="root",
  23. passwd=passwd,
  24. db=db_name,
  25. port=port,
  26. charset='utf8mb4',
  27. local_infile=1)
  28. print ('\nconnect to mysql server 成功')
  29. print ('---------------------------------------')
  30. except:
  31. print ("\ncould not connect to mysql server")
  32. db = "连接失败"
  33. return db
  34. def read_key_value_pair(db, brand_name, wm_plate, owner):
  35. '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
  36. db:数据库,
  37. brand_name:品牌名,
  38. wm_plate:外卖平台MEITUAN或ELEME,
  39. owner:账号权限all或one
  40. '''
  41. cursor = db.cursor ()
  42. sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
  43. cursor.execute (sql)
  44. pair = json.loads (cursor.fetchall ()[0][0])
  45. return pair
  46. def orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData):
  47. solutionjsondata = {'data':solutionJsonData}
  48. key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
  49. applicationId = '63631ee57005c0103426fdc7'
  50. processId = '642128bd80236836229b3b1d'
  51. Token = key_json['data']
  52. headers_api = {
  53. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
  54. "Content-type": "application/json",
  55. "X-Auth0-Token": Token}
  56. url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
  57. params_json = {
  58. "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
  59. "variables": {"任务触发时间": lastCatchTime_ago_s,'solutioncntall':solutionCntAll,'solutionjsondata':json.dumps(solutionjsondata,ensure_ascii=False)},
  60. "loginUserId":"92874ed35ecb45f51a58adcf18e99b5e"}
  61. try:
  62. resp = requests.post(url,headers =headers_api,json=params_json)
  63. msg = resp.json()['msg']
  64. print(resp.text)
  65. if msg == '执行成功':
  66. operation_results = '无异常'
  67. else:
  68. operation_results = f'[orderTo7qiao]-返回结果为{msg}'
  69. except Exception as e:
  70. operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
  71. return operation_results
  72. def sendMsgToRot(url,msg_txt):
  73. params_json = {
  74. "msgtype": "markdown",
  75. "markdown": {
  76. "content": msg_txt,
  77. "mentioned_list":["@all"]
  78. }}
  79. resp = requests.post(url,json=params_json).text
  80. print(resp)
  81. def baiduAD(userName,accessToken,solutionType,lastCatchTime_ago_s,lastCatchTime_now_s):
  82. url = "https://api.baidu.com/json/sms/service/LeadsNoticeService/getNoticeList"
  83. user_payload = {
  84. "header": {
  85. "authorityType": 5,
  86. "userName": userName,
  87. "accessToken": accessToken,
  88. "action": "API-PYTHON"
  89. },
  90. "body": {
  91. "solutionType": solutionType,
  92. "startDate": lastCatchTime_ago_s,
  93. "endDate": lastCatchTime_now_s
  94. }
  95. }
  96. http_headers = {
  97. "Accept-Encoding": "gzip, deflate",
  98. "Content-Type": "application/json",
  99. "Accept": "application/json"
  100. }
  101. user_payload = json.dumps(user_payload)
  102. try:
  103. response = requests.request("POST", url, data=user_payload, headers=http_headers)
  104. except:
  105. response = 'Error'
  106. return response
  107. # 查询计时器时间,输出起始时间
  108. def selectbaidu_ad_time_meter(db):
  109. cursor = db.cursor ()
  110. sql = f'SELECT * FROM baidu_ad_time_meter WHERE id = 1;'
  111. cursor.execute (sql)
  112. lastCatchTime_ago = cursor.fetchall ()[0][0]
  113. lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
  114. # 更新间隔
  115. lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S')
  116. cursor.close()
  117. return lastCatchTime_ago_s,lastCatchTime_now_s
  118. # 更新计时器时间
  119. def updatebaidu_ad_time_meter(db,lastCatchTime_now_s):
  120. cursor = db.cursor ()
  121. sql = f'UPDATE baidu_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
  122. cursor.execute(sql)
  123. db.commit()
  124. cursor.close()
  125. # 新增记录至抓取记录表
  126. def insertbaidu_ad_solution_catch(db,insert_valuse):
  127. cursor = db.cursor ()
  128. sql = f'INSERT INTO baidu_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
  129. cursor.execute (sql,insert_valuse)
  130. db.commit ()
  131. cursor.close ()
  132. # 新增记录至抓取明细记录表
  133. def insertbaidu_ad_solution_info(db,insert_valuse):
  134. cursor = db.cursor ()
  135. sql = f'INSERT INTO baidu_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  136. cursor.execute (sql,insert_valuse)
  137. db.commit ()
  138. cursor.close ()
  139. # 解析分配数据至分配队列
  140. def dumpBaiduJosn(db,userName,brandName,routeName,response_json):
  141. solutionJsonDatas = []
  142. datas = response_json['body']['data']
  143. for data in datas:
  144. noticeDetailList = data['noticeDetailList']
  145. for solution in noticeDetailList:
  146. clueId = solution['clueId']
  147. commitTime = solution['commitTime']
  148. solutionType = solution['solutionType']
  149. try:
  150. clueName = '未知'
  151. formDetail = json.loads(solution['formDetail'])
  152. for i in formDetail:
  153. if i['type'] == 'name':
  154. clueName = i['value']
  155. except:
  156. clueName = '未知'
  157. cluePhoneNumber = solution['cluePhoneNumber']
  158. try:
  159. flowChannelName = solution['flowChannelName']
  160. except:
  161. flowChannelName = '其他'
  162. try:
  163. area = solution['area']
  164. except:
  165. area = '未知区域'
  166. try:
  167. searchWord = solution['searchWord']
  168. except:
  169. searchWord = ''
  170. try:
  171. keyword = solution['keyword']
  172. except:
  173. keyword = ''
  174. solutionInfo = json.dumps(solution,ensure_ascii=False)
  175. userName = userName
  176. if userName == 'baidu-hp-cxb餐04yxB23HY08203':
  177. keyword = '【定向加油包】'+keyword
  178. insert_valuse = (clueId,commitTime,solutionType,clueName,cluePhoneNumber,flowChannelName,area,searchWord,keyword,solutionInfo,userName,brandName,routeName)
  179. if 1==0:
  180. insertbaidu_ad_solution_info (db, insert_valuse)
  181. else:
  182. print('测试中')
  183. solutionJsonData = {'clueId':clueId,
  184. 'commitTime':commitTime,
  185. 'solutionType':solutionType,
  186. 'clueName':clueName,
  187. 'cluePhoneNumber':cluePhoneNumber,
  188. 'flowChannelName':flowChannelName,
  189. 'area':area,
  190. 'searchWord':searchWord,
  191. 'keyword':keyword,
  192. 'userName':userName,
  193. 'brandName':brandName,
  194. 'routeName':routeName}
  195. solutionJsonDatas.append(solutionJsonData)
  196. return solutionJsonDatas
  197. # 执行数据抓取
  198. def runBaiduAD(db,userName,accessToken,brandName,routeName,lastCatchTime_ago_s,lastCatchTime_now_s):
  199. operation_results = '无异常'
  200. solutionTypes = ["phone","form"]
  201. solutionJsonDataList = []
  202. for solutionType in solutionTypes:
  203. response = baiduAD (userName, accessToken, solutionType, lastCatchTime_ago_s, lastCatchTime_now_s)
  204. if response == 'Error':
  205. response_data = 'Error'
  206. operation_results = '[runBaiduAD]-[baiduAD]请求失败'
  207. else:
  208. response_data = response.text
  209. response_json = response.json()
  210. try:
  211. desc = response_json['header']['desc']
  212. solutionCnt = response_json['body']['data'][0]['totalNum']
  213. solutionJsonDatas = dumpBaiduJosn (db, userName,brandName,routeName, response_json)
  214. solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
  215. except:
  216. operation_results = '[runBaiduAD]-[response_json]无法解析'
  217. desc = 'fail'
  218. solutionCnt = 0
  219. insert_valuse = (lastCatchTime_now_s,response_data,solutionCnt,desc,solutionType,userName,brandName,routeName)
  220. if 1==0:
  221. insertbaidu_ad_solution_catch (db, insert_valuse)
  222. else:
  223. print('测试中')
  224. print (response_data)
  225. return operation_results,solutionJsonDataList
  226. # 定时更新7巧密钥,每6小时更新一次
  227. def update7qiaoToken(db):
  228. now = datetime.now ()
  229. mm = now.strftime ("%M")
  230. hh = now.strftime ("%H")
  231. if int (hh) % 6 == 0 and int (mm) <= 1:
  232. update_key_value_pair_7qiaoPlus (db, '道一云', '7qiaoPlus', 'all', '')
  233. # 多账号计时抓取投流数据
  234. def runADsolution(db):
  235. # 基础参数
  236. operation_results = '无异常'
  237. e = 'no'
  238. # host = 'localhost'
  239. # passwd = '111???clown'
  240. # db_name = 'hexingxing'
  241. # port = 3306
  242. baidu_kisses = getAllBaiDuUsersList(db)
  243. try:
  244. # db = linkTomySql (host, passwd, db_name, port)
  245. lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
  246. for baidu_kis in baidu_kisses:
  247. userName = baidu_kis['userName']
  248. accessToken = baidu_kis['accessToken']
  249. brandName = baidu_kis['brandName']
  250. routeName = baidu_kis['routeName']
  251. operation_results,solutionJsonDataList = runBaiduAD (db,userName, accessToken,brandName,routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
  252. solutionCntAll = len(solutionJsonDataList)
  253. update7qiaoToken (db)
  254. if solutionCntAll > 0:
  255. if len(str(solutionJsonDataList)) >= 5000:
  256. for solutionJsonDataList_0 in solutionJsonDataList:
  257. operation_results = orderTo7qiao (db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
  258. time.sleep(0.5)
  259. else:
  260. operation_results = orderTo7qiao (db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
  261. # 发送报错信息
  262. if operation_results != '无异常':
  263. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
  264. msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
  265. sendMsgToRot (url, msg_txt)
  266. else:
  267. ...
  268. updatebaidu_ad_time_meter (db, lastCatchTime_now_s)
  269. # db.close ()
  270. except Exception as e:
  271. operation_results = traceback.format_exc ()
  272. e = 'no'
  273. # 发送报错信息
  274. if operation_results != '无异常':
  275. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
  276. msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
  277. sendMsgToRot (url, msg_txt)
  278. else:
  279. ...
  280. def run(db):
  281. # 初始化时间周期
  282. # 更新间隔
  283. minute = 5
  284. f_now = datetime.now ()
  285. f_mm = f_now.strftime ("%M")
  286. f_ss = f_now.strftime ("%S")
  287. f_c = int (f_mm) % minute
  288. wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
  289. time.sleep (wait_s)
  290. # 执行循环
  291. while True:
  292. now = datetime.now ()
  293. mm = now.strftime ("%M")
  294. ss = now.strftime ("%S")
  295. c = int (mm) % minute
  296. if c == 0 and ss == '01':
  297. s = time.time ()
  298. # print(now)
  299. runADsolution(db)
  300. time.sleep (minute * 60 - 0.5 - (time.time () - s))
  301. if __name__ == '__main__':
  302. # runBaiduAD ()
  303. if 1==1:
  304. host = 'localhost'
  305. passwd = '111???clown'
  306. db_name = 'hexingxing'
  307. port = 3306
  308. db = linkTomySql (host, passwd, db_name, port)
  309. if 1 == 1:
  310. userName = 'baidu-hp-cxb餐01yxB23HY08203'
  311. accessToken = 'eyJhbGciOiJIUzM4NCJ9.eyJzdWIiOiJhY2MiLCJhdWQiOiLkv6Hmga_lkIzmraUiLCJ1aWQiOjQ3MzA1MjQ3LCJhcHBJZCI6ImY1NzVmZGM2YTdiYTNiMzEzOTE3YjAwZTA1ZjM5YTlkIiwiaXNzIjoi5ZWG5Lia5byA5Y-R6ICF5Lit5b-DIiwicGxhdGZvcm1JZCI6IjQ5NjAzNDU5NjU5NTg1NjE3OTQiLCJleHAiOjE2ODE3OTQyNDIsImp0aSI6Ijc0MDk1NDYwMTU2Mjc2ODk5OTgifQ.qP4ucmmos_BlRZk9jZykS48VXuEmGAS1YQO6Q2_AU92KvuZtpfhqLGppBXfhexAR'
  312. brandName = '粥小鲜'
  313. routeName = 'baidu'
  314. lastCatchTime_ago_s = '2023-04-16 15:00:00'
  315. lastCatchTime_now_s = '2023-04-16 15:59:00'
  316. runBaiduAD (db, userName, accessToken, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s)
  317. db.close()
  318. if 1 == 0:
  319. # 更新至当前,用于重启服务时,自动更新数据
  320. lastCatchTime_ago_s,lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
  321. while parse(lastCatchTime_now_s) <= datetime.now():
  322. runADsolution (db)
  323. time.sleep(10)
  324. lastCatchTime_ago_s, lastCatchTime_now_s = selectbaidu_ad_time_meter (db)
  325. db.close ()
  326. # 正式运行
  327. print('正式运行')
  328. db = linkTomySql (host, passwd, db_name, port)
  329. run(db)
  330. # lastCatchTime_ago_s = '2023-03-26 23:29:03'
  331. # solutionCntAll = 0
  332. # solutionJsonData = {}
  333. # orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData)
  334. if 1==0:
  335. runADsolution ()
  336. if 1==0:
  337. url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
  338. ff = '''Traceback (most recent call last):
  339. File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 293, in <module>
  340. csv_name_storage,csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
  341. File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
  342. e = 0/0 #v2优化
  343. ZeroDivisionError: division by zero
  344. During handling of the above exception, another exception occurred:
  345. Traceback (most recent call last):
  346. File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 298, in <module>
  347. csv_name_storage, csv_name_application = Num_Of_OrdersPerCapita_InRecentD30D90_E(file_path, brand_name, elm_pair, shops_info_df)
  348. File "/home/python_flies/Num_Of_OrdersPerCapita_InRecentD30D90_E.py", line 253, in Num_Of_OrdersPerCapita_InRecentD30D90_E
  349. e = 0/0 #v2优化
  350. ZeroDivisionError: division by zero
  351. '''
  352. msg_txt = f'<font color=\"warning\">【百度投流数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->测试\nDetail->{ff}'
  353. sendMsgToRot (url, msg_txt)