|
@@ -0,0 +1,242 @@
|
|
|
+# -*- codeing = utf-8 -*-
|
|
|
+# @Time : 2024/2/19 17:29
|
|
|
+# @Author : Clown
|
|
|
+# @File : demo_菜单时段销量数据.py
|
|
|
+# @Software : PyCharm
|
|
|
+import pymysql
|
|
|
+from dateutil.parser import parse
|
|
|
+from datetime import datetime, timedelta
|
|
|
+import time
|
|
|
+from multiprocessing import Process,Queue
|
|
|
+
|
|
|
+def linkTomySql(host, passwd, db_name, port):
|
|
|
+ '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
|
|
|
+ try:
|
|
|
+ # 本地连接为:localhost 服务器连接为:124.222.188.59
|
|
|
+ db = pymysql.connect (
|
|
|
+ host=host, user="root",
|
|
|
+ passwd=passwd,
|
|
|
+ db=db_name,
|
|
|
+ charset='utf8mb4',
|
|
|
+ local_infile=1,
|
|
|
+ port=port)
|
|
|
+ # print ('\nconnect to mysql server 成功')
|
|
|
+ # print ('---------------------------------------')
|
|
|
+
|
|
|
+ except:
|
|
|
+ print ("\ncould not connect to mysql server")
|
|
|
+ db = "连接失败"
|
|
|
+ return db
|
|
|
+
|
|
|
+# 查询整合数据库表格[order_forms]及[order_formsgoods]中的数据,并输出整合后的数据结果12
|
|
|
+def selectDateFromOrderDetail(db,dateNo,tables):
|
|
|
+ orders = tables['orders']
|
|
|
+ goods = tables['goods']
|
|
|
+ sql_create_ordergoodsdf = f'''
|
|
|
+ CREATE TEMPORARY TABLE ordergoodsdf AS (SELECT `日期`, `标准商品名称`, `销量`, `平台`, `城市`, `平台门店id`, TRIM(`订单编号`) AS `订单编号` FROM {goods} WHERE `日期` = '{dateNo}');'''
|
|
|
+ sql_create_ordersdf = f'''
|
|
|
+ CREATE TEMPORARY TABLE ordersdf AS (SELECT DATE_FORMAT( `下单时间`, '%H' ) AS `时段`, DATE_FORMAT( `下单时间`, '%H:%i' ) AS `时间`, TRIM(`订单编号`) AS `订单编号` FROM {orders} WHERE `日期` = '{dateNo}');'''
|
|
|
+ sql_create_ordergoodsdf_time = f'''
|
|
|
+ CREATE TEMPORARY TABLE ordergoodsdf_time AS (SELECT ordergoodsdf.`日期`,
|
|
|
+ ordergoodsdf.`标准商品名称`,
|
|
|
+ ordergoodsdf.`平台`,
|
|
|
+ ordergoodsdf.`城市`,
|
|
|
+ CAST( ordergoodsdf.`平台门店id` AS SIGNED ) AS `平台门店id`,
|
|
|
+ CAST( ordersdf.`时段` AS SIGNED ) AS `时段`,
|
|
|
+ ordersdf.`时间`,
|
|
|
+ SUM(ordergoodsdf.`销量`) AS `销量`
|
|
|
+ FROM ordergoodsdf RIGHT JOIN ordersdf ON TRIM(ordergoodsdf.`订单编号`) = TRIM(ordersdf.`订单编号`)
|
|
|
+ GROUP BY
|
|
|
+ ordergoodsdf.`日期`,
|
|
|
+ ordergoodsdf.`标准商品名称`,
|
|
|
+ ordergoodsdf.`平台`,
|
|
|
+ ordergoodsdf.`城市`,
|
|
|
+ `平台门店id`,
|
|
|
+ `时段`,
|
|
|
+ ordersdf.`时间`);'''
|
|
|
+ sql_select = '''
|
|
|
+ SELECT
|
|
|
+ `日期`,
|
|
|
+ `标准商品名称`,
|
|
|
+ `平台`,
|
|
|
+ `城市`,
|
|
|
+ `平台门店id`,
|
|
|
+ `时段`,
|
|
|
+ SUM( `销量` ) AS `时段销量`,
|
|
|
+ GROUP_CONCAT( `时间销量` SEPARATOR ';' ) AS `时间销量集合`
|
|
|
+ FROM
|
|
|
+ (SELECT
|
|
|
+ `日期`,
|
|
|
+ `标准商品名称`,
|
|
|
+ `平台`,
|
|
|
+ `城市`,
|
|
|
+ `平台门店id`,
|
|
|
+ `时段`,
|
|
|
+ `销量`,
|
|
|
+ CONCAT( `时间`, '_', `销量` ) AS `时间销量`
|
|
|
+ FROM ordergoodsdf_time)t1
|
|
|
+ GROUP BY
|
|
|
+ `日期`,
|
|
|
+ `标准商品名称`,
|
|
|
+ `平台`,
|
|
|
+ `城市`,
|
|
|
+ `平台门店id`,
|
|
|
+ `时段`;
|
|
|
+ '''
|
|
|
+ sql_delete = '''
|
|
|
+ DROP TABLE ordergoodsdf,ordersdf,ordergoodsdf_time;
|
|
|
+ '''
|
|
|
+ cursor = db.cursor()
|
|
|
+ cursor.execute(sql_create_ordergoodsdf)
|
|
|
+ cursor.execute(sql_create_ordersdf)
|
|
|
+ cursor.execute(sql_create_ordergoodsdf_time)
|
|
|
+ cursor.execute(sql_select)
|
|
|
+ out_put = cursor.fetchall()
|
|
|
+ cursor.execute(sql_delete)
|
|
|
+ db.commit()
|
|
|
+ cursor.close()
|
|
|
+ return out_put
|
|
|
+
|
|
|
+# 插入数据库表格[daily_shops_goods_sales_time_split]新数据
|
|
|
+def insertDaily_shops_goods_sales_time_split(db,data_in,brandName):
|
|
|
+ cursor = db.cursor()
|
|
|
+ sql = f'''REPLACE INTO daily_shops_goods_sales_time_split VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
|
|
|
+ for row in data_in:
|
|
|
+ value = row + (brandName,)
|
|
|
+ cursor.execute(sql,value)
|
|
|
+ db.commit()
|
|
|
+ cursor.close()
|
|
|
+
|
|
|
+# 删除数据库表格[daily_shops_goods_sales_time_split]指定日期数据
|
|
|
+def deleteDaily_shops_goods_sales_time_split(db,dateNo):
|
|
|
+ cursor = db.cursor()
|
|
|
+ sql = f'''DELETE FROM daily_shops_goods_sales_time_split WHERE `日期` = {int(dateNo)};'''
|
|
|
+ cursor.execute(sql)
|
|
|
+ db.commit()
|
|
|
+ cursor.close()
|
|
|
+ print(dateNo,'数据已重置')
|
|
|
+
|
|
|
+# 创建新表用来存储输出的数据
|
|
|
+def create_new_sql_table(db,name,rows_in):
|
|
|
+ cursor = db.cursor()
|
|
|
+
|
|
|
+ for row in rows_in:
|
|
|
+ sql_in = f'INSERT INTO {name}'
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# 按照品牌、时间维度压缩数据库表格[daily_shops_goods_sales_time_split]
|
|
|
+def create_goods_sales_time_split(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,task_no, report_name):
|
|
|
+ db = linkTomySql(host, passwd, db_name, port)
|
|
|
+ cursor = db.cursor()
|
|
|
+ n = 1
|
|
|
+ sql_c_1 = f'''CREATE TEMPORARY TABLE temp{task_no}_{n} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {int(s_dateNo)} GROUP BY `时段`,`标准商品名称`);'''
|
|
|
+ cursor.execute(sql_c_1)
|
|
|
+
|
|
|
+ while s_dateNo < e_dateNo:
|
|
|
+ s_dateNo = int((parse(str(s_dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
|
|
|
+ sql_i_n = f'''INSERT INTO temp{task_no}_{n} SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {s_dateNo} GROUP BY `时段`,`标准商品名称`;'''
|
|
|
+ sql_c_n = f'''CREATE TEMPORARY TABLE temp{task_no}_{n+1} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM temp{task_no}_{n} GROUP BY `时段`,`标准商品名称`);'''
|
|
|
+ sql_d_n = f'''DROP TABLE temp{task_no}_{n};'''
|
|
|
+ cursor.execute(sql_i_n)
|
|
|
+ db.commit()
|
|
|
+ cursor.execute(sql_c_n)
|
|
|
+ cursor.execute(sql_d_n)
|
|
|
+ db.commit()
|
|
|
+ n += 1
|
|
|
+ try:
|
|
|
+ sql_c_out = f'''CREATE TABLE {report_name} AS (SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n} where 1=0);'''
|
|
|
+ cursor.execute(sql_c_out)
|
|
|
+ except:
|
|
|
+ ...
|
|
|
+ sql_in = f'''INSERT INTO {report_name} SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n}'''
|
|
|
+ cursor.execute(sql_in)
|
|
|
+ db.commit()
|
|
|
+ cursor.close()
|
|
|
+ db.close()
|
|
|
+ print(f'temp{task_no}_{n} has Done')
|
|
|
+
|
|
|
+#多进程运行代码create_goods_sales_time_split
|
|
|
+def MRunpool(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,poolSize,maxDays, report_name):
|
|
|
+ days = (parse(str(e_dateNo)).date() - parse(str(s_dateNo)).date()).days
|
|
|
+ if days <= maxDays:
|
|
|
+ task_no = 0
|
|
|
+ create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no, report_name)
|
|
|
+ else:
|
|
|
+ if days//maxDays < poolSize:
|
|
|
+ d_t_d = maxDays
|
|
|
+ else:
|
|
|
+ d_t_d = days // poolSize
|
|
|
+ date_list = []
|
|
|
+ d_s = s_dateNo
|
|
|
+ d_e = int((parse(str(e_dateNo)).date() + timedelta(days = - d_t_d)).strftime('%Y%m%d'))
|
|
|
+ while d_s < d_e:
|
|
|
+ date1 = d_s
|
|
|
+ date2 = int((parse(str(d_s)).date() + timedelta(days = d_t_d)).strftime('%Y%m%d'))
|
|
|
+ date_list.append([date1,date2])
|
|
|
+ d_s = int((parse(str(d_s)).date() + timedelta(days = d_t_d+1)).strftime('%Y%m%d'))
|
|
|
+ date_list.append([d_s,e_dateNo])
|
|
|
+ print(len(date_list))
|
|
|
+ ps = []
|
|
|
+ task_no = 0
|
|
|
+ for s_e in date_list:
|
|
|
+ print(s_e)
|
|
|
+ p = Process(target = create_goods_sales_time_split, args = (host,passwd,db_name,port,brandName,s_e[0],s_e[1],task_no, report_name))
|
|
|
+ task_no += 1
|
|
|
+ p.start()
|
|
|
+ ps.append(p)
|
|
|
+ for i in ps:
|
|
|
+ i.join()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ host = '192.168.110.96'
|
|
|
+ passwd = '111???clown'
|
|
|
+ db_name = 'jxb-h'
|
|
|
+ port = 3306
|
|
|
+ db = linkTomySql(host, passwd, db_name, port)
|
|
|
+ s_time = time.time()
|
|
|
+
|
|
|
+ # 对现有数据库进行数据整理,将商品数据销售数据进行时段拆分
|
|
|
+ if 1==0:
|
|
|
+ tables_list = [{'orders':'order_forms','goods':'order_formsgoods','brandName':'浆小白'},
|
|
|
+ {'orders':'order_forms_lls','goods':'order_formsgoods_lls','brandName':'楼兰说'},
|
|
|
+ {'orders':'order_forms_zzx','goods':'order_formsgoods_zzx','brandName':'粥小鲜'}]
|
|
|
+
|
|
|
+ end_date = 20220501 #结束时间
|
|
|
+ dateNo = 20220501 #开始时间
|
|
|
+
|
|
|
+ while dateNo <= end_date:
|
|
|
+ print(dateNo)
|
|
|
+ deleteDaily_shops_goods_sales_time_split(db, dateNo)
|
|
|
+ for tables in tables_list:
|
|
|
+ brandName = tables['brandName']
|
|
|
+ out_put = selectDateFromOrderDetail(db, dateNo, tables)
|
|
|
+ try:
|
|
|
+ insertDaily_shops_goods_sales_time_split(db,out_put,brandName)
|
|
|
+ except Exception as e:
|
|
|
+ print(dateNo,f'错误:{e}')
|
|
|
+ dateNo = int((parse(str(dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
|
|
|
+
|
|
|
+ # 对现商品数据销售数据时段拆分表,进行压缩,仅保留 时段,标准商品名称,时段销量
|
|
|
+ if 1==0:
|
|
|
+ brandName = '浆小白'
|
|
|
+ s_dateNo = 20230101
|
|
|
+ e_dateNo = 20230201
|
|
|
+ task_no = 0
|
|
|
+ tempName = create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no)
|
|
|
+ print(tempName)
|
|
|
+
|
|
|
+ # 多进程执行 create_goods_sales_time_split
|
|
|
+ if 1==1:
|
|
|
+ brandName = '浆小白' #指定品牌
|
|
|
+ s_dateNo = 20230101 #开始日期1
|
|
|
+ e_dateNo = 20240201 #结束日期
|
|
|
+ poolSize = 12 #开起进程数(此参数需依据电脑CPU的核心数对应的线程数设定,并不能超过CPU最大线程数)
|
|
|
+ maxDays = 30 #最大间隔天数
|
|
|
+ report_name = 'goods_sales_time_split_v1'
|
|
|
+ MRunpool(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, poolSize, maxDays, report_name)
|
|
|
+ db.close()
|
|
|
+ time_cnt = time.time() - s_time
|
|
|
+ print(f'共计用时{int(time_cnt)}s,{int(time_cnt/60)}min,{int(time_cnt/3600)}h')
|