如何使用Python脚本高效导入AI文章数据到Excel表格
- Linkreate AI插件 文章
- 2025-08-24 17:55:17
- 11阅读
当你面对大量AI生成的文章数据时,手动处理不仅效率低下,还容易出错。Python脚本提供了自动化解决方案,让你能够快速、准确地将这些数据导入Excel表格中。下面我们将详细介绍实现这一过程的具体方法和代码示例。
准备工作与环境配置
在开始之前,确保你的开发环境已经配置妥当。你需要安装Python以及几个关键库,这些库将帮助你处理数据并与Excel文件交互。
安装必要的Python库
pip install pandas
pip install openpyxl
pip install xlwt
pip install requests
这些库中,pandas用于数据处理,openpyxl和xlwt用于Excel文件操作,requests则用于获取API数据。安装完成后,你就可以开始编写导入脚本了。
注意:根据你的Python版本,可能需要使用pip3而不是pip。如果遇到权限问题,可以尝试在命令前加上sudo(Linux/macOS)或以管理员身份运行命令提示符(Windows)。
获取AI生成的文章数据
AI文章数据通常来自两个主要渠道:API接口或本地文件。我们将分别讨论这两种情况的处理方法。
通过API获取AI文章数据
许多AI内容生成平台提供API接口,你可以通过这些接口直接获取生成的文章数据。以下是使用OpenAI API获取数据的示例代码:
import requests
import json
设置API密钥和请求参数
api_key = "your_openai_api_key"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
准备请求数据
data = {
"model": "gpt-4-turbo",
"messages": [
{"role": "system", "content": "你是一个专业的文章生成助手"},
{"role": "user", "content": "生成一篇关于人工智能发展趋势的文章"}
],
"max_tokens": 2000
}
发送请求
response = requests.post("https://api.openai.com/v1/chat/completions",
headers=headers,
data=json.dumps(data))
解析响应
if response.status_code == 200:
result = response.json()
article_content = result['choices'][0]['message']['content']
print("成功获取AI文章数据")
else:
print(f"请求失败,状态码:{response.status_code}")
print(response.text)
这段代码向OpenAI API发送请求,获取一篇关于人工智能发展趋势的文章。你可以根据需要修改请求参数,比如调整文章主题、长度等。
从本地文件读取AI文章数据
如果你已经将AI生成的文章保存在本地文件中(如JSON、TXT或CSV格式),可以使用以下代码读取:
import json
import pandas as pd
从JSON文件读取数据
def read_from_json(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
return data
从TXT文件读取数据
def read_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return {"content": content}
从CSV文件读取数据
def read_from_csv(file_path):
df = pd.read_csv(file_path, encoding='utf-8')
return df.to_dict('records')
使用示例
json_data = read_from_json('ai_articles.json')
txt_data = read_from_txt('ai_article.txt')
csv_data = read_from_csv('ai_articles.csv')
这些函数分别处理不同格式的文件,将AI文章数据读取到Python变量中,为后续处理做准备。
数据清洗与预处理
AI生成的文章数据往往包含格式问题、特殊字符或不需要的内容,需要进行清洗和预处理。以下是常见的数据清洗操作:
import re
import pandas as pd
def clean_article_data(article_data):
如果是字典形式的数据
if isinstance(article_data, dict):
提取文章内容
content = article_data.get('content', '')
移除特殊字符和多余空格
content = re.sub(r's+', ' ', content) 替换多个空格为单个空格
content = re.sub(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', '', content) 保留中英文、标点和基本符号
移除可能的AI生成标记
content = re.sub(r'AI生成|人工智能生成|由AI创作', '', content)
更新清洗后的内容
article_data['content'] = content.strip()
return article_data
如果是列表形式的数据
elif isinstance(article_data, list):
cleaned_data = []
for item in article_data:
cleaned_item = clean_article_data(item)
cleaned_data.append(cleaned_item)
return cleaned_data
return article_data
使用示例
cleaned_data = clean_article_data(json_data)
这段代码定义了一个数据清洗函数,可以处理字典或列表形式的数据,移除特殊字符、多余空格和可能的AI生成标记。你可以根据实际需求调整正则表达式模式,以适应不同的数据清洗需求。
警告:数据清洗是一个需要谨慎处理的过程。过度清洗可能导致重要信息丢失,而清洗不足则可能影响后续分析。建议在处理重要数据前,先在小样本上测试清洗效果。
将数据转换为DataFrame
将清洗后的数据转换为Pandas DataFrame是导入Excel的关键步骤。DataFrame提供了丰富的数据处理功能,使你能够轻松操作和分析数据。
import pandas as pd
from datetime import datetime
def convert_to_dataframe(article_data):
如果是单个字典,转换为单行DataFrame
if isinstance(article_data, dict):
添加时间戳
article_data['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame([article_data])
如果是字典列表,直接转换为DataFrame
elif isinstance(article_data, list):
为每篇文章添加时间戳
for item in article_data:
item['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame(article_data)
else:
raise ValueError("不支持的数据格式,请提供字典或字典列表")
return df
使用示例
df = convert_to_dataframe(cleaned_data)
print(f"成功创建DataFrame,包含{len(df)}行数据")
print(df.head())
这段代码将清洗后的文章数据转换为DataFrame,并自动添加导入时间戳。时间戳对于追踪数据导入历史非常有用,特别是在处理大量数据时。
导出到Excel表格
最后一步是将DataFrame导出到Excel文件。Pandas提供了简单易用的方法来实现这一功能,同时支持多种自定义选项。
def export_to_excel(df, file_path, sheet_name='AI文章数据'):
try:
导出到Excel文件
df.to_excel(file_path,
sheet_name=sheet_name,
index=False, 不包含行索引
engine='openpyxl') 使用openpyxl引擎
print(f"数据已成功导出到 {file_path}")
return True
except Exception as e:
print(f"导出Excel时出错: {str(e)}")
return False
使用示例
export_to_excel(df, 'ai_articles_imported.xlsx')
这段代码将DataFrame导出到Excel文件,使用openpyxl引擎,并排除行索引。你可以根据需要调整sheet名称和其他导出参数。
高级Excel导出选项
如果你需要更复杂的Excel导出功能,比如设置单元格格式、添加公式或创建多个工作表,可以使用以下高级方法:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
def advanced_export_to_excel(df, file_path):
创建一个新的工作簿
wb = Workbook()
ws = wb.active
ws.title = "AI文章数据"
设置标题行样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
将DataFrame数据写入工作表
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
for c_idx, value in enumerate(row, 1):
cell = ws.cell(row=r_idx, column=c_idx, value=value)
设置标题行样式
if r_idx == 1:
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
自动调整列宽
if r_idx == 1:
ws.column_dimensions[chr(64 + c_idx)].width = 20
添加筛选功能
ws.auto_filter.ref = ws.dimensions
冻结首行
ws.freeze_panes = "A2"
保存工作簿
wb.save(file_path)
print(f"数据已成功导出到 {file_path},包含格式设置")
使用示例
advanced_export_to_excel(df, 'ai_articles_formatted.xlsx')
这段代码使用openpyxl库创建格式化的Excel文件,包括标题行样式、自动调整列宽、筛选功能和冻结首行等高级特性。这些功能使导出的Excel文件更加专业和易用。
批量处理与自动化
当你需要定期导入大量AI文章数据时,批量处理和自动化是必不可少的。以下是一个完整的批量处理脚本示例:
import os
import glob
import schedule
import time
from datetime import datetime
def batch_process_articles(input_folder, output_folder, file_pattern=".json"):
确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
获取所有匹配的输入文件
input_files = glob.glob(os.path.join(input_folder, file_pattern))
if not input_files:
print(f"在 {input_folder} 中未找到匹配 {file_pattern} 的文件")
return
创建输出文件名(包含时间戳)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = os.path.join(output_folder, f"ai_articles_batch_{timestamp}.xlsx")
处理所有文件
all_articles = []
for file_path in input_files:
print(f"正在处理文件: {file_path}")
读取文件
try:
data = read_from_json(file_path)
清洗数据
cleaned_data = clean_article_data(data)
添加到总列表
if isinstance(cleaned_data, list):
all_articles.extend(cleaned_data)
else:
all_articles.append(cleaned_data)
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
if not all_articles:
print("没有可用的文章数据")
return
转换为DataFrame
df = convert_to_dataframe(all_articles)
导出到Excel
export_success = export_to_excel(df, output_file)
if export_success:
print(f"批量处理完成,结果已保存到 {output_file}")
可选:处理完成后移动或删除原始文件
for file_path in input_files:
try:
移动到已处理文件夹
processed_folder = os.path.join(input_folder, "processed")
os.makedirs(processed_folder, exist_ok=True)
os.rename(file_path, os.path.join(processed_folder, os.path.basename(file_path)))
except Exception as e:
print(f"移动文件 {file_path} 时出错: {str(e)}")
def scheduled_batch_process():
input_folder = "./ai_articles_input"
output_folder = "./ai_articles_output"
print(f"开始定时批量处理: {datetime.now()}")
batch_process_articles(input_folder, output_folder)
print(f"定时批量处理完成: {datetime.now()}")
设置定时任务(每天凌晨2点执行)
schedule.every().day.at("02:00").do(scheduled_batch_process)
如果需要立即运行一次
scheduled_batch_process()
保持脚本运行,执行定时任务
while True:
schedule.run_pending()
time.sleep(60) 每分钟检查一次
这个脚本实现了完整的批量处理流程,包括读取多个文件、数据清洗、转换和导出。它还使用了schedule库来实现定时执行功能,你可以根据需要调整执行频率和时间。
提示:在生产环境中使用定时任务时,建议将脚本部署为系统服务或使用专门的任务调度工具(如cron、Windows任务计划程序或Airflow),而不是直接运行Python脚本。这样可以确保脚本的稳定运行和自动重启能力。
错误处理与日志记录
在处理大量数据时,错误处理和日志记录至关重要。它们帮助你追踪问题、分析失败原因并优化处理流程。
import logging
from logging.handlers import RotatingFileHandler
def setup_logging(log_file='ai_article_import.log', level=logging.INFO):
创建日志记录器
logger = logging.getLogger('AI文章导入')
logger.setLevel(level)
创建文件处理器,限制日志文件大小并保留备份
file_handler = RotatingFileHandler(
log_file,
maxBytes=1010241024, 10MB
backupCount=5
)
file_handler.setLevel(level)
创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
添加处理器到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
使用示例
logger = setup_logging()
def safe_export_to_excel(df, file_path):
try:
logger.info(f"开始导出数据到 {file_path}")
检查DataFrame是否为空
if df.empty:
logger.warning("DataFrame为空,没有数据可导出")
return False
检查文件是否已存在
if os.path.exists(file_path):
logger.warning(f"文件 {file_path} 已存在,将被覆盖")
导出到Excel
export_success = export_to_excel(df, file_path)
if export_success:
logger.info(f"成功导出 {len(df)} 行数据到 {file_path}")
return True
else:
logger.error(f"导出到 {file_path} 失败")
return False
except Exception as e:
logger.error(f"导出到 {file_path} 时发生异常: {str(e)}", exc_info=True)
return False
使用示例
logger.info("开始AI文章数据导入流程")
... 其他代码 ...
safe_export_to_excel(df, 'ai_articles_with_logging.xlsx')
logger.info("AI文章数据导入流程完成")
这段代码实现了完整的日志记录系统,包括文件和控制台输出,并提供了安全的Excel导出函数,带有详细的错误处理和日志记录。日志文件会自动轮转,避免占用过多磁盘空间。
性能优化与大数据处理
当处理大量AI文章数据时,性能优化变得尤为重要。以下是一些优化技术和代码示例:
import pandas as pd
import numpy as np
from tqdm import tqdm 进度条库
def optimize_dataframe(df):
"""优化DataFrame内存使用"""
转换对象类型为分类类型(适用于低基数字符串列)
for col in df.select_dtypes(include=['object']):
if df[col].nunique() / len(df[col]) < 0.5: 如果唯一值比例小于50%
df[col] = df[col].astype('category')
转换数值类型为更节省空间的类型
for col in df.select_dtypes(include=['int64']):
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float64']):
df[col] = pd.to_numeric(df[col], downcast='float')
return df
def chunked_export_to_excel(df, file_path, chunk_size=10000):
"""分块导出大数据到Excel"""
计算总行数和块数
total_rows = len(df)
num_chunks = (total_rows // chunk_size) + (1 if total_rows % chunk_size else 0)
创建Excel写入器
writer = pd.ExcelWriter(file_path, engine='openpyxl')
try:
分块处理和导出
for i, chunk in tqdm(enumerate(np.array_split(df, num_chunks)),
total=num_chunks,
desc="导出进度"):
第一块包含列名,后续块不包含
header = (i == 0)
如果不是第一块,追加到现有工作表
if not header:
获取现有工作表的起始行
startrow = writer.sheets['Sheet1'].max_row
else:
startrow = 0
写入当前块
chunk.to_excel(
writer,
sheet_name='Sheet1',
startrow=startrow,
header=header,
index=False
)
保存Excel文件
writer.close()
print(f"成功导出 {total_rows} 行数据到 {file_path}")
return True
except Exception as e:
print(f"分块导出时出错: {str(e)}")
writer.close()
return False
使用示例
假设有一个大型DataFrame
large_df = pd.DataFrame({
'id': range(1, 50001),
'title': [f"文章标题 {i}" for i in range(1, 50001)],
'content': [f"这是第 {i} 篇AI生成文章的内容..." for i in range(1, 50001)],
'category': np.random.choice(['科技', '教育', '娱乐', '健康'], 50001),
'publish_date': pd.date_range('2023-01-01', periods=50001)
})
优化DataFrame
optimized_df = optimize_dataframe(large_df)
分块导出到Excel
chunked_export_to_excel(optimized_df, 'large_ai_articles_dataset.xlsx')
这段代码展示了两种主要的性能优化技术:DataFrame内存优化和分块处理。内存优化通过转换数据类型来减少内存使用,而分块处理则将大数据集分割成小块,避免内存溢出和提高处理效率。
警告:对于非常大的数据集(超过100万行),Excel可能不是最佳选择。考虑使用数据库(如SQLite、MySQL)或专门的格式(如Parquet、HDF5)来存储和处理数据。如果必须使用Excel,可以考虑将数据分割到多个工作表或多个文件中。
数据验证与质量检查
确保导入的AI文章数据质量是整个流程中不可忽视的一环。以下代码实现了常见的数据验证和质量检查功能:
import pandas as pd
import re
from collections import Counter
def validate_article_data(df):
"""验证文章数据质量"""
validation_results = {
'total_records': len(df),
'valid_records': 0,
'invalid_records': 0,
'issues': []
}
检查必需列
required_columns = ['title', 'content']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
validation_results['issues'].append(f"缺少必需列: {', '.join(missing_columns)}")
检查每条记录
for idx, row in df.iterrows():
is_valid = True
record_issues = []
检查标题
if 'title' in df.columns:
title = str(row['title']).strip()
if not title or title.lower() in ['nan', 'null', 'undefined']:
is_valid = False
record_issues.append("标题为空")
elif len(title) < 5:
is_valid = False
record_issues.append("标题过短")
elif len(title) > 100:
is_valid = False
record_issues.append("标题过长")
检查内容
if 'content' in df.columns:
content = str(row['content']).strip()
if not content or content.lower() in ['nan', 'null', 'undefined']:
is_valid = False
record_issues.append("内容为空")
elif len(content) < 50:
is_valid = False
record_issues.append("内容过短")
elif len(content) > 10000:
is_valid = False
record_issues.append("内容过长")
检查特殊字符
if 'content' in df.columns:
content = str(row['content'])
special_chars = re.findall(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', content)
if special_chars:
统计特殊字符频率
char_counter = Counter(special_chars)
common_chars = char_counter.most_common(3)
is_valid = False
record_issues.append(f"包含特殊字符: {', '.join([f'{char}({count})' for char, count in common_chars])}")
检查AI生成标记
if 'content' in df.columns:
content = str(row['content']).lower()
ai_markers = ['ai生成', '人工智能生成', '由ai创作', 'ai写作']
found_markers = [marker for marker in ai_markers if marker in content]
if found_markers:
is_valid = False
record_issues.append(f"包含AI生成标记: {', '.join(found_markers)}")
更新验证结果
if is_valid:
validation_results['valid_records'] += 1
else:
validation_results['invalid_records'] += 1
validation_results['issues'].append(f"记录 {idx+1}: {', '.join(record_issues)}")
return validation_results
def generate_quality_report(df, output_file='data_quality_report.'):
"""生成数据质量报告"""
validation_results = validate_article_data(df)
计算质量指标
total_records = validation_results['total_records']
valid_records = validation_results['valid_records']
invalid_records = validation_results['invalid_records']
quality_score = (valid_records / total_records) 100 if total_records > 0 else 0
创建报告
html_content = f"""
<>
AI文章数据质量报告
AI文章数据质量报告
数据概览
总记录数: {total_records}
有效记录数: {valid_records}
无效记录数: {invalid_records}
质量评分: {quality_score:.1f}%
发现的问题
{''.join([f'{issue}' for issue in validation_results['issues']])}
>
"""
写入文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"数据质量报告已生成: {output_file}")
return validation_results
使用示例
假设df是包含AI文章数据的DataFrame
quality_results = generate_quality_report(df)
如果质量评分低于阈值,可以采取相应措施
if quality_results['valid_records'] / quality_results['total_records'] < 0.8:
print("警告: 数据质量低于可接受水平,建议进行数据清洗或重新生成")
这段代码实现了全面的数据验证功能,检查标题和内容的长度、特殊字符、AI生成标记等问题,并生成详细的质量报告。你可以根据实际需求调整验证规则和阈值。
集成到现有工作流
将AI文章数据导入功能集成到现有工作流中,可以进一步提高效率和自动化程度。以下是一个完整的集成示例:
import os
import sys
import json
import argparse
from datetime import datetime
class AIArticleImporter:
"""AI文章数据导入器"""
def __init__(self, config_file=None):
"""初始化导入器"""
self.config = self.load_config(config_file)
self.logger = self.setup_logging()
def load_config(self, config_file):
"""加载配置文件"""
default_config = {
"input_folder": "./ai_articles_input",
"output_folder": "./ai_articles_output",
"processed_folder": "./ai_articles_processed",
"log_file": "./ai_article_import.log",
"file_pattern": ".json",
"chunk_size": 10000,
"quality_threshold": 0.8,
"api_settings": {
"enabled": False,
"endpoint": "",
"api_key": ""
}
}
if config_file and os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
合并用户配置和默认配置
default_config.update(user_config)
except Exception as e:
print(f"加载配置文件出错: {str(e)},使用默认配置")
return default_config
def setup_logging(self):
"""设置日志记录"""
确保日志目录存在
log_dir = os.path.dirname(self.config['log_file'])
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
创建日志记录器
import logging
from logging.handlers import RotatingFileHandler
logger = logging.getLogger('AIArticleImporter')
logger.setLevel(logging.INFO)
创建文件处理器
file_handler = RotatingFileHandler(
self.config['log_file'],
maxBytes=1010241024, 10MB
backupCount=5
)
file_handler.setLevel(logging.INFO)
创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
添加处理器到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def ensure_directories(self):
"""确保必要的目录存在"""
for folder in [self.config['input_folder'],
self.config['output_folder'],
self.config['processed_folder']]:
if not os.path.exists(folder):
os.makedirs(folder)
self.logger.info(f"创建目录: {folder}")
def process_articles(self):
"""处理AI文章数据"""
self.logger.info("开始AI文章数据导入流程")
确保目录存在
self.ensure_directories()
获取输入文件列表
import glob
input_files = glob.glob(os.path.join(self.config['input_folder'], self.config['file_pattern']))
if not input_files:
self.logger.warning(f"在 {self.config['input_folder']} 中未找到匹配 {self.config['file_pattern']} 的文件")
return False
处理每个文件
all_success = True
for file_path in input_files:
self.logger.info(f"处理文件: {file_path}")
try:
读取文件
data = self.read_file(file_path)
清洗数据
cleaned_data = self.clean_data(data)
转换为DataFrame
df = self.convert_to_dataframe(cleaned_data)
优化DataFrame
df = self.optimize_dataframe(df)
验证数据质量
quality_results = self.validate_data(df)
如果质量低于阈值,记录警告
quality_score = quality_results['valid_records'] / quality_results['total_records']
if quality_score < self.config['quality_threshold']:
self.logger.warning(f"数据质量评分 {quality_score:.2f} 低于阈值 {self.config['quality_threshold']}")
生成输出文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_file = os.path.join(
self.config['output_folder'],
f"{base_name}_processed_{timestamp}.xlsx"
)
导出到Excel
if len(df) > self.config['chunk_size']:
大数据集使用分块导出
export_success = self.chunked_export_to_excel(df, output_file, self.config['chunk_size'])
else:
小数据集使用常规导出
export_success = self.export_to_excel(df, output_file)
if export_success:
移动已处理文件
processed_file = os.path.join(
self.config['processed_folder'],
f"{base_name}_processed_{timestamp}{os.path.splitext(file_path)[1]}"
)
os.rename(file_path, processed_file)
self.logger.info(f"文件处理完成: {file_path} -> {output_file}")
else:
all_success = False
self.logger.error(f"文件处理失败: {file_path}")
except Exception as e:
all_success = False
self.logger.error(f"处理文件 {file_path} 时出错: {str(e)}", exc_info=True)
if all_success:
self.logger.info("AI文章数据导入流程成功完成")
else:
self.logger.warning("AI文章数据导入流程完成,但部分文件处理失败")
return all_success
以下是前面定义的各种方法的简化版本
def read_file(self, file_path):
"""读取文件"""
ext = os.path.splitext(file_path)[1].lower()
if ext == '.json':
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
elif ext == '.csv':
import pandas as pd
return pd.read_csv(file_path, encoding='utf-8').to_dict('records')
elif ext == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
return {"content": f.read()}
else:
raise ValueError(f"不支持的文件格式: {ext}")
def clean_data(self, data):
"""清洗数据"""
import re
if isinstance(data, dict):
content = data.get('content', '')
content = re.sub(r's+', ' ', content)
content = re.sub(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', '', content)
content = re.sub(r'AI生成|人工智能生成|由AI创作', '', content)
data['content'] = content.strip()
return data
elif isinstance(data, list):
return [self.clean_data(item) for item in data]
else:
return data
def convert_to_dataframe(self, data):
"""转换为DataFrame"""
import pandas as pd
from datetime import datetime
if isinstance(data, dict):
data['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return pd.DataFrame([data])
elif isinstance(data, list):
for item in data:
item['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return pd.DataFrame(data)
else:
raise ValueError("不支持的数据格式")
def optimize_dataframe(self, df):
"""优化DataFrame"""
for col in df.select_dtypes(include=['object']):
if df[col].nunique() / len(df[col]) < 0.5:
df[col] = df[col].astype('category')
for col in df.select_dtypes(include=['int64']):
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float64']):
df[col] = pd.to_numeric(df[col], downcast='float')
return df
def validate_data(self, df):
"""验证数据质量"""
validation_results = {
'total_records': len(df),
'valid_records': 0,
'invalid_records': 0,
'issues': []
}
for idx, row in df.iterrows():
is_valid = True
if 'title' in df.columns:
title = str(row['title']).strip()
if not title or len(title) < 5 or len(title) > 100:
is_valid = False
if 'content' in df.columns:
content = str(row['content']).strip()
if not content or len(content) < 50 or len(content) > 10000:
is_valid = False
if is_valid:
validation_results['valid_records'] += 1
else:
validation_results['invalid_records'] += 1
return validation_results
def export_to_excel(self, df, file_path):
"""导出到Excel"""
try:
df.to_excel(file_path, index=False, engine='openpyxl')
self.logger.info(f"数据已成功导出到 {file_path}")
return True
except Exception as e:
self.logger.error(f"导出到 {file_path} 时出错: {str(e)}")
return False
def chunked_export_to_excel(self, df, file_path, chunk_size):
"""分块导出到Excel"""
import numpy as np
from tqdm import tqdm
total_rows = len(df)
num_chunks = (total_rows // chunk_size) + (1 if total_rows % chunk_size else 0)
writer = pd.ExcelWriter(file_path, engine='openpyxl')
try:
for i, chunk in tqdm(enumerate(np.array_split(df, num_chunks)),
total=num_chunks,
desc="导出进度"):
header = (i == 0)
startrow = writer.sheets['Sheet1'].max_row if not header and 'Sheet1' in writer.sheets else 0
chunk.to_excel(
writer,
sheet_name='Sheet1',
startrow=startrow,
header=header,
index=False
)
writer.close()
self.logger.info(f"成功导出 {total_rows} 行数据到 {file_path}")
return True
except Exception as e:
self.logger.error(f"分块导出时出错: {str(e)}")
writer.close()
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='AI文章数据导入工具')
parser.add_argument('--config', '-c', help='配置文件路径')
parser.add_argument('--input', '-i', help='输入文件夹路径')
parser.add_argument('--output', '-o', help='输出文件夹路径')
parser.add_argument('--pattern', '-p', help='文件匹配模式')
parser.add_argument('--validate', '-v', action='store_true', help='仅验证数据,不导出')
args = parser.parse_args()
创建导入器实例
importer = AIArticleImporter(args.config)
覆盖配置中的参数
if args.input:
importer.config['input_folder'] = args.input
if args.output:
importer.config['output_folder'] = args.output
if args.pattern:
importer.config['file_pattern'] = args.pattern
处理文章数据
if args.validate:
仅验证数据
print("验证模式:仅验证数据,不导出")
这里可以添加仅验证的逻辑
else:
完整处理流程
success = importer.process_articles()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
这段代码实现了一个完整的AI文章数据导入器类,可以集成到现有工作流中。它支持配置文件、命令行参数、日志记录、数据验证、质量检查和批量处理等功能。你可以根据实际需求进一步扩展和定制这个类。
通过使用这个导入器,你可以轻松地将AI文章数据导入到Excel表格中,同时确保数据质量和处理效率。无论是小规模的数据处理还是大规模的批量导入,这个解决方案都能满足你的需求。