免费AI模型API接口调用:DeepSeek、OpenAI和通义千问的密钥获取与使用方法

DeepSeek API密钥获取与集成开发

要开始使用DeepSeek API,首先需要注册并获取API密钥。访问DeepSeek官方网站,完成账号注册后进入开发者控制台。在控制台中,你可以创建新的API密钥,建议为不同项目设置不同的密钥以便管理。

免费AI模型API接口调用:DeepSeek、OpenAI和通义千问的密钥获取与使用方法

获取API密钥后,你可以使用以下Python代码示例进行API调用:


import requests

api_key = "你的DeepSeek API密钥"
url = "https://api.deepseek.com/v1/chat/completions"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
}

data = {
    "model": "deepseek-chat",
    "messages": [
        {"role": "user", "content": "你好,请介绍一下你自己"}
    ],
    "temperature": 0.7
}

response = requests.post(url, headers=headers, json=data)
print(response.json())

注意:DeepSeek API有调用频率限制,免费用户每分钟最多请求60次。如果你的应用需要更高的调用频率,考虑升级到付费计划。

DeepSeek API参数配置

DeepSeek API提供了多个可配置参数,以下是一些常用参数及其说明:

参数名 类型 默认值 说明
model string deepseek-chat 指定使用的模型名称
temperature number 0.7 控制生成文本的随机性,范围0-2
max_tokens integer 2048 生成文本的最大长度
top_p number 1.0 核采样参数,控制词汇选择的概率范围

OpenAI API密钥获取与集成开发

OpenAI提供了强大的API接口,支持多种AI模型的调用。要使用OpenAI API,你需要先创建OpenAI账号并获取API密钥。

登录OpenAI官网后,进入API密钥管理页面,点击"Create new secret key"生成新的API密钥。请妥善保存你的密钥,因为它只会在创建时显示一次。

以下是使用OpenAI API的Python代码示例:


import openai

openai.api_key = "你的OpenAI API密钥"

response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": "你是一个有用的助手。"},
        {"role": "user", "content": "解释一下量子计算的基本原理"}
    ],
    temperature=0.7,
    max_tokens=1000
)

print(response.choices[0].message['content'])

警告:OpenAI API是按使用量计费的,即使是免费额度也有一定限制。请监控你的API使用情况,避免超出预算。你可以在OpenAI控制台的Usage页面查看详细的使用统计。

OpenAI API高级配置

OpenAI API支持多种高级配置选项,以下是一些常用的高级参数:


response = openai.ChatCompletion.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "你是一个专业的技术文档撰写者。"},
        {"role": "user", "content": "为以下代码添加详细注释:nndef fibonacci(n):n    if n <= 1:n        return nn    else:n        return fibonacci(n-1) + fibonacci(n-2)"}
    ],
    temperature=0.3,
    max_tokens=1500,
    top_p=0.9,
    frequency_penalty=0.5,
    presence_penalty=0.5,
    stop=["nn", ""]
)

这些参数中,frequency_penalty和presence_penalty用于控制生成文本的重复性,stop参数用于指定停止生成的标记。

通义千问API密钥获取与集成开发

通义千问是阿里巴巴开发的大语言模型,提供了强大的API接口供开发者使用。要使用通义千问API,你需要先注册阿里云账号并开通通义千问服务。

登录阿里云控制台后,搜索"通义千问"并进入服务页面。按照指引开通服务后,在AccessKey管理页面创建AccessKey ID和AccessKey Secret。这些凭证将用于API调用认证。

以下是使用通义千问API的Python代码示例:


import requests
import json
import datetime
import hmac
import hashlib
import base64
from urllib.parse import quote

 配置你的AccessKey信息
access_key_id = "你的AccessKey ID"
access_key_secret = "你的AccessKey Secret"

 请求参数
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
method = "POST"
accept = "application/json"
content_type = "application/json"

 构造请求体
body = {
    "model": "qwen-turbo",
    "input": {
        "messages": [
            {
                "role": "user",
                "content": "请写一首关于秋天的诗"
            }
        ]
    },
    "parameters": {
        "temperature": 0.7,
        "top_p": 0.8
    }
}

 将请求体转换为JSON字符串
