123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- # -*- codeing = utf-8 -*-
- # @Time : 2022/12/29 14:13
- # @Author : Clown
- # @File : demo_bi看板数据.py
- # @Software : PyCharm
- import pymysql
- from datetime import timedelta, datetime
- from dateutil.parser import parse
- import time
- import os
- import pandas as pd
- import json
- import hashlib
- import traceback
- #连接数据库
- def linkTomySql(host,port,passwd,db_name):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- #本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect(
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- charset='utf8mb4',
- local_infile=1,
- port=port)
- print('\nconnect to mysql server 成功')
- print('------------------------------------')
- except:
- print("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- def selectShopDailyOperationFormHeadELM(db,uploadFiles,n):
- headers_dict_list = []
- headers_list = []
- for uploadFile in uploadFiles:
- for a, b, files in os.walk (uploadFile, topdown=False):
- for file in files:
- try:
- path = str (uploadFile) + '/' + str (file)
- # print(path)
- df_in = pd.read_excel(path,dtype=str,na_filter=False)
- #获取表头
- headers = []
- for row in df_in.keys ():
- headers.append (row)
- headers.sort ()
- md5 = hashlib.md5 ()
- md5.update (str (headers).encode ('utf-8'))
- id = md5.hexdigest ()
- #提取日期字段
- try:
- date_list = list(set(df_in['日期']))
- except:
- print(headers)
- date_col_name = input('请输入日期字段名')
- date_list = list (set (df_in[date_col_name]))
- date_list_o = []
- for date_vule in date_list:
- ordersDate = parse (str (date_vule)).date ()
- date_list_o.append(ordersDate)
- min_date = min(date_list_o)
- max_date = max(date_list_o)
- # print(min_date,max_date)
- headers_dict = {'id':id,'min_date':min_date,'max_date':max_date,'header_list':headers}
- headers_dict_list.append(headers_dict)
- except Exception as e:
- print(file,e)
- df_compute = pd.DataFrame(headers_dict_list)
- headers_only = list(set(df_compute['id']))
- print(headers_only)
- for i in headers_only:
- date_info = df_compute[df_compute['id']==i]
- header_list = list(date_info['header_list'])[0]
- date_info_min = min(list(set(date_info['min_date'])))
- date_info_max = max(list(set(date_info['max_date'])))
- headers_dict_o = {'id':i,'min_date':date_info_min,'max_date':date_info_max,'header_list':header_list,'pingtai':'饿了么'}
- # md5 = hashlib.md5 ()
- # md5.update (str (header_list).encode ('utf-8'))
- # if i == md5.hexdigest ():
- # print('匹配')
- #
- # print(headers_dict_o)
- headers_list.append(headers_dict_o)
- df_out = pd.DataFrame(headers_list)
- df_out.to_excel (r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadELM.xlsx')
- def selectShopDailyOperationFormHeadMT(db,uploadFiles,n):
- headers_dict_list = []
- headers_list = []
- for uploadFile in uploadFiles:
- for a, b, files in os.walk (uploadFile, topdown=False):
- for file in files:
- try:
- path = str (uploadFile) + '/' + str (file)
- try:
- df_in = pd.read_csv(path, header=0, dtype='str', encoding='ansi', na_filter=False)
- except:
- df_in = pd.read_excel (path, dtype=str, na_filter=False)
- #获取表头
- headers = []
- for row in df_in.keys ():
- headers.append (row)
- headers.sort ()
- md5 = hashlib.md5 ()
- md5.update (str (headers).encode ('utf-8'))
- id = md5.hexdigest ()
- # 提取日期字段
- try:
- date_list = list (set (df_in['日期']))
- except:
- print (headers)
- date_col_name = input ('请输入日期字段名')
- date_list = list (set (df_in[date_col_name]))
- date_list_o = []
- for date_vule in date_list:
- ordersDate = parse (str (date_vule)).date ()
- date_list_o.append(ordersDate)
- min_date = min(date_list_o)
- max_date = max(date_list_o)
- # print(min_date,max_date)
- headers_dict = {'id':id,'min_date':min_date,'max_date':max_date,'header_list':headers}
- headers_dict_list.append(headers_dict)
- except Exception as e:
- print(file,e)
- df_compute = pd.DataFrame(headers_dict_list)
- headers_only = list(set(df_compute['id']))
- print(headers_only)
- for i in headers_only:
- date_info = df_compute[df_compute['id']==i]
- header_list = list(date_info['header_list'])[0]
- date_info_min = min(list(set(date_info['min_date'])))
- date_info_max = max(list(set(date_info['max_date'])))
- headers_dict_o = {'id':i,'min_date':date_info_min,'max_date':date_info_max,'header_list':header_list,'pingtai':'美团'}
- # md5 = hashlib.md5 ()
- # md5.update (str (header_list).encode ('utf-8'))
- # if i == md5.hexdigest ():
- # print('匹配')
- #
- # print(headers_dict_o)
- headers_list.append(headers_dict_o)
- df_out = pd.DataFrame(headers_list)
- df_out.to_excel (r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadMT.xlsx')
- def insertUsefulTitleName(db,uploadFiles):
- cursor = db.cursor ()
- for uploadFile in uploadFiles:
- df_in = pd.read_excel (uploadFile)
- rows = df_in.shape[0]
- df_in_dict = df_in.loc
- for i in range (rows):
- rowinfo = dict (df_in_dict[i])
- id = rowinfo['id']
- start_date = rowinfo['min_date']
- end_date = rowinfo['max_date']
- title_list = rowinfo['header_list']
- json_col = json.loads (rowinfo['json'])
- json_col[rowinfo['id']]['平台'] = rowinfo['pingtai']
- useful_title_name = json.dumps(json_col,ensure_ascii=False)
- wm_plate = rowinfo['pingtai']
- sql = '''INSERT INTO shop_daily_operation_data_title_log VALUES (%s,%s,%s,%s,%s,%s);'''
- sql_value = (id,start_date,end_date,title_list,useful_title_name,wm_plate)
- cursor.execute(sql,sql_value)
- db.commit()
- def insertSupplyChainsOrders(db,uploadFiles):
- for a, b, files in os.walk (uploadFiles, topdown=False):
- for file in files:
- path = str (uploadFiles) + '/' + str (file)
- print(path)
- df_in = pd.read_excel(path,dtype=str,na_filter=False)
- df_rows = df_in.shape[0]
- df_rows_info = df_in.loc
- n = 0
- m = 0
- m_list = []
- cursor = db.cursor ()
- for i in range(df_rows):
- row_dict = dict(df_rows_info[i])
- # print(row_dict)
- row_json = json.dumps(row_dict,ensure_ascii=False)
- try:
- customersId = row_dict['客户编码']
- customersName = row_dict['客户名称']
- ordersId = row_dict['订单编号']
- ordersState = row_dict['订单状态']
- ordersDateTime = row_dict['下单时间']
- ordersDate = parse(str(ordersDateTime)).date()
- skuId = row_dict['商品编码']
- skuName = row_dict['商品名称']
- skuOrdNum = row_dict['订购量']
- skuPcs = row_dict['单位']
- except:
- try:
- customersId = row_dict['订货门店编号']
- customersName = row_dict['订货门店']
- ordersId = row_dict['订货单号']
- ordersState = row_dict['订货单状态']
- ordersDateTime = row_dict['下单时间']
- ordersDate = parse (str (ordersDateTime)).date ()
- skuId = row_dict['商品编号']
- skuName = row_dict['商品名称']
- skuOrdNum = row_dict['最小单位数量']
- skuPcs = row_dict['最小单位']
- except:
- ...
- md5 = hashlib.md5 ()
- md5.update (str(row_json).encode ('utf-8'))
- id = md5.hexdigest()
- row_save = (ordersDate, customersId, customersName, row_json, id)
- row_app = (customersId,customersName,ordersId,ordersState,ordersDateTime,ordersDate,skuId,skuName,skuOrdNum,skuPcs,id)
- sql_save = 'INSERT INTO supply_chains_orders_storage VALUES (%s,%s,%s,%s,%s);'
- sql_app = 'INSERT INTO supply_chains_orders_app VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
- try:
- cursor.execute(sql_save,row_save)
- cursor.execute (sql_app, row_app)
- db.commit ()
- n += 1
- except Exception as e:
- print(e)
- db.rollback()
- m += 1
- m_list.append(id)
- print (f'【{file}】完成上传,共计{n}条数据,重复{m}条,id【{m_list}】')
- #重新整合titleLog中参数,并去匹配表头,看关键参数是否一直,如一致则自动更新,不一致则需要手动更新
- def selectTitleLogToDict(cursor,id,headers,wm_plate):
- sql_all = f'''SELECT * FROM shop_daily_operation_data_title_log ;'''
- cursor.execute (sql_all)
- results_all = cursor.fetchall ()
- results_list = []
- results_all_dict = {}
- try:
- for result in results_all:
- id_1 = result[0]
- json_1 = json.loads (result[4])[id_1]
- results_list.append (json_1)
- df_result = pd.DataFrame (results_list)
- df_result_titles = df_result.keys ()
- for df_result_title in df_result_titles:
- keys = list (set (df_result[df_result_title]))
- for key in keys:
- if key == '':
- ...
- else:
- dict_a = {key: df_result_title}
- results_all_dict.update (dict_a)
- out_dict = {id:{}}
- for header in headers:
- try:
- out_dict[id][results_all_dict[header]] = header
- except:
- ...
- out_dict[id]['品牌'] = ''
- out_dict[id]['平台'] = wm_plate
- out_dict[id]['其他参数集合'] = ''
- id = id
- start_date = datetime.now ().strftime ("%Y-%m-%d")
- end_date = datetime.now ().strftime ("%Y-%m-%d")
- title_list = str(headers)
- useful_title_name = json.dumps (out_dict, ensure_ascii=False)
- sql = '''INSERT INTO shop_daily_operation_data_title_log VALUES (%s,%s,%s,%s,%s,%s);'''
- sql_value = (id, start_date, end_date, title_list, useful_title_name, wm_plate)
- cursor.execute (sql, sql_value)
- db.commit ()
- print('有新增')
- except Exception as e1:
- print ('selectTitleLogToDict错误',e1)
- def insertShopDailyOperationDate(db,uploadFiles):
- cursor = db.cursor()
- for uploadFile_d in uploadFiles:
- uploadFile = uploadFile_d['path']
- wm_plate = uploadFile_d['wm_plate']
- brand_name = uploadFile_d['brand_name']
- print(wm_plate,brand_name)
- log_date = datetime.now ().strftime ("%Y-%m-%d") # 日志记录日期
- log_time = datetime.now ().strftime ('%H:%M:%S') # 日志记录时间
- for a, b, files in os.walk (uploadFile, topdown=False):
- for file in files:
- try:
- path = str (uploadFile) + '/' + str (file)
- try:
- df_in = pd.read_csv (path, header=0, dtype='str', encoding='ansi', na_filter=False)
- except:
- df_in = pd.read_excel (path, dtype=str, na_filter=False)
- if 1 == 1:
- # 获取表头
- headers = []
- for row in df_in.keys ():
- headers.append (row)
- headers.sort ()
- md5 = hashlib.md5 ()
- md5.update (str (headers).encode ('utf-8'))
- # 生成id
- id = md5.hexdigest ()
- # 发起id查询,匹配title名称
- try:
- sql = f'''SELECT * FROM shop_daily_operation_data_title_log WHERE `id` = '{id}';'''
- cursor.execute (sql)
- results = cursor.fetchall ()
- row_value = results[0]
- useful_title_name = json.loads (row_value[4])[id]
- end_date_inform = row_value[2]
- except:
- # 重新整合titleLog中参数,并去匹配表头,看关键参数是否一直,如一致则自动更新,不一致则需要手动更新
- selectTitleLogToDict (cursor,id,headers,wm_plate)
- sql = f'''SELECT * FROM shop_daily_operation_data_title_log WHERE `id` = '{id}';'''
- cursor.execute (sql)
- results = cursor.fetchall ()
- row_value = results[0]
- useful_title_name = json.loads (row_value[4])[id]
- end_date_inform = row_value[2]
- # print('1.完成title_log表头处理')
- end_date = parse (str (max(list(set(df_in['日期']))))).date ()
- if end_date > end_date_inform:
- end_date_out = end_date
- else:
- end_date_out = end_date_inform
- sql_update = f'''UPDATE shop_daily_operation_data_title_log SET `end_date` = '{end_date_out}' WHERE `id` = '{id}';'''
- cursor.execute(sql_update)
- db.commit()
- # print('2.完成title_log结束时间更新')
- # print(useful_title_name)
- rows = df_in.loc
- rows_cnt = df_in.shape[0]
- # print(rows_cnt)
- for i in range(rows_cnt):
- row_info = rows[i]
- row_dict = json.dumps (dict (row_info), ensure_ascii=False)
- l0 = parse(row_info[useful_title_name['日期']]).date()
- l1 = wm_plate
- l2 = brand_name
- l3 = row_info[useful_title_name['门店id']]
- l4 = row_info[useful_title_name['门店名称']]
- l5 = row_info[useful_title_name['城市']]
- l6_0 = row_info[useful_title_name['GMV']]
- if l6_0 == '':
- l6 = 0
- else:
- l6 = l6_0
- l7_0 = row_info[useful_title_name['收入']]
- if l7_0 == '':
- l7 = 0
- else:
- l7 = l7_0
- l8_0 = row_info[useful_title_name['有效单量']]
- if l8_0 == '':
- l8 = 0
- else:
- l8 = l8_0
- try:
- l9_0 = row_info[useful_title_name['无效单量']]
- except:
- l9_0 = ''
- if l9_0 == '':
- l9 = 0
- else:
- l9 = l9_0
- ln = {'date':''}
- l10 = json.dumps(ln,ensure_ascii=False)
- l11 = str(int(parse(row_info[useful_title_name['日期']]).strftime('%Y%m%d')))+str(l3)
- l_row = (l0,l1,l2,l3,l4,l5,l6,l7,l8,l9,l10,l11)
- s_row = (brand_name,wm_plate,log_date,log_time,row_dict,l11,id)
- # print(s_row)
- l_sql = '''REPLACE INTO shop_daily_operation_data_app VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'''
- s_sql = '''REPLACE INTO shop_daily_operation_data_apps VALUES (%s,%s,%s,%s,%s,%s,%s);'''
- cursor.execute (l_sql, l_row)
- cursor.execute (s_sql, s_row)
- db.commit ()
- # print(f'3.【{file}】完成表格数据上载至app')
- except Exception as e:
- print (file, e)
- if __name__ == '__main__':
- starttime = time.time ()
- if 1 == 1:
- host = 'localhost'
- port = 3306
- passwd = '111???clown'
- db_name = 'hexingxing'
- db = linkTomySql (host, port, passwd, db_name)
- if 1==1:
- uploadFiles = r'C:\Users\ClownHe\Desktop\门店外购监测\供应链订单 - 上传'
- try:
- insertSupplyChainsOrders (db, uploadFiles)
- except Exception as e:
- print('错误')
- traceback.print_exc()
- db.close ()
- n = 0
- if 1==0:
- uploadFiles = [r'F:\Aa数据库数据导入\每日营业数据历史\jxb_elm',r'F:\Aa数据库数据导入\每日营业数据历史\zxx_elm']
- selectShopDailyOperationFormHeadELM (db, uploadFiles,n)
- if 1==0:
- uploadFiles1 = [r'F:\Aa数据库数据导入\每日营业数据历史\jxb_mt',r'F:\Aa数据库数据导入\每日营业数据历史\zxx_mt']
- selectShopDailyOperationFormHeadMT (db, uploadFiles1,n)
- if 1==0:
- uploadFiles = [r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadELM.xlsx',r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadMT.xlsx']
- insertUsefulTitleName (db, uploadFiles)
- if 1 == 0:
- beginTime = 20230812
- endTime = 20230812
- os.system(f'python F:/cppc/cppc_python脚本/服务器脚本/daily_script_python_yun/shop_daily_operation_data_E.py {beginTime} {endTime}')
- os.system (f'python F:/cppc/cppc_python脚本/服务器脚本/daily_script_python_yun/shop_daily_operation_data_M.py {beginTime} {endTime}')
- uploadFiles = [{'path':r'C:\Users\ClownHe\Desktop\导出\uploads\美团j','wm_plate':'美团','brand_name':'浆小白'},
- {'path':r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么j','wm_plate':'饿了么','brand_name':'浆小白'},
- {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\美团z', 'wm_plate': '美团', 'brand_name': '粥小鲜'},
- {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么z', 'wm_plate': '饿了么', 'brand_name': '粥小鲜'},
- {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\美团l', 'wm_plate': '美团', 'brand_name': '楼兰说'},
- {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么l', 'wm_plate': '饿了么', 'brand_name': '楼兰说'}]
- # uploadFiles = [{'path': r'C:\Users\ClownHe\Desktop\导出\uploads\新建文件夹', 'wm_plate': '美团', 'brand_name': '浆小白'}]
- insertShopDailyOperationDate (db, uploadFiles)
- db.close ()
|