微信公众号如何集成DeepSeek和豆包AI实现文章自动生成
- Linkreate AI插件 文章
- 2025-08-27 08:40:31
- 2阅读
微信公众号平台与AI模型API连接基础配置
微信公众号平台提供了开发者接口,允许通过API方式与外部系统进行数据交互。要实现AI文章自动生成,首先需要完成微信公众号开发者配置。
登录微信公众平台,进入"开发"-"基本配置"页面,获取AppID和AppSecret。这两个参数是后续API调用的基础凭证。在服务器配置中,填写你的服务器地址(URL)、Token和EncodingAESKey。
php
// 微信公众号验证服务器地址的有效性
$signature = $_GET["signature"];
$timestamp = $_GET["timestamp"];
$nonce = $_GET["nonce"];
$token = "你的Token";
$tmpArr = array($token, $timestamp, $nonce);
sort($tmpArr, SORT_STRING);
$tmpStr = implode( $tmpArr );
$tmpStr = sha1( $tmpStr );
if( $tmpStr == $signature ){
echo $_GET['echostr'];
exit;
}
DeepSeek API接入与认证流程
DeepSeek提供了强大的文本生成能力,适合用于公众号文章创作。首先需要在DeepSeek开放平台注册账号并创建应用,获取API Key。
python
import requests
import json
DeepSeek API认证
def deepseek_auth(api_key):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
return headers
生成文章内容
def generate_article_with_deepseek(prompt, api_key):
url = "https://api.deepseek.com/v1/chat/completions"
headers = deepseek_auth(api_key)
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 2000
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return f"Error: {response.status_code} - {response.text}"
注意:DeepSeek API有调用频率限制,免费用户每分钟最多请求10次,付费用户根据套餐不同限制有所提升。在公众号集成时,建议实现请求队列和缓存机制,避免触发限流。
豆包AI API集成与参数配置
豆包AI是字节跳动推出的AI模型,在中文内容生成方面表现优秀。接入豆包AI需要先在豆包开放平台申请API权限。
javascript
// 豆包AI API调用示例
const fetch = require('node-fetch');
async function generateWithDoubao(prompt, apiKey) {
const url = 'https://ark.cn-beijing.volces.com/api/v3/chat/completions';
const headers = {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
};
const body = {
model: 'doubao-pro',
messages: [
{
role: 'user',
content: prompt
}
],
temperature: 0.7,
max_tokens: 2000
};
try {
const response = await fetch(url, {
method: 'POST',
headers: headers,
body: JSON.stringify(body)
});
const data = await response.json();
if (response.ok) {
return data.choices[0].message.content;
} else {
throw new Error(`Error: ${response.status} - ${data.error.message}`);
}
} catch (error) {
console.error('Doubao API Error:', error);
throw error;
}
}
豆包AI支持多种模型,doubao-pro适合生成高质量长文本,doubao-lite适合快速生成短文本。根据公众号内容需求选择合适的模型,可以在保证质量的同时控制成本。
微信公众号消息接收与AI生成触发机制
微信公众号可以通过被动回复和客服接口两种方式实现AI文章生成。被动回复适合简单的自动回复,客服接口则更适合主动推送AI生成的文章。
php
// 接收微信公众号消息
function receiveMessage() {
$postStr = file_get_contents("php://input");
if (!empty($postStr)) {
$postObj = simplexml_load_string($postStr, 'SimpleXMLElement', LIBXML_NOCDATA);
$fromUsername = $postObj->FromUserName;
$toUsername = $postObj->ToUserName;
$keyword = trim($postObj->Content);
$time = time();
// 判断是否为生成文章的指令
if (strpos($keyword, '生成文章') !== false || strpos($keyword, 'AI写作') !== false) {
// 提取主题
$topic = str_replace(['生成文章', 'AI写作'], '', $keyword);
// 调用AI生成文章
$article = generateArticle($topic);
// 返回生成的文章
$textTpl = "
%s
";
$resultStr = sprintf($textTpl, $fromUsername, $toUsername, $time, $article);
echo $resultStr;
}
}
}
AI文章生成提示词优化策略
提示词(Prompt)是影响AI生成文章质量的关键因素。针对微信公众号内容特点,设计结构化的提示词模板可以显著提高生成效果。
python
def create_prompt(topic, style="科普", length="中等"):
"""
创建针对微信公众号的AI文章生成提示词
参数:
topic: 文章主题
style: 文章风格,可选"科普"、"故事"、"教程"、"评论"等
length: 文章长度,可选"短"、"中等"、"长"
"""
length_map = {
"短": "500字左右",
"中等": "1000-1500字",
"长": "2000字以上"
}
style_map = {
"科普": "通俗易懂,适合大众阅读,包含必要的专业解释",
"故事": "以故事形式展开,生动有趣,有人物和情节",
"教程": "步骤清晰,实用性强,包含具体操作指导",
"评论": "观点鲜明,论据充分,逻辑性强"
}
prompt = f"""
请为一篇微信公众号文章撰写内容,具体要求如下:
主题:{topic}
风格:{style_map.get(style, "通俗易懂")}
长度:{length_map.get(length, "1000-1500字")}
文章结构要求:
1. 引人入胜的标题
2. 简短有力的开头段落
3. 3-5个主体段落,每段有明确的小标题
4. 总结性结尾
写作要求:
- 语言通俗易懂,避免过于专业的术语
- 段落简短,适合手机阅读
- 适当使用表情符号增加亲和力
- 加入1-2个实际案例或数据支持观点
- 保持客观中立的立场
"""
return prompt
AI生成内容的后处理与格式优化
AI生成的原始文本通常需要进一步处理才能适配微信公众号的展示要求。包括格式调整、错别字修正、敏感词过滤等。
javascript
// 处理AI生成的文章内容
function processGeneratedContent(content) {
// 分段处理,确保每段不会太长
let paragraphs = content.split('nn');
let processedContent = '';
paragraphs.forEach(paragraph => {
// 跳过空段落
if (paragraph.trim() === '') return;
// 分割长段落
if (paragraph.length > 200) {
let sentences = paragraph.split('。');
let tempParagraph = '';
sentences.forEach(sentence => {
if (tempParagraph.length + sentence.length > 200) {
processedContent += `
${tempParagraph}。
`;
tempParagraph = sentence;
} else {
tempParagraph += sentence + '。';
}
});
if (tempParagraph.trim() !== '') {
processedContent += `
${tempParagraph}
`;
}
} else {
processedContent += `
${paragraph}
`;
}
});
// 添加微信公众号支持的格式
processedContent = processedContent
.replace(/\(.?)\/g, '$1') // 加粗
.replace(/(.?)/g, '$1') // 斜体
.replace(/`(.?)`/g, '$1
'); // 代码
return processedContent;
}
微信公众号图文消息自动发布实现
将AI生成的文章自动发布到微信公众号,需要使用微信公众号的素材管理和消息发送API。
python
import requests
import json
import time
def upload_media_to_wechat(access_token, media_type, file_path):
"""上传媒体文件到微信服务器"""
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
with open(file_path, 'rb') as f:
files = {'media': f}
response = requests.post(url, files=files)
if response.status_code == 200:
return response.json().get('media_id')
else:
print(f"上传媒体文件失败: {response.text}")
return None
def publish_article_to_wechat(access_token, title, content, thumb_media_id, author="", digest="", source_url=""):
"""发布文章到微信公众号"""
url = f"https://api.weixin.qq.com/cgi-bin/material/add_news?access_token={access_token}"
articles = {
"articles": [
{
"title": title,
"author": author,
"digest": digest,
"content": content,
"content_source_url": source_url,
"thumb_media_id": thumb_media_id,
"show_cover_pic": 1,
"need_open_comment": 1,
"only_fans_can_comment": 0
}
]
}
response = requests.post(url, data=json.dumps(articles, ensure_ascii=False).encode('utf-8'))
if response.status_code == 200:
result = response.json()
if 'media_id' in result:
return result['media_id']
else:
print(f"发布文章失败: {result}")
return None
else:
print(f"请求失败: {response.text}")
return None
def send_news_to_wechat(access_token, media_id, user_id=""):
"""发送图文消息到用户"""
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
data = {
"touser": user_id if user_id else "@all",
"msgtype": "mpnews",
"mpnews": {
"media_id": media_id
}
}
response = requests.post(url, data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
if response.status_code == 200:
result = response.json()
if result.get('errcode') == 0:
return True
else:
print(f"发送消息失败: {result}")
return False
else:
print(f"请求失败: {response.text}")
return False
DeepSeek与豆包AI混合使用策略
结合DeepSeek和豆包AI的优势,可以实现更高质量的公众号文章生成。例如,使用DeepSeek生成文章框架和主要内容,再用豆包AI进行润色和优化。
python
def hybrid_article_generation(topic, deepseek_api_key, doubao_api_key):
第一步:使用DeepSeek生成文章框架和主要内容
outline_prompt = f"""
请为"{topic}"这个主题创建一个详细的微信公众号文章大纲,包括:
1. 引人入胜的标题(3-5个选项)
2. 文章结构(包含3-5个主要部分)
3. 每个部分的核心要点(2-3点)
"""
outline = generate_article_with_deepseek(outline_prompt, deepseek_api_key)
第二步:根据大纲生成详细内容
content_prompt = f"""
基于以下大纲,为微信公众号撰写一篇完整的文章:
{outline}
要求:
1. 内容丰富详实,每个部分都有足够的展开
2. 语言通俗易懂,适合大众阅读
3. 适当加入实例和数据支持观点
4. 段落简短,适合手机阅读
"""
draft_content = generate_article_with_deepseek(content_prompt, deepseek_api_key)
第三步:使用豆包AI进行润色和优化
polish_prompt = f"""
请对以下文章进行润色和优化,使其更适合微信公众号发布:
{draft_content}
优化要求:
1. 调整语言风格,使其更加生动有趣
2. 适当添加表情符号,增强亲和力
3. 确保段落长度适中,便于手机阅读
4. 检查并修正语法错误和不通顺的表达
5. 增强文章的吸引力和可读性
"""
final_content = generate_with_doubao(polish_prompt, doubao_api_key)
return final_content
定时任务与自动化工作流实现
实现AI文章的定时自动生成与发布,可以建立完整的内容自动化工作流。以下是基于Python的定时任务实现:
python
import schedule
import time
from datetime import datetime, timedelta
def automated_article_workflow():
"""自动化文章生成与发布工作流"""
1. 获取热门话题(可以从热搜、行业资讯等渠道获取)
hot_topics = get_hot_topics()
2. 选择一个热门话题作为文章主题
topic = select_topic(hot_topics)
3. 生成文章
article_content = hybrid_article_generation(
topic,
DEEPSEEK_API_KEY,
DOUBAO_API_KEY
)
4. 处理文章内容
processed_content = processGeneratedContent(article_content)
5. 生成文章标题(如果AI生成的标题不满意,可以再次优化)
title_prompt = f"为以下内容生成一个吸引人的微信公众号标题(不超过30字):n{article_content[:500]}"
title = generate_article_with_deepseek(title_prompt, DEEPSEEK_API_KEY)
6. 上传封面图片
cover_image = generate_cover_image(topic) 可以使用AI生成封面图
thumb_media_id = upload_media_to_wechat(ACCESS_TOKEN, "image", cover_image)
7. 发布文章
media_id = publish_article_to_wechat(
ACCESS_TOKEN,
title,
processed_content,
thumb_media_id
)
8. 发送给粉丝
if media_id:
send_news_to_wechat(ACCESS_TOKEN, media_id)
print(f"文章《{title}》已自动生成并发布")
def setup_scheduled_tasks():
"""设置定时任务"""
每天上午9点生成并发布一篇文章
schedule.every().day.at("09:00").do(automated_article_workflow)
每周一上午10点生成一周热门话题汇总
schedule.every().monday.at("10:00").do(generate_weekly_hot_topics)
每月最后一天生成月度总结
schedule.every().day.do(check_and_generate_monthly_summary)
while True:
schedule.run_pending()
time.sleep(60) 每分钟检查一次
def check_and_generate_monthly_summary():
"""检查是否需要生成月度总结"""
today = datetime.now()
last_day_of_month = (today.replace(month=today.month % 12 + 1, day=1) - timedelta(days=1)).day
if today.day == last_day_of_month:
generate_monthly_summary()
if __name__ == "__main__":
setup_scheduled_tasks()
错误处理与监控机制
在AI文章自动生成与发布系统中,完善的错误处理和监控机制至关重要,可以确保系统稳定运行并及时发现问题。
python
import logging
from functools import wraps
import traceback
配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='wechat_ai_article.log'
)
logger = logging.getLogger('WeChatAI')
def error_handler(func):
"""错误处理装饰器"""
@wraps(func)
def wrapper(args, kwargs):
try:
return func(args, kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
logger.error(traceback.format_exc())
发送错误通知
send_error_notification(f"Error in {func.__name__}: {str(e)}")
返回None或默认值
return None
return wrapper
def send_error_notification(error_message):
"""发送错误通知"""
这里可以实现邮件、微信、短信等通知方式
logger.error(f"Notification sent: {error_message}")
示例:通过微信公众号客服接口发送错误通知给管理员
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={ACCESS_TOKEN}"
data = {
"touser": ADMIN_OPENID, 管理员的openid
"msgtype": "text",
"text": {
"content": f"系统错误通知:n{error_message}"
}
}
try:
response = requests.post(url, data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
if response.status_code != 200 or response.json().get('errcode') != 0:
logger.error(f"Failed to send error notification: {response.text}")
except Exception as e:
logger.error(f"Exception when sending error notification: {str(e)}")
@error_handler
def safe_generate_article(topic, api_key):
"""带错误处理的文章生成函数"""
return generate_article_with_deepseek(topic, api_key)
@error_handler
def safe_publish_article(title, content, thumb_media_id):
"""带错误处理的文章发布函数"""
return publish_article_to_wechat(ACCESS_TOKEN, title, content, thumb_media_id)
def monitor_system_health():
"""监控系统健康状态"""
检查API连接状态
deepseek_status = check_api_status("https://api.deepseek.com/v1/models", DEEPSEEK_API_KEY)
doubao_status = check_api_status("https://ark.cn-beijing.volces.com/api/v3/models", DOUBAO_API_KEY)
wechat_status = check_wechat_api_status()
status_report = {
"timestamp": datetime.now().isoformat(),
"deepseek_api": "OK" if deepseek_status else "FAIL",
"doubao_api": "OK" if doubao_status else "FAIL",
"wechat_api": "OK" if wechat_status else "FAIL"
}
logger.info(f"System status: {status_report}")
如果有API不可用,发送通知
if not all([deepseek_status, doubao_status, wechat_status]):
send_error_notification(f"System health check failed: {status_report}")
return status_report
def check_api_status(url, api_key):
"""检查API状态"""
try:
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers, timeout=10)
return response.status_code == 200
except:
return False
def check_wechat_api_status():
"""检查微信API状态"""
try:
url = f"https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token={ACCESS_TOKEN}"
response = requests.get(url, timeout=10)
return response.status_code == 200 and response.json().get('errcode') == 0
except:
return False
设置定时健康检查
schedule.every().hour.do(monitor_system_health)
成本控制与优化策略
AI文章生成系统需要考虑API调用成本,特别是当公众号文章生成频率较高时。以下是一些成本控制与优化策略:
python
import json
import os
from datetime import datetime, timedelta
from collections import defaultdict
class CostController:
def __init__(self, monthly_budget=1000):
self.monthly_budget = monthly_budget
self.usage_file = "api_usage.json"
self.usage_data = self.load_usage_data()
self.daily_limits = {
"deepseek": 100, DeepSeek每日调用次数限制
"doubao": 100 豆包AI每日调用次数限制
}
self.monthly_limits = {
"deepseek": 3000, DeepSeek每月调用次数限制
"doubao": 3000 豆包AI每月调用次数限制
}
def load_usage_data(self):
"""加载API使用数据"""
if os.path.exists(self.usage_file):
with open(self.usage_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
return {
"daily": defaultdict(int),
"monthly": defaultdict(int),
"costs": defaultdict(float),
"last_reset": datetime.now().isoformat()
}
def save_usage_data(self):
"""保存API使用数据"""
with open(self.usage_file, 'w', encoding='utf-8') as f:
json.dump(self.usage_data, f, ensure_ascii=False, indent=2)
def check_daily_limit(self, api_name):
"""检查是否超过每日限制"""
today = datetime.now().strftime("%Y-%m-%d")
if today != self.usage_data.get("last_reset_date"):
新的一天,重置每日计数
self.usage_data["daily"] = defaultdict(int)
self.usage_data["last_reset_date"] = today
self.save_usage_data()
return self.usage_data["daily"][api_name] < self.daily_limits.get(api_name, float('inf'))
def check_monthly_limit(self, api_name):
"""检查是否超过每月限制"""
current_month = datetime.now().strftime("%Y-%m")
if current_month != self.usage_data.get("current_month"):
新的月份,重置每月计数
self.usage_data["monthly"] = defaultdict(int)
self.usage_data["costs"] = defaultdict(float)
self.usage_data["current_month"] = current_month
self.save_usage_data()
return self.usage_data["monthly"][api_name] < self.monthly_limits.get(api_name, float('inf'))
def check_monthly_budget(self):
"""检查是否超过月度预算"""
current_month = datetime.now().strftime("%Y-%m")
if current_month != self.usage_data.get("current_month"):
return True
total_cost = sum(self.usage_data["costs"].values())
return total_cost < self.monthly_budget
def record_usage(self, api_name, tokens_used, cost):
"""记录API使用情况"""
today = datetime.now().strftime("%Y-%m-%d")
current_month = datetime.now().strftime("%Y-%m")
更新每日使用量
self.usage_data["daily"][api_name] += 1
更新每月使用量
self.usage_data["monthly"][api_name] += 1
更新成本
self.usage_data["costs"][api_name] += cost
更新时间戳
self.usage_data["last_reset_date"] = today
self.usage_data["current_month"] = current_month
self.usage_data["last_updated"] = datetime.now().isoformat()
self.save_usage_data()
def get_usage_report(self):
"""获取使用报告"""
current_month = datetime.now().strftime("%Y-%m")
total_cost = sum(self.usage_data["costs"].values())
budget_remaining = self.monthly_budget - total_cost
report = {
"current_month": current_month,
"monthly_budget": self.monthly_budget,
"total_cost": total_cost,
"budget_remaining": budget_remaining,
"budget_usage_percent": (total_cost / self.monthly_budget) 100,
"api_usage": {
"daily": dict(self.usage_data["daily"]),
"monthly": dict(self.usage_data["monthly"]),
"costs": dict(self.usage_data["costs"])
}
}
return report
使用示例
cost_controller = CostController(monthly_budget=1000)
def cost_aware_api_call(api_name, prompt, api_key, cost_per_call=0.01):
"""具有成本意识的API调用"""
检查限制
if not cost_controller.check_daily_limit(api_name):
logger.warning(f"已达到{api_name}的每日调用限制")
return None
if not cost_controller.check_monthly_limit(api_name):
logger.warning(f"已达到{api_name}的每月调用限制")
return None
if not cost_controller.check_monthly_budget():
logger.warning("已达到月度预算限制")
return None
执行API调用
if api_name == "deepseek":
result = generate_article_with_deepseek(prompt, api_key)
elif api_name == "doubao":
result = generate_with_doubao(prompt, api_key)
else:
logger.error(f"不支持的API: {api_name}")
return None
估算token使用量(简化示例,实际应根据API返回计算)
tokens_used = len(prompt) // 4 + len(result) // 4
记录使用情况
cost_controller.record_usage(api_name, tokens_used, cost_per_call)
return result
内容质量评估与反馈机制
为确保AI生成的文章质量达到要求,需要建立内容质量评估与反馈机制,不断优化生成效果。
python
import re
import jieba
import jieba.analyse
from collections import Counter
class ContentQualityAssessment:
def __init__(self):
加载自定义词典,包含行业术语等
jieba.load_userdict("custom_dict.txt")
质量评估标准
self.quality_standards = {
"min_length": 800, 最小字数
"max_length": 3000, 最大字数
"min_paragraphs": 5, 最小段落数
"max_paragraph_length": 200, 最大段落长度
"min_keywords": 3, 最小关键词数
"readability_score": 60 可读性分数阈值
}
加载停用词表
with open("stopwords.txt", 'r', encoding='utf-8') as f:
self.stopwords = set([line.strip() for line in f])
def assess_content_quality(self, content, topic):
"""评估内容质量"""
assessment_results = {
"length_check": self._check_length(content),
"paragraph_check": self._check_paragraphs(content),
"keyword_check": self._check_keywords(content, topic),
"readability_check": self._check_readability(content),
"originality_check": self._check_originality(content),
"overall_score": 0
}
计算总体得分
weights = {
"length_check": 0.2,
"paragraph_check": 0.2,
"keyword_check": 0.2,
"readability_check": 0.2,
"originality_check": 0.2
}
overall_score = sum(
assessment_results[check] weights[check]
for check in assessment_results
if check != "overall_score"
)
assessment_results["overall_score"] = overall_score
return assessment_results
def _check_length(self, content):
"""检查文章长度"""
length = len(content)
if length self.quality_standards["max_length"]:
return 0.7 太长,得分中等
else:
return 1.0 长度适中,得分高
def _check_paragraphs(self, content):
"""检查段落结构"""
paragraphs = re.split(r'nsn', content)
paragraph_count = len([p for p in paragraphs if p.strip()])
检查段落数量
if paragraph_count self.quality_standards["max_paragraph_length"]:
long_paragraphs += 1
if long_paragraphs > paragraph_count 0.3: 如果超过30%的段落过长
paragraph_score = 0.6
return paragraph_score
def _check_keywords(self, content, topic):
"""检查关键词覆盖"""
提取主题关键词
topic_keywords = jieba.analyse.extract_tags(topic, topK=5)
提取内容关键词
content_keywords = jieba.analyse.extract_tags(content, topK=10)
计算关键词覆盖率
covered_keywords = 0
for keyword in topic_keywords:
if keyword in content_keywords or keyword in content:
covered_keywords += 1
coverage = covered_keywords / len(topic_keywords) if topic_keywords else 0
根据覆盖率评分
if coverage >= 0.8:
return 1.0
elif coverage >= 0.6:
return 0.8
elif coverage >= 0.4:
return 0.6
else:
return 0.3
def _check_readability(self, content):
"""检查可读性"""
简单的可读性评估,基于句子长度和词汇复杂度
sentences = re.split(r'[。!?]', content)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return 0
计算平均句子长度
avg_sentence_length = sum(len(s) for s in sentences) / len(sentences)
句子长度评分(中等长度最易读)
if 20 <= avg_sentence_length <= 50:
length_score = 1.0
elif 10 <= avg_sentence_length < 20 or 50 < avg_sentence_length 4: 简单假设超过4个字的词是生僻词
uncommon_words += 1
uncommon_ratio = uncommon_words / len(words)
词汇复杂度评分(生僻词比例越低越好)
if uncommon_ratio < 0.05:
complexity_score = 1.0
elif uncommon_ratio < 0.1:
complexity_score = 0.8
elif uncommon_ratio 1)
计算原创性得分
cliche_penalty = min(cliche_count 0.1, 0.5) 最多扣0.5分
repetition_penalty = min(repeated_phrases 0.05, 0.3) 最多扣0.3分
originality_score = 1.0 - cliche_penalty - repetition_penalty
return max(originality_score, 0) 确保得分不为负
def generate_feedback(self, assessment_results):
"""生成质量反馈"""
feedback = []
if assessment_results["length_check"] < 0.7:
if assessment_results["