body_str = json.dumps(body)

 生成签名
def get_signature(method, accept, content_type, date, body_str, access_key_secret):
    string_to_sign = method + "n" + accept + "n" + content_type + "n" + date + "n" + body_str
    signature = hmac.new(access_key_secret.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256).digest()
    return base64.b64encode(signature).decode("utf-8")

 获取当前时间
gmt_format = "%a, %d %b %Y %H:%M:%S GMT"
date = datetime.datetime.utcnow().strftime(gmt_format)

 生成签名
signature = get_signature(method, accept, content_type, date, body_str, access_key_secret)

 构造请求头
headers = {
    "Date": date,
    "Accept": accept,
    "Content-Type": content_type,
    "Authorization": "acs " + access_key_id + ":" + signature,
    "X-DashScope-Async": "enable"
}

 发送请求
response = requests.post(url, headers=headers, data=body_str)
print(response.json())

注意:通义千问API的签名生成过程较为复杂,确保按照官方文档正确实现签名算法。错误的签名将导致API调用失败。

通义千问API参数详解

通义千问API提供了多种参数配置选项,以下是一些关键参数的详细说明:

参数名 类型 可选值 说明
model string qwen-turbo, qwen-plus, qwen-max 指定使用的模型,不同模型性能和价格不同
temperature number 0.0-2.0 控制生成文本的随机性,值越高越随机
top_p number 0.0-1.0 核采样参数,控制词汇选择的概率范围
top_k integer 1-100 限制每步生成时考虑的词汇数量
seed integer 任意整数 随机种子,用于控制生成结果的确定性
repetition_penalty number 1.0-2.0 重复惩罚系数,用于减少生成文本的重复性

多模型API集成与错误处理

在实际应用中,你可能需要同时集成多个AI模型的API。以下是一个简单的多模型API集成框架示例:


import requests
import openai
import json
from typing import Dict, Any, Optional

class AIModelInterface:
    def __init__(self):
        self.models = {
            "deepseek": {
                "api_key": "你的DeepSeek API密钥",
                "url": "https://api.deepseek.com/v1/chat/completions",
                "headers": lambda key: {
                    "Content-Type": "application/json",
                    "Authorization": f"Bearer {key}"
                }
            },
            "openai": {
                "api_key": "你的OpenAI API密钥",
                "model": "gpt-3.5-turbo"
            },
            "qwen": {
                "access_key_id": "你的AccessKey ID",
                "access_key_secret": "你的AccessKey Secret",
                "url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
            }
        }
    
    def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
        try:
            config = self.models["deepseek"]
            data = {
                "model": "deepseek-chat",
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
            
            response = requests.post(
                config["url"],
                headers=config["headers"](config["api_key"]),
                json=data
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"DeepSeek API错误: {response.status_code} - {response.text}")
                return None
                
        except Exception as e:
            print(f"调用DeepSeek API时发生异常: {str(e)}")
            return None
    
    def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
        try:
            openai.api_key = self.models["openai"]["api_key"]
            
            response = openai.ChatCompletion.create(
                model=self.models["openai"]["model"],
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            
            return response
            
        except Exception as e:
            print(f"调用OpenAI API时发生异常: {str(e)}")
            return None
    
    def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
        try:
             这里简化了通义千问的调用过程,实际应用中需要实现完整的签名算法
            config = self.models["qwen"]
            
             构造请求体
            body = {
                "model": "qwen-turbo",
                "input": {
                    "messages": messages
                },
                "parameters": {
                    "temperature": temperature,
                    "top_p": top_p
                }
            }
            
             实际应用中需要实现签名生成逻辑
            headers = {
                "Content-Type": "application/json",
                "Authorization": "Bearer " + config["access_key_id"] + ":" + config["access_key_secret"]
            }
            
            response = requests.post(
                config["url"],
                headers=headers,
                json=body
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"通义千问API错误: {response.status_code} - {response.text}")
                return None
                
        except Exception as e:
            print(f"调用通义千问API时发生异常: {str(e)}")
            return None
    
    def call_model(self, model_name: str, messages: list, kwargs) -> Optional[Dict[str, Any]]:
        if model_name == "deepseek":
            return self.call_deepseek(messages, kwargs)
        elif model_name == "openai":
            return self.call_openai(messages, kwargs)
        elif model_name == "qwen":
            return self.call_qwen(messages, kwargs)
        else:
            print(f"不支持的模型: {model_name}")
            return None

 使用示例
ai_interface = AIModelInterface()

messages = [
    {"role": "user", "content": "请简要介绍人工智能的发展历程"}
]

 调用不同模型
deepseek_result = ai_interface.call_model("deepseek", messages, temperature=0.7)
openai_result = ai_interface.call_model("openai", messages, temperature=0.7)
qwen_result = ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)

 处理结果
if deepseek_result:
    print("DeepSeek结果:", deepseek_result["choices"][0]["message"]["content"])

if openai_result:
    print("OpenAI结果:", openai_result.choices[0].message['content'])

if qwen_result:
    print("通义千问结果:", qwen_result["output"]["text"])

API调用错误处理与重试机制

在实际应用中,API调用可能会遇到各种错误,如网络超时、服务不可用、配额耗尽等。实现健壮的错误处理和重试机制非常重要:


import time
import random
from functools import wraps

def retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2, jitter=True):
    def decorator(func):
        @wraps(func)
        def wrapper(args, kwargs):
            retries = 0
            delay = initial_delay
            
            while retries = max_retries:
                        raise
                    
                     计算延迟时间,可以添加随机抖动以避免多个客户端同时重试
                    if jitter:
                        actual_delay = delay  (0.5 + random.random()  0.5)
                    else:
                        actual_delay = delay
                    
                    print(f"调用失败,{actual_delay}秒后重试 (重试次数: {retries}/{max_retries})")
                    time.sleep(actual_delay)
                    
                     指数退避
                    delay = backoff_factor
            
        return wrapper
    return decorator

 使用装饰器增强API调用方法
