import requests from datetime import datetime, timedelta import json import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class MobgiAPIClient: def __init__(self): self.session = requests.Session() self.login_time = None self.expire_time = None self.api_base = "https://180.213.188.227" self.api_host = "cli1.mobgi.com" self.is_logged_in = False # 添加用于存储 cookie 的变量 self.session_cookie = None def login(self, email, password, expire_hours=24): """登录并保存会话状态""" # 清除可能的旧会话 self.session.cookies.clear() self.is_logged_in = False url = f"{self.api_base}/User/AdminUser/login" payload = {"email": email, "password": password, "product_version": 0} try: # 登录请求使用独立的 headers headers = {"Content-Type": "application/json", "Host": self.api_host} response = self.session.post( url, json=payload, headers=headers, verify=False, timeout=10 ) response.raise_for_status() # 解析响应 resp_data = response.json() if resp_data.get('code') != 0: logging.error(f"登录失败: {resp_data.get('message', '未知错误')}") return False # 提取并保存会话 cookie self.session_cookie = self._extract_session_cookie(response) if not self.session_cookie: logging.error("未能获取会话cookie") return False # 更新登录状态 self.login_time = datetime.now() self.expire_time = self.login_time + timedelta(hours=expire_hours) self.is_logged_in = True logging.info(f"登录成功! 会话有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}") logging.info(f"cookie: {self.session_cookie}") # return self.session_cookie return True except requests.exceptions.RequestException as e: logging.error(f"登录请求异常: {str(e)}") return False except json.JSONDecodeError: logging.error("登录响应解析失败: 无效的JSON格式") return False def _extract_session_cookie(self, response): """从响应中提取会话cookie""" # 直接查找特定的 cookie 名称 cookie_value = response.cookies.get("chuangliang_session") if cookie_value: return f"chuangliang_session={cookie_value};" # 如果无法直接获取,尝试从 Set-Cookie 头解析 set_cookie = response.headers.get("Set-Cookie") if set_cookie: # 简单地从 Set-Cookie 中提取会话信息 # 实际处理可能需要更复杂的逻辑 if "chuangliang_session=" in set_cookie: parts = set_cookie.split(";") for part in parts: if "chuangliang_session=" in part: return part.strip() + ";" return None def _get_request_headers(self): """构造请求头,包含必要的cookie""" headers = { "Content-Type": "application/json", # 添加其他可能需要的通用头 } if self.session_cookie: # 将会话 cookie 添加到 headers headers["cookie"] = self.session_cookie return headers def check_session(self): """检查会话有效性""" if not self.is_logged_in or not self.expire_time: logging.warning("会话未初始化,请先登录") return False if datetime.now() > self.expire_time: logging.warning(f"会话已过期,有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}") return False return True def __del__(self): """关闭会话""" self.session.close() logging.info("会话已关闭") def cookie(): client = MobgiAPIClient() email = "974509022@qq.com" # password = "0e3bc01f8f0409f4541015737925ed8e" # email = "chuangliang@lyzh.com" password = "0e3bc01f8f0409f4541015737925ed8e" client.login(email=email, password=password) cookies_str = client.session_cookie # 将字符串转换为字典 cookies_dict = {} parts = cookies_str.strip(';').split(';') for part in parts: part = part.strip() if '=' in part: key, value = part.split('=', 1) cookies_dict[key.strip()] = value.strip() return cookies_dict # 使用示例 # if __name__ == "__main__": # cookie = cookie() # # 初始化客户端 # client = MobgiAPIClient() # email = "974509022@qq.com" # password = "0e3bc01f8f0409f4541015737925ed8e" # client.login(email=email, password=password) # cookie = client.session_cookie # 登录(使用您提供的凭证) # if client.login( # email="974509022@qq.com", # password="0e3bc01f8f0409f4541015737925ed8e" # ): # logging.info(f"获取的会话Cookie: {client.session_cookie}") # # # 获取广告主列表 # advertiser_data = client.get_advertiser_list( # start_date="2025-07-01", # end_date="2025-07-31", # conditions=None, # page=1, # page_size=50 # ) # # if advertiser_data: # print("成功获取广告主列表:") # print(f"总记录数: {advertiser_data.get('total_count', 0)}") # print(f"总页数: {advertiser_data.get('total_page', 1)}") # print(f"当前页数据: {json.dumps(advertiser_data.get('list', []), indent=2, ensure_ascii=False)}") # else: # print("获取广告主列表失败") # else: # print("登录失败")