数据工程师常需处理重复的数据管道搭建、格式转换等工作,以下 5 个 Python 脚本可直接复用,帮你节省大量时间。

1. 数据格式批量转换脚本(CSV ↔ JSON ↔ Parquet)

该脚本支持将多个文件在 CSV、JSON、Parquet 格式间批量转换,适配不同数据分析工具的格式需求。

python

运行

import pandas as pd
import os
from pathlib import Path

def batch_convert_files(input_dir, output_dir, input_format, output_format):
    # 创建输出目录(若不存在)
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    # 遍历输入目录下所有对应格式文件
    for filename in os.listdir(input_dir):
        if filename.endswith(f".{input_format.lower()}"):
            file_path = os.path.join(input_dir, filename)
            output_filename = Path(filename).stem + f".{output_format.lower()}"
            output_path = os.path.join(output_dir, output_filename)
            
            # 读取输入文件
            try:
                if input_format.lower() == "csv":
                    df = pd.read_csv(file_path)
                elif input_format.lower() == "json":
                    df = pd.read_json(file_path)
                elif input_format.lower() == "parquet":
                    df = pd.read_parquet(file_path)
                else:
                    print(f"不支持的输入格式:{input_format}")
                    continue
                
                # 写入输出文件
                if output_format.lower() == "csv":
                    df.to_csv(output_path, index=False)
                elif output_format.lower() == "json":
                    df.to_json(output_path, orient="records")
                elif output_format.lower() == "parquet":
                    df.to_parquet(output_path, index=False)
                else:
                    print(f"不支持的输出格式:{output_format}")
                    continue
                
                print(f"成功转换:{filename} → {output_filename}")
            except Exception as e:
                print(f"转换失败 {filename}:{str(e)}")

# 使用示例:将 input 文件夹的 CSV 转换为 Parquet 并存入 output 文件夹
batch_convert_files(
    input_dir="input_files",
    output_dir="output_files",
    input_format="CSV",
    output_format="Parquet"
)

2. 数据质量校验脚本(缺失值、异常值、重复值检测)

快速校验数据集完整性,生成详细质量报告,避免脏数据流入下游流程。

python

运行

import pandas as pd
import numpy as np

def data_quality_check(df, output_report_path="data_quality_report.txt"):
    report = []
    report.append("="*50)
    report.append("数据质量校验报告")
    report.append("="*50)
    
    # 1. 基本信息
    report.append(f"\n1. 数据集基本信息:")
    report.append(f"   - 总行数:{df.shape[0]}")
    report.append(f"   - 总列数:{df.shape[1]}")
    report.append(f"   - 数据类型分布:\n{df.dtypes.value_counts().to_string()}")
    
    # 2. 缺失值检测
    missing_stats = df.isnull().sum()
    missing_percent = (missing_stats / len(df)) * 100
    report.append(f"\n2. 缺失值统计:")
    for col, missing in missing_stats.items():
        if missing > 0:
            report.append(f"   - {col}:{missing} 个缺失值({missing_percent[col]:.2f}%)")
    
    # 3. 重复值检测
    duplicate_count = df.duplicated().sum()
    report.append(f"\n3. 重复值统计:")
    report.append(f"   - 完全重复行:{duplicate_count} 行({duplicate_count/len(df)*100:.2f}%)")
    
    # 4. 数值型字段异常值检测(IQR 方法)
    report.append(f"\n4. 数值型字段异常值统计(IQR 法则):")
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        q1 = df[col].quantile(0.25)
        q3 = df[col].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)].shape[0]
        if outliers > 0:
            report.append(f"   - {col}:{outliers} 个异常值(范围:[{lower_bound:.2f}, {upper_bound:.2f}])")
    
    # 保存报告
    with open(output_report_path, "w", encoding="utf-8") as f:
        f.write("\n".join(report))
    
    print(f"质量报告已保存至:{output_report_path}")
    return "\n".join(report)

# 使用示例
df = pd.read_csv("your_dataset.csv")
data_quality_check(df)

3. 数据库批量数据迁移脚本(MySQL → PostgreSQL)

实现不同数据库间的数据批量迁移,自动处理字段类型映射,支持增量同步。

python

运行

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime

def migrate_data_mysql_to_postgres(
    mysql_conn_str, pg_conn_str, 
    mysql_table, pg_table, 
    incremental_col=None, last_sync_time=None
):
    # 创建数据库连接
    mysql_engine = create_engine(mysql_conn_str)
    pg_engine = create_engine(pg_conn_str)
    
    try:
        # 构建查询语句(支持增量同步)
        query = f"SELECT * FROM {mysql_table}"
        if incremental_col and last_sync_time:
            query += f" WHERE {incremental_col} > '{last_sync_time}'"
            print(f"执行增量同步:仅迁移 {incremental_col} > {last_sync_time} 的数据")
        
        # 批量读取 MySQL 数据(避免内存溢出)
        chunk_size = 10000
        chunk_iter = pd.read_sql(query, mysql_engine, chunksize=chunk_size)
        
        # 批量写入 PostgreSQL
        total_rows = 0
        for chunk in chunk_iter:
            chunk.to_sql(
                name=pg_table,
                con=pg_engine,
                if_exists="append",
                index=False,
                chunksize=chunk_size
            )
            total_rows += len(chunk)
            print(f"已迁移 {total_rows} 行数据")
        
        print(f"数据迁移完成!共迁移 {total_rows} 行数据至 {pg_table}")
    
    except Exception as e:
        print(f"迁移失败:{str(e)}")
    finally:
        mysql_engine.dispose()
        pg_engine.dispose()

