如何使用Python脚本高效导入AI文章数据到Excel表格

当你面对大量AI生成的文章数据时,手动处理不仅效率低下,还容易出错。Python脚本提供了自动化解决方案,让你能够快速、准确地将这些数据导入Excel表格中。下面我们将详细介绍实现这一过程的具体方法和代码示例。

准备工作与环境配置

在开始之前,确保你的开发环境已经配置妥当。你需要安装Python以及几个关键库,这些库将帮助你处理数据并与Excel文件交互。


 安装必要的Python库
pip install pandas
pip install openpyxl
pip install xlwt
pip install requests

这些库中,pandas用于数据处理,openpyxl和xlwt用于Excel文件操作,requests则用于获取API数据。安装完成后,你就可以开始编写导入脚本了。

注意:根据你的Python版本,可能需要使用pip3而不是pip。如果遇到权限问题,可以尝试在命令前加上sudo(Linux/macOS)或以管理员身份运行命令提示符(Windows)。

获取AI生成的文章数据

AI文章数据通常来自两个主要渠道:API接口或本地文件。我们将分别讨论这两种情况的处理方法。

通过API获取AI文章数据

许多AI内容生成平台提供API接口,你可以通过这些接口直接获取生成的文章数据。以下是使用OpenAI API获取数据的示例代码:


import requests
import json

 设置API密钥和请求参数
api_key = "your_openai_api_key"
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

 准备请求数据
data = {
    "model": "gpt-4-turbo",
    "messages": [
        {"role": "system", "content": "你是一个专业的文章生成助手"},
        {"role": "user", "content": "生成一篇关于人工智能发展趋势的文章"}
    ],
    "max_tokens": 2000
}

 发送请求
response = requests.post("https://api.openai.com/v1/chat/completions", 
                        headers=headers, 
                        data=json.dumps(data))

 解析响应
if response.status_code == 200:
    result = response.json()
    article_content = result['choices'][0]['message']['content']
    print("成功获取AI文章数据")
else:
    print(f"请求失败,状态码:{response.status_code}")
    print(response.text)

这段代码向OpenAI API发送请求,获取一篇关于人工智能发展趋势的文章。你可以根据需要修改请求参数,比如调整文章主题、长度等。

从本地文件读取AI文章数据

如果你已经将AI生成的文章保存在本地文件中(如JSON、TXT或CSV格式),可以使用以下代码读取:


import json
import pandas as pd

 从JSON文件读取数据
def read_from_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    return data

 从TXT文件读取数据
