# -*- codeing = utf-8 -*- # @Time : 2023/12/18 16:44 # @Author : Clown # @File : demo_ADCluesInspection.py # @Software : PyCharm import pymysql import requests import json from datetime import datetime,timedelta,time from dateutil.parser import parse from EC_招商CRM import runTrajectory,selectCustomsInfo,get_sign import time as tms def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, port=port, charset='utf8mb4', local_infile=1) print ('\nconnect to mysql server 成功') print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db def read_key_value_pair(db, brand_name, wm_plate, owner): '''按条件读取,数据库中all_key_table表里的key_value_pair字段中的值,以键值对的形式输出 db:数据库, brand_name:品牌名, wm_plate:外卖平台MEITUAN或ELEME, owner:账号权限all或one ''' cursor = db.cursor () sql = f'SELECT key_value_pair FROM all_key_table WHERE brand_name = "{brand_name}" AND wm_plate = "{wm_plate}" AND owner = "{owner}";' cursor.execute (sql) pair = json.loads (cursor.fetchall ()[0][0]) return pair def sendMsgToRot(url,msg_txt): params_json = { "msgtype": "markdown", "markdown": { "content": msg_txt, "mentioned_list":["@all"] }} resp = requests.post(url,json=params_json).text print(resp) #更新客户跟进人 def updateCsFollower(corpid,sign,timestamp,crmId,followUserIds): url = 'https://open.workec.com/v2/customer/change/user' headers = {'Content-Type': 'application/json', 'X-Ec-Cid': corpid, 'X-Ec-Sign': sign, 'X-Ec-TimeStamp': timestamp} json_params = {'optUserId': 17409174, 'crmIds': crmId, 'followUserId': followUserIds} resp = requests.post (url, headers=headers, json=json_params).json () success = resp['data']['success'] return success # 查询计时器时间,输出起始时间 def selectcycle_time_meter(db,minutes): cursor = db.cursor () sql = f'SELECT * FROM cycle_time_meter WHERE id = 1;' cursor.execute (sql) lastCatchTime_ago = cursor.fetchall ()[0][0] lastCatchTime_ago_s = (lastCatchTime_ago + timedelta(seconds=1)).strftime('%Y-%m-%d %H:%M:%S') # 更新间隔 lastCatchTime_now_s = (lastCatchTime_ago + timedelta(minutes=minutes)).strftime('%Y-%m-%d %H:%M:%S') cursor.close() return lastCatchTime_ago_s,lastCatchTime_now_s # 更新计时器时间 def updatecycle_time_meter(db,lastCatchTime_now_s): cursor = db.cursor () sql = f'UPDATE cycle_time_meter SET lastCatchTime = "{lastCatchTime_now_s}" WHERE id = 1;' cursor.execute(sql) db.commit() cursor.close() # 线索流传状态查询 def ad_clues_inspection(crmIds): # url = 'http://clownted.top:63307/trajectory' params = {'crmIds': crmIds, 'memo': '0', 'lastContactTime': '0'} # headers = {"Content-Type": "application/json;charset=UTF-8"} # resp = requests.post(url, json = params, headers = headers).text resp = runTrajectory(params) return resp # 将线索资源写入数据库ad_clues_inspection_cycle def update_ad_clues_inspection(db,data_to_update): cursor = db.cursor() row_data = data_to_update sql = '''REPLACE INTO ad_clues_inspection_cycle VALUES (%s,%s,%s,%s,%s,%s,%s);''' cursor.execute(sql, row_data) db.commit () def select_ad_clues_inspection(db,cluePhoneNumber): cursor = db.cursor() sql = f'''SELECT * FROM ad_clues_inspection_cycle WHERE cluePhoneNumber = '{cluePhoneNumber}';''' cursor.execute(sql) value_out_list = cursor.fetchall() cursor.close () if len (value_out_list) == 0: result = 0 else: result = 1 return result def select_ad_clues_to_cycle(db,s_datetime,e_datetime): cursor = db.cursor() beginTime = '2023-12-25 09:30:00' sql = f'''SELECT * FROM ad_clues_inspection_cycle WHERE movingTime >= '{s_datetime}' and movingTime <= '{e_datetime}' and crmIdCreateTime >= '{beginTime}';''' # print(sql) cursor.execute(sql) value_out_list = cursor.fetchall() cursor.close() out_list = [] for row in value_out_list: clue_phone = row[0] out_list.append(clue_phone) return out_list def select_ad_clues_error_state(db): cursor = db.cursor() sql = f'''SELECT * FROM (SELECT * FROM ad_clues_inspection_cycle WHERE crmId is NULL OR memo -> '$.result' = 'error')t1 WHERE SUBSTRING(cluePhoneNumber, 1, 1) <> '0';''' cursor.execute(sql) value_out_list = cursor.fetchall() cursor.close() out_list = [] for row in value_out_list: clue_phone = row[0] out_list.append(clue_phone) return out_list def add_clues_to_cycle_list(db,s_datetime,e_datetime,cycle_hours): key_json = read_key_value_pair(db, '道一云', '7qiaoPlus', 'all') applicationId = '63631ee57005c0103426fdc7' Token = key_json['data'] formModelId = '63631ef07c997625a7627ac1' s_datetime = (parse(s_datetime).timestamp()-60*20) * 1000 e_datetime = parse(e_datetime).timestamp() * 1000 page = 0 pages = 1 clue_photo_num_list = [] while page < pages: page += 1 url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query?page={page}' headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} # params = [{"fieldName": "录入时间", "logic": "ge", "value": int(s_datetime)}, # {"fieldName": "录入时间", "logic": "le", "value": int(e_datetime)}] params = [{"fieldName": "录入时间", "logic": "between", "value": f'{int(s_datetime)},{int(e_datetime)}'}] print(params) resp = requests.post(url, headers = headers_api, json = params) # print(resp.text) resp_json = resp.json() pages = resp_json['data']['totalPage'] currPage = resp_json['data']['currPage'] clues_list = resp_json['data']['list'] for clue_info in clues_list: # print(clue_info) clue_photo_num = clue_info['variables']['客户手机'] clue_photo_num_list.append(clue_photo_num) clue_photo_num_list = clue_photo_num_list + select_ad_clues_error_state(db) clue_photo_num_list = list(set(clue_photo_num_list)) for clue_phone in clue_photo_num_list: result = select_ad_clues_inspection(db, clue_phone) if result == 0: clue_data = (str(clue_phone), None, None, None, None, None, '首次创建') update_ad_clues_inspection(db, clue_data) else: ... x = 50 n = int((len(clue_photo_num_list) - 0) / x) print(n) clue_photo_num_group = [] for i in range(n + 1): # print(i*x,(i+1)*x) # print(len(clue_photo_num_list[i*x:(i+1)*x])) group = ','.join(clue_photo_num_list[i * x:(i + 1) * x]) clue_photo_num_group.append(group) # print(clue_photo_num_group) insert_list = selectCustomsInfo(clue_photo_num_list, cycle_hours) for insert_data in insert_list: insert_data = insert_data[:5] + ( json.dumps(ad_clues_inspection(insert_data[1]), ensure_ascii = False),) + insert_data[6:] # print(insert_data) if "电话" in insert_data[5]: insert_data = insert_data[:6] + ('正常保留',) else: insert_data = insert_data[:6] + ('异常流转0',) update_ad_clues_inspection(db, insert_data) def cycle_clues_to_m(db,s_datetime,e_datetime,cycle_hours,followUserIds): corpid = '17409173' timestamp = str(int(round(tms.time() * 1000))) app_id = '914890189023739904' app_secret = 'Ur2K41t71RrxYn7eWhN' sign = get_sign(app_id, app_secret, str(timestamp)) cycle_phone_num_list = select_ad_clues_to_cycle(db, s_datetime, e_datetime) x = 50 n = int((len(cycle_phone_num_list) - 0) / x) print(n) clue_photo_num_group = [] for i in range(n + 1): # print(i*x,(i+1)*x) # print(len(clue_photo_num_list[i*x:(i+1)*x])) group = ','.join(cycle_phone_num_list[i * x:(i + 1) * x]) clue_photo_num_group.append(group) # print(clue_photo_num_group) insert_list = selectCustomsInfo(cycle_phone_num_list, cycle_hours) for insert_data in insert_list: insert_data = insert_data[:5] + ( json.dumps(ad_clues_inspection(insert_data[1]), ensure_ascii = False),) + insert_data[6:] # print(insert_data) if "电话" in insert_data[5]: insert_data = insert_data[:6] + ('正常保留',) else: insert_data = insert_data[:6] + ('异常流转1',) crmId = insert_data[1] url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=48eaf1ae-1e29-4a7d-9279-b2aaa0e218cf' name = json.loads(insert_data[5])['followerName'] msg_txt = f'来自【{name}】的线索【{insert_data[0]}】已超过15小时未跟进' # sendMsgToRot(url, msg_txt) # updateCsFollower(corpid, sign, timestamp, crmId, followUserIds) update_ad_clues_inspection(db, insert_data) def run_cycle(db): # 初始化时间周期 # 更新间隔 minute = 10 f_now = datetime.now() f_mm = f_now.strftime("%M") f_ss = f_now.strftime("%S") f_c = int(f_mm) % minute wait_s = (minute - f_c - 1) * 60 + 59 - int(f_ss) tms.sleep(wait_s) # 执行循环 while True: now = datetime.now() mm = now.strftime("%M") ss = now.strftime("%S") c = int(mm) % minute if c == 0 and ss == '01': s = tms.time() # print(now) cycle_hours = 15 # 重新轮转间隔小时 # minutes = 10 # 时间间隔分钟 followUserIds = '19710025' s_datetime, e_datetime = selectcycle_time_meter(db, minute) print(s_datetime, e_datetime) add_clues_to_cycle_list(db, s_datetime, e_datetime, cycle_hours) cycle_clues_to_m(db, s_datetime, e_datetime, cycle_hours, followUserIds) updatecycle_time_meter(db,e_datetime) tms.sleep(minute * 60 - 0.5 - (tms.time() - s)) if __name__ == '__main__': # host = 'clownted.top' # passwd = '111...Clown' # db_name = 'zuzu_data' # port = 63306 host = 'localhost' passwd = '111???clown' db_name = 'hexingxing' port = 3306 db = linkTomySql(host, passwd, db_name, port) cycle_hours = 15 #重新轮转间隔小时 minutes = 10 # 时间间隔分钟 followUserIds = '19710025' lastCatchTime_ago_s, lastCatchTime_now_s = selectcycle_time_meter(db, minutes) cnt = int((datetime.now().timestamp() - parse(lastCatchTime_ago_s).timestamp()) / 60 / minutes) * minutes - minutes if cnt > 0: s_datetime,e_datetime = selectcycle_time_meter(db,cnt) # e_datetime = '2023-12-25 23:59:59' print(s_datetime,e_datetime) add_clues_to_cycle_list(db, s_datetime, e_datetime, cycle_hours) cycle_clues_to_m(db, s_datetime, e_datetime, cycle_hours,followUserIds) updatecycle_time_meter(db, e_datetime) else: print('正式执行') tms.sleep(10) run_cycle(db) # ad_clues_inspection() if 1==0: key_json = read_key_value_pair (db, '道一云', '7qiaoPlus', 'all') applicationId = '63631ee57005c0103426fdc7' Token = key_json['data'] formModelId = '63631ef07c997625a7627ac1' s_datetime = parse('2023-12-25 09:30:00').timestamp()*1000 e_datetime = parse('2023-12-25 23:59:59').timestamp()*1000 cycle_hours = 15 # print(s_datetime,e_datetime) page = 0 pages = 1 clue_photo_num_list = [] while page < pages: page += 1 url = f'https://qiqiao.do1.com.cn/plus/cgi-bin/open/applications/{applicationId}/forms/{formModelId}/query?page={page}' headers_api = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36', "Content-type": "application/json", "X-Auth0-Token": Token} params = [ {"fieldName":"录入时间", "logic": "ge", "value":int(s_datetime)},{"fieldName":"录入时间", "logic": "le", "value":int(e_datetime)}] resp = requests.post(url,headers = headers_api,json=params) print(resp.text) resp_json = resp.json() pages = resp_json['data']['totalPage'] currPage = resp_json['data']['currPage'] clues_list = resp_json['data']['list'] for clue_info in clues_list: # print(clue_info) clue_photo_num = clue_info['variables']['客户手机'] clue_photo_num_list.append(clue_photo_num) clue_photo_num_list = list(set(clue_photo_num_list)) for clue_phone in clue_photo_num_list: clue_data = (str(clue_phone),None,None,None,None,None,'首次创建') update_ad_clues_inspection(db, clue_data) if 1==1: x = 50 n = int((len(clue_photo_num_list)-0)/x) print(n) clue_photo_num_group = [] for i in range(n+1): # print(i*x,(i+1)*x) # print(len(clue_photo_num_list[i*x:(i+1)*x])) group = ','.join(clue_photo_num_list[i*x:(i+1)*x]) clue_photo_num_group.append(group) print(clue_photo_num_group) insert_list = selectCustomsInfo(clue_photo_num_list,cycle_hours) for insert_data in insert_list: insert_data = insert_data[:5]+(json.dumps(ad_clues_inspection(insert_data[1]),ensure_ascii = False),)+insert_data[6:] print(insert_data) if "电话" in insert_data[5]: insert_data = insert_data[:6] + ('正常保留',) else: insert_data = insert_data[:6] + ('异常流转',) update_ad_clues_inspection(db, insert_data) db.close()