# 使用示例
mysql_conn = "mysql+pymysql://username:password@mysql_host:3306/database_name"
pg_conn = "postgresql://username:password@pg_host:5432/database_name"

# 全量迁移
migrate_data_mysql_to_postgres(
    mysql_conn_str=mysql_conn,
    pg_conn_str=pg_conn,
    mysql_table="source_table",
    pg_table="target_table"
)

# 增量迁移(基于更新时间字段)
# migrate_data_mysql_to_postgres(
#     mysql_conn_str=mysql_conn,
#     pg_conn_str=pg_conn,
#     mysql_table="source_table",
#     pg_table="target_table",
#     incremental_col="update_time",
#     last_sync_time="2025-01-01 00:00:00"
# )

4. 日志解析与数据提取脚本(结构化日志转 CSV)

从非结构化日志文件中提取关键信息(如时间戳、请求 ID、状态码),转为结构化数据便于分析。

python

运行

import re
import csv
from pathlib import Path

def parse_logs_to_csv(log_file_path, output_csv_path, log_pattern):
    # 编译日志匹配正则表达式
    regex = re.compile(log_pattern)
    
    # 提取日志字段名(从正则表达式分组中获取)
    field_names = regex.groupindex.keys()
    
    # 读取日志并提取数据
    parsed_data = []
    with open(log_file_path, "r", encoding="utf-8") as log_file:
        for line_num, line in enumerate(log_file, 1):
            match = regex.match(line.strip())
            if match:
                parsed_data.append(match.groupdict())
            else:
                print(f"第 {line_num} 行日志格式不匹配:{line.strip()}")
    
    # 写入 CSV 文件
    Path(output_csv_path).parent.mkdir(parents=True, exist_ok=True)
    with open(output_csv_path, "w", newline="", encoding="utf-8") as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=field_names)
        writer.writeheader()
        writer.writerows(parsed_data)
    
    print(f"日志解析完成!共提取 {len(parsed_data)} 条结构化数据,保存至 {output_csv_path}")

# 使用示例:解析 Nginx 访问日志
nginx_log_pattern = (
    r"(?P<ip>\d+\.\d+\.\d+\.\d+) "
    r"-\s+-\s+ "
    r"\[(?P<timestamp>.+?)\] "
    r'"(?P<method>\w+) (?P<path>.+?) HTTP/\d+\.\d+" '
    r"(?P<status_code>\d+) "
    r"(?P<response_size>\d+) "
    r'"(?P<referer>.+?)" '
    r'"(?P<user_agent>.+?)"'
)

parse_logs_to_csv(
    log_file_path="access.log",
    output_csv_path="parsed_nginx_logs.csv",
    log_pattern=nginx_log_pattern
)

5. 定时数据备份脚本(本地 + 云存储同步)

自动备份指定目录下的数据文件,支持本地归档和云存储(以 AWS S3 为例)同步,确保数据安全。

python

运行

import os
import shutil
import boto3
from datetime import datetime
from pathlib import Path

def backup_data(
    source_dir, local_backup_dir, 
    s3_bucket=None, s3_prefix=None, 
    aws_access_key=None, aws_secret_key=None
):
    # 生成备份文件名(带时间戳)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_filename = f"data_backup_{timestamp}.zip"
    local_backup_path = os.path.join(local_backup_dir, backup_filename)
    
    # 创建本地备份目录
    Path(local_backup_dir).mkdir(parents=True, exist_ok=True)
    
    try:
        # 压缩源目录数据
        print(f"开始本地备份:{source_dir} → {local_backup_path}")
        shutil.make_archive(
            base_name=os.path.splitext(local_backup_path)[0],
            format="zip",
            root_dir=source_dir
        )
        print("本地备份完成!")
        
        # 同步至 AWS S3(可选)
        if s3_bucket and s3_prefix:
            s3_client = boto3.client(
                "s3",
                aws_access_key_id=aws_access_key,
                aws_secret_access_key=aws_secret_key
            )
            s3_key = f"{s3_prefix}/{backup_filename}"
            print(f"开始同步至 S3:{local_backup_path} → s3://{s3_bucket}/{s3_key}")
            
            s3_client.upload_file(
                Filename=local_backup_path,
                Bucket=s3_bucket,
                Key=s3_key
            )
            print("S3 同步完成!")
    
    except Exception as e:
        print(f"备份失败:{str(e)}")

# 使用示例
backup_data(
    source_dir="data_to_backup",  # 需要备份的目录
    local_backup_dir="local_backups",  # 本地备份存储目录
    s3_bucket="your-s3-bucket",  # S3 桶名(可选)
    s3_prefix="data_backups",  # S3 存储前缀(可选)
    aws_access_key="your-aws-access-key",  # AWS 访问密钥(可选)
    aws_secret_key="your-aws-secret-key"  # AWS 密钥(可选)
)

# 定时执行建议:结合 crontab(Linux)或任务计划程序(Windows),例如每天凌晨 2 点执行

原文链接:https://www.kdnuggets.com/5-useful-python-scripts-for-busy-data-engineers

网站页脚示例