123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- # -*- codeing = utf-8 -*-
- # @Time : 2023/12/18 16:44
- # @Author : Clown
- # @File : demo_ADCluesInspection.py
- # @Software : PyCharm
- import pymysql
- import requests
- import json
- from datetime import datetime,timedelta,time
- from dateutil.parser import parse
- from EC_招商CRM import runTrajectory,selectCustomsInfo,get_sign
- import time as tms
- def linkTomySql(host, passwd, db_name, port):
- '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限'''
- try:
- # 本地连接为:localhost 服务器连接为:124.222.188.59
- db = pymysql.connect (
- host=host, user="root",
- passwd=passwd,
- db=db_name,
- port=port,
- charset='utf8mb4',
- local_infile=1)
- print ('\nconnect to mysql server 成功')
- print ('---------------------------------------')
- except:
- print ("\ncould not connect to mysql server")
- db = "连接失败"
- return db
- def read_key_value_pair(db, brand_name, wm_plate, owner):
- '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出
- db:数据库,
- brand_name:品牌名,
- wm_plate:外卖平台MEITUAN或ELEME,
- owner:账号权限all或one
- '''
- cursor = db.cursor ()
- sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";'
- cursor.execute (sql)
- pair = json.loads (cursor.fetchall ()[0][0])
- return pair
- def sendMsgToRot(url,msg_txt):
- params_json = {
- "msgtype": "markdown",
- "markdown": {
- "content": msg_txt,
- "mentioned_list":["@all"]
- }}
- resp = requests.post(url,json=params_json).text
- print(resp)
- #更新客户跟进人
- def updateCsFollower(corpid,sign,timestamp,crmId,followUserIds):
- url = 'https://open.workec.com/v2/customer/change/user'
- headers = {'Content-Type': 'application/json',
- 'X-Ec-Cid': corpid,
- 'X-Ec-Sign': sign,
- 'X-Ec-TimeStamp': timestamp}
- json_params = {'optUserId': 17409174,
- 'crmIds': crmId,
- 'followUserId': followUserIds}
- resp = requests.post (url, headers=headers, json=json_params).json ()
- success = resp['data']['success']
- return success
- # 查询计时器时间,输出起始时间
- def selectcycle_time_meter(db,minutes):
- cursor = db.cursor ()
- sql = f'SELECT * FROM cycle_time_meter WHERE id = 1;'
- cursor.execute (sql)
- lastCatchTime_ago = cursor.fetchall ()[0][0]
- lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S')
- # 更新间隔
- lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S')
- cursor.close()
- return lastCatchTime_ago_s,lastCatchTime_now_s
- # 更新计时器时间
- def updatecycle_time_meter(db,lastCatchTime_now_s):
- cursor = db.cursor ()
- sql = f'UPDATE cycle_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;'
- cursor.execute(sql)
- db.commit()
- cursor.close()
- # 线索流传状态查询
- def ad_clues_inspection(crmIds):
- # url = 'http://clownted.top:63307/trajectory'
- params = {'crmIds': crmIds,
- 'memo': '0',
- 'lastContactTime': '0'}
- # headers = {"Content-Type": "application/json;charset=UTF-8"}
- # resp = requests.post(url, json = params, headers = headers).text
- resp = runTrajectory(params)
- return resp
- # 将线索资源写入数据库ad_clues_inspection_cycle
- def update_ad_clues_inspection(db,data_to_update):
- cursor = db.cursor()
- row_data = data_to_update
- sql = '''REPLACE INTO ad_clues_inspection_cycle VALUES (%s,%s,%s,%s,%s,%s,%s);'''
- cursor.execute(sql, row_data)
- db.commit ()
- def select_ad_clues_inspection(db,cluePhoneNumber):
- cursor = db.cursor()
- sql = f'''SELECT * FROM ad_clues_inspection_cycle WHERE cluePhoneNumber = '{cluePhoneNumber}';'''
- cursor.execute(sql)
- value_out_list = cursor.fetchall()
- cursor.close ()
- if len (value_out_list) == 0:
- result = 0
- else:
- result = 1
- return result
- def select_ad_clues_to_cycle(db,s_datetime,e_datetime):
- cursor = db.cursor()
- beginTime = '2023-12-25 09:30:00'
- sql = f'''SELECT * FROM ad_clues_inspection_cycle WHERE movingTime >= '{s_datetime}' and movingTime <= '{e_datetime}' and crmIdCreateTime >= '{beginTime}';'''
- # print(sql)
- cursor.execute(sql)
- value_out_list = cursor.fetchall()
- cursor.close()
- out_list = []
- for row in value_out_list:
- clue_phone = row[0]
- out_list.append(clue_phone)
- return out_list
- def select_ad_clues_error_state(db):
- cursor = db.cursor()
- sql = f'''SELECT * FROM (SELECT * FROM ad_clues_inspection_cycle WHERE crmId is NULL OR memo -> '$.result' = 'error')t1 WHERE SUBSTRING(cluePhoneNumber, 1, 1) <> '0';'''
- cursor.execute(sql)
- value_out_list = cursor.fetchall()
- cursor.close()
- out_list = []
- for row in value_out_list:
- clue_phone = row[0]
- out_list.append(clue_phone)
- return out_list
- def add_clues_to_cycle_list(db,s_datetime,e_datetime,cycle_hours):
- key_json = read_key_value_pair(db, '道一云', '7qiaoPlus', 'all')
- applicationId = '63631ee57005c0103426fdc7'
- Token = key_json['data']
- formModelId = '63631ef07c997625a7627ac1'
- s_datetime = (parse(s_datetime).timestamp()-60*20) * 1000
- e_datetime = parse(e_datetime).timestamp() * 1000
- page = 0
- pages = 1
- clue_photo_num_list = []
- while page < pages:
- page += 1
- url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query?page={page}'
- headers_api = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
- "Content-type": "application/json",
- "X-Auth0-Token": Token}
- # params = [{"fieldName": "录入时间", "logic": "ge", "value": int(s_datetime)},
- # {"fieldName": "录入时间", "logic": "le", "value": int(e_datetime)}]
- params = [{"fieldName": "录入时间", "logic": "between", "value": f'{int(s_datetime)},{int(e_datetime)}'}]
- print(params)
- resp = requests.post(url, headers = headers_api, json = params)
- # print(resp.text)
- resp_json = resp.json()
- pages = resp_json['data']['totalPage']
- currPage = resp_json['data']['currPage']
- clues_list = resp_json['data']['list']
- for clue_info in clues_list:
- # print(clue_info)
- clue_photo_num = clue_info['variables']['客户手机']
- clue_photo_num_list.append(clue_photo_num)
- clue_photo_num_list = clue_photo_num_list + select_ad_clues_error_state(db)
- clue_photo_num_list = list(set(clue_photo_num_list))
- for clue_phone in clue_photo_num_list:
- result = select_ad_clues_inspection(db, clue_phone)
- if result == 0:
- clue_data = (str(clue_phone), None, None, None, None, None, '首次创建')
- update_ad_clues_inspection(db, clue_data)
- else:
- ...
- x = 50
- n = int((len(clue_photo_num_list) - 0) / x)
- print(n)
- clue_photo_num_group = []
- for i in range(n + 1):
- # print(i*x,(i+1)*x)
- # print(len(clue_photo_num_list[i*x:(i+1)*x]))
- group = ','.join(clue_photo_num_list[i * x:(i + 1) * x])
- clue_photo_num_group.append(group)
- # print(clue_photo_num_group)
- insert_list = selectCustomsInfo(clue_photo_num_list, cycle_hours)
- for insert_data in insert_list:
- insert_data = insert_data[:5] + (
- json.dumps(ad_clues_inspection(insert_data[1]), ensure_ascii = False),) + insert_data[6:]
- # print(insert_data)
- if "电话" in insert_data[5]:
- insert_data = insert_data[:6] + ('正常保留',)
- else:
- insert_data = insert_data[:6] + ('异常流转0',)
- update_ad_clues_inspection(db, insert_data)
- def cycle_clues_to_m(db,s_datetime,e_datetime,cycle_hours,followUserIds):
- corpid = '17409173'
- timestamp = str(int(round(tms.time() * 1000)))
- app_id = '914890189023739904'
- app_secret = 'Ur2K41t71RrxYn7eWhN'
- sign = get_sign(app_id, app_secret, str(timestamp))
- cycle_phone_num_list = select_ad_clues_to_cycle(db, s_datetime, e_datetime)
- x = 50
- n = int((len(cycle_phone_num_list) - 0) / x)
- print(n)
- clue_photo_num_group = []
- for i in range(n + 1):
- # print(i*x,(i+1)*x)
- # print(len(clue_photo_num_list[i*x:(i+1)*x]))
- group = ','.join(cycle_phone_num_list[i * x:(i + 1) * x])
- clue_photo_num_group.append(group)
- # print(clue_photo_num_group)
- insert_list = selectCustomsInfo(cycle_phone_num_list, cycle_hours)
- for insert_data in insert_list:
- insert_data = insert_data[:5] + (
- json.dumps(ad_clues_inspection(insert_data[1]), ensure_ascii = False),) + insert_data[6:]
- # print(insert_data)
- if "电话" in insert_data[5]:
- insert_data = insert_data[:6] + ('正常保留',)
- else:
- insert_data = insert_data[:6] + ('异常流转1',)
- crmId = insert_data[1]
- url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=48eaf1ae-1e29-4a7d-9279-b2aaa0e218cf'
- name = json.loads(insert_data[5])['followerName']
- msg_txt = f'来自【{name}】的线索【{insert_data[0]}】已超过15小时未跟进'
- # sendMsgToRot(url, msg_txt)
- # updateCsFollower(corpid, sign, timestamp, crmId, followUserIds)
- update_ad_clues_inspection(db, insert_data)
- def run_cycle(db):
- # 初始化时间周期
- # 更新间隔
- minute = 10
- f_now = datetime.now()
- f_mm = f_now.strftime("%M")
- f_ss = f_now.strftime("%S")
- f_c = int(f_mm) % minute
- wait_s = (minute - f_c - 1) * 60 + 59 - int(f_ss)
- tms.sleep(wait_s)
- # 执行循环
- while True:
- now = datetime.now()
- mm = now.strftime("%M")
- ss = now.strftime("%S")
- c = int(mm) % minute
- if c == 0 and ss == '01':
- s = tms.time()
- # print(now)
- cycle_hours = 15 # 重新轮转间隔小时
- # minutes = 10 # 时间间隔分钟
- followUserIds = '19710025'
- s_datetime, e_datetime = selectcycle_time_meter(db, minute)
- print(s_datetime, e_datetime)
- add_clues_to_cycle_list(db, s_datetime, e_datetime, cycle_hours)
- cycle_clues_to_m(db, s_datetime, e_datetime, cycle_hours, followUserIds)
- updatecycle_time_meter(db,e_datetime)
- tms.sleep(minute * 60 - 0.5 - (tms.time() - s))
- if __name__ == '__main__':
- # host = 'clownted.top'
- # passwd = '111...Clown'
- # db_name = 'zuzu_data'
- # port = 63306
- host = 'localhost'
- passwd = '111???clown'
- db_name = 'hexingxing'
- port = 3306
- db = linkTomySql(host, passwd, db_name, port)
- cycle_hours = 15 #重新轮转间隔小时
- minutes = 10 # 时间间隔分钟
- followUserIds = '19710025'
- lastCatchTime_ago_s, lastCatchTime_now_s = selectcycle_time_meter(db, minutes)
- cnt = int((datetime.now().timestamp() - parse(lastCatchTime_ago_s).timestamp()) / 60 / minutes) * minutes - minutes
- if cnt > 0:
- s_datetime,e_datetime = selectcycle_time_meter(db,cnt)
- # e_datetime = '2023-12-25 23:59:59'
- print(s_datetime,e_datetime)
- add_clues_to_cycle_list(db, s_datetime, e_datetime, cycle_hours)
- cycle_clues_to_m(db, s_datetime, e_datetime, cycle_hours,followUserIds)
- updatecycle_time_meter(db, e_datetime)
- else:
- print('正式执行')
- tms.sleep(10)
- run_cycle(db)
- # ad_clues_inspection()
- if 1==0:
- key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all')
- applicationId = '63631ee57005c0103426fdc7'
- Token = key_json['data']
- formModelId = '63631ef07c997625a7627ac1'
- s_datetime = parse('2023-12-25 09:30:00').timestamp()*1000
- e_datetime = parse('2023-12-25 23:59:59').timestamp()*1000
- cycle_hours = 15
- # print(s_datetime,e_datetime)
- page = 0
- pages = 1
- clue_photo_num_list = []
- while page < pages:
- page += 1
- url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query?page={page}'
- headers_api = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36',
- "Content-type": "application/json",
- "X-Auth0-Token": Token}
- params = [ {"fieldName":"录入时间", "logic": "ge", "value":int(s_datetime)},{"fieldName":"录入时间", "logic": "le", "value":int(e_datetime)}]
- resp = requests.post(url,headers = headers_api,json=params)
- print(resp.text)
- resp_json = resp.json()
- pages = resp_json['data']['totalPage']
- currPage = resp_json['data']['currPage']
- clues_list = resp_json['data']['list']
- for clue_info in clues_list:
- # print(clue_info)
- clue_photo_num = clue_info['variables']['客户手机']
- clue_photo_num_list.append(clue_photo_num)
- clue_photo_num_list = list(set(clue_photo_num_list))
- for clue_phone in clue_photo_num_list:
- clue_data = (str(clue_phone),None,None,None,None,None,'首次创建')
- update_ad_clues_inspection(db, clue_data)
- if 1==1:
- x = 50
- n = int((len(clue_photo_num_list)-0)/x)
- print(n)
- clue_photo_num_group = []
- for i in range(n+1):
- # print(i*x,(i+1)*x)
- # print(len(clue_photo_num_list[i*x:(i+1)*x]))
- group = ','.join(clue_photo_num_list[i*x:(i+1)*x])
- clue_photo_num_group.append(group)
- print(clue_photo_num_group)
- insert_list = selectCustomsInfo(clue_photo_num_list,cycle_hours)
- for insert_data in insert_list:
- insert_data = insert_data[:5]+(json.dumps(ad_clues_inspection(insert_data[1]),ensure_ascii = False),)+insert_data[6:]
- print(insert_data)
- if "电话" in insert_data[5]:
- insert_data = insert_data[:6] + ('正常保留',)
- else:
- insert_data = insert_data[:6] + ('异常流转',)
- update_ad_clues_inspection(db, insert_data)
- db.close()
|