智谱AI开放平台API调用实战技巧与性能优化方法
- Linkreate AI插件 文章
- 2025-08-24 18:16:07
- 11阅读
获取API凭证与基础环境配置
要在你的项目中集成智谱AI模型能力,首先需要获取有效的API凭证。访问智谱AI开放平台(https://open.bigmodel.cn/)注册账号并创建应用,系统会为你分配API Key和Secret Key,这是后续所有调用的基础凭证。
import requests
import json
import time
import hashlib
import hmac
import base64
class ZhipuAI:
def __init__(self, api_key, secret_key):
self.api_key = api_key
self.secret_key = secret_key
self.base_url = "https://open.bigmodel.cn/api/paas/v3/model-api/chatglm_pro/invoke"
def generate_signature(self, timestamp):
"""生成签名"""
string_to_sign = f"{self.api_key}.{timestamp}"
hmac_code = hmac.new(
self.secret_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(hmac_code).decode('utf-8')
def get_headers(self):
"""构建请求头"""
timestamp = str(int(time.time()))
signature = self.generate_signature(timestamp)
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}.{timestamp}.{signature}"
}
这段代码实现了智谱AI API的基础认证机制。签名生成过程使用了HMAC-SHA256算法,确保API调用的安全性。每次请求都需要生成新的时间戳和签名,防止重放攻击。
注意:请妥善保管你的API Key和Secret Key,不要在客户端代码中直接暴露这些敏感信息,建议通过后端服务代理API请求。
文本生成功能实现
智谱AI模型最常用的功能是文本生成,下面我们实现一个完整的对话接口:
def chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
"""
发起对话请求
:param prompt: 用户输入的提示文本
:param temperature: 控制生成文本的随机性,0-1之间
:param max_tokens: 生成文本的最大长度
:return: 模型生成的回复
"""
payload = {
"prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens
}
headers = self.get_headers()
response = requests.post(
self.base_url,
headers=headers,
data=json.dumps(payload)
)
if response.status_code == 200:
result = response.json()
if result.get("success"):
return result.get("data", {}).get("choices", [{}])[0].get("content", "")
else:
raise Exception(f"API返回错误: {result.get('msg', '未知错误')}")
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
这个方法实现了与智谱AI模型的对话功能。temperature参数控制生成文本的随机性,值越高输出越随机,值越低输出越确定。max_tokens参数限制了生成文本的长度,防止生成过长的响应。
批量处理与性能优化
当你需要处理大量请求时,简单的串行调用效率低下。下面我们实现一个批量处理机制,并加入请求缓存和重试逻辑:
import threading
import queue
from functools import lru_cache
class BatchZhipuAI(ZhipuAI):
def __init__(self, api_key, secret_key, max_workers=5):
super().__init__(api_key, secret_key)
self.task_queue = queue.Queue()
self.result_dict = {}
self.max_workers = max_workers
self.workers = []
self._init_workers()
def _init_workers(self):
"""初始化工作线程"""
for i in range(self.max_workers):
worker = threading.Thread(target=self._worker, daemon=True)
worker.start()
self.workers.append(worker)
def _worker(self):
"""工作线程处理函数"""
while True:
task_id, prompt, kwargs = self.task_queue.get()
try:
result = self.chat_completion(prompt, kwargs)
self.result_dict[task_id] = {"status": "success", "result": result}
except Exception as e:
self.result_dict[task_id] = {"status": "error", "error": str(e)}
finally:
self.task_queue.task_done()
@lru_cache(maxsize=1000)
def cached_chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
"""带缓存的对话请求"""
return self.chat_completion(prompt, temperature, max_tokens)
def batch_request(self, prompts_list, use_cache=True, kwargs):
"""
批量处理对话请求
:param prompts_list: 提示文本列表
:param use_cache: 是否使用缓存
:param kwargs: 其他参数
:return: 结果列表
"""
task_ids = []
for i, prompt in enumerate(prompts_list):
task_id = f"task_{int(time.time()1000)}_{i}"
task_ids.append(task_id)
if use_cache:
try:
result = self.cached_chat_completion(prompt, kwargs)
self.result_dict[task_id] = {"status": "success", "result": result}
continue
except Exception:
pass
self.task_queue.put((task_id, prompt, kwargs))
self.task_queue.join()
results = []
for task_id in task_ids:
task_result = self.result_dict.get(task_id, {"status": "pending"})
if task_result["status"] == "success":
results.append(task_result["result"])
else:
results.append(f"Error: {task_result.get('error', 'Unknown error')}")
return results
这段代码实现了以下优化:
1. 使用线程池处理并发请求,提高吞吐量
2. 添加了LRU缓存机制,避免重复请求相同内容
3. 实现了任务队列和结果收集机制,便于批量处理
4. 每个任务都有唯一ID,便于追踪和调试
警告:智谱AI开放平台对API调用频率有限制,批量处理时请合理设置worker数量,避免触发频率限制。建议在正式环境使用前,先在测试环境验证性能表现。
错误处理与重试机制
网络请求难免会遇到各种异常情况,下面我们实现一个健壮的错误处理和重试机制:
import random
from datetime import datetime, timedelta
class RobustZhipuAI(BatchZhipuAI):
def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
super().__init__(api_key, secret_key, max_workers)
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.last_request_time = None
self.min_interval = 0.1 请求间隔,秒
def _wait_if_needed(self):
"""控制请求频率"""
if self.last_request_time:
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request_time = time.time()
def chat_completion_with_retry(self, prompt, temperature=0.7, max_tokens=2048):
"""
带重试机制的对话请求
:param prompt: 用户输入的提示文本
:param temperature: 控制生成文本的随机性
:param max_tokens: 生成文本的最大长度
:return: 模型生成的回复
"""
self._wait_if_needed()
for attempt in range(self.max_retries + 1):
try:
return self.chat_completion(prompt, temperature, max_tokens)
except Exception as e:
if attempt == self.max_retries:
raise
指数退避策略
sleep_time = self.backoff_factor (2 attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
记录重试日志
print(f"Retry attempt {attempt + 1}/{self.max_retries} after error: {str(e)}")
def health_check(self):
"""健康检查"""
try:
response = self.chat_completion_with_retry("你好", max_tokens=10)
return len(response) > 0
except Exception as e:
print(f"Health check failed: {str(e)}")
return False
这段代码实现了以下功能:
1. 指数退避重试机制,在网络不稳定时自动重试
2. 请求频率控制,避免短时间内发送过多请求
3. 健康检查功能,可用于监控API可用性
4. 详细的错误日志记录,便于问题排查
流式输出实现
对于长文本生成场景,一次性等待完整响应会带来较差的用户体验。智谱AI支持流式输出,我们可以实现一个实时显示生成内容的接口:
import sseclient
class StreamingZhipuAI(RobustZhipuAI):
def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
super().__init__(api_key, secret_key, max_workers, max_retries, backoff_factor)
self.streaming_base_url = "https://open.bigmodel.cn/api/paas/v3/model-api/chatglm_pro/sse-invoke"
def stream_chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
"""
流式对话请求
:param prompt: 用户输入的提示文本
:param temperature: 控制生成文本的随机性
:param max_tokens: 生成文本的最大长度
:return: 生成内容的迭代器
"""
payload = {
"prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens
}
headers = self.get_headers()
headers["Accept"] = "text/event-stream"
response = requests.post(
self.streaming_base_url,
headers=headers,
data=json.dumps(payload),
stream=True
)
if response.status_code == 200:
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == "message":
try:
data = json.loads(event.data)
if data.get("success"):
content = data.get("data", {}).get("content", "")
if content:
yield content
else:
raise Exception(f"API返回错误: {data.get('msg', '未知错误')}")
except json.JSONDecodeError:
continue
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
def interactive_chat(self):
"""交互式对话界面"""
print("智谱AI交互式对话(输入'quit'退出)")
while True:
user_input = input("n你: ")
if user_input.lower() == 'quit':
break
print("nAI: ", end="", flush=True)
try:
for content in self.stream_chat_completion(user_input):
print(content, end="", flush=True)
print() 换行
except Exception as e:
print(f"n错误: {str(e)}")
这段代码实现了以下功能:
1. 使用Server-Sent Events(SSE)协议接收流式输出
2. 提供生成内容的迭代器接口,便于集成到各种应用中
3. 实现了一个简单的交互式命令行对话界面
4. 实时显示生成内容,提升用户体验
性能监控与统计分析
为了更好地了解API使用情况和性能表现,我们可以添加监控和统计功能:
import statistics
from collections import defaultdict
class MonitoredZhipuAI(StreamingZhipuAI):
def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
super().__init__(api_key, secret_key, max_workers, max_retries, backoff_factor)
self.request_stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"response_times": [],
"daily_usage": defaultdict(int),
"hourly_usage": defaultdict(int)
}
def chat_completion_with_retry(self, prompt, temperature=0.7, max_tokens=2048):
"""带监控的对话请求"""
start_time = time.time()
self.request_stats["total_requests"] += 1
记录使用情况
now = datetime.now()
date_key = now.strftime("%Y-%m-%d")
hour_key = now.strftime("%Y-%m-%d %H:00")
self.request_stats["daily_usage"][date_key] += 1
self.request_stats["hourly_usage"][hour_key] += 1
try:
result = super().chat_completion_with_retry(prompt, temperature, max_tokens)
记录成功请求
self.request_stats["successful_requests"] += 1
self.request_stats["total_tokens"] += len(result)
记录响应时间
response_time = time.time() - start_time
self.request_stats["response_times"].append(response_time)
return result
except Exception as e:
记录失败请求
self.request_stats["failed_requests"] += 1
raise
def get_stats(self):
"""获取统计信息"""
response_times = self.request_stats["response_times"]
avg_response_time = statistics.mean(response_times) if response_times else 0
max_response_time = max(response_times) if response_times else 0
min_response_time = min(response_times) if response_times else 0
success_rate = 0
if self.request_stats["total_requests"] > 0:
success_rate = self.request_stats["successful_requests"] / self.request_stats["total_requests"] 100
return {
"total_requests": self.request_stats["total_requests"],
"successful_requests": self.request_stats["successful_requests"],
"failed_requests": self.request_stats["failed_requests"],
"success_rate": f"{success_rate:.2f}%",
"total_tokens": self.request_stats["total_tokens"],
"avg_response_time": f"{avg_response_time:.2f}s",
"max_response_time": f"{max_response_time:.2f}s",
"min_response_time": f"{min_response_time:.2f}s",
"daily_usage": dict(self.request_stats["daily_usage"]),
"hourly_usage": dict(self.request_stats["hourly_usage"])
}
def print_stats(self):
"""打印统计信息"""
stats = self.get_stats()
print("n===== 智谱AI API使用统计 =====")
print(f"总请求数: {stats['total_requests']}")
print(f"成功请求数: {stats['successful_requests']}")
print(f"失败请求数: {stats['failed_requests']}")
print(f"成功率: {stats['success_rate']}")
print(f"总生成Token数: {stats['total_tokens']}")
print(f"平均响应时间: {stats['avg_response_time']}")
print(f"最大响应时间: {stats['max_response_time']}")
print(f"最小响应时间: {stats['min_response_time']}")
print("n最近7天使用情况:")
sorted_days = sorted(stats['daily_usage'].items())[-7:]
for date, count in sorted_days:
print(f" {date}: {count}次请求")
print("n最近24小时使用情况:")
sorted_hours = sorted(stats['hourly_usage'].items())[-24:]
for hour, count in sorted_hours:
print(f" {hour}: {count}次请求")
这段代码实现了以下功能:
1. 记录API请求的成功率、响应时间等关键指标
2. 按天和按小时统计API使用情况
3. 计算平均、最大、最小响应时间
4. 提供统计信息的格式化输出
实际应用场景示例
下面我们展示几个智谱AI模型的实际应用场景,帮助你更好地理解如何将其集成到自己的项目中。
智能客服系统
class CustomerServiceBot(MonitoredZhipuAI):
def __init__(self, api_key, secret_key, knowledge_base=None):
super().__init__(api_key, secret_key)
self.knowledge_base = knowledge_base or []
self.conversation_history = {}
def add_knowledge(self, question, answer):
"""添加知识库条目"""
self.knowledge_base.append({"question": question, "answer": answer})
def search_knowledge(self, query, threshold=0.8):
"""从知识库中搜索答案"""
这里简化处理,实际应用中可以使用向量相似度搜索
for item in self.knowledge_base:
if query.lower() in item["question"].lower() or item["question"].lower() in query.lower():
return item["answer"]
return None
def get_response(self, user_id, user_input):
"""获取客服回复"""
获取或创建会话历史
if user_id not in self.conversation_history:
self.conversation_history[user_id] = []
history = self.conversation_history[user_id]
先尝试从知识库中查找答案
knowledge_answer = self.search_knowledge(user_input)
if knowledge_answer:
history.append({"role": "user", "content": user_input})
history.append({"role": "assistant", "content": knowledge_answer})
return knowledge_answer
构建提示词,包含上下文
context = "n".join([f"{item['role']}: {item['content']}" for item in history[-5:]])
prompt = f"""你是一个专业的客服代表,请根据以下对话历史回答用户的问题。如果不知道答案,请诚实地表示你不知道,不要编造信息。
对话历史:
{context}
用户: {user_input}
客服:"""
try:
response = self.chat_completion_with_retry(prompt)
更新对话历史
history.append({"role": "user", "content": user_input})
history.append({"role": "assistant", "content": response})
限制历史长度,防止上下文过长
if len(history) > 20:
history = history[-20:]
self.conversation_history[user_id] = history
return response
except Exception as e:
return f"抱歉,我遇到了一些问题: {str(e)}"
这个智能客服系统实现了以下功能:
1. 基于知识库的快速问答
2. 维护多用户对话历史
3. 上下文感知的对话能力
4. 优雅的错误处理
内容生成工具
class ContentGenerator(MonitoredZhipuAI):
def __init__(self, api_key, secret_key):
super().__init__(api_key, secret_key)
def generate_blog_post(self, topic, word_count=800, style="informative"):
"""生成博客文章"""
prompt = f"""请写一篇关于"{topic}"的博客文章,字数约{word_count}字,风格为{style}。
文章应该包含引人入胜的标题、简短的引言、有条理的正文和总结性结尾。
请确保内容原创、信息丰富且易于理解。"""
return self.chat_completion_with_retry(prompt, max_tokens=word_count1.5)
def generate_product_description(self, product_name, features, target_audience):
"""生成产品描述"""
features_text = "n".join([f"- {feature}" for feature in features])
prompt = f"""请为以下产品撰写一个吸引人的产品描述:
产品名称: {product_name}
产品特点:
{features_text}
目标受众: {target_audience}
请突出产品的核心优势和价值主张,使用有说服力的语言,并包含明确的行动号召。"""
return self.chat_completion_with_retry(prompt)
def summarize_text(self, text, max_length=200):
"""文本摘要"""
prompt = f"""请将以下文本摘要成不超过{max_length}字的简短摘要,保留核心信息和关键点:
原文:
{text}
摘要:"""
return self.chat_completion_with_retry(prompt, max_tokens=max_length1.5)
这个内容生成工具提供了以下功能:
1. 博客文章生成,可指定主题、字数和风格
2. 产品描述生成,突出产品特点和目标受众
3. 文本摘要功能,提取核心信息
代码助手
class CodeAssistant(MonitoredZhipuAI):
def __init__(self, api_key, secret_key):
super().__init__(api_key, secret_key)
def generate_code(self, description, language="python"):
"""根据描述生成代码"""
prompt = f"""请根据以下描述生成{language}代码,确保代码清晰、高效且包含必要的注释:
需求描述:
{description}
生成的代码:"""
return self.chat_completion_with_retry(prompt)
def explain_code(self, code):
"""解释代码功能"""
prompt = f"""请解释以下代码的功能、工作原理和关键点,使用清晰易懂的语言:
代码:
python
{code}
解释:"""
return self.chat_completion_with_retry(prompt)
def optimize_code(self, code):
"""优化代码"""
prompt = f"""请优化以下代码,提高其性能、可读性和健壮性,并解释所做的改进:
原始代码:
python
{code}
优化后的代码和改进说明:"""
return self.chat_completion_with_retry(prompt)
这个代码助手提供了以下功能:
1. 根据自然语言描述生成代码
2. 解释代码功能和工作原理
3. 代码优化建议和实现
通过以上示例,你可以看到智谱AI模型在各种实际应用场景中的灵活性和强大能力。根据你的具体需求,可以进一步定制和扩展这些基础实现。