智谱AI开放平台API调用实战技巧与性能优化方法

获取API凭证与基础环境配置

要在你的项目中集成智谱AI模型能力,首先需要获取有效的API凭证。访问智谱AI开放平台(https://open.bigmodel.cn/)注册账号并创建应用,系统会为你分配API Key和Secret Key,这是后续所有调用的基础凭证。


import requests
import json
import time
import hashlib
import hmac
import base64

class ZhipuAI:
    def __init__(self, api_key, secret_key):
        self.api_key = api_key
        self.secret_key = secret_key
        self.base_url = "https://open.bigmodel.cn/api/paas/v3/model-api/chatglm_pro/invoke"
    
    def generate_signature(self, timestamp):
        """生成签名"""
        string_to_sign = f"{self.api_key}.{timestamp}"
        hmac_code = hmac.new(
            self.secret_key.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        return base64.b64encode(hmac_code).decode('utf-8')
    
    def get_headers(self):
        """构建请求头"""
        timestamp = str(int(time.time()))
        signature = self.generate_signature(timestamp)
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}.{timestamp}.{signature}"
        }

这段代码实现了智谱AI API的基础认证机制。签名生成过程使用了HMAC-SHA256算法,确保API调用的安全性。每次请求都需要生成新的时间戳和签名,防止重放攻击。

注意:请妥善保管你的API Key和Secret Key,不要在客户端代码中直接暴露这些敏感信息,建议通过后端服务代理API请求。

文本生成功能实现

智谱AI模型最常用的功能是文本生成,下面我们实现一个完整的对话接口:


def chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
    """
    发起对话请求
    :param prompt: 用户输入的提示文本
    :param temperature: 控制生成文本的随机性,0-1之间
    :param max_tokens: 生成文本的最大长度
    :return: 模型生成的回复
    """
    payload = {
        "prompt": prompt,
        "temperature": temperature,
        "max_tokens": max_tokens
    }
    
    headers = self.get_headers()
    response = requests.post(
        self.base_url,
        headers=headers,
        data=json.dumps(payload)
    )
    
    if response.status_code == 200:
        result = response.json()
        if result.get("success"):
            return result.get("data", {}).get("choices", [{}])[0].get("content", "")
        else:
            raise Exception(f"API返回错误: {result.get('msg', '未知错误')}")
    else:
        raise Exception(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")

这个方法实现了与智谱AI模型的对话功能。temperature参数控制生成文本的随机性,值越高输出越随机,值越低输出越确定。max_tokens参数限制了生成文本的长度,防止生成过长的响应。

批量处理与性能优化

当你需要处理大量请求时,简单的串行调用效率低下。下面我们实现一个批量处理机制,并加入请求缓存和重试逻辑:


import threading
import queue
from functools import lru_cache

class BatchZhipuAI(ZhipuAI):
    def __init__(self, api_key, secret_key, max_workers=5):
        super().__init__(api_key, secret_key)
        self.task_queue = queue.Queue()
        self.result_dict = {}
        self.max_workers = max_workers
        self.workers = []
        self._init_workers()
    
    def _init_workers(self):
        """初始化工作线程"""
        for i in range(self.max_workers):
            worker = threading.Thread(target=self._worker, daemon=True)
            worker.start()
            self.workers.append(worker)
    
    def _worker(self):
        """工作线程处理函数"""
        while True:
            task_id, prompt, kwargs = self.task_queue.get()
            try:
                result = self.chat_completion(prompt, kwargs)
                self.result_dict[task_id] = {"status": "success", "result": result}
            except Exception as e:
                self.result_dict[task_id] = {"status": "error", "error": str(e)}
            finally:
                self.task_queue.task_done()
    
    @lru_cache(maxsize=1000)
    def cached_chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
        """带缓存的对话请求"""
        return self.chat_completion(prompt, temperature, max_tokens)
    
    def batch_request(self, prompts_list, use_cache=True, kwargs):
        """
        批量处理对话请求
        :param prompts_list: 提示文本列表
        :param use_cache: 是否使用缓存
        :param kwargs: 其他参数
        :return: 结果列表
        """
        task_ids = []
        for i, prompt in enumerate(prompts_list):
            task_id = f"task_{int(time.time()1000)}_{i}"
            task_ids.append(task_id)
            
            if use_cache:
                try:
                    result = self.cached_chat_completion(prompt, kwargs)
                    self.result_dict[task_id] = {"status": "success", "result": result}
                    continue
                except Exception:
                    pass
            
            self.task_queue.put((task_id, prompt, kwargs))
        
        self.task_queue.join()
        
        results = []
        for task_id in task_ids:
            task_result = self.result_dict.get(task_id, {"status": "pending"})
            if task_result["status"] == "success":
                results.append(task_result["result"])
            else:
                results.append(f"Error: {task_result.get('error', 'Unknown error')}")
        
        return results

这段代码实现了以下优化:
1. 使用线程池处理并发请求,提高吞吐量
2. 添加了LRU缓存机制,避免重复请求相同内容
3. 实现了任务队列和结果收集机制,便于批量处理
4. 每个任务都有唯一ID,便于追踪和调试

警告:智谱AI开放平台对API调用频率有限制,批量处理时请合理设置worker数量,避免触发频率限制。建议在正式环境使用前,先在测试环境验证性能表现。

错误处理与重试机制

网络请求难免会遇到各种异常情况,下面我们实现一个健壮的错误处理和重试机制:


import random
from datetime import datetime, timedelta

class RobustZhipuAI(BatchZhipuAI):
    def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
        super().__init__(api_key, secret_key, max_workers)
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.last_request_time = None
        self.min_interval = 0.1   请求间隔,秒
    
    def _wait_if_needed(self):
        """控制请求频率"""
        if self.last_request_time:
            elapsed = time.time() - self.last_request_time
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
        self.last_request_time = time.time()
    
    def chat_completion_with_retry(self, prompt, temperature=0.7, max_tokens=2048):
        """
        带重试机制的对话请求
        :param prompt: 用户输入的提示文本
        :param temperature: 控制生成文本的随机性
        :param max_tokens: 生成文本的最大长度
        :return: 模型生成的回复
        """
        self._wait_if_needed()
        
        for attempt in range(self.max_retries + 1):
            try:
                return self.chat_completion(prompt, temperature, max_tokens)
            except Exception as e:
                if attempt == self.max_retries:
                    raise
                
                 指数退避策略
                sleep_time = self.backoff_factor  (2  attempt) + random.uniform(0, 1)
                time.sleep(sleep_time)
                
                 记录重试日志
                print(f"Retry attempt {attempt + 1}/{self.max_retries} after error: {str(e)}")
    
    def health_check(self):
        """健康检查"""
        try:
            response = self.chat_completion_with_retry("你好", max_tokens=10)
            return len(response) > 0
        except Exception as e:
            print(f"Health check failed: {str(e)}")
            return False

这段代码实现了以下功能:
1. 指数退避重试机制,在网络不稳定时自动重试
2. 请求频率控制,避免短时间内发送过多请求
3. 健康检查功能,可用于监控API可用性
4. 详细的错误日志记录,便于问题排查

流式输出实现

对于长文本生成场景,一次性等待完整响应会带来较差的用户体验。智谱AI支持流式输出,我们可以实现一个实时显示生成内容的接口:


import sseclient

class StreamingZhipuAI(RobustZhipuAI):
    def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
        super().__init__(api_key, secret_key, max_workers, max_retries, backoff_factor)
        self.streaming_base_url = "https://open.bigmodel.cn/api/paas/v3/model-api/chatglm_pro/sse-invoke"
    
    def stream_chat_completion(self, prompt, temperature=0.7, max_tokens=2048):
        """
        流式对话请求
        :param prompt: 用户输入的提示文本
        :param temperature: 控制生成文本的随机性
        :param max_tokens: 生成文本的最大长度
        :return: 生成内容的迭代器
        """
        payload = {
            "prompt": prompt,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        headers = self.get_headers()
        headers["Accept"] = "text/event-stream"
        
        response = requests.post(
            self.streaming_base_url,
            headers=headers,
            data=json.dumps(payload),
            stream=True
        )
        
        if response.status_code == 200:
            client = sseclient.SSEClient(response)
            for event in client.events():
                if event.event == "message":
                    try:
                        data = json.loads(event.data)
                        if data.get("success"):
                            content = data.get("data", {}).get("content", "")
                            if content:
                                yield content
                        else:
                            raise Exception(f"API返回错误: {data.get('msg', '未知错误')}")
                    except json.JSONDecodeError:
                        continue
        else:
            raise Exception(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
    
    def interactive_chat(self):
        """交互式对话界面"""
        print("智谱AI交互式对话(输入'quit'退出)")
        while True:
            user_input = input("n你: ")
            if user_input.lower() == 'quit':
                break
            
            print("nAI: ", end="", flush=True)
            try:
                for content in self.stream_chat_completion(user_input):
                    print(content, end="", flush=True)
                print()   换行
            except Exception as e:
                print(f"n错误: {str(e)}")

这段代码实现了以下功能:
1. 使用Server-Sent Events(SSE)协议接收流式输出
2. 提供生成内容的迭代器接口,便于集成到各种应用中
3. 实现了一个简单的交互式命令行对话界面
4. 实时显示生成内容,提升用户体验

性能监控与统计分析

为了更好地了解API使用情况和性能表现,我们可以添加监控和统计功能:


import statistics
from collections import defaultdict

class MonitoredZhipuAI(StreamingZhipuAI):
    def __init__(self, api_key, secret_key, max_workers=5, max_retries=3, backoff_factor=1):
        super().__init__(api_key, secret_key, max_workers, max_retries, backoff_factor)
        self.request_stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_tokens": 0,
            "response_times": [],
            "daily_usage": defaultdict(int),
            "hourly_usage": defaultdict(int)
        }
    
    def chat_completion_with_retry(self, prompt, temperature=0.7, max_tokens=2048):
        """带监控的对话请求"""
        start_time = time.time()
        self.request_stats["total_requests"] += 1
        
         记录使用情况
        now = datetime.now()
        date_key = now.strftime("%Y-%m-%d")
        hour_key = now.strftime("%Y-%m-%d %H:00")
        self.request_stats["daily_usage"][date_key] += 1
        self.request_stats["hourly_usage"][hour_key] += 1
        
        try:
            result = super().chat_completion_with_retry(prompt, temperature, max_tokens)
            
             记录成功请求
            self.request_stats["successful_requests"] += 1
            self.request_stats["total_tokens"] += len(result)
            
             记录响应时间
            response_time = time.time() - start_time
            self.request_stats["response_times"].append(response_time)
            
            return result
        except Exception as e:
             记录失败请求
            self.request_stats["failed_requests"] += 1
            raise
    
    def get_stats(self):
        """获取统计信息"""
        response_times = self.request_stats["response_times"]
        avg_response_time = statistics.mean(response_times) if response_times else 0
        max_response_time = max(response_times) if response_times else 0
        min_response_time = min(response_times) if response_times else 0
        
        success_rate = 0
        if self.request_stats["total_requests"] > 0:
            success_rate = self.request_stats["successful_requests"] / self.request_stats["total_requests"]  100
        
        return {
            "total_requests": self.request_stats["total_requests"],
            "successful_requests": self.request_stats["successful_requests"],
            "failed_requests": self.request_stats["failed_requests"],
            "success_rate": f"{success_rate:.2f}%",
            "total_tokens": self.request_stats["total_tokens"],
            "avg_response_time": f"{avg_response_time:.2f}s",
            "max_response_time": f"{max_response_time:.2f}s",
            "min_response_time": f"{min_response_time:.2f}s",
            "daily_usage": dict(self.request_stats["daily_usage"]),
            "hourly_usage": dict(self.request_stats["hourly_usage"])
        }
    
    def print_stats(self):
        """打印统计信息"""
        stats = self.get_stats()
        print("n===== 智谱AI API使用统计 =====")
        print(f"总请求数: {stats['total_requests']}")
        print(f"成功请求数: {stats['successful_requests']}")
        print(f"失败请求数: {stats['failed_requests']}")
        print(f"成功率: {stats['success_rate']}")
        print(f"总生成Token数: {stats['total_tokens']}")
        print(f"平均响应时间: {stats['avg_response_time']}")
        print(f"最大响应时间: {stats['max_response_time']}")
        print(f"最小响应时间: {stats['min_response_time']}")
        
        print("n最近7天使用情况:")
        sorted_days = sorted(stats['daily_usage'].items())[-7:]
        for date, count in sorted_days:
            print(f"  {date}: {count}次请求")
        
        print("n最近24小时使用情况:")
        sorted_hours = sorted(stats['hourly_usage'].items())[-24:]
        for hour, count in sorted_hours:
            print(f"  {hour}: {count}次请求")

这段代码实现了以下功能:
1. 记录API请求的成功率、响应时间等关键指标
2. 按天和按小时统计API使用情况
3. 计算平均、最大、最小响应时间
4. 提供统计信息的格式化输出

实际应用场景示例

下面我们展示几个智谱AI模型的实际应用场景,帮助你更好地理解如何将其集成到自己的项目中。

智能客服系统


class CustomerServiceBot(MonitoredZhipuAI):
    def __init__(self, api_key, secret_key, knowledge_base=None):
        super().__init__(api_key, secret_key)
        self.knowledge_base = knowledge_base or []
        self.conversation_history = {}
    
    def add_knowledge(self, question, answer):
        """添加知识库条目"""
        self.knowledge_base.append({"question": question, "answer": answer})
    
    def search_knowledge(self, query, threshold=0.8):
        """从知识库中搜索答案"""
         这里简化处理,实际应用中可以使用向量相似度搜索
        for item in self.knowledge_base:
            if query.lower() in item["question"].lower() or item["question"].lower() in query.lower():
                return item["answer"]
        return None
    
    def get_response(self, user_id, user_input):
        """获取客服回复"""
         获取或创建会话历史
        if user_id not in self.conversation_history:
            self.conversation_history[user_id] = []
        
        history = self.conversation_history[user_id]
        
         先尝试从知识库中查找答案
        knowledge_answer = self.search_knowledge(user_input)
        if knowledge_answer:
            history.append({"role": "user", "content": user_input})
            history.append({"role": "assistant", "content": knowledge_answer})
            return knowledge_answer
        
         构建提示词,包含上下文
        context = "n".join([f"{item['role']}: {item['content']}" for item in history[-5:]])
        prompt = f"""你是一个专业的客服代表,请根据以下对话历史回答用户的问题。如果不知道答案,请诚实地表示你不知道,不要编造信息。

对话历史:
{context}

用户: {user_input}
客服:"""
        
        try:
            response = self.chat_completion_with_retry(prompt)
            
             更新对话历史
            history.append({"role": "user", "content": user_input})
            history.append({"role": "assistant", "content": response})
            
             限制历史长度,防止上下文过长
            if len(history) > 20:
                history = history[-20:]
                self.conversation_history[user_id] = history
            
            return response
        except Exception as e:
            return f"抱歉,我遇到了一些问题: {str(e)}"

这个智能客服系统实现了以下功能:
1. 基于知识库的快速问答
2. 维护多用户对话历史
3. 上下文感知的对话能力
4. 优雅的错误处理

内容生成工具


class ContentGenerator(MonitoredZhipuAI):
    def __init__(self, api_key, secret_key):
        super().__init__(api_key, secret_key)
    
    def generate_blog_post(self, topic, word_count=800, style="informative"):
        """生成博客文章"""
        prompt = f"""请写一篇关于"{topic}"的博客文章,字数约{word_count}字,风格为{style}。
文章应该包含引人入胜的标题、简短的引言、有条理的正文和总结性结尾。
请确保内容原创、信息丰富且易于理解。"""
        
        return self.chat_completion_with_retry(prompt, max_tokens=word_count1.5)
    
    def generate_product_description(self, product_name, features, target_audience):
        """生成产品描述"""
        features_text = "n".join([f"- {feature}" for feature in features])
        prompt = f"""请为以下产品撰写一个吸引人的产品描述:

产品名称: {product_name}
产品特点:
{features_text}
目标受众: {target_audience}

请突出产品的核心优势和价值主张,使用有说服力的语言,并包含明确的行动号召。"""
        
        return self.chat_completion_with_retry(prompt)
    
    def summarize_text(self, text, max_length=200):
        """文本摘要"""
        prompt = f"""请将以下文本摘要成不超过{max_length}字的简短摘要,保留核心信息和关键点:

原文:
{text}

摘要:"""
        
        return self.chat_completion_with_retry(prompt, max_tokens=max_length1.5)

这个内容生成工具提供了以下功能:
1. 博客文章生成,可指定主题、字数和风格
2. 产品描述生成,突出产品特点和目标受众
3. 文本摘要功能,提取核心信息

代码助手


class CodeAssistant(MonitoredZhipuAI):
    def __init__(self, api_key, secret_key):
        super().__init__(api_key, secret_key)
    
    def generate_code(self, description, language="python"):
        """根据描述生成代码"""
        prompt = f"""请根据以下描述生成{language}代码,确保代码清晰、高效且包含必要的注释:

需求描述:
{description}

生成的代码:"""
        
        return self.chat_completion_with_retry(prompt)
    
    def explain_code(self, code):
        """解释代码功能"""
        prompt = f"""请解释以下代码的功能、工作原理和关键点,使用清晰易懂的语言:

代码:
python
{code}


解释:"""
        
        return self.chat_completion_with_retry(prompt)
    
    def optimize_code(self, code):
        """优化代码"""
        prompt = f"""请优化以下代码,提高其性能、可读性和健壮性,并解释所做的改进:

原始代码:
python
{code}


优化后的代码和改进说明:"""
        
        return self.chat_completion_with_retry(prompt)

这个代码助手提供了以下功能:
1. 根据自然语言描述生成代码
2. 解释代码功能和工作原理
3. 代码优化建议和实现

通过以上示例,你可以看到智谱AI模型在各种实际应用场景中的灵活性和强大能力。根据你的具体需求,可以进一步定制和扩展这些基础实现。