免费AI模型API接口调用:DeepSeek、OpenAI和通义千问的密钥获取与使用方法
- Linkreate AI插件 文章
- 2025-09-02 07:40:37
- 18阅读
DeepSeek API密钥获取与集成开发
要开始使用DeepSeek API,首先需要注册并获取API密钥。访问DeepSeek官方网站,完成账号注册后进入开发者控制台。在控制台中,你可以创建新的API密钥,建议为不同项目设置不同的密钥以便管理。
获取API密钥后,你可以使用以下Python代码示例进行API调用:
import requests
api_key = "你的DeepSeek API密钥"
url = "https://api.deepseek.com/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": "你好,请介绍一下你自己"}
],
"temperature": 0.7
}
response = requests.post(url, headers=headers, json=data)
print(response.json())
注意:DeepSeek API有调用频率限制,免费用户每分钟最多请求60次。如果你的应用需要更高的调用频率,考虑升级到付费计划。
DeepSeek API参数配置
DeepSeek API提供了多个可配置参数,以下是一些常用参数及其说明:
参数名 | 类型 | 默认值 | 说明 |
---|---|---|---|
model | string | deepseek-chat | 指定使用的模型名称 |
temperature | number | 0.7 | 控制生成文本的随机性,范围0-2 |
max_tokens | integer | 2048 | 生成文本的最大长度 |
top_p | number | 1.0 | 核采样参数,控制词汇选择的概率范围 |
OpenAI API密钥获取与集成开发
OpenAI提供了强大的API接口,支持多种AI模型的调用。要使用OpenAI API,你需要先创建OpenAI账号并获取API密钥。
登录OpenAI官网后,进入API密钥管理页面,点击"Create new secret key"生成新的API密钥。请妥善保存你的密钥,因为它只会在创建时显示一次。
以下是使用OpenAI API的Python代码示例:
import openai
openai.api_key = "你的OpenAI API密钥"
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个有用的助手。"},
{"role": "user", "content": "解释一下量子计算的基本原理"}
],
temperature=0.7,
max_tokens=1000
)
print(response.choices[0].message['content'])
警告:OpenAI API是按使用量计费的,即使是免费额度也有一定限制。请监控你的API使用情况,避免超出预算。你可以在OpenAI控制台的Usage页面查看详细的使用统计。
OpenAI API高级配置
OpenAI API支持多种高级配置选项,以下是一些常用的高级参数:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个专业的技术文档撰写者。"},
{"role": "user", "content": "为以下代码添加详细注释:nndef fibonacci(n):n if n <= 1:n return nn else:n return fibonacci(n-1) + fibonacci(n-2)"}
],
temperature=0.3,
max_tokens=1500,
top_p=0.9,
frequency_penalty=0.5,
presence_penalty=0.5,
stop=["nn", ""]
)
这些参数中,frequency_penalty和presence_penalty用于控制生成文本的重复性,stop参数用于指定停止生成的标记。
通义千问API密钥获取与集成开发
通义千问是阿里巴巴开发的大语言模型,提供了强大的API接口供开发者使用。要使用通义千问API,你需要先注册阿里云账号并开通通义千问服务。
登录阿里云控制台后,搜索"通义千问"并进入服务页面。按照指引开通服务后,在AccessKey管理页面创建AccessKey ID和AccessKey Secret。这些凭证将用于API调用认证。
以下是使用通义千问API的Python代码示例:
import requests
import json
import datetime
import hmac
import hashlib
import base64
from urllib.parse import quote
配置你的AccessKey信息
access_key_id = "你的AccessKey ID"
access_key_secret = "你的AccessKey Secret"
请求参数
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
method = "POST"
accept = "application/json"
content_type = "application/json"
构造请求体
body = {
"model": "qwen-turbo",
"input": {
"messages": [
{
"role": "user",
"content": "请写一首关于秋天的诗"
}
]
},
"parameters": {
"temperature": 0.7,
"top_p": 0.8
}
}
将请求体转换为JSON字符串
body_str = json.dumps(body)
生成签名
def get_signature(method, accept, content_type, date, body_str, access_key_secret):
string_to_sign = method + "n" + accept + "n" + content_type + "n" + date + "n" + body_str
signature = hmac.new(access_key_secret.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(signature).decode("utf-8")
获取当前时间
gmt_format = "%a, %d %b %Y %H:%M:%S GMT"
date = datetime.datetime.utcnow().strftime(gmt_format)
生成签名
signature = get_signature(method, accept, content_type, date, body_str, access_key_secret)
构造请求头
headers = {
"Date": date,
"Accept": accept,
"Content-Type": content_type,
"Authorization": "acs " + access_key_id + ":" + signature,
"X-DashScope-Async": "enable"
}
发送请求
response = requests.post(url, headers=headers, data=body_str)
print(response.json())
注意:通义千问API的签名生成过程较为复杂,确保按照官方文档正确实现签名算法。错误的签名将导致API调用失败。
通义千问API参数详解
通义千问API提供了多种参数配置选项,以下是一些关键参数的详细说明:
参数名 | 类型 | 可选值 | 说明 |
---|---|---|---|
model | string | qwen-turbo, qwen-plus, qwen-max | 指定使用的模型,不同模型性能和价格不同 |
temperature | number | 0.0-2.0 | 控制生成文本的随机性,值越高越随机 |
top_p | number | 0.0-1.0 | 核采样参数,控制词汇选择的概率范围 |
top_k | integer | 1-100 | 限制每步生成时考虑的词汇数量 |
seed | integer | 任意整数 | 随机种子,用于控制生成结果的确定性 |
repetition_penalty | number | 1.0-2.0 | 重复惩罚系数,用于减少生成文本的重复性 |
多模型API集成与错误处理
在实际应用中,你可能需要同时集成多个AI模型的API。以下是一个简单的多模型API集成框架示例:
import requests
import openai
import json
from typing import Dict, Any, Optional
class AIModelInterface:
def __init__(self):
self.models = {
"deepseek": {
"api_key": "你的DeepSeek API密钥",
"url": "https://api.deepseek.com/v1/chat/completions",
"headers": lambda key: {
"Content-Type": "application/json",
"Authorization": f"Bearer {key}"
}
},
"openai": {
"api_key": "你的OpenAI API密钥",
"model": "gpt-3.5-turbo"
},
"qwen": {
"access_key_id": "你的AccessKey ID",
"access_key_secret": "你的AccessKey Secret",
"url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
}
}
def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
try:
config = self.models["deepseek"]
data = {
"model": "deepseek-chat",
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(
config["url"],
headers=config["headers"](config["api_key"]),
json=data
)
if response.status_code == 200:
return response.json()
else:
print(f"DeepSeek API错误: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"调用DeepSeek API时发生异常: {str(e)}")
return None
def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
try:
openai.api_key = self.models["openai"]["api_key"]
response = openai.ChatCompletion.create(
model=self.models["openai"]["model"],
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response
except Exception as e:
print(f"调用OpenAI API时发生异常: {str(e)}")
return None
def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
try:
这里简化了通义千问的调用过程,实际应用中需要实现完整的签名算法
config = self.models["qwen"]
构造请求体
body = {
"model": "qwen-turbo",
"input": {
"messages": messages
},
"parameters": {
"temperature": temperature,
"top_p": top_p
}
}
实际应用中需要实现签名生成逻辑
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + config["access_key_id"] + ":" + config["access_key_secret"]
}
response = requests.post(
config["url"],
headers=headers,
json=body
)
if response.status_code == 200:
return response.json()
else:
print(f"通义千问API错误: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"调用通义千问API时发生异常: {str(e)}")
return None
def call_model(self, model_name: str, messages: list, kwargs) -> Optional[Dict[str, Any]]:
if model_name == "deepseek":
return self.call_deepseek(messages, kwargs)
elif model_name == "openai":
return self.call_openai(messages, kwargs)
elif model_name == "qwen":
return self.call_qwen(messages, kwargs)
else:
print(f"不支持的模型: {model_name}")
return None
使用示例
ai_interface = AIModelInterface()
messages = [
{"role": "user", "content": "请简要介绍人工智能的发展历程"}
]
调用不同模型
deepseek_result = ai_interface.call_model("deepseek", messages, temperature=0.7)
openai_result = ai_interface.call_model("openai", messages, temperature=0.7)
qwen_result = ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)
处理结果
if deepseek_result:
print("DeepSeek结果:", deepseek_result["choices"][0]["message"]["content"])
if openai_result:
print("OpenAI结果:", openai_result.choices[0].message['content'])
if qwen_result:
print("通义千问结果:", qwen_result["output"]["text"])
API调用错误处理与重试机制
在实际应用中,API调用可能会遇到各种错误,如网络超时、服务不可用、配额耗尽等。实现健壮的错误处理和重试机制非常重要:
import time
import random
from functools import wraps
def retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2, jitter=True):
def decorator(func):
@wraps(func)
def wrapper(args, kwargs):
retries = 0
delay = initial_delay
while retries = max_retries:
raise
计算延迟时间,可以添加随机抖动以避免多个客户端同时重试
if jitter:
actual_delay = delay (0.5 + random.random() 0.5)
else:
actual_delay = delay
print(f"调用失败,{actual_delay}秒后重试 (重试次数: {retries}/{max_retries})")
time.sleep(actual_delay)
指数退避
delay = backoff_factor
return wrapper
return decorator
使用装饰器增强API调用方法
class RobustAIModelInterface(AIModelInterface):
@retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
检查API密钥是否有效
if not self.models["deepseek"]["api_key"]:
raise ValueError("DeepSeek API密钥未设置")
try:
config = self.models["deepseek"]
data = {
"model": "deepseek-chat",
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(
config["url"],
headers=config["headers"](config["api_key"]),
json=data,
timeout=30 设置超时时间
)
检查HTTP状态码
if response.status_code == 429: 请求过于频繁
retry_after = int(response.headers.get("Retry-After", 5))
raise Exception(f"请求过于频繁,需要等待 {retry_after} 秒")
elif response.status_code >= 500: 服务器错误
raise Exception(f"服务器错误: {response.status_code}")
elif response.status_code != 200: 其他错误
raise Exception(f"API错误: {response.status_code} - {response.text}")
return response.json()
except requests.exceptions.Timeout:
raise Exception("请求超时")
except requests.exceptions.ConnectionError:
raise Exception("连接错误")
except Exception as e:
raise Exception(f"调用DeepSeek API时发生异常: {str(e)}")
@retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
检查API密钥是否有效
if not self.models["openai"]["api_key"]:
raise ValueError("OpenAI API密钥未设置")
try:
openai.api_key = self.models["openai"]["api_key"]
response = openai.ChatCompletion.create(
model=self.models["openai"]["model"],
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
request_timeout=30 设置超时时间
)
return response
except openai.error.RateLimitError:
raise Exception("OpenAI API调用频率限制")
except openai.error.InvalidAPIKey:
raise Exception("OpenAI API密钥无效")
except openai.error.ServiceUnavailableError:
raise Exception("OpenAI服务不可用")
except Exception as e:
raise Exception(f"调用OpenAI API时发生异常: {str(e)}")
@retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
检查AccessKey是否有效
if not self.models["qwen"]["access_key_id"] or not self.models["qwen"]["access_key_secret"]:
raise ValueError("通义千问AccessKey未设置")
try:
config = self.models["qwen"]
构造请求体
body = {
"model": "qwen-turbo",
"input": {
"messages": messages
},
"parameters": {
"temperature": temperature,
"top_p": top_p
}
}
实际应用中需要实现签名生成逻辑
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + config["access_key_id"] + ":" + config["access_key_secret"]
}
response = requests.post(
config["url"],
headers=headers,
json=body,
timeout=30 设置超时时间
)
检查HTTP状态码
if response.status_code == 429: 请求过于频繁
retry_after = int(response.headers.get("Retry-After", 5))
raise Exception(f"请求过于频繁,需要等待 {retry_after} 秒")
elif response.status_code >= 500: 服务器错误
raise Exception(f"服务器错误: {response.status_code}")
elif response.status_code != 200: 其他错误
raise Exception(f"API错误: {response.status_code} - {response.text}")
return response.json()
except requests.exceptions.Timeout:
raise Exception("请求超时")
except requests.exceptions.ConnectionError:
raise Exception("连接错误")
except Exception as e:
raise Exception(f"调用通义千问API时发生异常: {str(e)}")
使用示例
robust_ai_interface = RobustAIModelInterface()
messages = [
{"role": "user", "content": "请解释什么是机器学习"}
]
调用不同模型,自动处理错误和重试
try:
deepseek_result = robust_ai_interface.call_model("deepseek", messages, temperature=0.7)
if deepseek_result:
print("DeepSeek结果:", deepseek_result["choices"][0]["message"]["content"])
except Exception as e:
print(f"调用DeepSeek失败: {str(e)}")
try:
openai_result = robust_ai_interface.call_model("openai", messages, temperature=0.7)
if openai_result:
print("OpenAI结果:", openai_result.choices[0].message['content'])
except Exception as e:
print(f"调用OpenAI失败: {str(e)}")
try:
qwen_result = robust_ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)
if qwen_result:
print("通义千问结果:", qwen_result["output"]["text"])
except Exception as e:
print(f"调用通义千问失败: {str(e)}")
API调用监控与优化
为了确保AI模型API的稳定性和性能,实现监控和优化机制非常重要。以下是一个简单的API调用监控和性能优化的示例:
import time
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
class APIMonitor:
def __init__(self, max_history_size=1000):
self.max_history_size = max_history_size
self.call_history = defaultdict(lambda: deque(maxlen=max_history_size))
self.error_counts = defaultdict(int)
self.total_calls = defaultdict(int)
self.lock = threading.Lock()
def record_call(self, model_name: str, success: bool, duration: float, error_type: str = None):
with self.lock:
timestamp = datetime.now()
self.call_history[model_name].append({
"timestamp": timestamp,
"success": success,
"duration": duration,
"error_type": error_type
})
self.total_calls[model_name] += 1
if not success:
self.error_counts[model_name] += 1
def get_stats(self, model_name: str, time_window_minutes: int = 60) -> dict:
with self.lock:
now = datetime.now()
time_window = timedelta(minutes=time_window_minutes)
筛选时间窗口内的调用记录
recent_calls = [
call for call in self.call_history[model_name]
if now - call["timestamp"] 0 else 0
error_rate = 1 - success_rate
durations = [call["duration"] for call in recent_calls]
avg_duration = sum(durations) / len(durations) if durations else 0
统计错误类型
error_types = defaultdict(int)
for call in recent_calls:
if not call["success"] and call["error_type"]:
error_types[call["error_type"]] += 1
return {
"total_calls": total_calls,
"success_rate": success_rate,
"avg_duration": avg_duration,
"error_rate": error_rate,
"error_types": dict(error_types)
}
def print_stats(self, model_name: str, time_window_minutes: int = 60):
stats = self.get_stats(model_name, time_window_minutes)
print(f"n{model_name} API调用统计 (最近{time_window_minutes}分钟):")
print(f"总调用次数: {stats['total_calls']}")
print(f"成功率: {stats['success_rate']:.2%}")
print(f"平均响应时间: {stats['avg_duration']:.2f}秒")
print(f"错误率: {stats['error_rate']:.2%}")
if stats["error_types"]:
print("错误类型分布:")
for error_type, count in stats["error_types"].items():
print(f" {error_type}: {count}次")
创建监控实例
api_monitor = APIMonitor()
创建一个带有监控功能的AI模型接口
class MonitoredAIModelInterface(RobustAIModelInterface):
def __init__(self, monitor: APIMonitor):
super().__init__()
self.monitor = monitor
def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
start_time = time.time()
success = False
result = None
error_type = None
try:
result = super().call_deepseek(messages, temperature, max_tokens)
success = result is not None
return result
except Exception as e:
error_type = type(e).__name__
raise
finally:
duration = time.time() - start_time
self.monitor.record_call("deepseek", success, duration, error_type)
def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
start_time = time.time()
success = False
result = None
error_type = None
try:
result = super().call_openai(messages, temperature, max_tokens)
success = result is not None
return result
except Exception as e:
error_type = type(e).__name__
raise
finally:
duration = time.time() - start_time
self.monitor.record_call("openai", success, duration, error_type)
def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
start_time = time.time()
success = False
result = None
error_type = None
try:
result = super().call_qwen(messages, temperature, top_p)
success = result is not None
return result
except Exception as e:
error_type = type(e).__name__
raise
finally:
duration = time.time() - start_time
self.monitor.record_call("qwen", success, duration, error_type)
使用示例
monitored_ai_interface = MonitoredAIModelInterface(api_monitor)
模拟多次API调用
for i in range(5):
messages = [
{"role": "user", "content": f"请简单介绍人工智能的第{i+1}个应用领域"}
]
try:
deepseek_result = monitored_ai_interface.call_model("deepseek", messages, temperature=0.7)
if deepseek_result:
print(f"DeepSeek调用{i+1}成功")
except Exception as e:
print(f"DeepSeek调用{i+1}失败: {str(e)}")
try:
openai_result = monitored_ai_interface.call_model("openai", messages, temperature=0.7)
if openai_result:
print(f"OpenAI调用{i+1}成功")
except Exception as e:
print(f"OpenAI调用{i+1}失败: {str(e)}")
try:
qwen_result = monitored_ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)
if qwen_result:
print(f"通义千问调用{i+1}成功")
except Exception as e:
print(f"通义千问调用{i+1}失败: {str(e)}")
短暂延迟,避免触发API限制
time.sleep(1)
打印统计信息
api_monitor.print_stats("deepseek")
api_monitor.print_stats("openai")
api_monitor.print_stats("qwen")
API调用优化策略
为了提高API调用的效率和降低成本,可以采取以下优化策略:
- 请求批处理:将多个小请求合并为一个大请求,减少API调用次数。
- 结果缓存:对相同或相似的请求进行缓存,避免重复调用API。
- 模型选择:根据任务复杂度选择合适的模型,简单任务使用轻量级模型。
- 参数调优:调整temperature、max_tokens等参数,平衡质量和成本。
- 异步处理:使用异步请求提高并发性能。
以下是一个实现请求批处理和结果缓存的示例:
import hashlib
import json
import asyncio
import aiohttp
from functools import lru_cache
from typing import List, Dict, Any, Optional, Tuple
class CachedBatchAIModelInterface(MonitoredAIModelInterface):
def __init__(self, monitor: APIMonitor, cache_size=1000):
super().__init__(monitor)
self.cache_size = cache_size
@lru_cache(maxsize=1000)
def _get_cache_key(self, model_name: str, messages_json: str, kwargs) -> str:
将参数转换为JSON字符串并生成哈希键
params = {
"model_name": model_name,
"messages": json.loads(messages_json),
kwargs
}
params_json = json.dumps(params, sort_keys=True)
return hashlib.md5(params_json.encode()).hexdigest()
async def _batch_call_deepseek(self, batch_requests: List[Tuple[list, dict]]) -> List[Optional[Dict[str, Any]]]:
模拟批量调用DeepSeek API
注意:实际API可能不支持真正的批量处理,这里只是示例
results = []
for messages, kwargs in batch_requests:
try:
检查缓存
messages_json = json.dumps(messages)
cache_key = self._get_cache_key("deepseek", messages_json, kwargs)
cached_result = self._get_cached_result(cache_key)
if cached_result:
results.append(cached_result)
continue
调用API
result = await self._async_call_deepseek(messages, kwargs)
缓存结果
if result:
self._cache_result(cache_key, result)
results.append(result)
except Exception as e:
print(f"批量调用DeepSeek时发生异常: {str(e)}")
results.append(None)
return results
async def _async_call_deepseek(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
异步调用DeepSeek API
config = self.models["deepseek"]
data = {
"model": "deepseek-chat",
"messages": messages,
kwargs
}
async with aiohttp.ClientSession() as session:
async with session.post(
config["url"],
headers=config["headers"](config["api_key"]),
json=data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
print(f"DeepSeek API错误: {response.status} - {error_text}")
return None
def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]:
从缓存获取结果
实际应用中可以使用Redis等专门的缓存系统
return None
def _cache_result(self, cache_key: str, result: Dict[str, Any]):
缓存结果
实际应用中可以使用Redis等专门的缓存系统
pass
async def batch_process(self, model_name: str, requests: List[Tuple[list, dict]]) -> List[Optional[Dict[str, Any]]]:
if model_name == "deepseek":
return await self._batch_call_deepseek(requests)
else:
对于不支持批量处理的模型,使用并发调用
tasks = []
for messages, kwargs in requests:
if model_name == "openai":
task = self._async_call_openai(messages, kwargs)
elif model_name == "qwen":
task = self._async_call_qwen(messages, kwargs)
else:
task = asyncio.create_task(asyncio.sleep(0))
print(f"不支持的模型: {model_name}")
tasks.append(task)
return await asyncio.gather(tasks, return_exceptions=True)
async def _async_call_openai(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
异步调用OpenAI API
实际实现需要使用OpenAI的异步客户端
return None
async def _async_call_qwen(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
异步调用通义千问API
实际实现需要使用通义千问的异步客户端
return None
使用示例
async def main():
cached_batch_interface = CachedBatchAIModelInterface(api_monitor)
准备批量请求
batch_requests = []
for i in range(5):
messages = [
{"role": "user", "content": f"请简要解释人工智能的第{i+1}个重要概念"}
]
batch_requests.append((messages, {"temperature": 0.7}))
批量处理请求
results = await cached_batch_interface.batch_process("deepseek", batch_requests)
处理结果
for i, result in enumerate(results):
if result:
print(f"请求{i+1}结果:", result["choices"][0]["message"]["content"])
else:
print(f"请求{i+1}失败")
运行异步示例
asyncio.run(main())