# -*- codeing = utf-8 -*- # @Time : 2024/2/19 17:29 # @Author : Clown # @File : demo_菜单时段销量数据.py # @Software : PyCharm import pymysql from dateutil.parser import parse from datetime import datetime, timedelta import time from multiprocessing import Process,Queue def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, charset='utf8mb4', local_infile=1, port=port) # print ('\nconnect to mysql server 成功') # print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db # 查询整合数据库表格[order_forms]及[order_formsgoods]中的数据,并输出整合后的数据结果12 def selectDateFromOrderDetail(db,dateNo,tables): orders = tables['orders'] goods = tables['goods'] sql_create_ordergoodsdf = f''' CREATE TEMPORARY TABLE ordergoodsdf AS (SELECT `日期`, `标准商品名称`, `销量`, `平台`, `城市`, `平台门店id`, TRIM(`订单编号`) AS `订单编号` FROM {goods} WHERE `日期` = '{dateNo}');''' sql_create_ordersdf = f''' CREATE TEMPORARY TABLE ordersdf AS (SELECT DATE_FORMAT( `下单时间`, '%H' ) AS `时段`, DATE_FORMAT( `下单时间`, '%H:%i' ) AS `时间`, TRIM(`订单编号`) AS `订单编号` FROM {orders} WHERE `日期` = '{dateNo}');''' sql_create_ordergoodsdf_time = f''' CREATE TEMPORARY TABLE ordergoodsdf_time AS (SELECT ordergoodsdf.`日期`, ordergoodsdf.`标准商品名称`, ordergoodsdf.`平台`, ordergoodsdf.`城市`, CAST( ordergoodsdf.`平台门店id` AS SIGNED ) AS `平台门店id`, CAST( ordersdf.`时段` AS SIGNED ) AS `时段`, ordersdf.`时间`, SUM(ordergoodsdf.`销量`) AS `销量` FROM ordergoodsdf RIGHT JOIN ordersdf ON TRIM(ordergoodsdf.`订单编号`) = TRIM(ordersdf.`订单编号`) GROUP BY ordergoodsdf.`日期`, ordergoodsdf.`标准商品名称`, ordergoodsdf.`平台`, ordergoodsdf.`城市`, `平台门店id`, `时段`, ordersdf.`时间`);''' sql_select = ''' SELECT `日期`, `标准商品名称`, `平台`, `城市`, `平台门店id`, `时段`, SUM( `销量` ) AS `时段销量`, GROUP_CONCAT( `时间销量` SEPARATOR ';' ) AS `时间销量集合` FROM (SELECT `日期`, `标准商品名称`, `平台`, `城市`, `平台门店id`, `时段`, `销量`, CONCAT( `时间`, '_', `销量` ) AS `时间销量` FROM ordergoodsdf_time)t1 GROUP BY `日期`, `标准商品名称`, `平台`, `城市`, `平台门店id`, `时段`; ''' sql_delete = ''' DROP TABLE ordergoodsdf,ordersdf,ordergoodsdf_time; ''' cursor = db.cursor() cursor.execute(sql_create_ordergoodsdf) cursor.execute(sql_create_ordersdf) cursor.execute(sql_create_ordergoodsdf_time) cursor.execute(sql_select) out_put = cursor.fetchall() cursor.execute(sql_delete) db.commit() cursor.close() return out_put # 插入数据库表格[daily_shops_goods_sales_time_split]新数据 def insertDaily_shops_goods_sales_time_split(db,data_in,brandName): cursor = db.cursor() sql = f'''REPLACE INTO daily_shops_goods_sales_time_split VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)''' for row in data_in: value = row + (brandName,) cursor.execute(sql,value) db.commit() cursor.close() # 删除数据库表格[daily_shops_goods_sales_time_split]指定日期数据 def deleteDaily_shops_goods_sales_time_split(db,dateNo): cursor = db.cursor() sql = f'''DELETE FROM daily_shops_goods_sales_time_split WHERE `日期` = {int(dateNo)};''' cursor.execute(sql) db.commit() cursor.close() print(dateNo,'数据已重置') # 创建新表用来存储输出的数据 def create_new_sql_table(db,name,rows_in): cursor = db.cursor() for row in rows_in: sql_in = f'INSERT INTO {name}' # 按照品牌、时间维度压缩数据库表格[daily_shops_goods_sales_time_split] def create_goods_sales_time_split(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,task_no, report_name): db = linkTomySql(host, passwd, db_name, port) cursor = db.cursor() n = 1 sql_c_1 = f'''CREATE TEMPORARY TABLE temp{task_no}_{n} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {int(s_dateNo)} GROUP BY `时段`,`标准商品名称`);''' cursor.execute(sql_c_1) while s_dateNo < e_dateNo: s_dateNo = int((parse(str(s_dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d')) sql_i_n = f'''INSERT INTO temp{task_no}_{n} SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {s_dateNo} GROUP BY `时段`,`标准商品名称`;''' sql_c_n = f'''CREATE TEMPORARY TABLE temp{task_no}_{n+1} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM temp{task_no}_{n} GROUP BY `时段`,`标准商品名称`);''' sql_d_n = f'''DROP TABLE temp{task_no}_{n};''' cursor.execute(sql_i_n) db.commit() cursor.execute(sql_c_n) cursor.execute(sql_d_n) db.commit() n += 1 try: sql_c_out = f'''CREATE TABLE {report_name} AS (SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n} where 1=0);''' cursor.execute(sql_c_out) except: ... sql_in = f'''INSERT INTO {report_name} SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n}''' cursor.execute(sql_in) db.commit() cursor.close() db.close() print(f'temp{task_no}_{n} has Done') #多进程运行代码create_goods_sales_time_split def MRunpool(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,poolSize,maxDays, report_name): days = (parse(str(e_dateNo)).date() - parse(str(s_dateNo)).date()).days if days <= maxDays: task_no = 0 create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no, report_name) else: if days//maxDays < poolSize: d_t_d = maxDays else: d_t_d = days // poolSize date_list = [] d_s = s_dateNo d_e = int((parse(str(e_dateNo)).date() + timedelta(days = - d_t_d)).strftime('%Y%m%d')) while d_s < d_e: date1 = d_s date2 = int((parse(str(d_s)).date() + timedelta(days = d_t_d)).strftime('%Y%m%d')) date_list.append([date1,date2]) d_s = int((parse(str(d_s)).date() + timedelta(days = d_t_d+1)).strftime('%Y%m%d')) date_list.append([d_s,e_dateNo]) print(len(date_list)) ps = [] task_no = 0 for s_e in date_list: print(s_e) p = Process(target = create_goods_sales_time_split, args = (host,passwd,db_name,port,brandName,s_e[0],s_e[1],task_no, report_name)) task_no += 1 p.start() ps.append(p) for i in ps: i.join() if __name__ == '__main__': host = '192.168.110.96' passwd = '111???clown' db_name = 'jxb-h' port = 3306 db = linkTomySql(host, passwd, db_name, port) s_time = time.time() # 对现有数据库进行数据整理,将商品数据销售数据进行时段拆分 if 1==0: tables_list = [{'orders':'order_forms','goods':'order_formsgoods','brandName':'浆小白'}, {'orders':'order_forms_lls','goods':'order_formsgoods_lls','brandName':'楼兰说'}, {'orders':'order_forms_zzx','goods':'order_formsgoods_zzx','brandName':'粥小鲜'}] end_date = 20220501 #结束时间 dateNo = 20220501 #开始时间 while dateNo <= end_date: print(dateNo) deleteDaily_shops_goods_sales_time_split(db, dateNo) for tables in tables_list: brandName = tables['brandName'] out_put = selectDateFromOrderDetail(db, dateNo, tables) try: insertDaily_shops_goods_sales_time_split(db,out_put,brandName) except Exception as e: print(dateNo,f'错误:{e}') dateNo = int((parse(str(dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d')) # 对现商品数据销售数据时段拆分表,进行压缩,仅保留 时段,标准商品名称,时段销量 if 1==0: brandName = '浆小白' s_dateNo = 20230101 e_dateNo = 20230201 task_no = 0 tempName = create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no) print(tempName) # 多进程执行 create_goods_sales_time_split if 1==1: brandName = '浆小白' #指定品牌 s_dateNo = 20230101 #开始日期1 e_dateNo = 20240201 #结束日期 poolSize = 12 #开起进程数(此参数需依据电脑CPU的核心数对应的线程数设定,并不能超过CPU最大线程数) maxDays = 30 #最大间隔天数 report_name = 'goods_sales_time_split_v1' MRunpool(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, poolSize, maxDays, report_name) db.close() time_cnt = time.time() - s_time print(f'共计用时{int(time_cnt)}s,{int(time_cnt/60)}min,{int(time_cnt/3600)}h')