接口自动化 #3

Merged
xiaohai merged 1 commits from feature/合并接口自动化 into master 2025-09-18 09:42:42 +00:00
73 changed files with 2390 additions and 5 deletions

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:48
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/8/4 14:10
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:58
@Auth : 九月的海
@File : conftest.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
# conftest.py
import pytest
import logging
import time
import json
from pathlib import Path
from api_auto_framework.core.request_wrapper import RequestWrapper
# ========================== 全局日志配置 ==========================
def configure_logger(name):
"""配置全局日志记录器"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# 创建一个流处理器(控制台输出)
stream_handler = logging.StreamHandler()
# 创建格式器并添加到处理器
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
stream_handler.setFormatter(formatter)
# 确保只添加一个处理器
if not logger.hasHandlers():
logger.addHandler(stream_handler)
return logger
# ========================== 公共函数定义 ==========================
def log_request_details(test_data, logger):
"""记录请求详细信息到日志"""
logger.info(f"测试名称: {test_data['test_name']}")
logger.info(f"请求方法: {test_data['method']}")
logger.info(f"请求URL: {test_data['url']}")
if "headers" in test_data and test_data["headers"]:
logger.info("请求头:")
for k, v in test_data["headers"].items():
logger.info(f" {k}: {v}")
if "cookies" in test_data and test_data["cookies"]:
logger.info("Cookies:")
for k, v in test_data["cookies"].items():
logger.info(f" {k}: {v}")
if "body" in test_data and test_data.get("body"):
logger.info("请求体:" + test_data.get("body"))
else:
logger.info(" 无请求体")
def log_response_details(response, logger):
"""记录响应详细信息到日志"""
result = RequestWrapper.formatted_response(response)
logger.info(f"响应状态码: {result['status_code']}")
body_data = result.get("body", {})
logger.info(f"响应数据: {body_data}")
return result
# ========================== Fixture定义 ==========================
@pytest.fixture(scope="session")
def api_logger(request):
"""为每个测试提供专用日志记录器"""
logger_name = f"test_{request.node.name}"
return configure_logger(logger_name)
@pytest.fixture(scope="session")
def log_assert(api_logger):
"""带有日志记录的断言函数"""
def wrapper(condition, success_msg, fail_msg, actual_value=None, expected_value=None):
if condition:
api_logger.info(f"✔ PASS: {success_msg}")
if actual_value is not None and expected_value is not None:
api_logger.info(f" 实际值: {actual_value} | 期望值: {expected_value}")
else:
api_logger.error(f"✖ FAIL: {fail_msg}")
if actual_value is not None and expected_value is not None:
api_logger.error(f" 实际值: {actual_value} | 期望值: {expected_value}")
# 触发实际断言
assert condition, fail_msg
return wrapper
@pytest.fixture(scope="session")
def setup_teardown(request, api_logger):
"""测试的前置和后置操作"""
# 前置操作
api_logger.info(f"======= 开始测试: {request.node.name} =======")
start_time = time.time()
yield
# 后置操作
end_time = time.time()
duration = end_time - start_time
api_logger.info(f"测试执行时长: {duration:.2f}")
api_logger.info(f"======= 结束测试: {request.node.name} =======")

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:45
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:58
@Auth : 九月的海
@File : conftest.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import re
import json
import ast
import os
from pathlib import Path
import uuid
from urllib.parse import urlparse
import yaml
def parse_cookies(cookie_str):
"""解析cookies字符串为字典"""
cookies = {}
if not cookie_str:
return cookies
items = cookie_str.split(';')
for item in items:
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
return cookies
class CurlConverter:
@staticmethod
def parse_curl(curl_command):
"""解析cURL命令返回请求组件"""
# 方法解析
method = re.findall(r'-X\s+(\w+)', curl_command) or re.findall(r'--request\s+(\w+)', curl_command)
# URL解析
url_match = re.search(r"'(https?://[^']+)'|(https?://\S+)", curl_command)
# 头部解析
headers = dict(re.findall(r"-H\s+'([^:]+):\s*([^']+)'", curl_command))
# 数据处理
data_match = re.search(r"--data-raw\s+'(.*?)'", curl_command, re.DOTALL) # or \
# re.search(r"--data\s+'(.*?)'", curl_command, re.DOTALL) or \
# re.search(r"-d\s+'(.*?)'", curl_command, re.DOTALL)
# Cookie解析 (新增部分)
cookie_match = re.search(r"-b\s+'([^']+)'", curl_command) or \
re.search(r"--cookie\s+'([^']+)'", curl_command)
data = None
if data_match:
data_str = data_match.group(1).strip()
if data_str.startswith("{") or data_str.startswith("["):
try:
data = json.loads(data_str)
# 处理 conditions 字段(它实际上是字符串形式的 JSON
# if isinstance(data.get('conditions'), str):
# try:
# # 第二次解析:将字符串值转为真正的 JSON
# print(22222222222222)
# data['conditions'] = json.loads(data['conditions'])
# except json.JSONDecodeError:
# # 处理可能的解析错误
# pass
except json.JSONDecodeError as e:
print(e)
# 尝试Python dict解析
try:
data = ast.literal_eval(data_str)
except (SyntaxError, ValueError):
# 保留原始字符串
data = data_str
else:
data = data_str
# 解析Cookies
cookies = {}
if cookie_match:
cookies = parse_cookies(cookie_match.group(1))
return {
"method": method[0] if method else "GET",
"url": url_match.group(0).strip("'") if url_match else "",
"headers": headers,
"body": data,
"cookies": cookies # 新增的cookies字段
}
@staticmethod
def generate_test_case(curl_command, test_name="GeneratedTestCase"):
"""生成测试用例和测试数据"""
# 解析cURL命令
components = CurlConverter.parse_curl(curl_command)
URL = components["url"]
uri = urlparse(URL).path
# 生成唯一标识
# test_id = str(uuid.uuid4())[:8]
# safe_test_name = test_name.replace(" ", "_").lower()
# test_case_name = f"test_{safe_test_name}_{test_id}"
test_case_name = "test_" + uri.lstrip('/').replace('/', '_')
# 创建测试数据
test_data = {
"test_name": test_name,
"method": components["method"],
"url": components["url"],
"headers": components.get("headers", {}),
"cookies": components.get("cookies", {}), # 新增cookies字段
"body": components.get("body"),
"expected": {
"status_code": 200
}
}
print(components.get("body"))
# 创建文件路径
base_dir = Path(__file__).parents[1]
data_path = base_dir / "testdata" / f"{test_case_name}.yaml"
case_path = base_dir / "testcases" / f"{test_case_name}.py"
# 确保目录存在
data_path.parent.mkdir(parents=True, exist_ok=True)
case_path.parent.mkdir(parents=True, exist_ok=True)
# 写入测试数据文件UTF-8编码
with open(data_path, "w", encoding="utf-8") as f:
yaml.dump([test_data], f, sort_keys=False, allow_unicode=True)
# 生成测试用例模板包含cookies处理
test_case_template = f'''# -*- coding: utf-8 -*-
import pytest
import json
import os
import allure
from pathlib import Path
from api_auto_framework.core.request_wrapper import RequestWrapper
from api_auto_framework.utils.data_loader import load_test_data
from api_auto_framework.utils.assertion_tools import cookie
from api_auto_framework.utils.handle_path import testData_path
# 文件路径
TEST_DATA = Path(testData_path) / f"{test_case_name}.yaml"
@allure.feature("百度")
@allure.story("媒体账户")
@allure.title("{test_name}")
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.generated
def {test_case_name}():
"""Automatically generated test case: {test_name}"""
# 加载测试数据
test_data = load_test_data(TEST_DATA)[0]
# 准备请求参数
request_kwargs = {{
"method": test_data['method'],
"url": test_data['url'],
"headers": test_data.get('headers'),
"cookies": cookie() # 新增cookies参数
}}
# 智能选择请求体处理方式
body = test_data.get('body')
if body is not None:
if isinstance(body, (dict, list)):
request_kwargs["json"] = body
else:
request_kwargs["data"] = body
with allure.step("准备请求数据"):
allure.attach(f"URL: {URL}", "请求URL")
allure.attach(json.dumps(body), "请求体", allure.attachment_type.JSON)
# 发送请求并获取响应
with allure.step(f"发送请求:{test_case_name}"):
response = RequestWrapper.send_request(**request_kwargs)
if response.status_code != 200:
error_msg = f"请求失败,状态码: {{response.status_code}}"
allure.attach(error_msg, "请求错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)
with allure.step("处理请求"):
try:
result = RequestWrapper.formatted_response(response)
except Exception as e:
error_msg = f"响应处理失败: {{str(e)}}"
allure.attach(error_msg, "响应处理错误", allure.attachment_type.TEXT)
allure.attach(response.text, "原始响应内容", allure.attachment_type.TEXT)
pytest.fail(error_msg)
with allure.step("响应结果"):
allure.attach(str(response.status_code), "响应状态", allure.attachment_type.TEXT)
if result and "body" in result and "code" in result["body"]:
allure.attach(str(result["body"]["code"]), "响应状态码", allure.attachment_type.TEXT)
if result and "headers" in result:
allure.attach(json.dumps(result["headers"], indent=2),
"响应头", allure.attachment_type.JSON)
if result and "body" in result:
body_content = result["body"]
if isinstance(body_content, dict):
body_str = json.dumps(body_content, indent=2, ensure_ascii=False)
else:
body_str = str(body_content)
allure.attach(body_str, "响应内容", allure.attachment_type.JSON)
# 基础状态码断言
try:
assert result["status_code"] == test_data["expected"]["status_code"], f"状态码不匹配: {{result['status_code']}}{{test_data['expected']['status_code']}}"
assert result["body"]["code"] == 0, f"业务码非零: {{result['body']['code']}}"
assert result["body"]["message"] == "成功", f"消息不符: {{result['body']['message']}}"
except AssertionError as e:
allure.attach(str(e), "断言失败详情", allure.attachment_type.TEXT)
raise
except KeyError as e:
error_msg = f"响应体中缺少关键字段: {{str(e)}}"
allure.attach(error_msg, "响应体结构错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)
'''
# 用UTF-8编码写入测试用例文件
with open(case_path, "w", encoding="utf-8") as f:
f.write(test_case_template)
print(case_path)
print(data_path)
return str(case_path), str(data_path)

View File

@ -0,0 +1,37 @@
import re
import yaml
import json
curl_example = r"""
curl 'https://cli1.mobgi.com/Baidu/Advertiser/getList' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Accept-Language: zh-CN,zh;q=0.9' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json;charset=UTF-8' \
-b 'userId=100362; chuangliang_session=42YqwTsP4e98gPzwcQ3u83vQE984OnqvER2pnAxk; idea_token=b5555f58-6db0-11f0-b1e3-00163e4f2804' \
-H 'Origin: https://cl.mobgi.com' \
-H 'Referer: https://cl.mobgi.com/' \
-H 'Sec-Fetch-Dest: empty' \
-H 'Sec-Fetch-Mode: cors' \
-H 'Sec-Fetch-Site: same-site' \
-H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36' \
-H 'client-user: 100362' \
-H 'ff-request-id: 20250731201346e309169d13454d12bca544a779fadbb0' \
-H 'main-user-id: 100362' \
-H 'sec-ch-ua: "Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"' \
-H 'sec-ch-ua-mobile: ?0' \
-H 'sec-ch-ua-platform: "Windows"' \
--data-raw '{"conditions":"{\"companys\":[],\"cl_project_id\":[],\"optimize_user_id\":[],\"keyword\":\"\"}","start_date":"2025-07-31","end_date":"2025-07-31","page":1,"page_size":20,"total_count":2,"total_page":1,"sort_field":"","sort_direction":"","data_type":"list"}'
"""
# 步骤1提取原始数据
pattern = r"--data-raw\s+'(.*?)'"
match = re.search(pattern, curl_example, re.DOTALL)
if not match:
raise ValueError("未找到--data-raw数据")
data_raw_str = match.group(1)
print(data_raw_str)
print(json.loads(data_raw_str))
yaml_data = yaml.dump(data_raw_str)
print(yaml_data)

View File

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:46
@Auth : 九月的海
@File : request_wrapper.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import requests
from urllib.parse import urlparse, urlunparse
class RequestWrapper:
@staticmethod
def environment(url, env='prod'):
"""根据环境替换URL的域名部分"""
# 环境域名映射配置
env_mapping = {
'dev': '180.184.59.85', # 预发环境
'test': '180.184.58.89', # 测试环境
'prod': '180.213.188.227' # 生产环境
}
# 解析原始URL
parsed = urlparse(url)
# 获取新域名(默认为原始域名)
new_domain = env_mapping.get(env, parsed.netloc)
# 重建URL并返回
new_parsed = parsed._replace(netloc=new_domain)
return urlunparse(new_parsed)
@staticmethod
def send_request(method, url, **kwargs):
"""发送HTTP请求"""
try:
processed_url = RequestWrapper.environment(url, env=kwargs.pop('env', 'prod'))
response = requests.request(
method=method.upper(),
url=processed_url,
verify=False,
**kwargs
)
return response
except requests.RequestException as e:
raise ConnectionError(f"请求失败: {str(e)}")
@staticmethod
def formatted_response(response):
"""格式化响应结果"""
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": response.json() if response.content else {}
}

View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:59
@Auth : 九月的海
@File : generate_test.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
from core.curl_converter import CurlConverter
curl_example = r"""
curl 'https://cli1.mobgi.com/Baidu/Common/baiduRequest' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Accept-Language: zh-CN,zh;q=0.9' \
-H 'Cache-Control: no-cache' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json;charset=UTF-8' \
-b 'chuangliang_session=UBPWTe0ZQisVQsJdmcZVblYLTtrm58Ccm3Yb4ofL; userId=100362; idea_token=cc39ab3c-892d-11f0-94a2-00163e5c152e' \
-H 'Origin: https://cl.mobgi.com' \
-H 'Pragma: no-cache' \
-H 'Referer: https://cl.mobgi.com/' \
-H 'Sec-Fetch-Dest: empty' \
-H 'Sec-Fetch-Mode: cors' \
-H 'Sec-Fetch-Site: same-site' \
-H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36' \
-H 'client-user: 100362' \
-H 'ff-request-id: 202509042007234bbedf401c104521a788a0a4b70e58ec' \
-H 'main-user-id: 100362' \
-H 'sec-ch-ua: "Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"' \
-H 'sec-ch-ua-mobile: ?0' \
-H 'sec-ch-ua-platform: "Windows"' \
--data-raw '{"media_account_id":12472176893,"url":"ImageManagementService/qualityCheck","api_type":"feed","body":{"imageUrls":["https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg"]}}'
"""
# 生成测试用例
case_path, data_path = CurlConverter.generate_test_case(curl_example, "百度检测低质封面")

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
URL: https://cli1.mobgi.com/Baidu/Common/baiduRequest

View File

@ -0,0 +1 @@
{"media_account_id": 12472176893, "url": "ImageManagementService/qualityCheck", "api_type": "feed", "body": {"imageUrls": ["https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg"]}}

View File

@ -0,0 +1 @@
请求失败,状态码: 403

View File

@ -0,0 +1 @@
请求参数: {'media_account_id': 12472176893, 'url': 'ImageManagementService/qualityCheck', 'api_type': 'feed', 'body': {'imageUrls': ['https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg']}}

View File

@ -0,0 +1,2 @@
"BROKEN","EPIC","FAILED","FEATURE","PASSED","SKIPPED","STORY","UNKNOWN"
"0","","1","百度","0","0","广告搭建","0"
1 BROKEN EPIC FAILED FEATURE PASSED SKIPPED STORY UNKNOWN
2 0 1 百度 0 0 广告搭建 0

View File

@ -0,0 +1 @@
{"uid":"b1a8273437954620fa374b796ffaacdd","name":"behaviors","children":[{"name":"百度","children":[{"name":"广告搭建","children":[{"name":"百度检测低质封面","uid":"cb81e5a5d63827b","parentUid":"0fc562644cfaa728f98869bb1e45f955","status":"failed","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"flaky":false,"newFailed":false,"newPassed":false,"newBroken":false,"retriesCount":0,"retriesStatusChange":false,"parameters":[],"tags":["generated"]}],"uid":"0fc562644cfaa728f98869bb1e45f955"}],"uid":"ee29b3803b6a8097b75893210fac59bf"}]}

View File

@ -0,0 +1,2 @@
"BROKEN","CATEGORY","FAILED","PASSED","SKIPPED","UNKNOWN"
"0","Product defects","1","0","0","0"
1 BROKEN CATEGORY FAILED PASSED SKIPPED UNKNOWN
2 0 Product defects 1 0 0 0

View File

@ -0,0 +1 @@
{"uid":"4b4757e66a1912dae1a509f688f20b0f","name":"categories","children":[{"name":"Product defects","children":[{"name":"Failed: 请求失败,状态码: 403","children":[{"name":"百度检测低质封面","uid":"cb81e5a5d63827b","parentUid":"1c37a56cfe4a7673235a503e17d96376","status":"failed","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"flaky":false,"newFailed":false,"newPassed":false,"newBroken":false,"retriesCount":0,"retriesStatusChange":false,"parameters":[],"tags":["generated"]}],"uid":"1c37a56cfe4a7673235a503e17d96376"}],"uid":"8fb3a91ba5aaf9de24cc8a92edc82b5d"}]}

View File

@ -0,0 +1 @@
{"uid":"83edc06c07f9ae9e47eb6dd1b683e4e2","name":"packages","children":[{"name":"test_Baidu_Common_baiduRequest","children":[{"name":"百度检测低质封面","uid":"cb81e5a5d63827b","parentUid":"f3bd642c0e98a29fa20daeb0eaafac81","status":"failed","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"flaky":false,"newFailed":false,"newPassed":false,"newBroken":false,"retriesCount":0,"retriesStatusChange":false,"parameters":[],"tags":["generated"]}],"uid":"f3bd642c0e98a29fa20daeb0eaafac81"}]}

View File

@ -0,0 +1,2 @@
"DESCRIPTION","DURATION IN MS","NAME","PARENT SUITE","START TIME","STATUS","STOP TIME","SUB SUITE","SUITE","TEST CLASS","TEST METHOD"
"Automatically generated test case: 百度检测低质封面","671","百度检测低质封面","","Fri Sep 12 20:41:12 CST 2025","failed","Fri Sep 12 20:41:13 CST 2025","TestBaiduCommonBaiduRequest","test_Baidu_Common_baiduRequest","",""
1 DESCRIPTION DURATION IN MS NAME PARENT SUITE START TIME STATUS STOP TIME SUB SUITE SUITE TEST CLASS TEST METHOD
2 Automatically generated test case: 百度检测低质封面 671 百度检测低质封面 Fri Sep 12 20:41:12 CST 2025 failed Fri Sep 12 20:41:13 CST 2025 TestBaiduCommonBaiduRequest test_Baidu_Common_baiduRequest

View File

@ -0,0 +1 @@
{"uid":"98d3104e051c652961429bf95fa0b5d6","name":"suites","children":[{"name":"test_Baidu_Common_baiduRequest","children":[{"name":"TestBaiduCommonBaiduRequest","children":[{"name":"百度检测低质封面","uid":"cb81e5a5d63827b","parentUid":"28b6ea9b5a46d67120da780eed056407","status":"failed","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"flaky":false,"newFailed":false,"newPassed":false,"newBroken":false,"retriesCount":0,"retriesStatusChange":false,"parameters":[],"tags":["generated"]}],"uid":"28b6ea9b5a46d67120da780eed056407"}],"uid":"d53672d342f8a81ff0478bab5fec903a"}]}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
{"uid":"ab17fc5a4eb3bca4b216b548c7f9fcbc","name":"timeline","children":[{"name":"September","children":[{"name":"20148-MainThread","children":[{"name":"百度检测低质封面","uid":"cb81e5a5d63827b","parentUid":"330be67bd965875fe78a9b34e3bd82c4","status":"failed","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"flaky":false,"newFailed":false,"newPassed":false,"newBroken":false,"retriesCount":0,"retriesStatusChange":false,"parameters":[],"tags":["generated"]}],"uid":"330be67bd965875fe78a9b34e3bd82c4"}],"uid":"8642e8684f9071a14d748e846af0185a"}]}

View File

@ -0,0 +1,14 @@
launch_status failed=1 1757680875000000000
launch_status broken=0 1757680875000000000
launch_status passed=0 1757680875000000000
launch_status skipped=0 1757680875000000000
launch_status unknown=0 1757680875000000000
launch_time duration=671 1757680875000000000
launch_time min_duration=671 1757680875000000000
launch_time max_duration=671 1757680875000000000
launch_time sum_duration=671 1757680875000000000
launch_time start=1757680872785 1757680875000000000
launch_time stop=1757680873456 1757680875000000000
launch_problems product_defects=1 1757680875000000000
launch_retries retries=0 1757680875000000000
launch_retries run=1 1757680875000000000

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Allure Report summary mail</title>
</head>
<body>
Mail body
</body>
</html>

View File

@ -0,0 +1,14 @@
launch_status_failed 1
launch_status_broken 0
launch_status_passed 0
launch_status_skipped 0
launch_status_unknown 0
launch_time_duration 671
launch_time_min_duration 671
launch_time_max_duration 671
launch_time_sum_duration 671
launch_time_start 1757680872785
launch_time_stop 1757680873456
launch_problems_product_defects 1
launch_retries_retries 0
launch_retries_run 1

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1 @@
[{"data":{"Product defects":1}}]

View File

@ -0,0 +1 @@
[{"data":{"duration":671}}]

View File

@ -0,0 +1 @@
[{"data":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1}}]

View File

@ -0,0 +1 @@
{"9305aa8fe177b3ec8014321394155033":{"statistic":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1},"items":[{"uid":"cb81e5a5d63827b","status":"failed","statusDetails":"Failed: 请求失败,状态码: 403","time":{"start":1757680872785,"stop":1757680873456,"duration":671}}]}}

View File

@ -0,0 +1 @@
[{"data":{"run":1,"retry":0}}]

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html dir="ltr" lang="en">
<head>
<meta charset="utf-8">
<title>Allure Report</title>
<link rel="icon" href="favicon.ico">
<link rel="stylesheet" type="text/css" href="styles.css">
<link rel="stylesheet" type="text/css" href="plugin/screen-diff/styles.css">
</head>
<body>
<div id="alert"></div>
<div id="content">
<span class="spinner">
<span class="spinner__circle"></span>
</span>
</div>
<div id="popup"></div>
<script src="app.js"></script>
<script src="plugin/behaviors/index.js"></script>
<script src="plugin/packages/index.js"></script>
<script src="plugin/screen-diff/index.js"></script>
<script async src="https://www.googletagmanager.com/gtag/js?id=G-FVWC4GKEYS"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'G-FVWC4GKEYS', {
'allureVersion': '2.30.0',
'reportUuid': '9c64cb40-62ff-4590-aacf-441ddce2b06d',
'single_file': false
});
</script>
</body>
</html>

View File

@ -0,0 +1,250 @@
'use strict';
allure.api.addTranslation('en', {
tab: {
behaviors: {
name: 'Behaviors'
}
},
widget: {
behaviors: {
name: 'Features by stories',
showAll: 'show all'
}
}
});
allure.api.addTranslation('ru', {
tab: {
behaviors: {
name: 'Функциональность'
}
},
widget: {
behaviors: {
name: 'Функциональность',
showAll: 'показать все'
}
}
});
allure.api.addTranslation('zh', {
tab: {
behaviors: {
name: '功能'
}
},
widget: {
behaviors: {
name: '特性场景',
showAll: '显示所有'
}
}
});
allure.api.addTranslation('de', {
tab: {
behaviors: {
name: 'Verhalten'
}
},
widget: {
behaviors: {
name: 'Features nach Stories',
showAll: 'Zeige alle'
}
}
});
allure.api.addTranslation('nl', {
tab: {
behaviors: {
name: 'Functionaliteit'
}
},
widget: {
behaviors: {
name: 'Features en storys',
showAll: 'Toon alle'
}
}
});
allure.api.addTranslation('he', {
tab: {
behaviors: {
name: 'התנהגויות'
}
},
widget: {
behaviors: {
name: 'תכונות לפי סיפורי משתמש',
showAll: 'הצג הכול'
}
}
});
allure.api.addTranslation('br', {
tab: {
behaviors: {
name: 'Comportamentos'
}
},
widget: {
behaviors: {
name: 'Funcionalidades por história',
showAll: 'Mostrar tudo'
}
}
});
allure.api.addTranslation('ja', {
tab: {
behaviors: {
name: '振る舞い'
}
},
widget: {
behaviors: {
name: 'ストーリー別の機能',
showAll: '全て表示'
}
}
});
allure.api.addTranslation('es', {
tab: {
behaviors: {
name: 'Funcionalidades'
}
},
widget: {
behaviors: {
name: 'Funcionalidades por Historias de Usuario',
showAll: 'mostrar todo'
}
}
});
allure.api.addTranslation('kr', {
tab: {
behaviors: {
name: '동작'
}
},
widget: {
behaviors: {
name: '스토리별 기능',
showAll: '전체 보기'
}
}
});
allure.api.addTranslation('fr', {
tab: {
behaviors: {
name: 'Comportements'
}
},
widget: {
behaviors: {
name: 'Thèmes par histoires',
showAll: 'Montrer tout'
}
}
});
allure.api.addTranslation('pl', {
tab: {
behaviors: {
name: 'Zachowania'
}
},
widget: {
behaviors: {
name: 'Funkcje według historii',
showAll: 'pokaż wszystko'
}
}
});
allure.api.addTranslation('az', {
tab: {
behaviors: {
name: 'Davranışlar'
}
},
widget: {
behaviors: {
name: 'Hekayələr üzrə xüsusiyyətlər',
showAll: 'hamısını göstər'
}
}
});
allure.api.addTranslation('sv', {
tab: {
behaviors: {
name: 'Beteenden'
}
},
widget: {
behaviors: {
name: 'Funktioner efter user stories',
showAll: 'visa allt'
}
}
});
allure.api.addTranslation('isv', {
tab: {
behaviors: {
name: 'Funkcionalnost',
}
},
widget: {
behaviors: {
name: 'Funkcionalnost',
showAll: 'pokaži vsěčto',
}
}
});
allure.api.addTranslation('ka', {
tab: {
behaviors: {
name: 'ფუნქციონალი',
}
},
widget: {
behaviors: {
name: 'ფუნქციონალი',
showAll: 'ყველას ჩვენება',
}
}
});
allure.api.addTab('behaviors', {
title: 'tab.behaviors.name', icon: 'fa fa-list',
route: 'behaviors(/)(:testGroup)(/)(:testResult)(/)(:testResultTab)(/)',
onEnter: (function (testGroup, testResult, testResultTab) {
return new allure.components.TreeLayout({
testGroup: testGroup,
testResult: testResult,
testResultTab: testResultTab,
tabName: 'tab.behaviors.name',
baseUrl: 'behaviors',
url: 'data/behaviors.json',
csvUrl: 'data/behaviors.csv'
});
})
});
allure.api.addWidget('widgets', 'behaviors', allure.components.WidgetStatusView.extend({
rowTag: 'a',
title: 'widget.behaviors.name',
baseUrl: 'behaviors',
showLinks: true
}));

View File

@ -0,0 +1,143 @@
'use strict';
allure.api.addTranslation('en', {
tab: {
packages: {
name: 'Packages'
}
}
});
allure.api.addTranslation('ru', {
tab: {
packages: {
name: 'Пакеты'
}
}
});
allure.api.addTranslation('zh', {
tab: {
packages: {
name: '包'
}
}
});
allure.api.addTranslation('de', {
tab: {
packages: {
name: 'Pakete'
}
}
});
allure.api.addTranslation('nl', {
tab: {
packages: {
name: 'Packages'
}
}
});
allure.api.addTranslation('he', {
tab: {
packages: {
name: 'חבילות'
}
}
});
allure.api.addTranslation('br', {
tab: {
packages: {
name: 'Pacotes'
}
}
});
allure.api.addTranslation('ja', {
tab: {
packages: {
name: 'パッケージ'
}
}
});
allure.api.addTranslation('es', {
tab: {
packages: {
name: 'Paquetes'
}
}
});
allure.api.addTranslation('kr', {
tab: {
packages: {
name: '패키지'
}
}
});
allure.api.addTranslation('fr', {
tab: {
packages: {
name: 'Paquets'
}
}
});
allure.api.addTranslation('pl', {
tab: {
packages: {
name: 'Pakiety'
}
}
});
allure.api.addTranslation('az', {
tab: {
packages: {
name: 'Paketlər'
}
}
});
allure.api.addTranslation('sv', {
tab: {
packages: {
name: 'Paket'
}
}
});
allure.api.addTranslation('isv', {
tab: {
packages: {
name: 'Pakety'
}
}
});
allure.api.addTranslation('ka', {
tab: {
packages: {
name: 'პაკეტები'
}
}
});
allure.api.addTab('packages', {
title: 'tab.packages.name', icon: 'fa fa-align-left',
route: 'packages(/)(:testGroup)(/)(:testResult)(/)(:testResultTab)(/)',
onEnter: (function (testGroup, testResult, testResultTab) {
return new allure.components.TreeLayout({
testGroup: testGroup,
testResult: testResult,
testResultTab: testResultTab,
tabName: 'tab.packages.name',
baseUrl: 'packages',
url: 'data/packages.json'
});
})
});

View File

@ -0,0 +1,200 @@
(function () {
var settings = allure.getPluginSettings('screen-diff', { diffType: 'diff' });
function renderImage(src) {
return (
'<div class="screen-diff__container">' +
'<img class="screen-diff__image" src="' +
src +
'">' +
'</div>'
);
}
function findImage(data, name) {
if (data.testStage && data.testStage.attachments) {
var matchedImage = data.testStage.attachments.filter(function (attachment) {
return attachment.name === name;
})[0];
if (matchedImage) {
return 'data/attachments/' + matchedImage.source;
}
}
return null;
}
function renderDiffContent(type, diffImage, actualImage, expectedImage) {
if (type === 'diff') {
if (diffImage) {
return renderImage(diffImage);
}
}
if (type === 'overlay' && expectedImage) {
return (
'<div class="screen-diff__overlay screen-diff__container">' +
'<img class="screen-diff__image" src="' +
expectedImage +
'">' +
'<div class="screen-diff__image-over">' +
'<img class="screen-diff__image" src="' +
actualImage +
'">' +
'</div>' +
'</div>'
);
}
if (actualImage) {
return renderImage(actualImage);
}
return 'No diff data provided';
}
var TestResultView = Backbone.Marionette.View.extend({
regions: {
subView: '.screen-diff-view',
},
template: function () {
return '<div class="screen-diff-view"></div>';
},
onRender: function () {
var data = this.model.toJSON();
var testType = data.labels.filter(function (label) {
return label.name === 'testType';
})[0];
var diffImage = findImage(data, 'diff');
var actualImage = findImage(data, 'actual');
var expectedImage = findImage(data, 'expected');
if (!testType || testType.value !== 'screenshotDiff') {
return;
}
this.showChildView(
'subView',
new ScreenDiffView({
diffImage: diffImage,
actualImage: actualImage,
expectedImage: expectedImage,
}),
);
},
});
var ErrorView = Backbone.Marionette.View.extend({
templateContext: function () {
return this.options;
},
template: function (data) {
return '<pre class="screen-diff-error">' + data.error + '</pre>';
},
});
var AttachmentView = Backbone.Marionette.View.extend({
regions: {
subView: '.screen-diff-view',
},
template: function () {
return '<div class="screen-diff-view"></div>';
},
onRender: function () {
jQuery
.getJSON(this.options.sourceUrl)
.then(this.renderScreenDiffView.bind(this), this.renderErrorView.bind(this));
},
renderErrorView: function (error) {
console.log(error);
this.showChildView(
'subView',
new ErrorView({
error: error.statusText,
}),
);
},
renderScreenDiffView: function (data) {
this.showChildView(
'subView',
new ScreenDiffView({
diffImage: data.diff,
actualImage: data.actual,
expectedImage: data.expected,
}),
);
},
});
var ScreenDiffView = Backbone.Marionette.View.extend({
className: 'pane__section',
events: function () {
return {
['click [name="screen-diff-type-' + this.cid + '"]']: 'onDiffTypeChange',
'mousemove .screen-diff__overlay': 'onOverlayMove',
};
},
initialize: function (options) {
this.diffImage = options.diffImage;
this.actualImage = options.actualImage;
this.expectedImage = options.expectedImage;
this.radioName = 'screen-diff-type-' + this.cid;
},
templateContext: function () {
return {
diffType: settings.get('diffType'),
diffImage: this.diffImage,
actualImage: this.actualImage,
expectedImage: this.expectedImage,
radioName: this.radioName,
};
},
template: function (data) {
if (!data.diffImage && !data.actualImage && !data.expectedImage) {
return '';
}
return (
'<h3 class="pane__section-title">Screen Diff</h3>' +
'<div class="screen-diff__content">' +
'<div class="screen-diff__switchers">' +
'<label><input type="radio" name="' +
data.radioName +
'" value="diff"> Show diff</label>' +
'<label><input type="radio" name="' +
data.radioName +
'" value="overlay"> Show overlay</label>' +
'</div>' +
renderDiffContent(
data.diffType,
data.diffImage,
data.actualImage,
data.expectedImage,
) +
'</div>'
);
},
adjustImageSize: function (event) {
var overImage = this.$(event.target);
overImage.width(overImage.width());
},
onRender: function () {
const diffType = settings.get('diffType');
this.$('[name="' + this.radioName + '"][value="' + diffType + '"]').prop(
'checked',
true,
);
if (diffType === 'overlay') {
this.$('.screen-diff__image-over img').on('load', this.adjustImageSize.bind(this));
}
},
onOverlayMove: function (event) {
var pageX = event.pageX;
var containerScroll = this.$('.screen-diff__container').scrollLeft();
var elementX = event.currentTarget.getBoundingClientRect().left;
var delta = pageX - elementX + containerScroll;
this.$('.screen-diff__image-over').width(delta);
},
onDiffTypeChange: function (event) {
settings.save('diffType', event.target.value);
this.render();
},
});
allure.api.addTestResultBlock(TestResultView, { position: 'before' });
allure.api.addAttachmentViewer('application/vnd.allure.image.diff', {
View: AttachmentView,
icon: 'fa fa-exchange',
});
})();

View File

@ -0,0 +1,30 @@
.screen-diff__switchers {
margin-bottom: 1em;
}
.screen-diff__switchers label + label {
margin-left: 1em;
}
.screen-diff__overlay {
position: relative;
cursor: col-resize;
}
.screen-diff__container {
overflow-x: auto;
}
.screen-diff__image-over {
top: 0;
left: 0;
bottom: 0;
background: #fff;
position: absolute;
overflow: hidden;
box-shadow: 2px 0 1px -1px #aaa;
}
.screen-diff-error {
color: #fd5a3e;
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
{"total":1,"items":[{"uid":"ee29b3803b6a8097b75893210fac59bf","name":"百度","statistic":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1}}]}

View File

@ -0,0 +1 @@
[{"data":{"Product defects":1}}]

View File

@ -0,0 +1 @@
{"total":1,"items":[{"uid":"8fb3a91ba5aaf9de24cc8a92edc82b5d","name":"Product defects","statistic":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1}}]}

View File

@ -0,0 +1 @@
[{"data":{"duration":671}}]

View File

@ -0,0 +1 @@
[{"uid":"cb81e5a5d63827b","name":"百度检测低质封面","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"status":"failed","severity":"normal"}]

View File

@ -0,0 +1 @@
[]

View File

@ -0,0 +1 @@
[]

View File

@ -0,0 +1 @@
[{"data":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1}}]

View File

@ -0,0 +1 @@
[]

View File

@ -0,0 +1 @@
[{"data":{"run":1,"retry":0}}]

View File

@ -0,0 +1 @@
[{"uid":"cb81e5a5d63827b","name":"百度检测低质封面","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"status":"failed","severity":"normal"}]

View File

@ -0,0 +1 @@
[{"uid":"cb81e5a5d63827b","name":"百度检测低质封面","time":{"start":1757680872785,"stop":1757680873456,"duration":671},"status":"failed","severity":"normal"}]

View File

@ -0,0 +1 @@
{"total":1,"items":[{"uid":"d53672d342f8a81ff0478bab5fec903a","name":"test_Baidu_Common_baiduRequest","statistic":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1}}]}

View File

@ -0,0 +1 @@
{"reportName":"Allure Report","testRuns":[],"statistic":{"failed":1,"broken":0,"skipped":0,"passed":0,"unknown":0,"total":1},"time":{"start":1757680872785,"stop":1757680873456,"duration":671,"minDuration":671,"maxDuration":671,"sumDuration":671}}

View File

@ -0,0 +1 @@
请求失败,状态码: 403

View File

@ -0,0 +1 @@
{"name": "百度检测低质封面", "status": "failed", "statusDetails": {"message": "Failed: 请求失败,状态码: 403", "trace": "self = <api_auto_framework.testcases.test_Baidu_Common_baiduRequest.TestBaiduCommonBaiduRequest object at 0x000001ECEFE50F50>\n\n @allure.feature(\"百度\")\n @allure.story(\"广告搭建\")\n @allure.title(\"百度检测低质封面\")\n @pytest.mark.generated\n def test_baidu_common_baidu_request(self):\n \"\"\"Automatically generated test case: 百度检测低质封面\"\"\"\n # 加载测试数据\n test_data = load_test_data(self.TEST_DATA)[0]\n # 准备请求参数\n request_kwargs = {\n \"method\": test_data['method'],\n \"url\": test_data['url'],\n \"headers\": test_data.get('headers'),\n \"cookies\": cookie() # 新增cookies参数\n }\n # 智能选择请求体处理方式\n body = test_data.get('body')\n if body is not None:\n if isinstance(body, (dict, list)):\n request_kwargs[\"json\"] = body\n else:\n request_kwargs[\"data\"] = body\n with allure.step(\"准备请求数据\"):\n print(f\"请求参数: {request_kwargs['json']}\")\n allure.attach(f\"URL: https://cli1.mobgi.com/Baidu/Common/baiduRequest\", \"请求URL\")\n allure.attach(json.dumps(body), \"请求体\", allure.attachment_type.JSON)\n # 发送请求并获取响应\n with allure.step(f\"发送请求:test_baidu_common_baidu_request\"):\n response = RequestWrapper.send_request(**request_kwargs)\n if response.status_code != 200:\n error_msg = f\"请求失败,状态码: {response.status_code}\"\n allure.attach(error_msg, \"请求错误\", allure.attachment_type.TEXT)\n> pytest.fail(error_msg)\nE Failed: 请求失败,状态码: 403\n\ntestcases\\test_Baidu_Common_baiduRequest.py:48: Failed"}, "description": "Automatically generated test case: 百度检测低质封面", "steps": [{"name": "准备请求数据", "status": "passed", "attachments": [{"name": "请求URL", "source": "98e23859-5a1a-45fa-a0b0-5a0c53dfbb12-attachment.attach"}, {"name": "请求体", "source": "d9f5bc3c-1b40-4d1a-80ef-c8512b69960d-attachment.json", "type": "application/json"}], "start": 1757680873251, "stop": 1757680873252}, {"name": "发送请求:test_baidu_common_baidu_request", "status": "failed", "statusDetails": {"message": "Failed: 请求失败,状态码: 403\n", "trace": " File \"D:\\memorandum\\chuangliangProject\\api_auto_framework\\testcases\\test_Baidu_Common_baiduRequest.py\", line 48, in test_baidu_common_baidu_request\n pytest.fail(error_msg)\n ~~~~~~~~~~~^^^^^^^^^^^\n File \"D:\\memorandum\\chuangliangProject\\venv\\Lib\\site-packages\\_pytest\\outcomes.py\", line 177, in fail\n raise Failed(msg=reason, pytrace=pytrace)\n"}, "attachments": [{"name": "请求错误", "source": "0067adad-fcf4-4f52-9bc3-ad1f9137e430-attachment.txt", "type": "text/plain"}], "start": 1757680873252, "stop": 1757680873453}], "attachments": [{"name": "stdout", "source": "6f31d277-9246-44da-9b09-809238ade95a-attachment.txt", "type": "text/plain"}], "start": 1757680872785, "stop": 1757680873456, "uuid": "a7d89745-e90e-41b0-b947-166c41adb2d0", "historyId": "9305aa8fe177b3ec8014321394155033", "testCaseId": "9305aa8fe177b3ec8014321394155033", "fullName": "test_Baidu_Common_baiduRequest.TestBaiduCommonBaiduRequest#test_baidu_common_baidu_request", "labels": [{"name": "feature", "value": "百度"}, {"name": "story", "value": "广告搭建"}, {"name": "tag", "value": "generated"}, {"name": "suite", "value": "test_Baidu_Common_baiduRequest"}, {"name": "subSuite", "value": "TestBaiduCommonBaiduRequest"}, {"name": "host", "value": "September"}, {"name": "thread", "value": "20148-MainThread"}, {"name": "framework", "value": "pytest"}, {"name": "language", "value": "cpython3"}, {"name": "package", "value": "test_Baidu_Common_baiduRequest"}], "titlePath": ["test_Baidu_Common_baiduRequest.py", "TestBaiduCommonBaiduRequest"]}

View File

@ -0,0 +1 @@
请求参数: {'media_account_id': 12472176893, 'url': 'ImageManagementService/qualityCheck', 'api_type': 'feed', 'body': {'imageUrls': ['https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg']}}

View File

@ -0,0 +1 @@
URL: https://cli1.mobgi.com/Baidu/Common/baiduRequest

View File

@ -0,0 +1 @@
{"media_account_id": 12472176893, "url": "ImageManagementService/qualityCheck", "api_type": "feed", "body": {"imageUrls": ["https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg"]}}

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/8/4 17:53
@Auth : 九月的海
@File : test_run.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import time
from api_auto_framework.utils.handle_path import case_path, report_path, result_path
import pytest
import os
import threading
def open_report():
os.system(f"allure open {report_path}")
threading.Thread(target=open_report, daemon=True).start()
def test_run():
target = f"{case_path}/test_Baidu_Common_baiduRequest.py"
cases = case_path
os.system(f"pytest {target} --alluredir={result_path} --clean-alluredir")
os.system(f"allure generate {result_path} -o {report_path} --clean")
if __name__ == "__main__":
test_run()
open_report()

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:52
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,9 @@
# pytest.ini
[pytest]
markers =
generated: 标识自动生成的测试用例
api_test: API接口测试
smoke: 冒烟测试
# 添加其他您使用的自定义标记
filterwarnings =
ignore::urllib3.exceptions.InsecureRequestWarning

View File

@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import json
import pytest
import allure
from pathlib import Path
from api_auto_framework.core.request_wrapper import RequestWrapper
from api_auto_framework.utils.data_loader import load_test_data
from api_auto_framework.utils.assertion_tools import cookie
from api_auto_framework.utils.handle_path import testData_path
# 文件路径
TEST_DATA = Path(testData_path) / "test_Baidu_Advertiser_getList.yaml"
def prepare_request_kwargs(test_data):
"""准备请求参数"""
kwargs = {
"method": test_data['method'],
"url": test_data['url'],
"headers": test_data.get('headers'),
"cookies": cookie()
}
# 智能处理请求体
body = test_data.get('body')
if body is not None:
if isinstance(body, (dict, list)):
kwargs["json"] = body
else:
kwargs["data"] = body
return kwargs, body
def attach_request_details(url, body):
"""在Allure中附加请求详情"""
with allure.step("准备请求数据"):
allure.attach(f"URL: {url}", "请求URL")
if body:
allure.attach(json.dumps(body, indent=2, ensure_ascii=False),
"请求体", allure.attachment_type.JSON)
def attach_response_details(response, result):
"""在Allure中附加响应详情"""
with allure.step("响应结果"):
allure.attach(str(response.status_code), "响应状态", allure.attachment_type.TEXT)
# allure.attach(response.text, "原始响应内容", allure.attachment_type.TEXT)
if result:
allure.attach(str(result["body"]["code"]), "响应状态码", allure.attachment_type.TEXT)
allure.attach(json.dumps(result["headers"], indent=2),
"响应头", allure.attachment_type.JSON)
if result["body"]:
body_str = json.dumps(result["body"], indent=2, ensure_ascii=False) if isinstance(result["body"],
dict) else str(
result["body"])
allure.attach(body_str, "响应内容", allure.attachment_type.JSON)
def perform_assertions(result, expected):
"""执行断言并处理失败情况"""
try:
# 状态码断言
assert result["status_code"] == expected["status_code"], \
f"状态码不匹配: {result['status_code']}{expected['status_code']}"
# 业务码断言
assert result["body"]["code"] == 0, f"业务码非零: {result['body']['code']}"
# 消息断言
assert result["body"]["message"] == "成功", f"消息不符: {result['body']['message']}"
except AssertionError as e:
# 在Allure中记录断言失败详情
allure.attach(str(e), "断言失败详情", allure.attachment_type.TEXT)
raise
except KeyError as e:
# 处理响应体中缺少必要字段的情况
error_msg = f"响应体中缺少关键字段: {str(e)}"
allure.attach(error_msg, "响应体结构错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)
@allure.feature("百度")
@allure.story("媒体账户")
@allure.title("获取媒体账户列表")
@pytest.mark.generated
def test_Baidu_Advertiser_getList():
"""主面板查询媒体账户列表"""
# 加载测试数据
test_data = load_test_data(TEST_DATA)[0]
# 准备请求参数
request_kwargs, body = prepare_request_kwargs(test_data)
# 附加请求详情到报告
attach_request_details(request_kwargs["url"], body)
# 发送请求
with allure.step("发起请求: test_Baidu_Advertiser_getList"):
response = RequestWrapper.send_request(**request_kwargs)
# 检查状态码
if response.status_code != 200:
error_msg = f"请求失败,状态码: {response.status_code}"
allure.attach(error_msg, "请求错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)
# 处理响应
with allure.step("处理响应"):
try:
result = RequestWrapper.formatted_response(response)
except Exception as e:
error_msg = f"响应处理失败: {str(e)}"
allure.attach(error_msg, "响应处理错误", allure.attachment_type.TEXT)
allure.attach(response.text, "原始响应内容", allure.attachment_type.TEXT)
pytest.fail(error_msg)
# 附加响应详情到报告
attach_response_details(response, result)
# 执行断言
perform_assertions(result, test_data["expected"])

View File

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
import json
import pytest
import os
import allure
from pathlib import Path
from api_auto_framework.core.request_wrapper import RequestWrapper
from api_auto_framework.utils.data_loader import load_test_data
from api_auto_framework.utils.assertion_tools import cookie
from api_auto_framework.utils.handle_path import testData_path
TEST_DATA = Path(testData_path) / "test_Baidu_Campaign_getList.yaml"
@allure.feature("百度")
@allure.story("媒体账户")
@allure.title("主面板查询计划列表")
@pytest.mark.generated
def test_Baidu_Campaign_getList():
"""Automatically generated test case: 主面板查询计划列表"""
# 加载测试数据
test_data = load_test_data(TEST_DATA)[0]
# 准备请求参数
request_kwargs = {
"method": test_data['method'],
"url": test_data['url'],
"headers": test_data.get('headers'),
"cookies": cookie() # 新增cookies参数
}
# 智能选择请求体处理方式
body = test_data.get('body')
if body is not None:
if isinstance(body, (dict, list)):
request_kwargs["json"] = body
else:
request_kwargs["data"] = body
with allure.step("准备请求数据"):
allure.attach(f"URL: https://cli1.mobgi.com/Baidu/Campaign/getList", "请求URL")
allure.attach(json.dumps(body), "请求体", allure.attachment_type.JSON)
# 发送请求并获取响应
with allure.step(f"发送请求:test_Baidu_Campaign_getList"):
response = RequestWrapper.send_request(**request_kwargs)
with allure.step("处理请求"):
result = RequestWrapper.formatted_response(response)
with allure.step("响应结果"):
allure.attach(str(result["status_code"]), "响应状态", allure.attachment_type.TEXT)
allure.attach(json.dumps(result["headers"]), "响应头", allure.attachment_type.JSON)
allure.attach(json.dumps(result["body"], indent=2, ensure_ascii=False), "响应内容", allure.attachment_type.JSON)
# 基础状态码断言
try:
assert result["status_code"] == test_data["expected"][
"status_code"], f"状态码不匹配: {result['status_code']}{test_data['expected']['status_code']}"
assert result["body"]["code"] == 0, f"业务码非零: {result['body']['code']}"
assert result["body"]["message"] == "成功", f"消息不符: {result['body']['message']}"
except AssertionError as e:
allure.attach(str(e), "断言失败详情", allure.attachment_type.TEXT)
raise
# assert result["status_code"] == test_data["expected"]["status_code"], \
# f"期望状态码 {test_data['expected']['status_code']},实际得到 {result['status_code']}"
# assert result["body"]["code"] == 0
# assert result["body"]["message"] == "成功"

View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
import pytest
import json
import allure
from pathlib import Path
from api_auto_framework.core.request_wrapper import RequestWrapper
from api_auto_framework.utils.data_loader import load_test_data
from api_auto_framework.utils.assertion_tools import cookie
from api_auto_framework.utils.handle_path import testData_path
class TestBaiduCommonBaiduRequest:
# 文件路径
TEST_DATA = Path(testData_path) / "test_Baidu_Common_baiduRequest.yaml"
@allure.feature("百度")
@allure.story("广告搭建")
@allure.title("百度检测低质封面")
@pytest.mark.generated
def test_baidu_common_baidu_request(self):
"""Automatically generated test case: 百度检测低质封面"""
# 加载测试数据
test_data = load_test_data(self.TEST_DATA)[0]
# 准备请求参数
request_kwargs = {
"method": test_data['method'],
"url": test_data['url'],
"headers": test_data.get('headers'),
"cookies": cookie() # 新增cookies参数
}
# 智能选择请求体处理方式
body = test_data.get('body')
if body is not None:
if isinstance(body, (dict, list)):
request_kwargs["json"] = body
else:
request_kwargs["data"] = body
with allure.step("准备请求数据"):
print(f"请求参数: {request_kwargs['json']}")
allure.attach(f"URL: https://cli1.mobgi.com/Baidu/Common/baiduRequest", "请求URL")
allure.attach(json.dumps(body), "请求体", allure.attachment_type.JSON)
# 发送请求并获取响应
with allure.step(f"发送请求:test_baidu_common_baidu_request"):
response = RequestWrapper.send_request(**request_kwargs)
if response.status_code != 200:
error_msg = f"请求失败,状态码: {response.status_code}"
allure.attach(error_msg, "请求错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)
with allure.step("处理请求"):
try:
result = RequestWrapper.formatted_response(response)
except Exception as e:
error_msg = f"响应处理失败: {str(e)}"
allure.attach(error_msg, "响应处理错误", allure.attachment_type.TEXT)
allure.attach(response.text, "原始响应内容", allure.attachment_type.TEXT)
pytest.fail(error_msg)
with allure.step("响应结果"):
allure.attach(str(response.status_code), "响应状态", allure.attachment_type.TEXT)
if result and "body" in result and "code" in result["body"]:
allure.attach(str(result["body"]["code"]), "响应状态码", allure.attachment_type.TEXT)
if result and "headers" in result:
allure.attach(json.dumps(result["headers"], indent=2),
"响应头", allure.attachment_type.JSON)
if result and "body" in result:
body_content = result["body"]
if isinstance(body_content, dict):
body_str = json.dumps(body_content, indent=2, ensure_ascii=False)
else:
body_str = str(body_content)
allure.attach(body_str, "响应内容", allure.attachment_type.JSON)
# 基础状态码断言
try:
assert result["status_code"] == test_data["expected"]["status_code"], f"状态码不匹配: {result['status_code']}{test_data['expected']['status_code']}"
assert result["body"]["code"] == 0, f"业务码非零: {result['body']['code']}"
assert result["body"]["message"] == "成功", f"消息不符: {result['body']['message']}"
except AssertionError as e:
allure.attach(str(e), "断言失败详情", allure.attachment_type.TEXT)
raise
except KeyError as e:
error_msg = f"响应体中缺少关键字段: {str(e)}"
allure.attach(error_msg, "响应体结构错误", allure.attachment_type.TEXT)
pytest.fail(error_msg)

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/8/4 15:27
@Auth : 九月的海
@File : test_login.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import os
import allure
import pytest
import requests
from api_auto_framework.utils.handle_path import report_path
@allure.feature('创量账户登录')
@allure.story('管理员登录功能')
class TestAdminLogin:
@allure.title('管理员邮箱正确登录')
@allure.severity(allure.severity_level.CRITICAL)
@allure.description('验证管理员使用正确邮箱和密码登录成功')
def test_admin_login_success(self):
# 准备测试数据
url = "https://180.213.188.227/User/AdminUser/login"
payload = {
"email": "974509022@qq.com",
"password": "0e3bc01f8f0409f4541015737925ed8e",
"product_version": 0
}
headers = {'Content-Type': 'application/json'}
# 添加请求详细信息到报告
with allure.step("准备请求数据"):
allure.attach(f"URL: {url}", "请求URL")
allure.attach(str(payload), "请求体", allure.attachment_type.JSON)
# 发送请求
with allure.step("发送登录请求"):
response = requests.post(url, json=payload, headers=headers, verify=False)
# 添加响应信息到报告
with allure.step("验证响应结果"):
allure.attach(f"状态码: {response.status_code}", "响应状态")
allure.attach(response.text, "响应内容", allure.attachment_type.JSON)
# 断言验证
assert response.status_code == 200
assert response.json().get("code") == 0 # 假设0表示成功
# assert "session" in response.json().get("data", {})
@allure.title('测试管理员错误密码登录')
@allure.severity(allure.severity_level.NORMAL)
def test_admin_login_wrong_password(self):
# 测试数据准备
url = "https://180.213.188.227/User/AdminUser/login"
payload = {
"email": "974509022@qq.com",
"password": "wrong_password",
"product_version": 0
}
headers = {'Content-Type': 'application/json'}
# 执行请求
response = requests.post(url, json=payload, headers=headers, verify=False)
# 添加步骤信息
allure.attach(str(response.status_code), name="状态码")
allure.attach(response.text, name="响应内容")
# 验证预期结果
assert response.status_code == 200
assert response.json().get("code") != 0 # 非0表示失败
if __name__ == "__main__":
# 运行测试并生成Allure报告
report_dir = report_path
pytest.main([
'-v', # 显示详细日志
'-s', # 打印输出信息
'--alluredir', report_dir
])
# 自动打开报告
os.system(f'allure open {report_dir}')

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:54
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,40 @@
- test_name: 主面板查询计划列表
method: GET
url: https://cli1.mobgi.com/Baidu/Advertiser/getList
headers:
Accept: application/json, text/plain, */*
Accept-Language: zh-CN,zh;q=0.9
Cache-Control: no-cache
Connection: keep-alive
Content-Type: application/json;charset=UTF-8
Origin: https://cl.mobgi.com
Pragma: no-cache
Referer: https://cl.mobgi.com/
Sec-Fetch-Dest: empty
Sec-Fetch-Mode: cors
Sec-Fetch-Site: same-site
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/139.0.0.0 Safari/537.36
client-user: '100362'
ff-request-id: 20250903200435b94c2c50a35e483abfe36b342313c4e4
main-user-id: '100362'
sec-ch-ua: '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"'
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: '"Windows"'
cookies:
chuangliang_session: jfv4vD7fNIPGUp54xWYoImXYvtrYOPCGLbtdwltC
userId: '100362'
idea_token: fb70aca4-8863-11f0-b8ad-00163e7724c5
body:
conditions: '{"companys":[],"cl_project_id":[],"optimize_user_id":[],"keyword":""}'
start_date: '2025-09-03'
end_date: '2025-09-03'
page: 1
page_size: 20
total_count: 3
total_page: 1
sort_field: ''
sort_direction: ''
data_type: list
expected:
status_code: 200

View File

@ -0,0 +1,41 @@
- test_name: 主面板查询计划列表
method: GET
url: https://180.213.188.227/Baidu/Campaign/getList
headers:
Accept: application/json, text/plain, */*
Accept-Language: zh-CN,zh;q=0.9
Cache-Control: no-cache
Connection: keep-alive
Content-Type: application/json;charset=UTF-8
Origin: https://cl.mobgi.com
Pragma: no-cache
Referer: https://cl.mobgi.com/
Sec-Fetch-Dest: empty
Sec-Fetch-Mode: cors
Sec-Fetch-Site: same-site
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/138.0.0.0 Safari/537.36
client-user: '100362'
ff-request-id: 20250807205955f14bd8f6db62498bb37ee0bb3ccc6b44
main-user-id: '100362'
sec-ch-ua: '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"'
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: '"Windows"'
cookies:
chuangliang_session: DWDhvsO2jnkDOlGGMcoaMOKFiJ0nHdvBdt65ptKF
userId: '100362'
idea_token: 26817840-7335-11f0-a0ae-00163e2677fe
body:
conditions: '{"cl_project_id":[],"optimize_user_id":[],"companys":[],"media_account_id":[],"extension_subject":[],"status":"","keyword":"","cdt_start_date":"2025-08-01
00:00:00","cdt_end_date":"2025-08-07 23:59:59"}'
start_date: '2025-08-07'
end_date: '2025-08-07'
page: 1
page_size: 20
total_count: 0
total_page: 0
sort_field: addtime
sort_direction: desc
data_type: list
expected:
status_code: 200

View File

@ -0,0 +1,36 @@
- test_name: 百度检测低质封面
method: GET
url: https://cli1.mobgi.com/Baidu/Common/baiduRequest
headers:
Accept: application/json, text/plain, */*
Accept-Language: zh-CN,zh;q=0.9
Cache-Control: no-cache
Connection: keep-alive
Content-Type: application/json;charset=UTF-8
Origin: https://cl.mobgi.com
Pragma: no-cache
Referer: https://cl.mobgi.com/
Sec-Fetch-Dest: empty
Sec-Fetch-Mode: cors
Sec-Fetch-Site: same-site
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/139.0.0.0 Safari/537.36
client-user: '100362'
ff-request-id: 202509042007234bbedf401c104521a788a0a4b70e58ec
main-user-id: '100362'
sec-ch-ua: '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"'
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: '"Windows"'
cookies:
chuangliang_session: UBPWTe0ZQisVQsJdmcZVblYLTtrm58Ccm3Yb4ofL
userId: '100362'
idea_token: cc39ab3c-892d-11f0-94a2-00163e5c152e
body:
media_account_id: 12472176893
url: ImageManagementService/qualityCheck
api_type: feed
body:
imageUrls:
- https://feed-image.baidu.com/0/pic/-269653120_1670284112_-114565096.jpg
expected:
status_code: 200

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:47
@Auth : 九月的海
@File : __init__.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""

View File

@ -0,0 +1,178 @@
import requests
from datetime import datetime, timedelta
import json
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class MobgiAPIClient:
def __init__(self):
self.session = requests.Session()
self.login_time = None
self.expire_time = None
self.api_base = "https://180.213.188.227"
self.api_host = "cli1.mobgi.com"
self.is_logged_in = False
# 添加用于存储 cookie 的变量
self.session_cookie = None
def login(self, email, password, expire_hours=24):
"""登录并保存会话状态"""
# 清除可能的旧会话
self.session.cookies.clear()
self.is_logged_in = False
url = f"{self.api_base}/User/AdminUser/login"
payload = {"email": email, "password": password, "product_version": 0}
try:
# 登录请求使用独立的 headers
headers = {"Content-Type": "application/json", "Host": self.api_host}
response = self.session.post(
url,
json=payload,
headers=headers,
verify=False,
timeout=10
)
response.raise_for_status()
# 解析响应
resp_data = response.json()
if resp_data.get('code') != 0:
logging.error(f"登录失败: {resp_data.get('message', '未知错误')}")
return False
# 提取并保存会话 cookie
self.session_cookie = self._extract_session_cookie(response)
if not self.session_cookie:
logging.error("未能获取会话cookie")
return False
# 更新登录状态
self.login_time = datetime.now()
self.expire_time = self.login_time + timedelta(hours=expire_hours)
self.is_logged_in = True
logging.info(f"登录成功! 会话有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"cookie: {self.session_cookie}")
# return self.session_cookie
return True
except requests.exceptions.RequestException as e:
logging.error(f"登录请求异常: {str(e)}")
return False
except json.JSONDecodeError:
logging.error("登录响应解析失败: 无效的JSON格式")
return False
def _extract_session_cookie(self, response):
"""从响应中提取会话cookie"""
# 直接查找特定的 cookie 名称
cookie_value = response.cookies.get("chuangliang_session")
if cookie_value:
return f"chuangliang_session={cookie_value};"
# 如果无法直接获取,尝试从 Set-Cookie 头解析
set_cookie = response.headers.get("Set-Cookie")
if set_cookie:
# 简单地从 Set-Cookie 中提取会话信息
# 实际处理可能需要更复杂的逻辑
if "chuangliang_session=" in set_cookie:
parts = set_cookie.split(";")
for part in parts:
if "chuangliang_session=" in part:
return part.strip() + ";"
return None
def _get_request_headers(self):
"""构造请求头包含必要的cookie"""
headers = {
"Content-Type": "application/json",
# 添加其他可能需要的通用头
}
if self.session_cookie:
# 将会话 cookie 添加到 headers
headers["cookie"] = self.session_cookie
return headers
def check_session(self):
"""检查会话有效性"""
if not self.is_logged_in or not self.expire_time:
logging.warning("会话未初始化,请先登录")
return False
if datetime.now() > self.expire_time:
logging.warning(f"会话已过期,有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}")
return False
return True
def __del__(self):
"""关闭会话"""
self.session.close()
logging.info("会话已关闭")
def cookie():
client = MobgiAPIClient()
email = "974509022@qq.com"
# password = "0e3bc01f8f0409f4541015737925ed8e"
# email = "chuangliang@lyzh.com"
password = "0e3bc01f8f0409f4541015737925ed8e"
client.login(email=email, password=password)
cookies_str = client.session_cookie
# 将字符串转换为字典
cookies_dict = {}
parts = cookies_str.strip(';').split(';')
for part in parts:
part = part.strip()
if '=' in part:
key, value = part.split('=', 1)
cookies_dict[key.strip()] = value.strip()
return cookies_dict
# 使用示例
# if __name__ == "__main__":
# cookie = cookie()
# # 初始化客户端
# client = MobgiAPIClient()
# email = "974509022@qq.com"
# password = "0e3bc01f8f0409f4541015737925ed8e"
# client.login(email=email, password=password)
# cookie = client.session_cookie
# 登录(使用您提供的凭证)
# if client.login(
# email="974509022@qq.com",
# password="0e3bc01f8f0409f4541015737925ed8e"
# ):
# logging.info(f"获取的会话Cookie: {client.session_cookie}")
#
# # 获取广告主列表
# advertiser_data = client.get_advertiser_list(
# start_date="2025-07-01",
# end_date="2025-07-31",
# conditions=None,
# page=1,
# page_size=50
# )
#
# if advertiser_data:
# print("成功获取广告主列表:")
# print(f"总记录数: {advertiser_data.get('total_count', 0)}")
# print(f"总页数: {advertiser_data.get('total_page', 1)}")
# print(f"当前页数据: {json.dumps(advertiser_data.get('list', []), indent=2, ensure_ascii=False)}")
# else:
# print("获取广告主列表失败")
# else:
# print("登录失败")

View File

@ -0,0 +1,340 @@
# # -*- coding: utf-8 -*-
# """
# @Time : 2025/8/1 10:53
# @Auth : 九月的海
# @File : cookie.py
# @IDE : PyCharm
# @Motto : Catch as catch can....
# """
# import requests
# from datetime import datetime, timedelta
# import json
# import logging
# import http.client
# import urllib3
#
# # 启用详细日志记录
# urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# http.client.HTTPConnection.debuglevel = 1
#
# # 配置更详细的日志
# logging.basicConfig(level=logging.DEBUG)
# logger = logging.getLogger('requests.packages.urllib3')
# logger.setLevel(logging.DEBUG)
# logger.propagate = True
#
#
# # 创建自定义的 HTTP 适配器用于记录请求/响应细节
# class DebugHTTPAdapter(requests.adapters.HTTPAdapter):
# def send(self, request, **kwargs):
# """发送请求并记录所有细节"""
# # 记录请求详情
# print("\n" + "=" * 40)
# print(f"请求方法: {request.method}")
# print(f"请求URL: {request.url}")
# print("请求头:")
# for key, value in request.headers.items():
# print(f" {key}: {value}")
#
# if request.body:
# print("请求体:")
# if isinstance(request.body, bytes):
# try:
# body = request.body.decode('utf-8')
# print(body)
# except UnicodeDecodeError:
# print("<无法解码的二进制数据>")
# else:
# print(request.body)
# else:
# print("无请求体")
#
# # 发送请求
# response = super().send(request, **kwargs)
#
# # 记录响应详情
# print("\n" + "=" * 40)
# print(f"响应状态码: {response.status_code}")
# print("响应头:")
# for key, value in response.headers.items():
# print(f" {key}: {value}")
#
# # 尝试解析JSON响应体
# try:
# if response.text:
# print("响应内容:")
# # 尝试格式化为可读JSON
# try:
# json_data = json.loads(response.text)
# print(json.dumps(json_data, indent=2, ensure_ascii=False))
# except json.JSONDecodeError:
# print(response.text)
# else:
# print("无响应内容")
# except Exception as e:
# print(f"解析响应失败: {e}")
#
# print("=" * 40 + "\n")
#
# return response
#
#
# class MobgiAPIClient:
# def __init__(self):
# self.session = requests.Session()
#
# # 添加调试适配器
# adapter = DebugHTTPAdapter()
# self.session.mount('http://', adapter)
# self.session.mount('https://', adapter)
#
# self.login_time = None
# self.expire_time = None
# self.api_base = "https://cli1.mobgi.com"
# self.is_logged_in = False
# self.session_cookie = None
# self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36"
#
# def login(self, email, password, expire_hours=24):
# """登录并保存会话状态"""
# # 清除可能的旧会话
# self.session.cookies.clear()
# self.is_logged_in = False
# url = f"{self.api_base}/User/AdminUser/login"
# payload = {"email": email, "password": password, "product_version": 0}
# try:
# # 登录请求使用独立的 headers
# headers = {
# "Content-Type": "application/json",
# "User-Agent": self.user_agent
# }
# # 打印登录请求细节
# print("\n" + "*" * 50)
# print("发送登录请求...")
# print("*" * 50 + "\n")
# response = self.session.post(
# url,
# json=payload,
# headers=headers,
# timeout=10
# )
# response.raise_for_status()
# # 解析响应
# resp_data = response.json()
# if resp_data.get('code') != 0:
# print(f"登录失败: {resp_data.get('message', '未知错误')}")
# print(f"完整响应: {json.dumps(resp_data, indent=2)}")
# return False
# # 提取并保存会话 cookie
# self.session_cookie = self._extract_session_cookie(response)
#
# if not self.session_cookie:
# print("ERROR: 未能获取会话cookie")
# if 'Set-Cookie' in response.headers:
# print(f"Set-Cookie 头: {response.headers['Set-Cookie']}")
# else:
# print("响应中没有 Set-Cookie 头")
# return False
#
# # 更新登录状态
# self.login_time = datetime.now()
# self.expire_time = self.login_time + timedelta(hours=expire_hours)
# self.is_logged_in = True
#
# print(f"登录成功! 会话有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}")
# print(f"获取的会话Cookie: {self.session_cookie}")
# return True
#
# except requests.exceptions.RequestException as e:
# print(f"登录请求异常: {str(e)}")
# if e.response is not None:
# print(f"响应状态码: {e.response.status_code}")
# try:
# print(f"响应内容: {e.response.text}")
# except:
# pass
# return False
# except json.JSONDecodeError as e:
# print(f"登录响应解析失败: 无效的JSON格式, 原始内容: {response.text}")
# return False
# except Exception as e:
# print(f"登录过程中发生意外错误: {str(e)}")
# return False
#
# def _extract_session_cookie(self, response):
# """从响应中提取会话cookie"""
# # 方法1: 从cookies字典获取
# if 'chuangliang_session' in response.cookies:
# cookie_value = response.cookies['chuangliang_session']
# return f"chuangliang_session={cookie_value};"
#
# # 方法2: 从Set-Cookie头获取
# if 'Set-Cookie' in response.headers:
# set_cookie = response.headers['Set-Cookie']
# # 寻找chuangliang_session条目
# for cookie_str in set_cookie.split(','):
# if 'chuangliang_session' in cookie_str:
# # 提取cookie值
# parts = cookie_str.split(';')
# for part in parts:
# if 'chuangliang_session' in part:
# return part.strip() + ';'
#
# # 方法3: 如果cookie是多个值可能在cookie jar中
# for cookie in response.cookies:
# if cookie.name == 'chuangliang_session':
# return f"{cookie.name}={cookie.value};"
#
# return None
#
# def _get_request_headers(self):
# """构造请求头包含必要的cookie"""
# headers = {
# "Content-Type": "application/json",
# "User-Agent": self.user_agent,
# # 添加其他可能需要的通用头
# }
#
# if self.session_cookie:
# # 将会话 cookie 添加到 headers
# headers["Cookie"] = self.session_cookie
#
# return headers
#
# def check_session(self):
# """检查会话有效性"""
# if not self.is_logged_in or not self.expire_time:
# print("会话未初始化,请先登录")
# return False
#
# if datetime.now() > self.expire_time:
# print(f"会话已过期,有效期至 {self.expire_time.strftime('%Y-%m-%d %H:%M:%S')}")
# return False
#
# return True
#
# def get_advertiser_list(
# self,
# start_date="2025-07-31",
# end_date="2025-07-31",
# conditions=None,
# page=1,
# page_size=20
# ):
# """获取广告主列表数据"""
# if not self.check_session():
# return None
#
# # 准备请求参数
# params = {
# "conditions": json.dumps(conditions) if conditions else json.dumps({
# "companys": [],
# "cl_project_id": [],
# "optimize_user_id": [],
# "keyword": ""
# }),
# "start_date": start_date,
# "end_date": end_date,
# "page": page,
# "page_size": page_size,
# "total_count": 0,
# "total_page": 1,
# "sort_field": "",
# "sort_direction": "",
# "data_type": "list"
# }
#
# url = f"{self.api_base}/Baidu/Advertiser/getList"
#
# try:
# # 获取带 cookie 的 headers
# headers = self._get_request_headers()
#
# # 打印API请求细节
# print("\n" + "*" * 50)
# print(f"发送广告主列表请求到: {url}")
# print("*" * 50 + "\n")
#
# response = self.session.post(
# url,
# json=params,
# headers=headers,
# timeout=15
# )
# response.raise_for_status()
#
# # 检查是否有新的cookie
# if 'Set-Cookie' in response.headers:
# new_cookie = self._extract_session_cookie(response)
# if new_cookie:
# print(f"接收到新cookie: {new_cookie}")
# self.session_cookie = new_cookie
# else:
# print("响应包含Set-Cookie但未解析出有效值")
#
# resp_data = response.json()
# if resp_data.get('code') != 0:
# print(f"广告主列表请求失败: {resp_data.get('message', '未知错误')}")
# print(f"完整响应: {json.dumps(resp_data, indent=2)}")
# return None
#
# return resp_data.get('data', {})
#
# except requests.exceptions.RequestException as e:
# print(f"广告主列表请求异常: {str(e)}")
# if e.response is not None:
# print(f"响应状态码: {e.response.status_code}")
# try:
# print(f"响应内容: {e.response.text}")
# except:
# pass
# return None
# except json.JSONDecodeError:
# print(f"广告主列表响应解析失败: 无效的JSON格式, 原始内容: {response.text}")
# return None
# except Exception as e:
# print(f"广告主列表请求过程中发生意外错误: {str(e)}")
# return None
#
# def __del__(self):
# """关闭会话"""
# self.session.close()
# print("会话已关闭")
#
#
# # 使用示例
# if __name__ == "__main__":
# print("=" * 60)
# print("启动 API 客户端调试")
# print("=" * 60)
#
# # 初始化客户端
# client = MobgiAPIClient()
#
# # 登录(使用您提供的凭证)
# if client.login(
# email="974509022@qq.com",
# password="0e3bc01f8f0409f4541015737925ed8e"
# ):
# print("\n登录成功! 尝试获取广告主列表...")
#
# # 获取广告主列表
# advertiser_data = client.get_advertiser_list(
# start_date="2025-07-01",
# end_date="2025-07-31",
# conditions=None,
# page=1,
# page_size=50
# )
#
# if advertiser_data:
# print("\n成功获取广告主列表数据:")
# print(f"总记录数: {advertiser_data.get('total_count', 0)}")
# print(f"总页数: {advertiser_data.get('total_page', 1)}")
# print(f"返回记录数: {len(advertiser_data.get('list', []))}")
# else:
# print("\n获取广告主列表失败")
# else:
# print("\n登录失败无法继续")
#
# print("\n调试结束")

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/7/30 10:51
@Auth : 九月的海
@File : data_loader.py.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import yaml
import os
def load_test_data(file_path):
"""从YAML文件加载测试数据"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"测试数据文件不存在: {file_path}")
with open(file_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
@Time : 2025/8/4 14:08
@Auth : 九月的海
@File : handle_path.py
@IDE : PyCharm
@Motto : Catch as catch can....
"""
import os
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 2- 配置路径
config_path = os.path.join(project_path, 'configs')
# 3- 测试数据路径
testData_path = os.path.join(project_path, 'testdata')
# 4- 测试报告路径
report_path = os.path.join(project_path, r'outFiles\report')
result_path = os.path.join(project_path, r'outFiles\result')
# 5- log路径
log_path = os.path.join(project_path, r'outFiles\log')
case_path = os.path.join(project_path, 'testcases')