def read_from_txt(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    return {"content": content}

 从CSV文件读取数据
def read_from_csv(file_path):
    df = pd.read_csv(file_path, encoding='utf-8')
    return df.to_dict('records')

 使用示例
json_data = read_from_json('ai_articles.json')
txt_data = read_from_txt('ai_article.txt')
csv_data = read_from_csv('ai_articles.csv')

这些函数分别处理不同格式的文件,将AI文章数据读取到Python变量中,为后续处理做准备。

数据清洗与预处理

AI生成的文章数据往往包含格式问题、特殊字符或不需要的内容,需要进行清洗和预处理。以下是常见的数据清洗操作:


import re
import pandas as pd

def clean_article_data(article_data):
     如果是字典形式的数据
    if isinstance(article_data, dict):
         提取文章内容
        content = article_data.get('content', '')
        
         移除特殊字符和多余空格
        content = re.sub(r's+', ' ', content)   替换多个空格为单个空格
        content = re.sub(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', '', content)   保留中英文、标点和基本符号
        
         移除可能的AI生成标记
        content = re.sub(r'AI生成|人工智能生成|由AI创作', '', content)
        
         更新清洗后的内容
        article_data['content'] = content.strip()
        
        return article_data
    
     如果是列表形式的数据
    elif isinstance(article_data, list):
        cleaned_data = []
        for item in article_data:
            cleaned_item = clean_article_data(item)
            cleaned_data.append(cleaned_item)
        return cleaned_data
    
    return article_data

 使用示例
cleaned_data = clean_article_data(json_data)

这段代码定义了一个数据清洗函数,可以处理字典或列表形式的数据,移除特殊字符、多余空格和可能的AI生成标记。你可以根据实际需求调整正则表达式模式,以适应不同的数据清洗需求。

警告:数据清洗是一个需要谨慎处理的过程。过度清洗可能导致重要信息丢失,而清洗不足则可能影响后续分析。建议在处理重要数据前,先在小样本上测试清洗效果。

将数据转换为DataFrame

将清洗后的数据转换为Pandas DataFrame是导入Excel的关键步骤。DataFrame提供了丰富的数据处理功能,使你能够轻松操作和分析数据。


import pandas as pd
from datetime import datetime

def convert_to_dataframe(article_data):
     如果是单个字典,转换为单行DataFrame
    if isinstance(article_data, dict):
         添加时间戳
        article_data['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        df = pd.DataFrame([article_data])
    
     如果是字典列表,直接转换为DataFrame
    elif isinstance(article_data, list):
         为每篇文章添加时间戳
        for item in article_data:
            item['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        df = pd.DataFrame(article_data)
    
    else:
        raise ValueError("不支持的数据格式,请提供字典或字典列表")
    
    return df

 使用示例
df = convert_to_dataframe(cleaned_data)
print(f"成功创建DataFrame,包含{len(df)}行数据")
print(df.head())

这段代码将清洗后的文章数据转换为DataFrame,并自动添加导入时间戳。时间戳对于追踪数据导入历史非常有用,特别是在处理大量数据时。

导出到Excel表格

最后一步是将DataFrame导出到Excel文件。Pandas提供了简单易用的方法来实现这一功能,同时支持多种自定义选项。


def export_to_excel(df, file_path, sheet_name='AI文章数据'):
    try:
         导出到Excel文件
        df.to_excel(file_path, 
                   sheet_name=sheet_name, 
                   index=False,   不包含行索引
                   engine='openpyxl')   使用openpyxl引擎
        
        print(f"数据已成功导出到 {file_path}")
        return True
    except Exception as e:
        print(f"导出Excel时出错: {str(e)}")
        return False

 使用示例
export_to_excel(df, 'ai_articles_imported.xlsx')

这段代码将DataFrame导出到Excel文件,使用openpyxl引擎,并排除行索引。你可以根据需要调整sheet名称和其他导出参数。

高级Excel导出选项

如果你需要更复杂的Excel导出功能,比如设置单元格格式、添加公式或创建多个工作表,可以使用以下高级方法:


from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows

def advanced_export_to_excel(df, file_path):
     创建一个新的工作簿
    wb = Workbook()
    ws = wb.active
    ws.title = "AI文章数据"
    
     设置标题行样式
    header_font = Font(bold=True, color="FFFFFF")
    header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
    header_alignment = Alignment(horizontal="center", vertical="center")
    
     将DataFrame数据写入工作表
    for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
        for c_idx, value in enumerate(row, 1):
            cell = ws.cell(row=r_idx, column=c_idx, value=value)
            
             设置标题行样式
            if r_idx == 1:
                cell.font = header_font
                cell.fill = header_fill
                cell.alignment = header_alignment
            
             自动调整列宽
            if r_idx == 1:
                ws.column_dimensions[chr(64 + c_idx)].width = 20
    
     添加筛选功能
    ws.auto_filter.ref = ws.dimensions
    
     冻结首行
    ws.freeze_panes = "A2"
    
     保存工作簿
    wb.save(file_path)
    print(f"数据已成功导出到 {file_path},包含格式设置")

 使用示例
advanced_export_to_excel(df, 'ai_articles_formatted.xlsx')

这段代码使用openpyxl库创建格式化的Excel文件,包括标题行样式、自动调整列宽、筛选功能和冻结首行等高级特性。这些功能使导出的Excel文件更加专业和易用。

批量处理与自动化

当你需要定期导入大量AI文章数据时,批量处理和自动化是必不可少的。以下是一个完整的批量处理脚本示例:


import os
import glob
import schedule
import time
from datetime import datetime

def batch_process_articles(input_folder, output_folder, file_pattern=".json"):
     确保输出文件夹存在
    os.makedirs(output_folder, exist_ok=True)
    
     获取所有匹配的输入文件
    input_files = glob.glob(os.path.join(input_folder, file_pattern))
    
    if not input_files:
        print(f"在 {input_folder} 中未找到匹配 {file_pattern} 的文件")
        return
    
     创建输出文件名(包含时间戳)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_file = os.path.join(output_folder, f"ai_articles_batch_{timestamp}.xlsx")
    
     处理所有文件
    all_articles = []
    
    for file_path in input_files:
        print(f"正在处理文件: {file_path}")
        
         读取文件
        try:
            data = read_from_json(file_path)
            
             清洗数据
            cleaned_data = clean_article_data(data)
            
             添加到总列表
            if isinstance(cleaned_data, list):
                all_articles.extend(cleaned_data)
            else:
                all_articles.append(cleaned_data)
                
        except Exception as e:
            print(f"处理文件 {file_path} 时出错: {str(e)}")
    
    if not all_articles:
        print("没有可用的文章数据")
        return
    
     转换为DataFrame
    df = convert_to_dataframe(all_articles)
    
     导出到Excel
    export_success = export_to_excel(df, output_file)
    
    if export_success:
        print(f"批量处理完成,结果已保存到 {output_file}")
        
         可选:处理完成后移动或删除原始文件
        for file_path in input_files:
            try:
                 移动到已处理文件夹
                processed_folder = os.path.join(input_folder, "processed")
                os.makedirs(processed_folder, exist_ok=True)
                os.rename(file_path, os.path.join(processed_folder, os.path.basename(file_path)))
            except Exception as e:
                print(f"移动文件 {file_path} 时出错: {str(e)}")

def scheduled_batch_process():
    input_folder = "./ai_articles_input"
    output_folder = "./ai_articles_output"
    
    print(f"开始定时批量处理: {datetime.now()}")
    batch_process_articles(input_folder, output_folder)
    print(f"定时批量处理完成: {datetime.now()}")

 设置定时任务(每天凌晨2点执行)
schedule.every().day.at("02:00").do(scheduled_batch_process)

 如果需要立即运行一次
scheduled_batch_process()

 保持脚本运行,执行定时任务
while True:
    schedule.run_pending()
    time.sleep(60)   每分钟检查一次

这个脚本实现了完整的批量处理流程,包括读取多个文件、数据清洗、转换和导出。它还使用了schedule库来实现定时执行功能,你可以根据需要调整执行频率和时间。

提示:在生产环境中使用定时任务时,建议将脚本部署为系统服务或使用专门的任务调度工具(如cron、Windows任务计划程序或Airflow),而不是直接运行Python脚本。这样可以确保脚本的稳定运行和自动重启能力。

错误处理与日志记录

在处理大量数据时,错误处理和日志记录至关重要。它们帮助你追踪问题、分析失败原因并优化处理流程。


import logging
from logging.handlers import RotatingFileHandler

def setup_logging(log_file='ai_article_import.log', level=logging.INFO):
     创建日志记录器
    logger = logging.getLogger('AI文章导入')
    logger.setLevel(level)
    
     创建文件处理器,限制日志文件大小并保留备份
    file_handler = RotatingFileHandler(
        log_file, 
        maxBytes=1010241024,   10MB
        backupCount=5
    )
    file_handler.setLevel(level)
    
     创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(level)
    
     创建格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
     添加处理器到日志记录器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

 使用示例
logger = setup_logging()

def safe_export_to_excel(df, file_path):
    try:
        logger.info(f"开始导出数据到 {file_path}")
        
         检查DataFrame是否为空
        if df.empty:
            logger.warning("DataFrame为空,没有数据可导出")
            return False
        
         检查文件是否已存在
        if os.path.exists(file_path):
            logger.warning(f"文件 {file_path} 已存在,将被覆盖")
        
         导出到Excel
        export_success = export_to_excel(df, file_path)
        
        if export_success:
            logger.info(f"成功导出 {len(df)} 行数据到 {file_path}")
            return True
        else:
            logger.error(f"导出到 {file_path} 失败")
            return False
            
    except Exception as e:
        logger.error(f"导出到 {file_path} 时发生异常: {str(e)}", exc_info=True)
        return False

 使用示例
logger.info("开始AI文章数据导入流程")
 ... 其他代码 ...
safe_export_to_excel(df, 'ai_articles_with_logging.xlsx')
logger.info("AI文章数据导入流程完成")

这段代码实现了完整的日志记录系统,包括文件和控制台输出,并提供了安全的Excel导出函数,带有详细的错误处理和日志记录。日志文件会自动轮转,避免占用过多磁盘空间。

性能优化与大数据处理

当处理大量AI文章数据时,性能优化变得尤为重要。以下是一些优化技术和代码示例:


import pandas as pd
import numpy as np
from tqdm import tqdm   进度条库

def optimize_dataframe(df):
    """优化DataFrame内存使用"""
     转换对象类型为分类类型(适用于低基数字符串列)
    for col in df.select_dtypes(include=['object']):
        if df[col].nunique() / len(df[col]) < 0.5:   如果唯一值比例小于50%
            df[col] = df[col].astype('category')
    
     转换数值类型为更节省空间的类型
    for col in df.select_dtypes(include=['int64']):
        df[col] = pd.to_numeric(df[col], downcast='integer')
    
    for col in df.select_dtypes(include=['float64']):
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    return df

def chunked_export_to_excel(df, file_path, chunk_size=10000):
    """分块导出大数据到Excel"""
     计算总行数和块数
    total_rows = len(df)
    num_chunks = (total_rows // chunk_size) + (1 if total_rows % chunk_size else 0)
    
     创建Excel写入器
    writer = pd.ExcelWriter(file_path, engine='openpyxl')
    
    try:
         分块处理和导出
        for i, chunk in tqdm(enumerate(np.array_split(df, num_chunks)), 
                            total=num_chunks, 
                            desc="导出进度"):
             第一块包含列名,后续块不包含
            header = (i == 0)
            
             如果不是第一块,追加到现有工作表
            if not header:
                 获取现有工作表的起始行
                startrow = writer.sheets['Sheet1'].max_row
            else:
                startrow = 0
            
             写入当前块
            chunk.to_excel(
                writer, 
                sheet_name='Sheet1', 
                startrow=startrow, 
                header=header, 
                index=False
            )
        
         保存Excel文件
        writer.close()
        print(f"成功导出 {total_rows} 行数据到 {file_path}")
        return True
        
    except Exception as e:
        print(f"分块导出时出错: {str(e)}")
        writer.close()
        return False

 使用示例
 假设有一个大型DataFrame
large_df = pd.DataFrame({
    'id': range(1, 50001),
    'title': [f"文章标题 {i}" for i in range(1, 50001)],
    'content': [f"这是第 {i} 篇AI生成文章的内容..." for i in range(1, 50001)],
    'category': np.random.choice(['科技', '教育', '娱乐', '健康'], 50001),
    'publish_date': pd.date_range('2023-01-01', periods=50001)
})

 优化DataFrame
optimized_df = optimize_dataframe(large_df)

 分块导出到Excel
chunked_export_to_excel(optimized_df, 'large_ai_articles_dataset.xlsx')

这段代码展示了两种主要的性能优化技术:DataFrame内存优化和分块处理。内存优化通过转换数据类型来减少内存使用,而分块处理则将大数据集分割成小块,避免内存溢出和提高处理效率。

警告:对于非常大的数据集(超过100万行),Excel可能不是最佳选择。考虑使用数据库(如SQLite、MySQL)或专门的格式(如Parquet、HDF5)来存储和处理数据。如果必须使用Excel,可以考虑将数据分割到多个工作表或多个文件中。

数据验证与质量检查

确保导入的AI文章数据质量是整个流程中不可忽视的一环。以下代码实现了常见的数据验证和质量检查功能:


import pandas as pd
import re
from collections import Counter

def validate_article_data(df):
    """验证文章数据质量"""
    validation_results = {
        'total_records': len(df),
        'valid_records': 0,
        'invalid_records': 0,
        'issues': []
    }
    
     检查必需列
    required_columns = ['title', 'content']
    missing_columns = [col for col in required_columns if col not in df.columns]
    
    if missing_columns:
        validation_results['issues'].append(f"缺少必需列: {', '.join(missing_columns)}")
    
     检查每条记录
    for idx, row in df.iterrows():
        is_valid = True
        record_issues = []
        
         检查标题
        if 'title' in df.columns:
            title = str(row['title']).strip()
            if not title or title.lower() in ['nan', 'null', 'undefined']:
                is_valid = False
                record_issues.append("标题为空")
            elif len(title) < 5:
                is_valid = False
                record_issues.append("标题过短")
            elif len(title) > 100:
                is_valid = False
                record_issues.append("标题过长")
        
         检查内容
        if 'content' in df.columns:
            content = str(row['content']).strip()
            if not content or content.lower() in ['nan', 'null', 'undefined']:
                is_valid = False
                record_issues.append("内容为空")
            elif len(content) < 50:
                is_valid = False
                record_issues.append("内容过短")
            elif len(content) > 10000:
                is_valid = False
                record_issues.append("内容过长")
        
         检查特殊字符
        if 'content' in df.columns:
            content = str(row['content'])
            special_chars = re.findall(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', content)
            if special_chars:
                 统计特殊字符频率
                char_counter = Counter(special_chars)
                common_chars = char_counter.most_common(3)
                is_valid = False
                record_issues.append(f"包含特殊字符: {', '.join([f'{char}({count})' for char, count in common_chars])}")
        
         检查AI生成标记
        if 'content' in df.columns:
            content = str(row['content']).lower()
            ai_markers = ['ai生成', '人工智能生成', '由ai创作', 'ai写作']
            found_markers = [marker for marker in ai_markers if marker in content]
            if found_markers:
                is_valid = False
                record_issues.append(f"包含AI生成标记: {', '.join(found_markers)}")
        
         更新验证结果
        if is_valid:
            validation_results['valid_records'] += 1
        else:
            validation_results['invalid_records'] += 1
            validation_results['issues'].append(f"记录 {idx+1}: {', '.join(record_issues)}")
    
    return validation_results

def generate_quality_report(df, output_file='data_quality_report.'):
    """生成数据质量报告"""
    validation_results = validate_article_data(df)
    
     计算质量指标
    total_records = validation_results['total_records']
    valid_records = validation_results['valid_records']
    invalid_records = validation_results['invalid_records']
    quality_score = (valid_records / total_records)  100 if total_records > 0 else 0
    
     创建报告
    html_content = f"""
    <>
    
        AI文章数据质量报告
        
    
    
        

AI文章数据质量报告

数据概览

总记录数: {total_records}
有效记录数: {valid_records}
无效记录数: {invalid_records}
质量评分: {quality_score:.1f}%

发现的问题

{''.join([f'
{issue}
' for issue in validation_results['issues']])}
""" 写入文件 with open(output_file, 'w', encoding='utf-8') as f: f.write(html_content) print(f"数据质量报告已生成: {output_file}") return validation_results 使用示例 假设df是包含AI文章数据的DataFrame quality_results = generate_quality_report(df) 如果质量评分低于阈值,可以采取相应措施 if quality_results['valid_records'] / quality_results['total_records'] < 0.8: print("警告: 数据质量低于可接受水平,建议进行数据清洗或重新生成")

这段代码实现了全面的数据验证功能,检查标题和内容的长度、特殊字符、AI生成标记等问题,并生成详细的质量报告。你可以根据实际需求调整验证规则和阈值。

如何使用Python脚本高效导入AI文章数据到Excel表格

集成到现有工作流

将AI文章数据导入功能集成到现有工作流中,可以进一步提高效率和自动化程度。以下是一个完整的集成示例:


import os
import sys
import json
import argparse
from datetime import datetime

class AIArticleImporter:
    """AI文章数据导入器"""
    
    def __init__(self, config_file=None):
        """初始化导入器"""
        self.config = self.load_config(config_file)
        self.logger = self.setup_logging()
        
    def load_config(self, config_file):
        """加载配置文件"""
        default_config = {
            "input_folder": "./ai_articles_input",
            "output_folder": "./ai_articles_output",
            "processed_folder": "./ai_articles_processed",
            "log_file": "./ai_article_import.log",
            "file_pattern": ".json",
            "chunk_size": 10000,
            "quality_threshold": 0.8,
            "api_settings": {
                "enabled": False,
                "endpoint": "",
                "api_key": ""
            }
        }
        
        if config_file and os.path.exists(config_file):
            try:
                with open(config_file, 'r', encoding='utf-8') as f:
                    user_config = json.load(f)
                     合并用户配置和默认配置
                    default_config.update(user_config)
            except Exception as e:
                print(f"加载配置文件出错: {str(e)},使用默认配置")
        
        return default_config
    
    def setup_logging(self):
        """设置日志记录"""
         确保日志目录存在
        log_dir = os.path.dirname(self.config['log_file'])
        if log_dir and not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
         创建日志记录器
        import logging
        from logging.handlers import RotatingFileHandler
        
        logger = logging.getLogger('AIArticleImporter')
        logger.setLevel(logging.INFO)
        
         创建文件处理器
        file_handler = RotatingFileHandler(
            self.config['log_file'], 
            maxBytes=1010241024,   10MB
            backupCount=5
        )
        file_handler.setLevel(logging.INFO)
        
         创建控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
         创建格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
         添加处理器到日志记录器
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger
    
    def ensure_directories(self):
        """确保必要的目录存在"""
        for folder in [self.config['input_folder'], 
                      self.config['output_folder'], 
                      self.config['processed_folder']]:
            if not os.path.exists(folder):
                os.makedirs(folder)
                self.logger.info(f"创建目录: {folder}")
    
    def process_articles(self):
        """处理AI文章数据"""
        self.logger.info("开始AI文章数据导入流程")
        
         确保目录存在
        self.ensure_directories()
        
         获取输入文件列表
        import glob
        input_files = glob.glob(os.path.join(self.config['input_folder'], self.config['file_pattern']))
        
        if not input_files:
            self.logger.warning(f"在 {self.config['input_folder']} 中未找到匹配 {self.config['file_pattern']} 的文件")
            return False
        
         处理每个文件
        all_success = True
        for file_path in input_files:
            self.logger.info(f"处理文件: {file_path}")
            
            try:
                 读取文件
                data = self.read_file(file_path)
                
                 清洗数据
                cleaned_data = self.clean_data(data)
                
                 转换为DataFrame
                df = self.convert_to_dataframe(cleaned_data)
                
                 优化DataFrame
                df = self.optimize_dataframe(df)
                
                 验证数据质量
                quality_results = self.validate_data(df)
                
                 如果质量低于阈值,记录警告
                quality_score = quality_results['valid_records'] / quality_results['total_records']
                if quality_score < self.config['quality_threshold']:
                    self.logger.warning(f"数据质量评分 {quality_score:.2f} 低于阈值 {self.config['quality_threshold']}")
                
                 生成输出文件名
                timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                base_name = os.path.splitext(os.path.basename(file_path))[0]
                output_file = os.path.join(
                    self.config['output_folder'], 
                    f"{base_name}_processed_{timestamp}.xlsx"
                )
                
                 导出到Excel
                if len(df) > self.config['chunk_size']:
                     大数据集使用分块导出
                    export_success = self.chunked_export_to_excel(df, output_file, self.config['chunk_size'])
                else:
                     小数据集使用常规导出
                    export_success = self.export_to_excel(df, output_file)
                
                if export_success:
                     移动已处理文件
                    processed_file = os.path.join(
                        self.config['processed_folder'], 
                        f"{base_name}_processed_{timestamp}{os.path.splitext(file_path)[1]}"
                    )
                    os.rename(file_path, processed_file)
                    self.logger.info(f"文件处理完成: {file_path} -> {output_file}")
                else:
                    all_success = False
                    self.logger.error(f"文件处理失败: {file_path}")
                
            except Exception as e:
                all_success = False
                self.logger.error(f"处理文件 {file_path} 时出错: {str(e)}", exc_info=True)
        
        if all_success:
            self.logger.info("AI文章数据导入流程成功完成")
        else:
            self.logger.warning("AI文章数据导入流程完成,但部分文件处理失败")
        
        return all_success
    
     以下是前面定义的各种方法的简化版本
    def read_file(self, file_path):
        """读取文件"""
        ext = os.path.splitext(file_path)[1].lower()
        
        if ext == '.json':
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        elif ext == '.csv':
            import pandas as pd
            return pd.read_csv(file_path, encoding='utf-8').to_dict('records')
        elif ext == '.txt':
            with open(file_path, 'r', encoding='utf-8') as f:
                return {"content": f.read()}
        else:
            raise ValueError(f"不支持的文件格式: {ext}")
    
    def clean_data(self, data):
        """清洗数据"""
        import re
        
        if isinstance(data, dict):
            content = data.get('content', '')
            content = re.sub(r's+', ' ', content)
            content = re.sub(r'[^wsu4e00-u9fff.,!?;:()[]{}"'-]', '', content)
            content = re.sub(r'AI生成|人工智能生成|由AI创作', '', content)
            data['content'] = content.strip()
            return data
        elif isinstance(data, list):
            return [self.clean_data(item) for item in data]
        else:
            return data
    
    def convert_to_dataframe(self, data):
        """转换为DataFrame"""
        import pandas as pd
        from datetime import datetime
        
        if isinstance(data, dict):
            data['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            return pd.DataFrame([data])
        elif isinstance(data, list):
            for item in data:
                item['import_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            return pd.DataFrame(data)
        else:
            raise ValueError("不支持的数据格式")
    
    def optimize_dataframe(self, df):
        """优化DataFrame"""
        for col in df.select_dtypes(include=['object']):
            if df[col].nunique() / len(df[col]) < 0.5:
                df[col] = df[col].astype('category')
        
        for col in df.select_dtypes(include=['int64']):
            df[col] = pd.to_numeric(df[col], downcast='integer')
        
        for col in df.select_dtypes(include=['float64']):
            df[col] = pd.to_numeric(df[col], downcast='float')
        
        return df
    
    def validate_data(self, df):
        """验证数据质量"""
        validation_results = {
            'total_records': len(df),
            'valid_records': 0,
            'invalid_records': 0,
            'issues': []
        }
        
        for idx, row in df.iterrows():
            is_valid = True
            
            if 'title' in df.columns:
                title = str(row['title']).strip()
                if not title or len(title) < 5 or len(title) > 100:
                    is_valid = False
            
            if 'content' in df.columns:
                content = str(row['content']).strip()
                if not content or len(content) < 50 or len(content) > 10000:
                    is_valid = False
            
            if is_valid:
                validation_results['valid_records'] += 1
            else:
                validation_results['invalid_records'] += 1
        
        return validation_results
    
    def export_to_excel(self, df, file_path):
        """导出到Excel"""
        try:
            df.to_excel(file_path, index=False, engine='openpyxl')
            self.logger.info(f"数据已成功导出到 {file_path}")
            return True
        except Exception as e:
            self.logger.error(f"导出到 {file_path} 时出错: {str(e)}")
            return False
    
    def chunked_export_to_excel(self, df, file_path, chunk_size):
        """分块导出到Excel"""
        import numpy as np
        from tqdm import tqdm
        
        total_rows = len(df)
        num_chunks = (total_rows // chunk_size) + (1 if total_rows % chunk_size else 0)
        
        writer = pd.ExcelWriter(file_path, engine='openpyxl')
        
        try:
            for i, chunk in tqdm(enumerate(np.array_split(df, num_chunks)), 
                                total=num_chunks, 
                                desc="导出进度"):
                header = (i == 0)
                startrow = writer.sheets['Sheet1'].max_row if not header and 'Sheet1' in writer.sheets else 0
                
                chunk.to_excel(
                    writer, 
                    sheet_name='Sheet1', 
                    startrow=startrow, 
                    header=header, 
                    index=False
                )
            
            writer.close()
            self.logger.info(f"成功导出 {total_rows} 行数据到 {file_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"分块导出时出错: {str(e)}")
            writer.close()
            return False

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='AI文章数据导入工具')
    parser.add_argument('--config', '-c', help='配置文件路径')
    parser.add_argument('--input', '-i', help='输入文件夹路径')
    parser.add_argument('--output', '-o', help='输出文件夹路径')
    parser.add_argument('--pattern', '-p', help='文件匹配模式')
    parser.add_argument('--validate', '-v', action='store_true', help='仅验证数据,不导出')
    
    args = parser.parse_args()
    
     创建导入器实例
    importer = AIArticleImporter(args.config)
    
     覆盖配置中的参数
    if args.input:
        importer.config['input_folder'] = args.input
    if args.output:
        importer.config['output_folder'] = args.output
    if args.pattern:
        importer.config['file_pattern'] = args.pattern
    
     处理文章数据
    if args.validate:
         仅验证数据
        print("验证模式:仅验证数据,不导出")
         这里可以添加仅验证的逻辑
    else:
         完整处理流程
        success = importer.process_articles()
        sys.exit(0 if success else 1)

if __name__ == "__main__":
    main()

这段代码实现了一个完整的AI文章数据导入器类,可以集成到现有工作流中。它支持配置文件、命令行参数、日志记录、数据验证、质量检查和批量处理等功能。你可以根据实际需求进一步扩展和定制这个类。

通过使用这个导入器,你可以轻松地将AI文章数据导入到Excel表格中,同时确保数据质量和处理效率。无论是小规模的数据处理还是大规模的批量导入,这个解决方案都能满足你的需求。