class RobustAIModelInterface(AIModelInterface):
    @retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
    def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
         检查API密钥是否有效
        if not self.models["deepseek"]["api_key"]:
            raise ValueError("DeepSeek API密钥未设置")
        
        try:
            config = self.models["deepseek"]
            data = {
                "model": "deepseek-chat",
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
            
            response = requests.post(
                config["url"],
                headers=config["headers"](config["api_key"]),
                json=data,
                timeout=30   设置超时时间
            )
            
             检查HTTP状态码
            if response.status_code == 429:   请求过于频繁
                retry_after = int(response.headers.get("Retry-After", 5))
                raise Exception(f"请求过于频繁,需要等待 {retry_after} 秒")
            elif response.status_code >= 500:   服务器错误
                raise Exception(f"服务器错误: {response.status_code}")
            elif response.status_code != 200:   其他错误
                raise Exception(f"API错误: {response.status_code} - {response.text}")
            
            return response.json()
            
        except requests.exceptions.Timeout:
            raise Exception("请求超时")
        except requests.exceptions.ConnectionError:
            raise Exception("连接错误")
        except Exception as e:
            raise Exception(f"调用DeepSeek API时发生异常: {str(e)}")
    
    @retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
    def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
         检查API密钥是否有效
        if not self.models["openai"]["api_key"]:
            raise ValueError("OpenAI API密钥未设置")
        
        try:
            openai.api_key = self.models["openai"]["api_key"]
            
            response = openai.ChatCompletion.create(
                model=self.models["openai"]["model"],
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                request_timeout=30   设置超时时间
            )
            
            return response
            
        except openai.error.RateLimitError:
            raise Exception("OpenAI API调用频率限制")
        except openai.error.InvalidAPIKey:
            raise Exception("OpenAI API密钥无效")
        except openai.error.ServiceUnavailableError:
            raise Exception("OpenAI服务不可用")
        except Exception as e:
            raise Exception(f"调用OpenAI API时发生异常: {str(e)}")
    
    @retry_on_error(max_retries=3, initial_delay=1, backoff_factor=2)
    def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
         检查AccessKey是否有效
        if not self.models["qwen"]["access_key_id"] or not self.models["qwen"]["access_key_secret"]:
            raise ValueError("通义千问AccessKey未设置")
        
        try:
            config = self.models["qwen"]
            
             构造请求体
            body = {
                "model": "qwen-turbo",
                "input": {
                    "messages": messages
                },
                "parameters": {
                    "temperature": temperature,
                    "top_p": top_p
                }
            }
            
             实际应用中需要实现签名生成逻辑
            headers = {
                "Content-Type": "application/json",
                "Authorization": "Bearer " + config["access_key_id"] + ":" + config["access_key_secret"]
            }
            
            response = requests.post(
                config["url"],
                headers=headers,
                json=body,
                timeout=30   设置超时时间
            )
            
             检查HTTP状态码
            if response.status_code == 429:   请求过于频繁
                retry_after = int(response.headers.get("Retry-After", 5))
                raise Exception(f"请求过于频繁,需要等待 {retry_after} 秒")
            elif response.status_code >= 500:   服务器错误
                raise Exception(f"服务器错误: {response.status_code}")
            elif response.status_code != 200:   其他错误
                raise Exception(f"API错误: {response.status_code} - {response.text}")
            
            return response.json()
            
        except requests.exceptions.Timeout:
            raise Exception("请求超时")
        except requests.exceptions.ConnectionError:
            raise Exception("连接错误")
        except Exception as e:
            raise Exception(f"调用通义千问API时发生异常: {str(e)}")

 使用示例
robust_ai_interface = RobustAIModelInterface()

messages = [
    {"role": "user", "content": "请解释什么是机器学习"}
]

 调用不同模型,自动处理错误和重试
try:
    deepseek_result = robust_ai_interface.call_model("deepseek", messages, temperature=0.7)
    if deepseek_result:
        print("DeepSeek结果:", deepseek_result["choices"][0]["message"]["content"])
except Exception as e:
    print(f"调用DeepSeek失败: {str(e)}")

try:
    openai_result = robust_ai_interface.call_model("openai", messages, temperature=0.7)
    if openai_result:
        print("OpenAI结果:", openai_result.choices[0].message['content'])
except Exception as e:
    print(f"调用OpenAI失败: {str(e)}")

try:
    qwen_result = robust_ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)
    if qwen_result:
        print("通义千问结果:", qwen_result["output"]["text"])
