# -*- codeing = utf-8 -*- # @Time : 2023/4/11 19:08 # @Author : Clown # @File : demo_feiyu_ad_tk_refresh.py # @Software : PyCharm import pymysql import json import requests from datetime import datetime,timedelta def linkTomySql(host, passwd, db_name, port): '''连接至数据库返回【db】,v2新增local_infile=1 打开文件导入权限''' try: # 本地连接为:localhost 服务器连接为:124.222.188.59 db = pymysql.connect ( host=host, user="root", passwd=passwd, db=db_name, port=port, charset='utf8mb4', local_infile=1) print ('\nconnect to mysql server 成功') print ('---------------------------------------') except: print ("\ncould not connect to mysql server") db = "连接失败" return db # 保存tokne至数据库 def save_oauth_to_db(db,rsp_data,brandName): lastCatchTime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor = db.cursor() sql = '''INSERT INTO baidu_ad_tk_refresh VALUES (%s,%s,%s,%s);''' values = (lastCatchTime,brandName,rsp_data,0) cursor.execute(sql,values) db.commit() cursor.close() # 通过app_id查询secret def select_secret_by_app_id(db,app_id): cursor = db.cursor () sql = f'''SELECT * FROM baidu_ad_app_id_sct WHERE app_id = '{app_id}';''' cursor.execute (sql) out = cursor.fetchall ()[0] secret =out[2] brandName = out[1] cursor.close () return secret,brandName # 通过品牌查询app_id及对应sct def select_appId_secret_by_brandName(db,brandName): cursor = db.cursor () sql = f'''SELECT * FROM baidu_ad_app_id_sct WHERE brandName = '{brandName}';''' cursor.execute (sql) out = cursor.fetchall ()[0] app_id = out[0] secretKey = out[2] cursor.close () return app_id,secretKey # 通过验证码重置token def get_access_token_by_auth_code(app_id,auth_code,userId): """ auth_code换取access-token详见https://ad.oceanengine.com/openapi/doc/index.html?id=1696710505596940 :param auth_code: :return: """ host = '124.222.188.59' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 db = linkTomySql (host, passwd, db_name, port) secret,brandName = select_secret_by_app_id (db, app_id) open_api_url_prefix = "https://u.baidu.com/oauth/accessToken" url = open_api_url_prefix headers = {'Content-Type':'application/json;charset:utf-8'} data = { "appId": app_id, "secretKey": secret, "grantType": "auth_code", "authCode": auth_code, 'userId':userId } rsp = requests.post(url, json=data, headers=headers) rsp_data = rsp.text save_oauth_to_db(db,rsp_data,brandName) # 将access_token等信息进行存储 db.close () return # 查询token对应的json def selectAccessToken(db,brandName): cursor = db.cursor () sql = f'''SELECT * FROM baidu_ad_tk_refresh WHERE brandName = '{brandName}' ORDER BY lastCatchTime DESC;''' cursor.execute (sql) out = cursor.fetchall ()[0] json_data = out[2] cursor.close () return json_data # 刷新密钥 def refreshToken(db,brandName): url = 'https://u.baidu.com/oauth/refreshToken' headers = {'Content-Type': 'application/json;charset:utf-8'} json_data = selectAccessToken(db,brandName) json_data = json.loads (json_data)['data'] userId = json_data['userId'] refreshToken = json_data['refreshToken'] app_id, secretKey = select_appId_secret_by_brandName(db,brandName) data = {'appId':app_id, 'refreshToken':refreshToken, 'secretKey':secretKey, 'userId':userId} resp = requests.post(url,headers=headers,json=data).text save_oauth_to_db (db, resp, brandName) # 选择所有有效账户 def selectAllUsers(db): cursor = db.cursor () sql = f'''SELECT * FROM baidu_ad_app_id_sct WHERE state = 1;''' cursor.execute (sql) out = cursor.fetchall () list_out = [] for row in out: dict_out = {'brandName':row[1], 'userName':row[3]} list_out.append(dict_out) cursor.close() return list_out # 批量生成账号密钥队列 def getUserListByMccid(db,brandName,userName): json_data = selectAccessToken (db,brandName) json_data = json.loads (json_data)['data'] accessToken = json_data['accessToken'] # print(accessToken) url = 'https://api.baidu.com/json/feed/v1/MccFeedService/getUserListByMccid' headers = {"Accept-Encoding": "gzip, deflate", "Content-Type": "application/json", "Accept": "application/json"} data = {"header": { "userName": userName, "accessToken": accessToken, "action": "API-PYTHON"}, "body": {}} resp = requests.post(url,headers=headers,json=data).json() userList = resp['body']['data'] list_out = [] for user in userList: if user['remark'] not in ['资管','']: user_dict = {'userName':user['username'], 'accessToken':accessToken, 'brandName':user['remark'], 'routeName':'baidu'} list_out.append(user_dict) if len(list_out) == 0: list_out = [{'userName':userName, 'accessToken':accessToken, 'brandName':brandName, 'routeName':'baidu'}] return list_out # 获取所有百度投流账户信息 def getAllBaiDuUsersList(db): Users = selectAllUsers (db) out_list = [] for userInfo in Users: out = getUserListByMccid (db, userInfo['brandName'], userInfo['userName']) out_list = out + out_list print (out_list) return out_list # 批量更新密钥 def refreshAllBaiDuUsersToken(db): Users = selectAllUsers (db) for userInfo in Users: try: refreshToken (db, userInfo['brandName']) except: print(f'【{userInfo["brandName"]}】token 更新失败 ') if __name__ == '__main__': host = 'localhost' passwd = '111...Clown' db_name = 'zuzu_data' port = 63306 db = linkTomySql (host, passwd, db_name, port) getAllBaiDuUsersList(db) db.close() if 1==0: brandName = '咀咀集团' userName = '原生-hp-cxb餐01yxB23HY08201' # list_out = getUserListByMccid (brandName,userName) refreshToken (brandName) if 1==0: json_data = selectAccessToken(brandName) print (json_data) json_data = json.loads(json_data)['data'] url = 'https://u.baidu.com/oauth/getUserInfo' headers = {'Content-Type': 'application/json;charset:utf-8'} data = {'openId':json_data['openId'], 'accessToken':json_data['accessToken'], 'userId':json_data['userId'], 'needSubList':True, 'lastPageMaxUcId':1, 'pageSize':500} resp = requests.post(url,headers=headers,json=data).text print(resp)