test_FlowByHour_M.py 18 KB


  1. # -*- codeing = utf-8 -*-
  2. # @Time : 2022/7/5 17:45
  3. # @Author : Clown
  4. # @File : test_FlowByHour_M.py
  5. # @Software : PyCharm
  6. import pymysql
  7. import json
  8. import requests
  9. import csv
  10. import os
  11. import time
  12. from datetime import datetime,timedelta
  13. import sys
  14. import traceback
  15. from all_key_table import update_key_value_pair
  16. import schedule
  17. def linkTomySql(host,passwd,db_name):
  18. '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
  19. try:
  20. #本地连接为:localhost 服务器连接为:124.222.188.59
  21. db = pymysql.connect(
  22. host=host, user="root",
  23. passwd=passwd,
  24. db=db_name,
  25. charset='utf8mb4',
  26. local_infile=1,
  27. port=63306)
  28. print('\nconnect to mysql server 成功')
  29. print('---------------------------------------')
  30. except:
  31. print("\ncould not connect to mysql server")
  32. db = "连接失败"
  33. return db
  34. def read_key_value_pair(db,brand_name,wm_plate,owner):
  35. '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
  36. db:数据库,
  37. brand_name:品牌名,
  38. wm_plate:外卖平台MEITUAN或ELEME,
  39. owner:账号权限all或one
  40. '''
  41. cursor = db.cursor()
  42. sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
  43. cursor.execute(sql)
  44. pair = json.loads(cursor.fetchall()[0][0])
  45. return pair
  46. def loadDataInfile(file_path ,csv_name,db,table_name):
  47. '''v2新增 执行csv文件导入数据库,注意ENCLOSED BY '"' 防止误判'''
  48. cursor = db.cursor()
  49. sql0 = f"SET global local_infile = 1;"
  50. cursor.execute(sql0)
  51. db.commit()
  52. sql1 = f'''LOAD DATA LOCAL INFILE '{file_path+'/'+csv_name}' REPLACE INTO TABLE {table_name} FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\r\n' IGNORE 1 LINES ;'''
  53. cursor.execute(sql1)
  54. db.commit()
  55. sql2= f"SET global local_infile = 0;"
  56. cursor.execute(sql2)
  57. db.commit()
  58. cursor.close()
  59. def update_scriptrunninglogs(log_date,log_time,running_Time,operation_results,table_name,db):
  60. '''将脚本执行情况发送至【脚本执行日志】表
  61. 配套内容
  62. from datetime import datetime
  63. import sys
  64. import traceback
  65. time_begin = time.time()
  66. log_date = datetime.now().strftime("%Y-%m-%d") # 日志记录日期
  67. log_time = datetime.now().strftime('%H:%M:%S') # 日志记录时间
  68. df_name = '数据库表名'
  69. try:
  70. operation_results = '执行成功'
  71. except Exception as e:
  72. operation_results = traceback.format_exc()
  73. running_Time = time.time() - time_begin # 脚本运行时长
  74. '''
  75. try:
  76. sql = "INSERT INTO scriptrunninglogs_test VALUES (%s,%s,%s,%s,%s,%s,%s)"#date, time, running_Time, script_name, operation_results, table_name, id
  77. script_name = os.path.basename(sys.argv[0])
  78. value = (log_date,log_time,running_Time,script_name,operation_results,table_name,0)
  79. cursor = db.cursor()
  80. cursor.execute(sql, value) # 执行sql语句
  81. db.commit()
  82. cursor.close()
  83. except:
  84. result = "数据库连接失败"
  85. print(result)
  86. def reset_mysql_table(db,table_name):
  87. cursor = db.cursor()
  88. sql = f'DELETE FROM {table_name} WHERE log_date = CURRENT_DATE() '
  89. cursor.execute(sql)
  90. db.commit()
  91. print(f'【{table_name}】当日数据重置成功,可导入新数据')
  92. def get_shops_info_to_list(db,brand_name,wm_plate,key_name):
  93. '''获取门店信息表【shops_info_to_list】中的信息,
  94. 并返回表单shops_info_df【shop_id,shop_name,update_datetime,info_for_script】
  95. db:数据库信息
  96. brand_name:品牌
  97. wm_plate:外卖平台
  98. key_name:关键信息字段名,如无填‘’,如有填对应键值对的key
  99. '''
  100. cursor = db.cursor()
  101. if key_name == '':
  102. sql = f'SELECT shop_id,shop_name,update_datetime FROM shops_info_for_script WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}";'
  103. cursor.execute(sql)
  104. shops_info = cursor.fetchall()
  105. shops_info_df = []
  106. for shop_info in shops_info:
  107. shop_info_dict = {'shop_id':shop_info[0],
  108. 'shop_name':shop_info[1],
  109. 'update_datetime':shop_info[2],
  110. 'info_for_script':''}
  111. shops_info_df.append(shop_info_dict)
  112. return shops_info_df
  113. else:
  114. sql = f'SELECT shop_id,shop_name,update_datetime,info_for_script -> "$.{key_name}" FROM shops_info_for_script WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}";'
  115. cursor.execute(sql)
  116. shops_info = cursor.fetchall()
  117. shops_info_df = []
  118. for shop_info in shops_info:
  119. shop_info_dict = {'shop_id':shop_info[0],
  120. 'shop_name':shop_info[1],
  121. 'update_datetime':shop_info[2],
  122. f'{key_name}':shop_info[3]}
  123. shops_info_df.append(shop_info_dict)
  124. return shops_info_df
  125. def test_FlowByHour_M(file_path,brand_name,mt_pair,shops_info_df):
  126. '''
  127. return: error or (csv_name_storage, csv_name_application)
  128. v2优化:增加批量错误判断,方便终止此函数
  129. '''
  130. #创建储存表
  131. csv_name_storage = 'test_FlowByHour_M_storage.csv'
  132. title_storage = ['script_name', 'brand_name', 'shop_id', 'wm_plate', 'date', 'time', 'data_details', 'id']
  133. f_storage = open(file_path+'/'+csv_name_storage, mode='w', encoding='utf-8-sig',newline="")
  134. writer_storage = csv.writer(f_storage)
  135. writer_storage.writerow(title_storage)
  136. #创建生产表
  137. csv_name_application = 'test_FlowByHour_M_application.csv'
  138. title_application = ['id', 'log_date', 'date_of_data', 'brand_name', 'shop_id', 'shop_name', 'timeSign',
  139. 'today_exp_pv', 'today_exp_uv', 'today_clk_pv', 'today_clk_uv', 'today_valid_order_cnt', 'today_valid_order_user_cnt', 'today_clk_exp_rate', 'today_ord_clk_rate',
  140. 'last_day_exp_pv', 'last_day_exp_uv', 'last_day_clk_pv', 'last_day_clk_uv', 'last_day_valid_order_cnt', 'last_day_valid_order_user_cnt', 'last_day_clk_exp_rate', 'last_day_ord_clk_rate',
  141. 'totalExposureAmount', 'uvCount', 'totalCost', 'averageCost']
  142. #['id', '记录日期', '数据所在日期', '门店id', '店名', '时段', '曝光次数', '点击率%', '点击次数', '单次点击花费', '时段竞价花费']
  143. f_application = open(file_path + '/' + csv_name_application, mode='w', encoding='utf-8-sig', newline="")
  144. writer_application = csv.writer(f_application)
  145. writer_application.writerow(title_application)
  146. #通用参数
  147. script_name = 'test_FlowByHour_M'
  148. wm_plate = 'MEITUAN'
  149. date = datetime.now().strftime('%Y-%m-%d')
  150. time = datetime.now().strftime('%H:%M:%S')
  151. beginTime = (datetime.today()).strftime('%Y-%m-%d') #2022-06-23
  152. endTime = (datetime.today() + timedelta(days=-1)).strftime('%Y-%m-%d') #2022-06-23
  153. timeSign = datetime.now().strftime('%H:%M') #20220706优化
  154. # shops_cnt = len(shops_info_df) # v2优化
  155. shops_cnt = 1
  156. check_error_cnt = 0 # v2优化
  157. acctId = mt_pair['data']['session']['acctId']
  158. token = mt_pair['data']['session']['token']
  159. #门店信息
  160. # for shop_info in shops_info_df:
  161. # shop_id = shop_info['shop_id']
  162. # shop_name = shop_info['shop_name']
  163. # info_update_datetime = shop_info['update_datetime']
  164. shop_id = 8038750
  165. shop_name = '浆小白·粉面粥·豆浆夜市(周浦万达店)'
  166. info_update_datetime = datetime.today()
  167. #联网获取信息
  168. url_flow = f'https://waimaieapp.meituan.com/bizdata/flow/single/pc/overview?acctId={acctId}&wmPoiId={shop_id}&durationType=5&tabType=3&circleType=1'
  169. cookie = f'acctId={acctId}; ' \
  170. f'token={token}; ' \
  171. f'wmPoiId={shop_id}; '
  172. headers_flow = {
  173. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
  174. 'cookie': cookie}
  175. url_wager = f'https://waimaieapp.meituan.com/ad/v3/statistics/cpc/today/info?acctId={acctId}&wmPoiId={shop_id}&token={token}&platform=0&bizad_cityId=310100&bizad_second_city_id=310100&bizad_third_city_id=310115'
  176. headers_wager = {
  177. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36'}
  178. if int(info_update_datetime.strftime("%Y%m%d")) >= int(datetime.strptime(beginTime,"%Y-%m-%d").strftime("%Y%m%d")):
  179. try:
  180. data_details_flow = requests.get(url_flow, headers= headers_flow).json()
  181. data_details_wager = requests.get(url_wager, headers= headers_wager).json()
  182. data_details = {"data_flow":data_details_flow,
  183. "data_wager":data_details_wager}
  184. #储存信息至csv存储表
  185. row_storage = [script_name, brand_name, shop_id, wm_plate, date, time, json.dumps(data_details, ensure_ascii= False), 0]
  186. #解析数据存储至csv生产表
  187. try:
  188. data_all_flow = data_details_flow['data']
  189. today_exp_pv = data_all_flow['exposureCnt']['base'] #曝光次数
  190. today_exp_uv = data_all_flow['exposureNum']['base'] #曝光人数
  191. today_clk_pv = data_all_flow['visitCnt']['base'] #进店次数
  192. today_clk_uv = data_all_flow['visitNum']['base'] #进店人数
  193. today_valid_order_cnt = data_all_flow['orderCnt']['base'] #下单次数
  194. today_valid_order_user_cnt = data_all_flow['orderNum']['base'] #下单人数
  195. today_clk_exp_rate = data_all_flow['visitRate']['base'] #进店转化率
  196. today_ord_clk_rate = data_all_flow['orderRate']['base'] #下单转化率
  197. last_day_exp_pv = today_exp_pv - data_all_flow['exposureCnt']['baseDelta'] # 曝光次数
  198. last_day_exp_uv = today_exp_uv - data_all_flow['exposureNum']['baseDelta'] # 曝光人数
  199. last_day_clk_pv = today_clk_pv - data_all_flow['visitCnt']['baseDelta'] # 进店次数
  200. last_day_clk_uv = today_clk_uv - data_all_flow['visitNum']['baseDelta'] # 进店人数
  201. last_day_valid_order_cnt = today_valid_order_cnt - data_all_flow['orderCnt']['baseDelta'] # 下单次数
  202. last_day_valid_order_user_cnt = today_valid_order_user_cnt - data_all_flow['orderNum']['baseDelta'] # 下单人数
  203. last_day_clk_exp_rate = today_clk_exp_rate - data_all_flow['visitRate']['baseDelta'] # 进店转化率
  204. last_day_ord_clk_rate = today_ord_clk_rate - data_all_flow['orderRate']['baseDelta'] # 下单转化率
  205. data_all_wager = data_details_wager['data']
  206. totalExposureAmount = data_all_wager['showCount'] #累计曝光次数
  207. uvCount = data_all_wager['clickCount'] #竞价进店次数
  208. totalCost = data_all_wager['cost'] #累计竞价花费
  209. averageCost = data_all_wager['avgPrice'] #单次点击花费
  210. # 解析数据存储至csv生产表
  211. row_application = [0,date, beginTime, brand_name, shop_id, shop_name, timeSign,
  212. today_exp_pv, today_exp_uv, today_clk_pv, today_clk_uv, today_valid_order_cnt, today_valid_order_user_cnt, today_clk_exp_rate, today_ord_clk_rate,
  213. last_day_exp_pv, last_day_exp_uv, last_day_clk_pv, last_day_clk_uv, last_day_valid_order_cnt, last_day_valid_order_user_cnt, last_day_clk_exp_rate, last_day_ord_clk_rate,
  214. totalExposureAmount, uvCount, totalCost, averageCost]
  215. writer_application.writerow(row_application)
  216. except:
  217. # 解析数据存储至csv生产表
  218. row_application = [0, date, beginTime, brand_name, shop_id, shop_name, timeSign, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] #20220706优化
  219. writer_application.writerow(row_application)
  220. except:
  221. check_error_cnt += 1 # v2优化
  222. data_details = {'result': '此门店未打开营业or账户号已无效or相关界面无参数,请核实!'}
  223. # 储存信息至csv存储表
  224. row_storage = [script_name, brand_name, shop_id, wm_plate, date, time, json.dumps(data_details, ensure_ascii=False), 0]
  225. # 解析数据存储至csv生产表
  226. row_application = [0, date, beginTime, brand_name, shop_id, shop_name, timeSign, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] #20220706优化
  227. writer_application.writerow(row_application)
  228. else:
  229. check_error_cnt += 1 # v2优化
  230. data_details = {'result':'此门店未打开营业or账户号已无效or相关界面无参数,请核实!'}
  231. # 储存信息至csv存储表
  232. row_storage = [script_name, brand_name, shop_id, wm_plate, date, time,json.dumps(data_details, ensure_ascii=False), 0]
  233. # 解析数据存储至csv生产表
  234. row_application = [0, date, beginTime, brand_name, shop_id, shop_name, timeSign, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] #20220706优化
  235. writer_application.writerow(row_application)
  236. writer_storage.writerow(row_storage)
  237. if shops_cnt == check_error_cnt:#v2优化
  238. e = 0/0 #v2优化
  239. else: #v2优化
  240. print(f'{brand_name}\n【{csv_name_storage}】\n【{csv_name_application}】\n加载成功!\n')
  241. return csv_name_storage,csv_name_application
  242. def running():
  243. # 日志参数
  244. time_begin = time.time()
  245. log_date = datetime.now().strftime("%Y-%m-%d") # 日志记录日期
  246. log_time = datetime.now().strftime('%H:%M:%S') # 日志记录时间
  247. # 通用参数
  248. host = 'localhost'
  249. passwd = '111???clown'
  250. db_name = 'hexingxing'
  251. # brand_names = ['粥小鲜','浆小白']
  252. wm_plate = 'MEITUAN'
  253. owner = 'all'
  254. key_name = ''
  255. file_path = 'C:/Users/ClownHe/Desktop/goods'
  256. table_name = '[test_flowbyhour_m] Storage And App'
  257. table_name_storage = 'data_storage_test_flowbyhour_m'
  258. table_name_app = 'data_app_test_flowbyhour_m'
  259. operation_results = '执行成功'
  260. brand_name = '浆小白'
  261. # 脚本执行
  262. db = linkTomySql(host, passwd, db_name)
  263. # for brand_name in brand_names:
  264. mt_pair = read_key_value_pair(db, brand_name, wm_plate, owner)
  265. # shops_info_df = get_shops_info_to_list(db,brand_name,wm_plate,key_name)
  266. shops_info_df = 0
  267. try:
  268. try:
  269. csv_name_storage, csv_name_application = test_FlowByHour_M(file_path, brand_name, mt_pair,
  270. shops_info_df)
  271. except Exception as x:
  272. print('重载关键表',x)
  273. update_key_value_pair(db, brand_name, wm_plate, owner, '') # 报错更新【all_key_table】
  274. csv_name_storage, csv_name_application = test_FlowByHour_M(file_path, brand_name, mt_pair,
  275. shops_info_df)
  276. operation_results = operation_results + f',更新【{brand_name}】【{wm_plate}】【all_key_table】'
  277. loadDataInfile(file_path, csv_name_storage, db, table_name_storage)
  278. print(f'【{csv_name_storage}】导入成功')
  279. loadDataInfile(file_path, csv_name_application, db, table_name_app)
  280. print(f'【{csv_name_application}】导入成功')
  281. print(f'本次脚本执行总用时{(time.time() - time_begin) / 60} min')
  282. except Exception as e:
  283. operation_results = traceback.format_exc()
  284. running_Time = time.time() - time_begin # 脚本运行时长
  285. db = linkTomySql(host, passwd, db_name)
  286. update_scriptrunninglogs(log_date, log_time, running_Time, operation_results, table_name, db)
  287. print('Working done!')
  288. time_kill = time.time() - time_begin
  289. return time_kill
  290. if __name__ == '__main__':
  291. # schedule.every().hour.at("00:10").do(running) # 每个小时的n分n秒执行一次
  292. # schedule.every().hour.at("05:10").do(running) # 每个小时的n分n秒执行一次
  293. # schedule.every().hour.at("10:10").do(running) # 每个小时的n分n秒执行一次
  294. # schedule.every().hour.at("15:10").do(running) # 每个小时的n分n秒执行一次
  295. # schedule.every().hour.at("20:10").do(running) # 每个小时的n分n秒执行一次
  296. # schedule.every().hour.at("25:10").do(running) # 每个小时的n分n秒执行一次
  297. # schedule.every().hour.at("30:10").do(running) # 每个小时的n分n秒执行一次
  298. # schedule.every().hour.at("35:10").do(running) # 每个小时的n分n秒执行一次
  299. # schedule.every().hour.at("40:10").do(running) # 每个小时的n分n秒执行一次
  300. # schedule.every().hour.at("45:10").do(running) # 每个小时的n分n秒执行一次
  301. # schedule.every().hour.at("50:10").do(running) # 每个小时的n分n秒执行一次
  302. # schedule.every().hour.at("57:10").do(running) # 每个小时的n分n秒执行一次
  303. schedule.every().minute.at(":05").do(running) # 每个小时的n分n秒执行一次
  304. schedule.every().minute.at(":15").do(running) # 每个小时的n分n秒执行一次
  305. schedule.every().minute.at(":25").do(running) # 每个小时的n分n秒执行一次
  306. schedule.every().minute.at(":35").do(running) # 每个小时的n分n秒执行一次
  307. schedule.every().minute.at(":45").do(running) # 每个小时的n分n秒执行一次
  308. schedule.every().minute.at(":55").do(running) # 每个小时的n分n秒执行一次
  309. while True:
  310. schedule.run_pending() # 运行所有可运行的任务