# -*- codeing = utf-8 -*- # @Time : 2022/12/29 14:13 # @Author : Clown # @File : demo_bi看板数据.py # @Software : PyCharm import pymysql from datetime import timedelta, datetime from dateutil.parser import parse import time import os import pandas as pd import json import hashlib import traceback #连接数据库 def linkTomySql(host,port,passwd,db_name): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: #本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect( host=host, user="root", passwd=passwd, db=db_name, charset='utf8mb4', local_infile=1, port=port) print('\nconnect to mysql server 成功') print('------------------------------------') except: print("\ncould not connect to mysql server") db = "连接失败" return db def selectShopDailyOperationFormHeadELM(db,uploadFiles,n): headers_dict_list = [] headers_list = [] for uploadFile in uploadFiles: for a, b, files in os.walk (uploadFile, topdown=False): for file in files: try: path = str (uploadFile) + '/' + str (file) # print(path) df_in = pd.read_excel(path,dtype=str,na_filter=False) #获取表头 headers = [] for row in df_in.keys (): headers.append (row) headers.sort () md5 = hashlib.md5 () md5.update (str (headers).encode ('utf-8')) id = md5.hexdigest () #提取日期字段 try: date_list = list(set(df_in['日期'])) except: print(headers) date_col_name = input('请输入日期字段名') date_list = list (set (df_in[date_col_name])) date_list_o = [] for date_vule in date_list: ordersDate = parse (str (date_vule)).date () date_list_o.append(ordersDate) min_date = min(date_list_o) max_date = max(date_list_o) # print(min_date,max_date) headers_dict = {'id':id,'min_date':min_date,'max_date':max_date,'header_list':headers} headers_dict_list.append(headers_dict) except Exception as e: print(file,e) df_compute = pd.DataFrame(headers_dict_list) headers_only = list(set(df_compute['id'])) print(headers_only) for i in headers_only: date_info = df_compute[df_compute['id']==i] header_list = list(date_info['header_list'])[0] date_info_min = min(list(set(date_info['min_date']))) date_info_max = max(list(set(date_info['max_date']))) headers_dict_o = {'id':i,'min_date':date_info_min,'max_date':date_info_max,'header_list':header_list,'pingtai':'饿了么'} # md5 = hashlib.md5 () # md5.update (str (header_list).encode ('utf-8')) # if i == md5.hexdigest (): # print('匹配') # # print(headers_dict_o) headers_list.append(headers_dict_o) df_out = pd.DataFrame(headers_list) df_out.to_excel (r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadELM.xlsx') def selectShopDailyOperationFormHeadMT(db,uploadFiles,n): headers_dict_list = [] headers_list = [] for uploadFile in uploadFiles: for a, b, files in os.walk (uploadFile, topdown=False): for file in files: try: path = str (uploadFile) + '/' + str (file) try: df_in = pd.read_csv(path, header=0, dtype='str', encoding='ansi', na_filter=False) except: df_in = pd.read_excel (path, dtype=str, na_filter=False) #获取表头 headers = [] for row in df_in.keys (): headers.append (row) headers.sort () md5 = hashlib.md5 () md5.update (str (headers).encode ('utf-8')) id = md5.hexdigest () # 提取日期字段 try: date_list = list (set (df_in['日期'])) except: print (headers) date_col_name = input ('请输入日期字段名') date_list = list (set (df_in[date_col_name])) date_list_o = [] for date_vule in date_list: ordersDate = parse (str (date_vule)).date () date_list_o.append(ordersDate) min_date = min(date_list_o) max_date = max(date_list_o) # print(min_date,max_date) headers_dict = {'id':id,'min_date':min_date,'max_date':max_date,'header_list':headers} headers_dict_list.append(headers_dict) except Exception as e: print(file,e) df_compute = pd.DataFrame(headers_dict_list) headers_only = list(set(df_compute['id'])) print(headers_only) for i in headers_only: date_info = df_compute[df_compute['id']==i] header_list = list(date_info['header_list'])[0] date_info_min = min(list(set(date_info['min_date']))) date_info_max = max(list(set(date_info['max_date']))) headers_dict_o = {'id':i,'min_date':date_info_min,'max_date':date_info_max,'header_list':header_list,'pingtai':'美团'} # md5 = hashlib.md5 () # md5.update (str (header_list).encode ('utf-8')) # if i == md5.hexdigest (): # print('匹配') # # print(headers_dict_o) headers_list.append(headers_dict_o) df_out = pd.DataFrame(headers_list) df_out.to_excel (r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadMT.xlsx') def insertUsefulTitleName(db,uploadFiles): cursor = db.cursor () for uploadFile in uploadFiles: df_in = pd.read_excel (uploadFile) rows = df_in.shape[0] df_in_dict = df_in.loc for i in range (rows): rowinfo = dict (df_in_dict[i]) id = rowinfo['id'] start_date = rowinfo['min_date'] end_date = rowinfo['max_date'] title_list = rowinfo['header_list'] json_col = json.loads (rowinfo['json']) json_col[rowinfo['id']]['平台'] = rowinfo['pingtai'] useful_title_name = json.dumps(json_col,ensure_ascii=False) wm_plate = rowinfo['pingtai'] sql = '''INSERT INTO shop_daily_operation_data_title_log VALUES (%s,%s,%s,%s,%s,%s);''' sql_value = (id,start_date,end_date,title_list,useful_title_name,wm_plate) cursor.execute(sql,sql_value) db.commit() def insertSupplyChainsOrders(db,uploadFiles): for a, b, files in os.walk (uploadFiles, topdown=False): for file in files: path = str (uploadFiles) + '/' + str (file) print(path) df_in = pd.read_excel(path,dtype=str,na_filter=False) df_rows = df_in.shape[0] df_rows_info = df_in.loc n = 0 m = 0 m_list = [] cursor = db.cursor () for i in range(df_rows): row_dict = dict(df_rows_info[i]) # print(row_dict) row_json = json.dumps(row_dict,ensure_ascii=False) try: customersId = row_dict['客户编码'] customersName = row_dict['客户名称'] ordersId = row_dict['订单编号'] ordersState = row_dict['订单状态'] ordersDateTime = row_dict['下单时间'] ordersDate = parse(str(ordersDateTime)).date() skuId = row_dict['商品编码'] skuName = row_dict['商品名称'] skuOrdNum = row_dict['订购量'] skuPcs = row_dict['单位'] except: try: customersId = row_dict['订货门店编号'] customersName = row_dict['订货门店'] ordersId = row_dict['订货单号'] ordersState = row_dict['订货单状态'] ordersDateTime = row_dict['下单时间'] ordersDate = parse (str (ordersDateTime)).date () skuId = row_dict['商品编号'] skuName = row_dict['商品名称'] skuOrdNum = row_dict['最小单位数量'] skuPcs = row_dict['最小单位'] except: ... md5 = hashlib.md5 () md5.update (str(row_json).encode ('utf-8')) id = md5.hexdigest() row_save = (ordersDate, customersId, customersName, row_json, id) row_app = (customersId,customersName,ordersId,ordersState,ordersDateTime,ordersDate,skuId,skuName,skuOrdNum,skuPcs,id) sql_save = 'INSERT INTO supply_chains_orders_storage VALUES (%s,%s,%s,%s,%s);' sql_app = 'INSERT INTO supply_chains_orders_app VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' try: cursor.execute(sql_save,row_save) cursor.execute (sql_app, row_app) db.commit () n += 1 except Exception as e: print(e) db.rollback() m += 1 m_list.append(id) print (f'【{file}】完成上传,共计{n}条数据,重复{m}条,id【{m_list}】') #重新整合titleLog中参数,并去匹配表头,看关键参数是否一直,如一致则自动更新,不一致则需要手动更新 def selectTitleLogToDict(cursor,id,headers,wm_plate): sql_all = f'''SELECT * FROM shop_daily_operation_data_title_log ;''' cursor.execute (sql_all) results_all = cursor.fetchall () results_list = [] results_all_dict = {} try: for result in results_all: id_1 = result[0] json_1 = json.loads (result[4])[id_1] results_list.append (json_1) df_result = pd.DataFrame (results_list) df_result_titles = df_result.keys () for df_result_title in df_result_titles: keys = list (set (df_result[df_result_title])) for key in keys: if key == '': ... else: dict_a = {key: df_result_title} results_all_dict.update (dict_a) out_dict = {id:{}} for header in headers: try: out_dict[id][results_all_dict[header]] = header except: ... out_dict[id]['品牌'] = '' out_dict[id]['平台'] = wm_plate out_dict[id]['其他参数集合'] = '' id = id start_date = datetime.now ().strftime ("%Y-%m-%d") end_date = datetime.now ().strftime ("%Y-%m-%d") title_list = str(headers) useful_title_name = json.dumps (out_dict, ensure_ascii=False) sql = '''INSERT INTO shop_daily_operation_data_title_log VALUES (%s,%s,%s,%s,%s,%s);''' sql_value = (id, start_date, end_date, title_list, useful_title_name, wm_plate) cursor.execute (sql, sql_value) db.commit () print('有新增') except Exception as e1: print ('selectTitleLogToDict错误',e1) def insertShopDailyOperationDate(db,uploadFiles): cursor = db.cursor() for uploadFile_d in uploadFiles: uploadFile = uploadFile_d['path'] wm_plate = uploadFile_d['wm_plate'] brand_name = uploadFile_d['brand_name'] print(wm_plate,brand_name) log_date = datetime.now ().strftime ("%Y-%m-%d") # 日志记录日期 log_time = datetime.now ().strftime ('%H:%M:%S') # 日志记录时间 for a, b, files in os.walk (uploadFile, topdown=False): for file in files: try: path = str (uploadFile) + '/' + str (file) try: df_in = pd.read_csv (path, header=0, dtype='str', encoding='ansi', na_filter=False) except: df_in = pd.read_excel (path, dtype=str, na_filter=False) if 1 == 1: # 获取表头 headers = [] for row in df_in.keys (): headers.append (row) headers.sort () md5 = hashlib.md5 () md5.update (str (headers).encode ('utf-8')) # 生成id id = md5.hexdigest () # 发起id查询,匹配title名称 try: sql = f'''SELECT * FROM shop_daily_operation_data_title_log WHERE `id` = '{id}';''' cursor.execute (sql) results = cursor.fetchall () row_value = results[0] useful_title_name = json.loads (row_value[4])[id] end_date_inform = row_value[2] except: # 重新整合titleLog中参数,并去匹配表头,看关键参数是否一直,如一致则自动更新,不一致则需要手动更新 selectTitleLogToDict (cursor,id,headers,wm_plate) sql = f'''SELECT * FROM shop_daily_operation_data_title_log WHERE `id` = '{id}';''' cursor.execute (sql) results = cursor.fetchall () row_value = results[0] useful_title_name = json.loads (row_value[4])[id] end_date_inform = row_value[2] # print('1.完成title_log表头处理') end_date = parse (str (max(list(set(df_in['日期']))))).date () if end_date > end_date_inform: end_date_out = end_date else: end_date_out = end_date_inform sql_update = f'''UPDATE shop_daily_operation_data_title_log SET `end_date` = '{end_date_out}' WHERE `id` = '{id}';''' cursor.execute(sql_update) db.commit() # print('2.完成title_log结束时间更新') # print(useful_title_name) rows = df_in.loc rows_cnt = df_in.shape[0] # print(rows_cnt) for i in range(rows_cnt): row_info = rows[i] row_dict = json.dumps (dict (row_info), ensure_ascii=False) l0 = parse(row_info[useful_title_name['日期']]).date() l1 = wm_plate l2 = brand_name l3 = row_info[useful_title_name['门店id']] l4 = row_info[useful_title_name['门店名称']] l5 = row_info[useful_title_name['城市']] l6_0 = row_info[useful_title_name['GMV']] if l6_0 == '': l6 = 0 else: l6 = l6_0 l7_0 = row_info[useful_title_name['收入']] if l7_0 == '': l7 = 0 else: l7 = l7_0 l8_0 = row_info[useful_title_name['有效单量']] if l8_0 == '': l8 = 0 else: l8 = l8_0 try: l9_0 = row_info[useful_title_name['无效单量']] except: l9_0 = '' if l9_0 == '': l9 = 0 else: l9 = l9_0 ln = {'date':''} l10 = json.dumps(ln,ensure_ascii=False) l11 = str(int(parse(row_info[useful_title_name['日期']]).strftime('%Y%m%d')))+str(l3) l_row = (l0,l1,l2,l3,l4,l5,l6,l7,l8,l9,l10,l11) s_row = (brand_name,wm_plate,log_date,log_time,row_dict,l11,id) # print(s_row) l_sql = '''REPLACE INTO shop_daily_operation_data_app VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);''' s_sql = '''REPLACE INTO shop_daily_operation_data_apps VALUES (%s,%s,%s,%s,%s,%s,%s);''' cursor.execute (l_sql, l_row) cursor.execute (s_sql, s_row) db.commit () # print(f'3.【{file}】完成表格数据上载至app') except Exception as e: print (file, e) if __name__ == '__main__': starttime = time.time () if 1 == 1: host = 'localhost' port = 3306 passwd = '111???clown' db_name = 'hexingxing' db = linkTomySql (host, port, passwd, db_name) if 1==1: uploadFiles = r'C:\Users\ClownHe\Desktop\门店外购监测\供应链订单 - 上传' try: insertSupplyChainsOrders (db, uploadFiles) except Exception as e: print('错误') traceback.print_exc() db.close () n = 0 if 1==0: uploadFiles = [r'F:\Aa数据库数据导入\每日营业数据历史\jxb_elm',r'F:\Aa数据库数据导入\每日营业数据历史\zxx_elm'] selectShopDailyOperationFormHeadELM (db, uploadFiles,n) if 1==0: uploadFiles1 = [r'F:\Aa数据库数据导入\每日营业数据历史\jxb_mt',r'F:\Aa数据库数据导入\每日营业数据历史\zxx_mt'] selectShopDailyOperationFormHeadMT (db, uploadFiles1,n) if 1==0: uploadFiles = [r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadELM.xlsx',r'C:\Users\ClownHe\Desktop\导出\selectShopDailyOperationFormHeadMT.xlsx'] insertUsefulTitleName (db, uploadFiles) if 1 == 0: beginTime = 20230812 endTime = 20230812 os.system(f'python F:/cppc/cppc_python脚本/服务器脚本/daily_script_python_yun/shop_daily_operation_data_E.py {beginTime} {endTime}') os.system (f'python F:/cppc/cppc_python脚本/服务器脚本/daily_script_python_yun/shop_daily_operation_data_M.py {beginTime} {endTime}') uploadFiles = [{'path':r'C:\Users\ClownHe\Desktop\导出\uploads\美团j','wm_plate':'美团','brand_name':'浆小白'}, {'path':r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么j','wm_plate':'饿了么','brand_name':'浆小白'}, {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\美团z', 'wm_plate': '美团', 'brand_name': '粥小鲜'}, {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么z', 'wm_plate': '饿了么', 'brand_name': '粥小鲜'}, {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\美团l', 'wm_plate': '美团', 'brand_name': '楼兰说'}, {'path': r'C:\Users\ClownHe\Desktop\导出\uploads\饿了么l', 'wm_plate': '饿了么', 'brand_name': '楼兰说'}] # uploadFiles = [{'path': r'C:\Users\ClownHe\Desktop\导出\uploads\新建文件夹', 'wm_plate': '美团', 'brand_name': '浆小白'}] insertShopDailyOperationDate (db, uploadFiles) db.close ()