123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619 |
- # -*- codeing = utf-8 -*-
- # @Time : 2023/12/10 18:20
- # @Author : Clown
- # @File : demo_400AD.py
- # @Software : PyCharm
- import pymysql
- from datetime import datetime,timedelta,time
- from dateutil.parser import parse
- import traceback
- from all_key_table import update_key_value_pair_7qiaoPlus
- import json
- import requests
- import ddddocr
- import pandas as pd
- from bs4 import BeautifulSoup
- import execjs
- import time as tms
- from urllib.parse import quote
- import math
- def linkTomySql(host, passwd, db_name, port):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- # 本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect (
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- port=port,
- charset='utf8mb4',
- local_infile=1)
- print ('\nconnect to mysql server 成功')
- print ('---------------------------------------')
- except:
- print ("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- def read_key_value_pair(db, brand_name, wm_plate, owner):
- '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
- db:数据库,
- brand_name:品牌名,
- wm_plate:外卖平台MEITUAN或ELEME,
- owner:账号权限all或one
- '''
- cursor = db.cursor ()
- sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
- cursor.execute (sql)
- pair = json.loads (cursor.fetchall ()[0][0])
- return pair
- def orderTo7qiao(db,lastCatchTime_ago_s,solutionCntAll,solutionJsonData):
- solutionjsondata = {'data':solutionJsonData}
- key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
- applicationId = '63631ee57005c0103426fdc7'
- processId = '642128bd80236836229b3b1d'
- Token = key_json['data']
- headers_api = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
- "Content-type": "application/json",
- "X-Auth0-Token": Token}
- url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/workflow/process_definitions/{processId}/start'
- params_json = {
- "nextNodesAndHandlers": [{"activityDefinitionId": "obj_02"}],
- "variables": {"任务触发时间": lastCatchTime_ago_s,'solutioncntall':solutionCntAll,'solutionjsondata':json.dumps(solutionjsondata,ensure_ascii=False)},
- "loginUserId":"92874ed35ecb45f51a58adcf18e99b5e"}
- try:
- resp = requests.post(url,headers =headers_api,json=params_json)
- msg = resp.json()['msg']
- print(resp.text)
- if msg == '执行成功':
- operation_results = '无异常'
- else:
- operation_results = f'[orderTo7qiao]-返回结果为{msg}'
- except Exception as e:
- operation_results = f'[orderTo7qiao]-道一云端无法连接{e}'
- return operation_results
- def sendMsgToRot(url,msg_txt):
- params_json = {
- "msgtype": "markdown",
- "markdown": {
- "content": msg_txt,
- "mentioned_list":["@all"]
- }}
- resp = requests.post(url,json=params_json).text
- print(resp)
- # 查询计时器时间,输出起始时间
- def select400_ad_time_meter(db,minutes):
- cursor = db.cursor ()
- sql = f'SELECT * FROM 400_ad_time_meter WHERE id = 1;'
- cursor.execute (sql)
- lastCatchTime_ago = cursor.fetchall ()[0][0]
- lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
- # 更新间隔
- lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S')
- cursor.close()
- return lastCatchTime_ago_s,lastCatchTime_now_s
- # 更新计时器时间
- def update400_ad_time_meter(db,lastCatchTime_now_s):
- cursor = db.cursor ()
- sql = f'UPDATE 400_ad_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
- cursor.execute(sql)
- db.commit()
- cursor.close()
- # 新增记录至抓取记录表
- def insert400_ad_solution_catch(db,insert_valuse):
- cursor = db.cursor ()
- sql = f'INSERT INTO 400_ad_solution_catch VALUES (%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql,insert_valuse)
- db.commit ()
- cursor.close ()
- # 新增记录至抓取明细记录表
- def insert400_ad_solution_info(db,insert_valuse):
- cursor = db.cursor ()
- sql = f'INSERT INTO 400_ad_solution_info VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
- cursor.execute (sql,insert_valuse)
- db.commit ()
- cursor.close ()
- # 图像识别
- ocr = ddddocr.DdddOcr(show_ad = False)
- def dOcrImage(image_path):
- with open(image_path,'rb') as f:
- img_bytes = f.read()
- text = ocr.classification(img_bytes)
- print(text)
- return text
- # 验证码验证之后获取对应的headers中的code码
- def getCodeAndCodeId(save_path,typeName,cookie):
- if typeName == 'xt4008':
- url = 'https://www.xt4008.com/api/xuntec/login/admin/code'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'content-type': 'application/x-www-form-urlencoded'}
- resp = requests.get(url, headers = headers)
- codeid = resp.headers.get('Codeid')
- data = resp.content
- with open(save_path+'code.png', mode = "wb") as f:
- f.write(data)
- # print(codeid)
- code = dOcrImage(save_path+'code.png')
- out_json = {'code':code,'codeId':codeid}
- elif typeName == '400pt':
- now = datetime.now()
- formatted_time = now.strftime("%a %b %d %Y %H:%M:%S GMT ") + "0800 (中国标准时间)"
- url = 'https://www.400pt.net/platform/pm/getVaildImg'
- data = f'now={formatted_time}'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'content-type': 'application/x-www-form-urlencoded',
- 'cookie':cookie}
- resp = requests.get(url, headers = headers,data=quote(data))
- data = resp.content
- with open(save_path+'code.png', mode = "wb") as f:
- f.write(data)
- code = dOcrImage(save_path+'code.png')
- out_json = {'code':code,'codeId':'codeid'}
- return out_json
- # step1网址'https://www.400pt.net/'获取待授权cookie_id信息
- def getCookie():
- url = 'https://www.400pt.net/'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'content-type': 'application/x-www-form-urlencoded'}
- resp = requests.get(url,headers=headers).headers.get('Set-Cookie')
- cookie_id = resp.split(';')[0]
- return cookie_id
- # step2网址'https://www.400pt.net/'授权cookie_id信息,使cookie_id信息变的有效
- def getInhtml(cookie_id):
- url = 'https://www.400pt.net/api/cust/auth/index'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'Referer': 'https://www.400pt.net/',
- 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Cookie':cookie_id,
- 'Upgrade-Insecure-Requests':'1'}
- resp = requests.get(url,headers=headers)
- def getMd5(url,strIn):
- if url != '':
- url = url
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'}
- resp = requests.get(url, headers = headers).text
- with open('md5.js', mode = 'w') as f:
- f.write(resp)
- else:
- with open("md5.js", "r") as file:
- resp = file.read()
- ctx = execjs.compile(resp)
- result = ctx.call("MD5",strIn)
- # print(result)
- return result
- def getJsScript():
- url = 'https://www.400pt.net/api/cust/auth/index'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'}
- resp = requests.get(url, headers = headers).text
- soup = BeautifulSoup(resp, 'html.parser')
- tags = soup.find_all(src=True)
- tag_dict = {}
- ii = ['md5','login.js']
- for tag in tags:
- tag_text = tag['src']
- for i in ii:
- if i in tag_text:
- tag_dict[i]='https://www.400pt.net/'+tag_text
- getMd5(tag_dict['md5'],'')
- def getToken(userName,pwd,typeName,img_path):
- cookie_id = ''
- if typeName == '400pt':
- cookie_id=getCookie()
- # print(cookie_id)
- code_json = getCodeAndCodeId(img_path, typeName,cookie_id)
- loginName = userName
- pwd = pwd
- md5 = getMd5('',pwd)
- # print(f'md5:{md5}')
- timestamp = int(tms.time()*1000)
- # print(timestamp)
- password = getMd5('',md5+str(timestamp))
- # print(password)
- url = 'https://www.400pt.net/platform/pm/admin/login'
- params = f'loginName={loginName}&password={password}&pwd={md5}&authCode={code_json["code"]}×tamp={int(timestamp)}&loginType=0'
- headers = {
- 'Accept':'application/json, text/javascript, */*; q=0.01',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
- 'Cookie':cookie_id,
- 'Sec-Ch-Ua':'"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
- 'Sec-Ch-Ua-Mobile':'?0',
- 'Sec-Ch-Ua-Platform':'"Windows"',
- 'Sec-Fetch-Dest':'empty',
- 'Sec-Fetch-Mode':'cors',
- 'Sec-Fetch-Site':'same-origin',
- 'X-Requested-With':'XMLHttpRequest'}
- resp = requests.post(url,headers=headers,params = params)
- getInhtml(cookie_id)
- elif typeName == 'xt4008':
- code_json = getCodeAndCodeId(img_path, typeName, cookie_id)
- url = 'https://www.xt4008.com/api/xuntec/login/admin/login'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'content-type': 'application/x-www-form-urlencoded'}
- params = {'userName': int(userName),
- 'passWord': pwd,
- 'code': code_json['code'],
- 'codeId': code_json['codeId']}
- resp = requests.post(url, headers = headers, params = params)
- # print(resp.text)
- cookie_id = resp.json()['data']['token']
- # print(resp.text)
- return cookie_id
- # 选择所有有效账户
- def selectAllUsers(db):
- cursor = db.cursor ()
- sql = f'''SELECT * FROM 400_ad_app_id_sct WHERE state = 1;'''
- cursor.execute (sql)
- out = cursor.fetchall ()
- list_out = []
- for row in out:
- dict_out = {'brandName':row[0],
- 'userName':row[2],
- 'secret':row[1],
- 'typeName':row[3]}
- list_out.append(dict_out)
- cursor.close()
- return list_out
- # 获取网站资源信息
- def p400AD(userName, secret, typeName, lastCatchTime_ago_s, lastCatchTime_now_s):
- # 此路径可进行自定义
- img_path = ''
- resp = {'data':[]}
- a = 0
- n = 0
- while a == 0 and n <= 5:
- try:
- token = getToken(userName, secret, typeName, img_path)
- a = 1
- except:
- error_results = traceback.format_exc()
- print(f'getToken:{error_results}')
- tms.sleep(2)
- a = 0
- n += 1
- if typeName == 'xt4008':
- pageNum = 1
- pages_cnt = 1
- resp = {'data':[],'solutionCnt':0}
- while pageNum <= pages_cnt:
- url = f'https://www.xt4008.com/api/xuntec/callAcdReport/queryCallDetailPd?token={token}&page={pageNum}'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'}
- params = {"msisdn": userName,
- "caller": "",
- "called": "",
- "callSts": "",
- "callerCode": "",
- "startTime": lastCatchTime_ago_s,
- "endTime": lastCatchTime_now_s,
- "remarkStatus": "",
- "end_code": ""}
- resp_0 = requests.post(url, headers = headers, json = params)
- # print(pageNum,resp_0.text)
- resp['data'].append(resp_0.json())
- if pageNum == 1:
- pages_cnt = resp_0.json()['data']['pageCount']
- resp['solutionCnt'] = resp_0.json()['data']['total']
- else:
- ...
- pageNum += 1
- elif typeName == '400pt':
- cookie = f'{token};'
- # print(cookie)
- url = 'https://www.400pt.net/api/cust/report/customerReportCallListExport'
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
- 'content-type': 'application/x-www-form-urlencoded',
- 'Cookie': f'{cookie}uid=c_10974'}
- params = {'status': 1,
- 'startDate': lastCatchTime_ago_s,
- 'endDate': lastCatchTime_now_s,
- 'rows': 80,
- 'isForm': 1,
- 'page': 1}
- resp = requests.post(url, headers = headers, params = params)
- # print(resp.text)
- data = resp.content
- save_path = ''
- with open(save_path + '记录.csv', mode = "wb") as f:
- f.write(data)
- df_json = pd.read_csv('记录.csv', encoding = 'gbk', keep_default_na = '').to_dict('records')
- resp = {'data':df_json,'solutionCnt':len(df_json)}
- return resp
- def dump400Josn(db, userName, brandName, routeName, typeName, response_json):
- solutionJsonDatas = []
- datas = response_json['data']
- if typeName == 'xt4008':
- for data in datas:
- rows = data['data']['rows']
- if rows is None:
- ...
- else:
- for row in rows:
- clueId = row['call_id']
- commitTime = parse(row['formatter_calltime']).strftime("%Y-%m-%d %H:%M:%S")
- solutionType = typeName
- clueName = '未知'
- cluePhoneNumber = row['caller_id']
- flowChannelName = '-'
- try:
- area = row['callerCityName']
- except:
- area = '未知区域'
- searchWord = '-'
- keyword = '-'
- solutionInfo = json.dumps(row, ensure_ascii = False)
- userName = userName
- insert_valuse = (clueId, commitTime, solutionType, clueName, cluePhoneNumber, flowChannelName, area, searchWord, keyword,solutionInfo, userName, brandName, routeName)
- insert400_ad_solution_info(db, insert_valuse)
- solutionJsonData = {'clueId': clueId,
- 'commitTime': commitTime,
- 'solutionType': solutionType,
- 'clueName': clueName,
- 'cluePhoneNumber': cluePhoneNumber,
- 'flowChannelName': flowChannelName,
- 'area': area,
- 'searchWord': searchWord,
- 'keyword': keyword,
- 'userName': userName,
- 'brandName': brandName,
- 'routeName': routeName}
- solutionJsonDatas.append(solutionJsonData)
- elif typeName == '400pt':
- for data in datas:
- commitTime = parse(data['来电时间']).strftime("%Y-%m-%d %H:%M:%S")
- solutionType = typeName
- clueName = '未知'
- cluePhoneNumber = str(data['主叫号码'])
- clueId = str(int(parse(commitTime).timestamp()))+ cluePhoneNumber
- flowChannelName = '-'
- try:
- area = data['主叫归属地']
- except:
- area = '未知区域'
- searchWord = '-'
- keyword = '-'
- solutionInfo = json.dumps(data, ensure_ascii = False)
- userName = userName
- insert_valuse = (
- clueId, commitTime, solutionType, clueName, cluePhoneNumber, flowChannelName, area, searchWord, keyword,
- solutionInfo, userName, brandName, routeName)
- insert400_ad_solution_info(db, insert_valuse)
- solutionJsonData = {'clueId': clueId,
- 'commitTime': commitTime,
- 'solutionType': solutionType,
- 'clueName': clueName,
- 'cluePhoneNumber': cluePhoneNumber,
- 'flowChannelName': flowChannelName,
- 'area': area,
- 'searchWord': searchWord,
- 'keyword': keyword,
- 'userName': userName,
- 'brandName': brandName,
- 'routeName': routeName}
- solutionJsonDatas.append(solutionJsonData)
- return solutionJsonDatas
- # 运行数据获取及数据库数据录入
- def run400AD(db, userName, secret, typeName, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, result):
- solutionJsonDataList = []
- a = 0
- n = 0
- while a == 0 and n <= 10:
- try:
- response_json = p400AD(userName, secret, typeName, lastCatchTime_ago_s, lastCatchTime_now_s)
- a = 1
- except:
- tms.sleep(2)
- a = 0
- n += 1
- if n == 1:
- desc = f'[{result}]success'
- else:
- desc = f'[{result}]restart[{n}]times'
- solutionCnt = response_json['solutionCnt']
- solutionJsonDatas = dump400Josn(db, userName,brandName,routeName, typeName, response_json)
- solutionJsonDataList = solutionJsonDataList + solutionJsonDatas
- response_data = json.dumps(response_json,ensure_ascii = False)
- insert_valuse = (lastCatchTime_now_s, response_data, solutionCnt, desc, typeName, userName, brandName, routeName)
- insert400_ad_solution_catch(db, insert_valuse)
- return solutionCnt,solutionJsonDataList
- df_timeTable = pd.read_excel('timeTableForRunOrStop.xlsx',sheet_name = 'timeTable')
- list_date2str = lambda df_in : [data_out.strftime("%Y-%m-%d") for data_out in df_in]
- runDateList = list_date2str(list(df_timeTable[df_timeTable['status']=='run']['date']))
- stopDateList = list_date2str(list(df_timeTable[df_timeTable['status']=='stop']['date']))
- df_timeTable_dict = df_timeTable.to_dict('records')
- timeTable_df2dict = lambda df_in : {data_out['date'].strftime("%Y-%m-%d"):data_out['memo'] for data_out in df_in}
- timeTable_dict = timeTable_df2dict(df_timeTable_dict)
- def dateTimeStopRun(lastCatchTime_ago_s):
- weekday_text = ['周一','周二','周三','周四','周五','周六','周日']
- # url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=7d89dfd6-04f2-430f-9b14-e5c0ff38c9c8'
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- result = 'run'
- weekday_value = parse(lastCatchTime_ago_s).weekday()
- date_value = parse(lastCatchTime_ago_s).date().strftime("%Y-%m-%d")
- next_date_value = (parse(lastCatchTime_ago_s).date()+timedelta(days = 1)).strftime("%Y-%m-%d")
- time_value = (parse(lastCatchTime_ago_s )+ timedelta(seconds = -1)).time()
- start_time = time(9, 30)
- end_time = time(17, 50)
- if weekday_value in [0,1,2,3,4] or date_value in stopDateList:
- if start_time <= time_value <= end_time:
- result = 'stop'
- text = ''
- if time_value == start_time and date_value not in runDateList:
- text = f'**【{date_value}({weekday_text[weekday_value]})】\n自动值守【<font color=\"warning\">已关闭</font>】**\n请注意接听[09:30-18:00]期间的400来电,并即时录入推送!辛苦!'
- sendMsgToRot(url,text)
- elif time_value == end_time:
- if next_date_value in stopDateList:
- text = f'**【{date_value}({weekday_text[weekday_value]})】\n自动值守【<font color=\"info\">已开启</font>】**\n系统将自动推送[今日18:00-次日9:30]期间的400来电!\n' \
- f'**提醒:\n明天【{next_date_value}({weekday_text[weekday_value+1]})】为【<font color=\"warning\">工作日({timeTable_dict[next_date_value]})</font>】**\n请注意正常接听[09:30-18:00]期间的400来电,并即时录入推送!辛苦!'
- sendMsgToRot(url, text)
- elif next_date_value in runDateList:
- if date_value not in runDateList:
- text = f'**【{date_value}({weekday_text[weekday_value]})】\n自动值守【<font color=\"info\">已开启</font>】**\n系统将自动推送[今日18:00-假期结束次日9:30]期间的400来电!**\n预祝 {timeTable_dict[next_date_value]}假期愉快!**'
- sendMsgToRot(url, text)
- else:
- ...
- elif weekday_value+1 in [5]:
- text = f'**【{date_value}({weekday_text[weekday_value]})】\n自动值守【<font color=\"info\">已开启</font>】**\n系统将自动推送[今日18:00-下周一9:30]期间的400来电!**\n周末愉快!**'
- sendMsgToRot(url, text)
- else:
- text = f'**【{date_value}({weekday_text[weekday_value]})】\n自动值守【<font color=\"info\">已开启</font>】**\n系统将自动推送[今日18:00-次日9:30]期间的400来电!'
- sendMsgToRot(url, text)
- # print(text)
- else:
- if time_value == end_time and next_date_value in stopDateList:
- text = f'**提醒:\n明天【{next_date_value}({weekday_text[weekday_value + 1]})】为【<font color=\"warning\">工作日({timeTable_dict[next_date_value]})</font>】**\n请注意正常接听[09:30-18:00]期间的400来电,并即时录入推送!辛苦!'
- sendMsgToRot(url, text)
- if date_value in runDateList:
- result = 'run'
- return result
- # 多账号计时抓取投流数据
- def runADsolution(db,minutes):
- # 基础参数
- operation_results = '无异常'
- e = 'no'
- users_info = selectAllUsers(db)
- try:
- lastCatchTime_ago_s, lastCatchTime_now_s = select400_ad_time_meter(db,minutes)
- result = dateTimeStopRun(lastCatchTime_ago_s)
- for user_info in users_info:
- brandName = user_info['brandName']
- userName = user_info['userName']
- secret = user_info['secret']
- typeName = user_info['typeName']
- routeName = '400'
- solutionCntAll,solutionJsonDataList = run400AD(db, userName, secret, typeName, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, result)
- if result == 'stop':
- print(result)
- elif result == 'run':
- print(result)
- if 1==0:
- if solutionCntAll > 0:
- if len(str(solutionJsonDataList)) >= 5000:
- for solutionJsonDataList_0 in solutionJsonDataList:
- operation_results = orderTo7qiao(db, lastCatchTime_now_s, 1, solutionJsonDataList_0)
- tms.sleep(0.5)
- else:
- operation_results = orderTo7qiao(db, lastCatchTime_now_s, solutionCntAll, solutionJsonDataList)
- else:
- operation_results = '无异常'
- except Exception as ee:
- e = ee
- operation_results = traceback.format_exc()
- # 发送报错信息
- if operation_results != '无异常':
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cf7139e1-e623-41ca-8541-5dc6e26d43b0'
- msg_txt = f'<font color=\"warning\">【400数据同步】服务端出现异常,请管理员尽快处理!</font>\n>Exception->{e}\nDetail->{operation_results}'
- sendMsgToRot(url, msg_txt)
- a = 1/0
- else:
- update400_ad_time_meter(db, lastCatchTime_now_s)
- def run(db):
- # 初始化时间周期
- # 更新间隔
- minute = 10
- f_now = datetime.now ()
- f_mm = f_now.strftime ("%M")
- f_ss = f_now.strftime ("%S")
- f_c = int (f_mm) % minute
- wait_s = (minute - f_c - 1) * 60 + 59 - int (f_ss)
- tms.sleep (wait_s)
- # 执行循环
- while True:
- now = datetime.now ()
- mm = now.strftime ("%M")
- ss = now.strftime ("%S")
- c = int (mm) % minute
- if c == 0 and ss == '01':
- s = tms.time ()
- # print(now)
- runADsolution(db,minute)
- tms.sleep (minute * 60 - 0.5 - (tms.time () - s))
- if __name__ == '__main__':
- # host = 'clownted.top'
- # passwd = '111...Clown'
- # db_name = 'zuzu_data'
- # port = 63306
- if 1==1:
- host = 'localhost'
- passwd = '111???clown'
- db_name = 'hexingxing'
- port = 3306
- db = linkTomySql(host, passwd, db_name, port)
- # list_out = selectAllUsers(db)
- # print(list_out)
- if 1 == 1:
- userName = '4009017757'
- secret = '4N+0xuYCWiNoawOggredIQ=='
- typeName = 'xt4008'
- lastCatchTime_ago_s = '2023-12-01 00:00:00'
- lastCatchTime_now_s = '2023-12-07 00:00:59'
- brandName = '浆小白'
- routeName = '400'
- run400AD(db, userName, secret, typeName, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, 'run')
- userName = '4009018187'
- secret = 'Abcd123456'
- typeName = '400pt'
- run400AD(db, userName, secret, typeName, brandName, routeName, lastCatchTime_ago_s, lastCatchTime_now_s, 'run')
- if 1 == 0:
- getJsScript()
- # 更新至当前,用于重启服务时,自动更新数据
- minutes = 10 #时间间隔
- # minutes_for_d = 10 #延时时间
- lastCatchTime_ago_s, lastCatchTime_now_s = select400_ad_time_meter(db,minutes)
- cnt = int((datetime.now().timestamp() - parse(lastCatchTime_ago_s).timestamp())/60/minutes)*minutes
- # lastCatchTime_ago_s, lastCatchTime_now_s = select400_ad_time_meter(db, cnt)
- # print(cnt)
- # print(lastCatchTime_now_s)
- if cnt > 0 :
- runADsolution(db,cnt)
- else:
- ...
- tms.sleep(10)
- # 正式运行
- print('正式运行')
- run(db)
- db.close()
- # a = '2023-12-12 17:50:01'
- # dateTimeStopRun(a)
|