123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- # -*- codeing = utf-8 -*-
- # @Time : 2024/2/19 17:29
- # @Author : Clown
- # @File : demo_菜单时段销量数据.py
- # @Software : PyCharm
- import pymysql
- from dateutil.parser import parse
- from datetime import datetime, timedelta
- import time
- from multiprocessing import Process,Queue
- def linkTomySql(host, passwd, db_name, port):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- # 本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect (
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- charset='utf8mb4',
- local_infile=1,
- port=port)
- # print ('\nconnect to mysql server 成功')
- # print ('---------------------------------------')
- except:
- print ("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- # 查询整合数据库表格[order_forms]及[order_formsgoods]中的数据,并输出整合后的数据结果12
- def selectDateFromOrderDetail(db,dateNo,tables):
- orders = tables['orders']
- goods = tables['goods']
- sql_create_ordergoodsdf = f'''
- CREATE TEMPORARY TABLE ordergoodsdf AS (SELECT `日期`, `标准商品名称`, `销量`, `平台`, `城市`, `平台门店id`, TRIM(`订单编号`) AS `订单编号` FROM {goods} WHERE `日期` = '{dateNo}');'''
- sql_create_ordersdf = f'''
- CREATE TEMPORARY TABLE ordersdf AS (SELECT DATE_FORMAT( `下单时间`, '%H' ) AS `时段`, DATE_FORMAT( `下单时间`, '%H:%i' ) AS `时间`, TRIM(`订单编号`) AS `订单编号` FROM {orders} WHERE `日期` = '{dateNo}');'''
- sql_create_ordergoodsdf_time = f'''
- CREATE TEMPORARY TABLE ordergoodsdf_time AS (SELECT ordergoodsdf.`日期`,
- ordergoodsdf.`标准商品名称`,
- ordergoodsdf.`平台`,
- ordergoodsdf.`城市`,
- CAST( ordergoodsdf.`平台门店id` AS SIGNED ) AS `平台门店id`,
- CAST( ordersdf.`时段` AS SIGNED ) AS `时段`,
- ordersdf.`时间`,
- SUM(ordergoodsdf.`销量`) AS `销量`
- FROM ordergoodsdf RIGHT JOIN ordersdf ON TRIM(ordergoodsdf.`订单编号`) = TRIM(ordersdf.`订单编号`)
- GROUP BY
- ordergoodsdf.`日期`,
- ordergoodsdf.`标准商品名称`,
- ordergoodsdf.`平台`,
- ordergoodsdf.`城市`,
- `平台门店id`,
- `时段`,
- ordersdf.`时间`);'''
- sql_select = '''
- SELECT
- `日期`,
- `标准商品名称`,
- `平台`,
- `城市`,
- `平台门店id`,
- `时段`,
- SUM( `销量` ) AS `时段销量`,
- GROUP_CONCAT( `时间销量` SEPARATOR ';' ) AS `时间销量集合`
- FROM
- (SELECT
- `日期`,
- `标准商品名称`,
- `平台`,
- `城市`,
- `平台门店id`,
- `时段`,
- `销量`,
- CONCAT( `时间`, '_', `销量` ) AS `时间销量`
- FROM ordergoodsdf_time)t1
- GROUP BY
- `日期`,
- `标准商品名称`,
- `平台`,
- `城市`,
- `平台门店id`,
- `时段`;
- '''
- sql_delete = '''
- DROP TABLE ordergoodsdf,ordersdf,ordergoodsdf_time;
- '''
- cursor = db.cursor()
- cursor.execute(sql_create_ordergoodsdf)
- cursor.execute(sql_create_ordersdf)
- cursor.execute(sql_create_ordergoodsdf_time)
- cursor.execute(sql_select)
- out_put = cursor.fetchall()
- cursor.execute(sql_delete)
- db.commit()
- cursor.close()
- return out_put
- # 插入数据库表格[daily_shops_goods_sales_time_split]新数据
- def insertDaily_shops_goods_sales_time_split(db,data_in,brandName):
- cursor = db.cursor()
- sql = f'''REPLACE INTO daily_shops_goods_sales_time_split VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
- for row in data_in:
- value = row + (brandName,)
- cursor.execute(sql,value)
- db.commit()
- cursor.close()
- # 删除数据库表格[daily_shops_goods_sales_time_split]指定日期数据
- def deleteDaily_shops_goods_sales_time_split(db,dateNo):
- cursor = db.cursor()
- sql = f'''DELETE FROM daily_shops_goods_sales_time_split WHERE `日期` = {int(dateNo)};'''
- cursor.execute(sql)
- db.commit()
- cursor.close()
- print(dateNo,'数据已重置')
- # 创建新表用来存储输出的数据
- def create_new_sql_table(db,name,rows_in):
- cursor = db.cursor()
- for row in rows_in:
- sql_in = f'INSERT INTO {name}'
- # 按照品牌、时间维度压缩数据库表格[daily_shops_goods_sales_time_split]
- def create_goods_sales_time_split(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,task_no, report_name):
- db = linkTomySql(host, passwd, db_name, port)
- cursor = db.cursor()
- n = 1
- sql_c_1 = f'''CREATE TEMPORARY TABLE temp{task_no}_{n} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {int(s_dateNo)} GROUP BY `时段`,`标准商品名称`);'''
- cursor.execute(sql_c_1)
- while s_dateNo < e_dateNo:
- s_dateNo = int((parse(str(s_dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
- sql_i_n = f'''INSERT INTO temp{task_no}_{n} SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM daily_shops_goods_sales_time_split WHERE `品牌` = '{brandName}' AND `日期` = {s_dateNo} GROUP BY `时段`,`标准商品名称`;'''
- sql_c_n = f'''CREATE TEMPORARY TABLE temp{task_no}_{n+1} AS ( SELECT `时段`,`标准商品名称`,SUM(`时段销量`) AS `时段销量` FROM temp{task_no}_{n} GROUP BY `时段`,`标准商品名称`);'''
- sql_d_n = f'''DROP TABLE temp{task_no}_{n};'''
- cursor.execute(sql_i_n)
- db.commit()
- cursor.execute(sql_c_n)
- cursor.execute(sql_d_n)
- db.commit()
- n += 1
- try:
- sql_c_out = f'''CREATE TABLE {report_name} AS (SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n} where 1=0);'''
- cursor.execute(sql_c_out)
- except:
- ...
- sql_in = f'''INSERT INTO {report_name} SELECT *,'task{task_no:02d}' AS `标记` FROM temp{task_no}_{n}'''
- cursor.execute(sql_in)
- db.commit()
- cursor.close()
- db.close()
- print(f'temp{task_no}_{n} has Done')
- #多进程运行代码create_goods_sales_time_split
- def MRunpool(host,passwd,db_name,port,brandName,s_dateNo,e_dateNo,poolSize,maxDays, report_name):
- days = (parse(str(e_dateNo)).date() - parse(str(s_dateNo)).date()).days
- if days <= maxDays:
- task_no = 0
- create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no, report_name)
- else:
- if days//maxDays < poolSize:
- d_t_d = maxDays
- else:
- d_t_d = days // poolSize
- date_list = []
- d_s = s_dateNo
- d_e = int((parse(str(e_dateNo)).date() + timedelta(days = - d_t_d)).strftime('%Y%m%d'))
- while d_s < d_e:
- date1 = d_s
- date2 = int((parse(str(d_s)).date() + timedelta(days = d_t_d)).strftime('%Y%m%d'))
- date_list.append([date1,date2])
- d_s = int((parse(str(d_s)).date() + timedelta(days = d_t_d+1)).strftime('%Y%m%d'))
- date_list.append([d_s,e_dateNo])
- print(len(date_list))
- ps = []
- task_no = 0
- for s_e in date_list:
- print(s_e)
- p = Process(target = create_goods_sales_time_split, args = (host,passwd,db_name,port,brandName,s_e[0],s_e[1],task_no, report_name))
- task_no += 1
- p.start()
- ps.append(p)
- for i in ps:
- i.join()
- if __name__ == '__main__':
- host = '192.168.110.96'
- passwd = '111???clown'
- db_name = 'jxb-h'
- port = 3306
- db = linkTomySql(host, passwd, db_name, port)
- s_time = time.time()
- # 对现有数据库进行数据整理,将商品数据销售数据进行时段拆分
- if 1==0:
- tables_list = [{'orders':'order_forms','goods':'order_formsgoods','brandName':'浆小白'},
- {'orders':'order_forms_lls','goods':'order_formsgoods_lls','brandName':'楼兰说'},
- {'orders':'order_forms_zzx','goods':'order_formsgoods_zzx','brandName':'粥小鲜'}]
- end_date = 20220501 #结束时间
- dateNo = 20220501 #开始时间
- while dateNo <= end_date:
- print(dateNo)
- deleteDaily_shops_goods_sales_time_split(db, dateNo)
- for tables in tables_list:
- brandName = tables['brandName']
- out_put = selectDateFromOrderDetail(db, dateNo, tables)
- try:
- insertDaily_shops_goods_sales_time_split(db,out_put,brandName)
- except Exception as e:
- print(dateNo,f'错误:{e}')
- dateNo = int((parse(str(dateNo)).date() + timedelta(days = 1)).strftime('%Y%m%d'))
- # 对现商品数据销售数据时段拆分表,进行压缩,仅保留 时段,标准商品名称,时段销量
- if 1==0:
- brandName = '浆小白'
- s_dateNo = 20230101
- e_dateNo = 20230201
- task_no = 0
- tempName = create_goods_sales_time_split(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, task_no)
- print(tempName)
- # 多进程执行 create_goods_sales_time_split
- if 1==1:
- brandName = '浆小白' #指定品牌123
- s_dateNo = 20230101 #开始日期123
- e_dateNo = 20240201 #结束日期123
- poolSize = 12 #开起进程数(此参数需依据电脑CPU的核心数对应的线程数设定,并不能超过CPU最大线程数)
- maxDays = 30 #最大间隔天数
- report_name = 'goods_sales_time_split_v1'
- MRunpool(host,passwd,db_name,port, brandName, s_dateNo, e_dateNo, poolSize, maxDays, report_name)
- db.close()
- time_cnt = time.time() - s_time
- print(f'共计用时{int(time_cnt)}s,{int(time_cnt/60)}min,{int(time_cnt/3600)}h')
|