View File

@ -1,3 +1,4 @@
import pandas as pd
from sshtunnel import SSHTunnelForwarder
import pymysql
@ -34,6 +35,21 @@ class JumpHostDatabaseConnector:
results = cursor.fetchall()
return results
def query_database_select_field_names(self, query):
with self.connect_to_jump_host() as server:
with self.connect_to_database(server) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
field_names = [desc[0] for desc in cursor.description]
return {'field_names': field_names, 'results': results}
def query_database_select_to_dataframe(self, query):
with self.connect_to_jump_host() as server:
with self.connect_to_database(server) as conn:
df = pd.read_sql_query(query, conn)
return df.to_dict(orient='records')
# 配置字典
config_common = {
@ -107,6 +123,30 @@ config_async_create_ad_batch = {
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task"
}
# 生产魔剪业务库配置
config_chuangliang_ad_magic_cut = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysql2d9941b86eb8.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_magic_cut"
}
# 生产魔剪任务库配置
config_chuangliang_ad_task_skit = {
'jump_host_ip': "180.184.103.38",
'jump_host_port': 2222,
'jump_host_user': "cl_ad",
'jump_host_password': "4CGbdPW2zkbewcp^",
'database_ip': "mysqlc00f5833fdad.rds.ivolces.com",
'database_port': 3306,
'database_user': "cl_readOnly",
'database_password': "Chuangliang@2023",
'database_name': "chuangliang_ad_task_skit"
}
# 实例化数据库连接器
UC_common_db = JumpHostDatabaseConnector(config_common)
@ -115,12 +155,12 @@ Bd_task_db = JumpHostDatabaseConnector(config_baidu_task)
Material1_db = JumpHostDatabaseConnector(config_material1)
Material2_db = JumpHostDatabaseConnector(config_material2)
Async_create_ad_batch_db = JumpHostDatabaseConnector(config_async_create_ad_batch)
Chuangliang_ad_magic_cut = JumpHostDatabaseConnector(config_chuangliang_ad_magic_cut)
Chuangliang_ad_task_skit = JumpHostDatabaseConnector(config_chuangliang_ad_task_skit)
if __name__ == '__main__':
sql = "SELECT * FROM `task_log_ucx_batch_add_ad` where batch_id = '20241110104432b71b64129f0d11efb0d200163e0d187b'"
baidu_task_batchid_sql = "SELECT batch_id, async_task_id,create_status FROM `async_create_ad_batch` where " \
"async_task_id = 14027844245 "
sql = "SELECT * FROM chuangliang_ad_magic_cut.config_apollo WHERE `key` = 'client_task_num_day_limit'"
magic_sql = "SELECT task_id,batch_id,task_param,result_data,main_user_id,create_user_id FROM chuangliang_ad_task_skit.task_log_client_mix_shear WHERE task_id = 19110555"
# 执行查询
results = Async_create_ad_batch_db.query_database(baidu_task_batchid_sql)
results = Chuangliang_ad_task_skit.query_database_select_to_dataframe(magic_sql)
print(results)