数据工程师常需处理重复的数据管道搭建、格式转换等工作,以下 5 个 Python 脚本可直接复用,帮你节省大量时间。
1. 数据格式批量转换脚本(CSV ↔ JSON ↔ Parquet)
该脚本支持将多个文件在 CSV、JSON、Parquet 格式间批量转换,适配不同数据分析工具的格式需求。
python
运行
import pandas as pd
import os
from pathlib import Path
def batch_convert_files(input_dir, output_dir, input_format, output_format):
# 创建输出目录(若不存在)
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 遍历输入目录下所有对应格式文件
for filename in os.listdir(input_dir):
if filename.endswith(f".{input_format.lower()}"):
file_path = os.path.join(input_dir, filename)
output_filename = Path(filename).stem + f".{output_format.lower()}"
output_path = os.path.join(output_dir, output_filename)
# 读取输入文件
try:
if input_format.lower() == "csv":
df = pd.read_csv(file_path)
elif input_format.lower() == "json":
df = pd.read_json(file_path)
elif input_format.lower() == "parquet":
df = pd.read_parquet(file_path)
else:
print(f"不支持的输入格式:{input_format}")
continue
# 写入输出文件
if output_format.lower() == "csv":
df.to_csv(output_path, index=False)
elif output_format.lower() == "json":
df.to_json(output_path, orient="records")
elif output_format.lower() == "parquet":
df.to_parquet(output_path, index=False)
else:
print(f"不支持的输出格式:{output_format}")
continue
print(f"成功转换:{filename} → {output_filename}")
except Exception as e:
print(f"转换失败 {filename}:{str(e)}")
# 使用示例:将 input 文件夹的 CSV 转换为 Parquet 并存入 output 文件夹
batch_convert_files(
input_dir="input_files",
output_dir="output_files",
input_format="CSV",
output_format="Parquet"
)
2. 数据质量校验脚本(缺失值、异常值、重复值检测)
快速校验数据集完整性,生成详细质量报告,避免脏数据流入下游流程。
python
运行
import pandas as pd
import numpy as np
def data_quality_check(df, output_report_path="data_quality_report.txt"):
report = []
report.append("="*50)
report.append("数据质量校验报告")
report.append("="*50)
# 1. 基本信息
report.append(f"\n1. 数据集基本信息:")
report.append(f" - 总行数:{df.shape[0]}")
report.append(f" - 总列数:{df.shape[1]}")
report.append(f" - 数据类型分布:\n{df.dtypes.value_counts().to_string()}")
# 2. 缺失值检测
missing_stats = df.isnull().sum()
missing_percent = (missing_stats / len(df)) * 100
report.append(f"\n2. 缺失值统计:")
for col, missing in missing_stats.items():
if missing > 0:
report.append(f" - {col}:{missing} 个缺失值({missing_percent[col]:.2f}%)")
# 3. 重复值检测
duplicate_count = df.duplicated().sum()
report.append(f"\n3. 重复值统计:")
report.append(f" - 完全重复行:{duplicate_count} 行({duplicate_count/len(df)*100:.2f}%)")
# 4. 数值型字段异常值检测(IQR 方法)
report.append(f"\n4. 数值型字段异常值统计(IQR 法则):")
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)].shape[0]
if outliers > 0:
report.append(f" - {col}:{outliers} 个异常值(范围:[{lower_bound:.2f}, {upper_bound:.2f}])")
# 保存报告
with open(output_report_path, "w", encoding="utf-8") as f:
f.write("\n".join(report))
print(f"质量报告已保存至:{output_report_path}")
return "\n".join(report)
# 使用示例
df = pd.read_csv("your_dataset.csv")
data_quality_check(df)
3. 数据库批量数据迁移脚本(MySQL → PostgreSQL)
实现不同数据库间的数据批量迁移,自动处理字段类型映射,支持增量同步。
python
运行
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
def migrate_data_mysql_to_postgres(
mysql_conn_str, pg_conn_str,
mysql_table, pg_table,
incremental_col=None, last_sync_time=None
):
# 创建数据库连接
mysql_engine = create_engine(mysql_conn_str)
pg_engine = create_engine(pg_conn_str)
try:
# 构建查询语句(支持增量同步)
query = f"SELECT * FROM {mysql_table}"
if incremental_col and last_sync_time:
query += f" WHERE {incremental_col} > '{last_sync_time}'"
print(f"执行增量同步:仅迁移 {incremental_col} > {last_sync_time} 的数据")
# 批量读取 MySQL 数据(避免内存溢出)
chunk_size = 10000
chunk_iter = pd.read_sql(query, mysql_engine, chunksize=chunk_size)
# 批量写入 PostgreSQL
total_rows = 0
for chunk in chunk_iter:
chunk.to_sql(
name=pg_table,
con=pg_engine,
if_exists="append",
index=False,
chunksize=chunk_size
)
total_rows += len(chunk)
print(f"已迁移 {total_rows} 行数据")
print(f"数据迁移完成!共迁移 {total_rows} 行数据至 {pg_table}")
except Exception as e:
print(f"迁移失败:{str(e)}")
finally:
mysql_engine.dispose()
pg_engine.dispose()
# 使用示例
mysql_conn = "mysql+pymysql://username:password@mysql_host:3306/database_name"
pg_conn = "postgresql://username:password@pg_host:5432/database_name"
# 全量迁移
migrate_data_mysql_to_postgres(
mysql_conn_str=mysql_conn,
pg_conn_str=pg_conn,
mysql_table="source_table",
pg_table="target_table"
)
# 增量迁移(基于更新时间字段)
# migrate_data_mysql_to_postgres(
# mysql_conn_str=mysql_conn,
# pg_conn_str=pg_conn,
# mysql_table="source_table",
# pg_table="target_table",
# incremental_col="update_time",
# last_sync_time="2025-01-01 00:00:00"
# )
4. 日志解析与数据提取脚本(结构化日志转 CSV)
从非结构化日志文件中提取关键信息(如时间戳、请求 ID、状态码),转为结构化数据便于分析。
python
运行
import re
import csv
from pathlib import Path
def parse_logs_to_csv(log_file_path, output_csv_path, log_pattern):
# 编译日志匹配正则表达式
regex = re.compile(log_pattern)
# 提取日志字段名(从正则表达式分组中获取)
field_names = regex.groupindex.keys()
# 读取日志并提取数据
parsed_data = []
with open(log_file_path, "r", encoding="utf-8") as log_file:
for line_num, line in enumerate(log_file, 1):
match = regex.match(line.strip())
if match:
parsed_data.append(match.groupdict())
else:
print(f"第 {line_num} 行日志格式不匹配:{line.strip()}")
# 写入 CSV 文件
Path(output_csv_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_csv_path, "w", newline="", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=field_names)
writer.writeheader()
writer.writerows(parsed_data)
print(f"日志解析完成!共提取 {len(parsed_data)} 条结构化数据,保存至 {output_csv_path}")
# 使用示例:解析 Nginx 访问日志
nginx_log_pattern = (
r"(?P<ip>\d+\.\d+\.\d+\.\d+) "
r"-\s+-\s+ "
r"\[(?P<timestamp>.+?)\] "
r'"(?P<method>\w+) (?P<path>.+?) HTTP/\d+\.\d+" '
r"(?P<status_code>\d+) "
r"(?P<response_size>\d+) "
r'"(?P<referer>.+?)" '
r'"(?P<user_agent>.+?)"'
)
parse_logs_to_csv(
log_file_path="access.log",
output_csv_path="parsed_nginx_logs.csv",
log_pattern=nginx_log_pattern
)
5. 定时数据备份脚本(本地 + 云存储同步)
自动备份指定目录下的数据文件,支持本地归档和云存储(以 AWS S3 为例)同步,确保数据安全。
python
运行
import os
import shutil
import boto3
from datetime import datetime
from pathlib import Path
def backup_data(
source_dir, local_backup_dir,
s3_bucket=None, s3_prefix=None,
aws_access_key=None, aws_secret_key=None
):
# 生成备份文件名(带时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"data_backup_{timestamp}.zip"
local_backup_path = os.path.join(local_backup_dir, backup_filename)
# 创建本地备份目录
Path(local_backup_dir).mkdir(parents=True, exist_ok=True)
try:
# 压缩源目录数据
print(f"开始本地备份:{source_dir} → {local_backup_path}")
shutil.make_archive(
base_name=os.path.splitext(local_backup_path)[0],
format="zip",
root_dir=source_dir
)
print("本地备份完成!")
# 同步至 AWS S3(可选)
if s3_bucket and s3_prefix:
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key
)
s3_key = f"{s3_prefix}/{backup_filename}"
print(f"开始同步至 S3:{local_backup_path} → s3://{s3_bucket}/{s3_key}")
s3_client.upload_file(
Filename=local_backup_path,
Bucket=s3_bucket,
Key=s3_key
)
print("S3 同步完成!")
except Exception as e:
print(f"备份失败:{str(e)}")
# 使用示例
backup_data(
source_dir="data_to_backup", # 需要备份的目录
local_backup_dir="local_backups", # 本地备份存储目录
s3_bucket="your-s3-bucket", # S3 桶名(可选)
s3_prefix="data_backups", # S3 存储前缀(可选)
aws_access_key="your-aws-access-key", # AWS 访问密钥(可选)
aws_secret_key="your-aws-secret-key" # AWS 密钥(可选)
)
# 定时执行建议:结合 crontab(Linux)或任务计划程序(Windows),例如每天凌晨 2 点执行
原文链接:https://www.kdnuggets.com/5-useful-python-scripts-for-busy-data-engineers
