from django.http import JsonResponse from django.views.decorators.http import require_GET from django.utils import timezone # 使用 Django 的时区支持 import json # 确保导入你的 AsyncTaskDataQuery 类 from chuanglinagBaidu.migrations.BaidutaskQuery import AsyncTaskDataQuery # 替换 your_module 为实际模块名 # # # # @require_GET # def get_task_details(request): # """获取任务详情的API接口""" # # 获取任务ID参数 # task_id = request.GET.get('task_id') # current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S") # # if not task_id: # return JsonResponse({ # "code": 400, # "message": "缺少参数: task_id", # "time": current_time # }, status=400) # # try: # # 转换为整数 # task_id = int(task_id) # except ValueError: # return JsonResponse({ # "code": 400, # "message": "task_id 必须是整数", # "time": current_time # }, status=400) # # # 创建查询实例并获取数据 # task_query = AsyncTaskDataQuery(task_id) # result = task_query.get_task_details() # # 获取任务日志并处理JSON嵌套 # task_logs = result.get("task_logs", []) # # # 处理可能的嵌套JSON字符串 # for log in task_logs: # # 处理 task_param # if isinstance(log.get('task_param'), str): # try: # log['task_param'] = json.loads(log['task_param']) # except json.JSONDecodeError: # # 可选的日志记录 # # logger.debug(f"无法解析 task_param: {log['task_param']}") # pass # # # 处理 result_data # if isinstance(log.get('result_data'), str): # try: # log['result_data'] = json.loads(log['result_data']) # except json.JSONDecodeError: # pass # # # 准备响应数据 # response_data = { # "code": 200, # "message": "成功", # "time": timezone.now().strftime("%Y-%m-%d %H:%M:%S"), # "task_id": task_id, # "task_logs": task_logs, # "batch_id": result.get("batch_id"), # "create_status": result.get("create_status"), # "total_logs": len(task_logs) # } # # # 返回时禁用ASCII转义 # return JsonResponse( # response_data, # json_dumps_params={'ensure_ascii': False} # ) # from django.http import JsonResponse, HttpResponse # from django.views.decorators.http import require_GET # from django.utils import timezone # from django.shortcuts import render # import json # import re # # 确保导入你的 AsyncTaskDataQuery 类 # from chuanglinagBaidu.migrations.BaidutaskQuery import AsyncTaskDataQuery # 替换为实际模块名 # # # def render_json_pretty(data, status=200): # """渲染美观的JSON HTML视图""" # # 准备响应数据 - 格式化JSON用于展示 # formatted_json = json.dumps(data, indent=4, ensure_ascii=False) # # # 生成HTML内容 # html_content = f""" # # """ # return HttpResponse(html_content, status=status, content_type='text/html') # # # @require_GET # def get_task_details(request): # """获取任务详情的API接口""" # # 获取任务ID参数 # task_id = request.GET.get('task_id') # current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S") # host = request.get_host() # # # 参数验证 # if not task_id: # error_data = { # "code": 400, # "message": "缺少参数: task_id", # "time": current_time, # "path": request.path, # "method": request.method, # "host": host # } # # # 检查是否为浏览器请求 # if 'text/html' in request.META.get('HTTP_ACCEPT', ''): # return render_json_pretty(error_data, 400) # # return JsonResponse( # error_data, # status=400, # json_dumps_params={'ensure_ascii': False} # ) # # try: # # 转换为整数 # task_id = int(task_id) # except ValueError: # error_data = { # "code": 400, # "message": "task_id 必须是整数", # "time": current_time, # "provided_value": task_id, # "path": request.path, # "host": host # } # # if 'text/html' in request.META.get('HTTP_ACCEPT', ''): # return render_json_pretty(error_data, 400) # # return JsonResponse( # error_data, # status=400, # json_dumps_params={'ensure_ascii': False} # ) # # # 创建查询实例并获取数据 # try: # task_query = AsyncTaskDataQuery(task_id) # result = task_query.get_task_details() # # # 处理任务日志并解析嵌套JSON # task_logs = result.get("task_logs", []) # for log in task_logs: # # 处理 task_param # if isinstance(log.get('task_param'), str): # try: # log['task_param'] = json.loads(log['task_param']) # except json.JSONDecodeError: # # 保留原始字符串并添加解析错误标记 # log['task_param'] = { # "__parse_error": True, # "raw_value": log['task_param'] # } # # # 处理 result_data # if isinstance(log.get('result_data'), str): # try: # log['result_data'] = json.loads(log['result_data']) # except json.JSONDecodeError: # log['result_data'] = { # "__parse_error": True, # "raw_value": log['result_data'] # } # # # 准备响应数据 # response_data = { # "code": 200, # "message": "成功", # "time": current_time, # "task_id": task_id, # "task_logs": task_logs, # "batch_id": result.get("batch_id"), # "create_status": result.get("create_status"), # "total_logs": len(task_logs), # "api_details": { # "method": request.method, # "path": request.path, # "host": host # } # } # # # 检查是否为浏览器请求 # if 'text/html' in request.META.get('HTTP_ACCEPT', ''): # return render_json_pretty(response_data) # # # 为JSON响应添加元数据 # response_data['api_details']['rendered_format'] = 'json' # return JsonResponse( # response_data, # json_dumps_params={ # 'ensure_ascii': False, # 'indent': 2 if 'pretty' in request.GET else None # } # ) # # except Exception as e: # # 记录异常信息 # error_data = { # "code": 500, # "message": "服务器内部错误", # "error_detail": str(e), # "time": current_time, # "task_id": task_id, # "host": host, # "path": request.path # } # # if 'text/html' in request.META.get('HTTP_ACCEPT', ''): # return render_json_pretty(error_data, 500) # # return JsonResponse( # error_data, # status=500, # json_dumps_params={'ensure_ascii': False} # ) from django.http import JsonResponse from django.views.decorators.http import require_GET from django.utils import timezone from django.shortcuts import render import json def get_task_details(request): """获取任务详情的API接口(优化版)""" # 获取任务ID参数 task_id = request.GET.get('task_id') current_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S") # 处理浏览器请求的Accept头 is_browser_request = 'text/html' in request.META.get('HTTP_ACCEPT', '') # 参数验证 if not task_id: error_data = { "code": 400, "message": "缺少参数: task_id", "time": current_time, "endpoint": request.path, "method": request.method } if is_browser_request: return render(request, 'json_formatter.html', { "json_data": json.dumps(error_data, ensure_ascii=False), "status": 400 }) return JsonResponse(error_data, status=400, json_dumps_params={'ensure_ascii': False}) try: # 转换为整数 task_id = int(task_id) except ValueError: error_data = { "code": 400, "message": "task_id 必须是整数", "time": current_time, "provided_value": task_id } if is_browser_request: return render(request, 'json_formatter.html', { "json_data": json.dumps(error_data, ensure_ascii=False), "status": 400 }) return JsonResponse(error_data, status=400, json_dumps_params={'ensure_ascii': False}) # 创建查询实例并获取数据 try: task_query = AsyncTaskDataQuery(task_id) result = task_query.get_task_details() # 处理任务日志并解码中文 task_logs = result.get("task_logs", []) for log in task_logs: # 处理 task_param if isinstance(log.get('task_param'), str): try: log['task_param'] = json.loads(log['task_param']) except json.JSONDecodeError: pass # 处理 result_data if isinstance(log.get('result_data'), str): try: log['result_data'] = json.loads(log['result_data']) except json.JSONDecodeError: pass # 准备响应数据 response_data = { "code": 200, "message": "成功", "time": current_time, "task_id": task_id, "task_logs": task_logs, "batch_id": result.get("batch_id", ""), "create_status": result.get("create_status", ""), "total_logs": len(task_logs) } # 如果是浏览器请求,返回HTML视图 if is_browser_request: return render(request, 'json_formatter3.html', { "json_data": response_data, # "json_data": json.dumps(response_data, ensure_ascii=False, indent=2), "status": 200, "task_id": task_id }) # 返回JSON响应 return JsonResponse( response_data, json_dumps_params={'ensure_ascii': False} ) except Exception as e: error_data = { "code": 500, "message": "服务器内部错误", "error": str(e), "time": current_time, "task_id": task_id } print(e) if is_browser_request: return render(request, 'json_formatter.html', { "json_data": json.dumps(error_data, ensure_ascii=False), "status": 500 }) return JsonResponse(error_data, status=500, json_dumps_params={'ensure_ascii': False})