except Exception as e:
    print(f"调用通义千问失败: {str(e)}")

API调用监控与优化

为了确保AI模型API的稳定性和性能,实现监控和优化机制非常重要。以下是一个简单的API调用监控和性能优化的示例:


import time
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta

class APIMonitor:
    def __init__(self, max_history_size=1000):
        self.max_history_size = max_history_size
        self.call_history = defaultdict(lambda: deque(maxlen=max_history_size))
        self.error_counts = defaultdict(int)
        self.total_calls = defaultdict(int)
        self.lock = threading.Lock()
    
    def record_call(self, model_name: str, success: bool, duration: float, error_type: str = None):
        with self.lock:
            timestamp = datetime.now()
            self.call_history[model_name].append({
                "timestamp": timestamp,
                "success": success,
                "duration": duration,
                "error_type": error_type
            })
            
            self.total_calls[model_name] += 1
            
            if not success:
                self.error_counts[model_name] += 1
    
    def get_stats(self, model_name: str, time_window_minutes: int = 60) -> dict:
        with self.lock:
            now = datetime.now()
            time_window = timedelta(minutes=time_window_minutes)
            
             筛选时间窗口内的调用记录
            recent_calls = [
                call for call in self.call_history[model_name]
                if now - call["timestamp"]  0 else 0
            error_rate = 1 - success_rate
            
            durations = [call["duration"] for call in recent_calls]
            avg_duration = sum(durations) / len(durations) if durations else 0
            
             统计错误类型
            error_types = defaultdict(int)
            for call in recent_calls:
                if not call["success"] and call["error_type"]:
                    error_types[call["error_type"]] += 1
            
            return {
                "total_calls": total_calls,
                "success_rate": success_rate,
                "avg_duration": avg_duration,
                "error_rate": error_rate,
                "error_types": dict(error_types)
            }
    
    def print_stats(self, model_name: str, time_window_minutes: int = 60):
        stats = self.get_stats(model_name, time_window_minutes)
        print(f"n{model_name} API调用统计 (最近{time_window_minutes}分钟):")
        print(f"总调用次数: {stats['total_calls']}")
        print(f"成功率: {stats['success_rate']:.2%}")
        print(f"平均响应时间: {stats['avg_duration']:.2f}秒")
        print(f"错误率: {stats['error_rate']:.2%}")
        
        if stats["error_types"]:
            print("错误类型分布:")
            for error_type, count in stats["error_types"].items():
                print(f"  {error_type}: {count}次")

 创建监控实例
api_monitor = APIMonitor()

 创建一个带有监控功能的AI模型接口
class MonitoredAIModelInterface(RobustAIModelInterface):
    def __init__(self, monitor: APIMonitor):
        super().__init__()
        self.monitor = monitor
    
    def call_deepseek(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
        start_time = time.time()
        success = False
        result = None
        error_type = None
        
        try:
            result = super().call_deepseek(messages, temperature, max_tokens)
            success = result is not None
            return result
        except Exception as e:
            error_type = type(e).__name__
            raise
        finally:
            duration = time.time() - start_time
            self.monitor.record_call("deepseek", success, duration, error_type)
    
    def call_openai(self, messages: list, temperature: float = 0.7, max_tokens: int = 1000) -> Optional[Dict[str, Any]]:
        start_time = time.time()
        success = False
        result = None
        error_type = None
        
        try:
            result = super().call_openai(messages, temperature, max_tokens)
            success = result is not None
            return result
        except Exception as e:
            error_type = type(e).__name__
            raise
        finally:
            duration = time.time() - start_time
            self.monitor.record_call("openai", success, duration, error_type)
    
    def call_qwen(self, messages: list, temperature: float = 0.7, top_p: float = 0.8) -> Optional[Dict[str, Any]]:
        start_time = time.time()
        success = False
        result = None
        error_type = None
        
        try:
            result = super().call_qwen(messages, temperature, top_p)
            success = result is not None
            return result
        except Exception as e:
            error_type = type(e).__name__
            raise
        finally:
            duration = time.time() - start_time
            self.monitor.record_call("qwen", success, duration, error_type)

 使用示例
monitored_ai_interface = MonitoredAIModelInterface(api_monitor)

 模拟多次API调用
for i in range(5):
    messages = [
        {"role": "user", "content": f"请简单介绍人工智能的第{i+1}个应用领域"}
    ]
    
    try:
        deepseek_result = monitored_ai_interface.call_model("deepseek", messages, temperature=0.7)
        if deepseek_result:
            print(f"DeepSeek调用{i+1}成功")
    except Exception as e:
        print(f"DeepSeek调用{i+1}失败: {str(e)}")
    
    try:
        openai_result = monitored_ai_interface.call_model("openai", messages, temperature=0.7)
        if openai_result:
            print(f"OpenAI调用{i+1}成功")
    except Exception as e:
        print(f"OpenAI调用{i+1}失败: {str(e)}")
    
    try:
        qwen_result = monitored_ai_interface.call_model("qwen", messages, temperature=0.7, top_p=0.8)
        if qwen_result:
            print(f"通义千问调用{i+1}成功")
    except Exception as e:
        print(f"通义千问调用{i+1}失败: {str(e)}")
    
     短暂延迟,避免触发API限制
    time.sleep(1)

 打印统计信息
api_monitor.print_stats("deepseek")
api_monitor.print_stats("openai")
api_monitor.print_stats("qwen")

API调用优化策略

为了提高API调用的效率和降低成本,可以采取以下优化策略:

  1. 请求批处理:将多个小请求合并为一个大请求,减少API调用次数。
  2. 结果缓存:对相同或相似的请求进行缓存,避免重复调用API。
  3. 模型选择:根据任务复杂度选择合适的模型,简单任务使用轻量级模型。
  4. 参数调优:调整temperature、max_tokens等参数,平衡质量和成本。
  5. 异步处理:使用异步请求提高并发性能。

以下是一个实现请求批处理和结果缓存的示例:


import hashlib
import json
import asyncio
import aiohttp
from functools import lru_cache
from typing import List, Dict, Any, Optional, Tuple

class CachedBatchAIModelInterface(MonitoredAIModelInterface):
    def __init__(self, monitor: APIMonitor, cache_size=1000):
        super().__init__(monitor)
        self.cache_size = cache_size
    
    @lru_cache(maxsize=1000)
    def _get_cache_key(self, model_name: str, messages_json: str, kwargs) -> str:
         将参数转换为JSON字符串并生成哈希键
        params = {
            "model_name": model_name,
            "messages": json.loads(messages_json),
            kwargs
        }
        params_json = json.dumps(params, sort_keys=True)
        return hashlib.md5(params_json.encode()).hexdigest()
    
    async def _batch_call_deepseek(self, batch_requests: List[Tuple[list, dict]]) -> List[Optional[Dict[str, Any]]]:
         模拟批量调用DeepSeek API
         注意:实际API可能不支持真正的批量处理,这里只是示例
        results = []
        
        for messages, kwargs in batch_requests:
            try:
                 检查缓存
                messages_json = json.dumps(messages)
                cache_key = self._get_cache_key("deepseek", messages_json, kwargs)
                cached_result = self._get_cached_result(cache_key)
                
                if cached_result:
                    results.append(cached_result)
                    continue
                
                 调用API
                result = await self._async_call_deepseek(messages, kwargs)
                
                 缓存结果
                if result:
                    self._cache_result(cache_key, result)
                
                results.append(result)
            except Exception as e:
                print(f"批量调用DeepSeek时发生异常: {str(e)}")
                results.append(None)
        
        return results
    
    async def _async_call_deepseek(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
         异步调用DeepSeek API
        config = self.models["deepseek"]
        data = {
            "model": "deepseek-chat",
            "messages": messages,
            kwargs
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                config["url"],
                headers=config["headers"](config["api_key"]),
                json=data,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    error_text = await response.text()
                    print(f"DeepSeek API错误: {response.status} - {error_text}")
                    return None
    
    def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]:
         从缓存获取结果
         实际应用中可以使用Redis等专门的缓存系统
        return None
    
    def _cache_result(self, cache_key: str, result: Dict[str, Any]):
         缓存结果
         实际应用中可以使用Redis等专门的缓存系统
        pass
    
    async def batch_process(self, model_name: str, requests: List[Tuple[list, dict]]) -> List[Optional[Dict[str, Any]]]:
        if model_name == "deepseek":
            return await self._batch_call_deepseek(requests)
        else:
             对于不支持批量处理的模型,使用并发调用
            tasks = []
            for messages, kwargs in requests:
                if model_name == "openai":
                    task = self._async_call_openai(messages, kwargs)
                elif model_name == "qwen":
                    task = self._async_call_qwen(messages, kwargs)
                else:
                    task = asyncio.create_task(asyncio.sleep(0))
                    print(f"不支持的模型: {model_name}")
                
                tasks.append(task)
            
            return await asyncio.gather(tasks, return_exceptions=True)
    
    async def _async_call_openai(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
         异步调用OpenAI API
         实际实现需要使用OpenAI的异步客户端
        return None
    
    async def _async_call_qwen(self, messages: list, kwargs) -> Optional[Dict[str, Any]]:
         异步调用通义千问API
         实际实现需要使用通义千问的异步客户端
        return None

 使用示例
async def main():
    cached_batch_interface = CachedBatchAIModelInterface(api_monitor)
    
     准备批量请求
    batch_requests = []
    for i in range(5):
        messages = [
            {"role": "user", "content": f"请简要解释人工智能的第{i+1}个重要概念"}
        ]
        batch_requests.append((messages, {"temperature": 0.7}))
    
     批量处理请求
    results = await cached_batch_interface.batch_process("deepseek", batch_requests)
    
     处理结果
    for i, result in enumerate(results):
        if result:
            print(f"请求{i+1}结果:", result["choices"][0]["message"]["content"])
        else:
            print(f"请求{i+1}失败")

 运行异步示例
 asyncio.run(main())