demo_菜单时段销量数据.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # -*- codeing = utf-8 -*-
  2. # @Time : 2024/2/19 17:29
  3. # @Author : Clown
  4. # @File : demo_菜单时段销量数据.py
  5. # @Software : PyCharm
  6. import pymysql
  7. from dateutil.parser import parse
  8. from datetime import datetime, timedelta
  9. import time
  10. from multiprocessing import Process,Queue
  11. def linkTomySql(host, passwd, db_name, port):
  12. '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
  13. try:
  14. # 本地连接为:localhost 服务器连接为:124.222.188.59
  15. db = pymysql.connect (
  16. host=host, user="root",
  17. passwd=passwd,
  18. db=db_name,
  19. charset='utf8mb4',
  20. local_infile=1,
  21. port=port)
  22. # print ('\nconnect to mysql server 成功')
  23. # print ('---------------------------------------')
  24. except:
  25. print ("\ncould not connect to mysql server")
  26. db = "连接失败"
  27. return db
  28. # 查询整合数据库表格[order_forms]及[order_formsgoods]中的数据,并输出整合后的数据结果12
  29. def selectDateFromOrderDetail(db,dateNo,tables):
  30. orders = tables['orders']
  31. goods = tables['goods']
  32. sql_create_ordergoodsdf = f'''
  33. CREATE TEMPORARY TABLE ordergoodsdf AS (SELECT `日期`, `标准商品名称`, `销量`, `平台`, `城市`, `平台门店id`, TRIM(`订单编号`) AS `订单编号` FROM {goods} WHERE `日期` = '{dateNo}');'''
  34. sql_create_ordersdf = f'''
  35. CREATE TEMPORARY TABLE ordersdf AS (SELECT DATE_FORMAT( `下单时间`, '%H' ) AS `时段`, DATE_FORMAT( `下单时间`, '%H:%i' ) AS `时间`, TRIM(`订单编号`) AS `订单编号` FROM {orders} WHERE `日期` = '{dateNo}');'''
  36. sql_create_ordergoodsdf_time = f'''
  37. CREATE TEMPORARY TABLE ordergoodsdf_time AS (SELECT ordergoodsdf.`日期`,
  38. ordergoodsdf.`标准商品名称`,
  39. ordergoodsdf.`平台`,
  40. ordergoodsdf.`城市`,
  41. CAST( ordergoodsdf.`平台门店id` AS SIGNED ) AS `平台门店id`,
  42. CAST( ordersdf.`时段` AS SIGNED ) AS `时段`,
  43. ordersdf.`时间`,
  44. SUM(ordergoodsdf.`销量`) AS `销量`
  45. FROM ordergoodsdf RIGHT JOIN ordersdf ON TRIM(ordergoodsdf.`订单编号`) = TRIM(ordersdf.`订单编号`)
  46. GROUP BY
  47. ordergoodsdf.`日期`,
  48. ordergoodsdf.`标准商品名称`,
  49. ordergoodsdf.`平台`,
  50. ordergoodsdf.`城市`,
  51. `平台门店id`,
  52. `时段`,
  53. ordersdf.`时间`);'''
  54. sql_select = '''
  55. SELECT
  56. `日期`,
  57. `标准商品名称`,
  58. `平台`,
  59. `城市`,
  60. `平台门店id`,
  61. `时段`,
  62. SUM( `销量` ) AS `时段销量`,
  63. GROUP_CONCAT( `时间销量` SEPARATOR ';' ) AS `时间销量集合`
  64. FROM
  65. (SELECT
  66. `日期`,
  67. `标准商品名称`,
  68. `平台`,
  69. `城市`,
  70. `平台门店id`,
  71. `时段`,
  72. `销量`,
  73. CONCAT( `时间`, '_', `销量` ) AS `时间销量`
  74. FROM ordergoodsdf_time)t1
  75. GROUP BY
  76. `日期`,
  77. `标准商品名称`,
  78. `平台`,
  79. `城市`,
  80. `平台门店id`,
  81. `时段`;
  82. '''
  83. sql_delete = '''
  84. DROP TABLE ordergoodsdf,ordersdf,ordergoodsdf_time;
  85. '''
  86. cursor = db.cursor()
  87. cursor.execute(sql_create_ordergoodsdf)
  88. cursor.execute(sql_create_ordersdf)
  89. cursor.execute(sql_create_ordergoodsdf_time)
  90. cursor.execute(sql_select)
  91. out_put = cursor.fetchall()
  92. cursor.execute(sql_delete)
  93. db.commit()
  94. cursor.close()
  95. return out_put
  96. # 插入数据库表格[daily_shops_goods_sales_time_split]新数据
  97. def insertDaily_shops_goods_sales_time_split(db,data_in,brandName):
  98. cursor = db.cursor()
  99. sql = f'''REPLACE INTO daily_shops_goods_sales_time_split VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
  100. for row in data_in:
  101. value = row + (brandName,)
  102. cursor.execute(sql,value)
  103. db.commit()
  104. cursor.close()
  105. # 删除数据库表格[daily_shops_goods_sales_time_split]指定日期数据
  106. def deleteDaily_shops_goods_sales_time_split(db,dateNo):
  107. cursor = db.cursor()
  108. sql = f'''DELETE FROM daily_shops_goods_sales_time_split WHERE `日期` = {int(dateNo)};'''
  109. cursor.execute(sql)
  110. db.commit()
  111. cursor.close()
  112. print(dateNo,'数据已重置')
  113. # 创建新表用来存储输出的数据
  114. def create_new_sql_table(db,name,rows_in):
  115. cursor = db.cursor()
  116. for row in rows_in:
  117. sql_in = f'INSERT INTO {name}'
  118. # 按照品牌、时间维度压缩数据库表格[daily_shops_goods_sales_time_split]
  119. def create_goods_sales_time_split(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,task_no, report_name):
  120. db = linkTomySql(host, passwd, db_name, port)
  121. cursor = db.cursor()
  122. n = 1
  123. sql_c_1 = f'''CREATE TEMPORARY TABLE temp{task_no}_{n} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {int(s_dateNo)} GROUP BY `时段`,`标准商品名称`);'''
  124. cursor.execute(sql_c_1)
  125. while s_dateNo < e_dateNo:
  126. s_dateNo = int((parse(str(s_dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
  127. sql_i_n = f'''INSERT INTO temp{task_no}_{n} SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {s_dateNo} GROUP BY `时段`,`标准商品名称`;'''
  128. sql_c_n = f'''CREATE TEMPORARY TABLE temp{task_no}_{n+1} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM temp{task_no}_{n} GROUP BY `时段`,`标准商品名称`);'''
  129. sql_d_n = f'''DROP TABLE temp{task_no}_{n};'''
  130. cursor.execute(sql_i_n)
  131. db.commit()
  132. cursor.execute(sql_c_n)
  133. cursor.execute(sql_d_n)
  134. db.commit()
  135. n += 1
  136. try:
  137. sql_c_out = f'''CREATE TABLE {report_name} AS (SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n} where 1=0);'''
  138. cursor.execute(sql_c_out)
  139. except:
  140. ...
  141. sql_in = f'''INSERT INTO {report_name} SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n}'''
  142. cursor.execute(sql_in)
  143. db.commit()
  144. cursor.close()
  145. db.close()
  146. print(f'temp{task_no}_{n} has Done')
  147. #多进程运行代码create_goods_sales_time_split
  148. def MRunpool(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,poolSize,maxDays, report_name):
  149. days = (parse(str(e_dateNo)).date() - parse(str(s_dateNo)).date()).days
  150. if days <= maxDays:
  151. task_no = 0
  152. create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no, report_name)
  153. else:
  154. if days//maxDays < poolSize:
  155. d_t_d = maxDays
  156. else:
  157. d_t_d = days // poolSize
  158. date_list = []
  159. d_s = s_dateNo
  160. d_e = int((parse(str(e_dateNo)).date() + timedelta(days = - d_t_d)).strftime('%Y%m%d'))
  161. while d_s < d_e:
  162. date1 = d_s
  163. date2 = int((parse(str(d_s)).date() + timedelta(days = d_t_d)).strftime('%Y%m%d'))
  164. date_list.append([date1,date2])
  165. d_s = int((parse(str(d_s)).date() + timedelta(days = d_t_d+1)).strftime('%Y%m%d'))
  166. date_list.append([d_s,e_dateNo])
  167. print(len(date_list))
  168. ps = []
  169. task_no = 0
  170. for s_e in date_list:
  171. print(s_e)
  172. p = Process(target = create_goods_sales_time_split, args = (host,passwd,db_name,port,brandName,s_e[0],s_e[1],task_no, report_name))
  173. task_no += 1
  174. p.start()
  175. ps.append(p)
  176. for i in ps:
  177. i.join()
  178. if __name__ == '__main__':
  179. host = '192.168.110.96'
  180. passwd = '111???clown'
  181. db_name = 'jxb-h'
  182. port = 3306
  183. db = linkTomySql(host, passwd, db_name, port)
  184. s_time = time.time()
  185. # 对现有数据库进行数据整理,将商品数据销售数据进行时段拆分
  186. if 1==0:
  187. tables_list = [{'orders':'order_forms','goods':'order_formsgoods','brandName':'浆小白'},
  188. {'orders':'order_forms_lls','goods':'order_formsgoods_lls','brandName':'楼兰说'},
  189. {'orders':'order_forms_zzx','goods':'order_formsgoods_zzx','brandName':'粥小鲜'}]
  190. end_date = 20220501 #结束时间
  191. dateNo = 20220501 #开始时间
  192. while dateNo <= end_date:
  193. print(dateNo)
  194. deleteDaily_shops_goods_sales_time_split(db, dateNo)
  195. for tables in tables_list:
  196. brandName = tables['brandName']
  197. out_put = selectDateFromOrderDetail(db, dateNo, tables)
  198. try:
  199. insertDaily_shops_goods_sales_time_split(db,out_put,brandName)
  200. except Exception as e:
  201. print(dateNo,f'错误:{e}')
  202. dateNo = int((parse(str(dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
  203. # 对现商品数据销售数据时段拆分表,进行压缩,仅保留 时段,标准商品名称,时段销量
  204. if 1==0:
  205. brandName = '浆小白'
  206. s_dateNo = 20230101
  207. e_dateNo = 20230201
  208. task_no = 0
  209. tempName = create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no)
  210. print(tempName)
  211. # 多进程执行 create_goods_sales_time_split
  212. if 1==1:
  213. brandName = '浆小白' #指定品牌123
  214. s_dateNo = 20230101 #开始日期123
  215. e_dateNo = 20240201 #结束日期123
  216. poolSize = 12 #开起进程数(此参数需依据电脑CPU的核心数对应的线程数设定,并不能超过CPU最大线程数)
  217. maxDays = 30 #最大间隔天数
  218. report_name = 'goods_sales_time_split_v1'
  219. MRunpool(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, poolSize, maxDays, report_name)
  220. db.close()
  221. time_cnt = time.time() - s_time
  222. print(f'共计用时{int(time_cnt)}s,{int(time_cnt/60)}min,{int(time_cnt/3600)}h')