随着 IMS 系统长时间运行,数据量会不断增长。合理的数据归档与清理策略既能保证系统性能,又能满足数据合规要求。本文将详细介绍数据生命周期管理和自动化清理方案。
数据生命周期概述
IMS 系统中的数据通常分为以下几个阶段:
- 热数据:最近 30 天的数据,需要频繁查询,存储在高性能存储中
- 温数据:31-90 天的数据,偶尔查询,可使用普通存储
- 冷数据:91 天 - 1 年的数据,很少查询,存储在低成本存储
- 归档数据:超过 1 年的数据,仅用于审计合规,存储在归档存储
数据分类表设计
首先,我们需要设计支持数据分级的表结构:
/* 数据分类配置表 */
CREATE TABLE data_retention_policy (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
table_name VARCHAR(64) NOT NULL, /* 表名 */
column_name VARCHAR(64) NOT NULL, /* 时间字段名 */
hot_days INT DEFAULT 30, /* 热数据天数 */
warm_days INT DEFAULT 90, /* 温数据天数 */
cold_days INT DEFAULT 365, /* 冷数据天数 */
archive_days INT DEFAULT 1825, /* 归档天数(5年) */
delete_after_archive TINYINT(1) DEFAULT 0, /* 归档后是否删除 */
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_table (table_name)
);
/* 插入默认策略 */
INSERT INTO data_retention_policy (table_name, column_name) VALUES
('ims_order', 'created_at'),
('ims_log', 'log_time'),
('ims_audit_log', 'created_at'),
('ims_operation_log', 'operate_time'),
('ims_file', 'upload_time');
归档任务调度设计
/* data_archive_scheduler.py - 数据归档调度器 */
import schedule
import time
from datetime import datetime, timedelta
class DataArchiveScheduler:
def __init__(self, db_config):
self.db_config = db_config
self.conn = None
def connect(self):
"""连接数据库"""
# MySQL 连接逻辑
pass
def get_retention_policy(self, table_name):
"""获取表的保留策略"""
with self.conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM data_retention_policy WHERE table_name = %s",
(table_name,)
)
return cursor.fetchone()
def archive_to_cold(self, table_name, column_name, cold_days):
"""将数据迁移到冷存储"""
cutoff_date = datetime.now() - timedelta(days=cold_days)
# 1. 导出到文件
export_sql = f"""
SELECT * INTO OUTFILE '/backup/{table_name}_{{date}}.csv'
FROM {table_name}
WHERE {column_name} < '{cutoff_date.strftime('%Y-%m-%d')}'
"""
# 2. 标记为已归档
# ...
def delete_from_hot(self, table_name, column_name, days):
"""从热数据中删除"""
cutoff_date = datetime.now() - timedelta(days=days)
with self.conn.cursor() as cursor:
cursor.execute(
f"DELETE FROM {table_name} WHERE {column_name} < %s",
(cutoff_date,)
)
self.conn.commit()
return cursor.rowcount
def run_archive_task(self):
"""执行归档任务"""
with self.conn.cursor() as cursor:
cursor.execute("SELECT table_name FROM data_retention_policy")
tables = cursor.fetchall()
for (table_name,) in tables:
policy = self.get_retention_policy(table_name)
if policy:
print(f"Processing {table_name}...")
# 处理归档
# 处理删除
def start(self):
"""启动调度器"""
schedule.every().day.at("02:00").do(self.run_archive_task)
while True:
schedule.run_pending()
time.sleep(60)
分区表设计
使用分区表可以极大提升数据清理效率:
/* 使用 Range 分区表的订单表 */
CREATE TABLE ims_order (
id BIGINT PRIMARY KEY,
order_no VARCHAR(32) NOT NULL,
customer_id BIGINT NOT NULL,
amount DECIMAL(12,2),
status TINYINT,
created_at DATETIME NOT NULL,
updated_at DATETIME,
INDEX idx_created (created_at),
INDEX idx_customer (customer_id)
)
PARTITION BY RANGE (TO_DAYS(created_at)) (
/* 按月分区 */
PARTITION p202605 VALUES LESS THAN (TO_DAYS('2026-06-01')),
PARTITION p202606 VALUES LESS THAN (TO_DAYS('2026-07-01')),
PARTITION p202607 VALUES LESS THAN (TO_DAYS('2026-08-01')),
/* 更多分区... */
PARTITION pmax VALUES LESS THAN MAXVALUE
);
/* 清理旧分区示例 */
-- 删除 2025 年的数据分区
ALTER TABLE ims_order DROP PARTITION p202501;
增量清理策略
对于大数据量的表,需要采用增量清理避免长时间锁表:
/* incremental_cleanup.py - 增量清理脚本 */
import pymysql
from datetime import datetime, timedelta
class IncrementalCleanup:
BATCH_SIZE = 5000 # 每批删除数量
MAX_RETRY = 3
def __init__(self, db_config):
self.db_config = db_config
def cleanup_table(self, table_name, column_name, days):
"""增量清理表数据"""
conn = pymysql.connect(**self.db_config)
try:
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
total_deleted = 0
while True:
with conn.cursor() as cursor:
# 分批删除
sql = f"""
DELETE FROM {table_name}
WHERE {column_name} < '{cutoff_date}'
LIMIT {self.BATCH_SIZE}
"""
cursor.execute(sql)
deleted = cursor.rowcount
if deleted == 0:
break
total_deleted += deleted
conn.commit()
print(f"Deleted {deleted} rows from {table_name}")
# 让出时间给其他操作
import time
time.sleep(0.1)
print(f"{table_name}: Total deleted {total_deleted} rows")
finally:
conn.close()
def run_cleanup(self):
"""执行清理任务"""
tasks = [
("ims_order", "created_at", 365),
("ims_log", "log_time", 90),
("ims_audit_log", "created_at", 730),
]
for table, column, days in tasks:
try:
self.cleanup_table(table, column, days)
except Exception as e:
print(f"Error cleaning {table}: {e}")
日志清理策略
/* 日志表清理策略 */
CREATE TABLE ims_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
level VARCHAR(10),
message TEXT,
logger VARCHAR(128),
log_time DATETIME NOT NULL,
INDEX idx_log_time (log_time),
INDEX idx_level (level)
) ENGINE=InnoDB;
/* 清理策略 */
-- 错误日志保留 90 天
DELETE FROM ims_log
WHERE level = 'ERROR'
AND log_time < DATE_SUB(NOW(), INTERVAL 90 DAY);
-- 警告日志保留 60 天
DELETE FROM ims_log
WHERE level = 'WARN'
AND log_time < DATE_SUB(NOW(), INTERVAL 60 DAY);
-- 信息日志保留 30 天
DELETE FROM ims_log
WHERE level IN ('INFO', 'DEBUG')
AND log_time < DATE_SUB(NOW(), INTERVAL 30 DAY);
归档文件管理
/* 文件归档脚本 */
import os
import shutil
from datetime import datetime, timedelta
def archive_old_files(upload_dir, archive_dir, days=365):
"""归档旧的上传文件"""
cutoff = datetime.now() - timedelta(days=days)
for root, dirs, files in os.walk(upload_dir):
for filename in files:
filepath = os.path.join(root, filename)
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff:
# 移动到归档目录
rel_path = os.path.relpath(filepath, upload_dir)
dest_path = os.path.join(archive_dir, rel_path)
# 确保目标目录存在
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.move(filepath, dest_path)
print(f"Archived: {filepath} -> {dest_path}")
/* 压缩归档文件 */
def compress_archive(archive_dir):
"""压缩归档目录"""
for root, dirs, files in os.walk(archive_dir):
for filename in files:
if not filename.endswith('.gz'):
filepath = os.path.join(root, filename)
# 使用 gzip 压缩
# ...
重要提示
- 在执行删除操作前务必先备份数据
- 选择业务低峰期执行清理任务
- 定期验证归档数据的完整性和可用性
- 遵守数据保护法规,重要数据不可随意删除
- 记录所有清理操作,便于审计追溯
总结
数据归档与清理是 IMS 系统运维的重要环节。通过合理的数据生命周期管理、分区表设计、增量清理策略,可以在保证系统性能的同时满足数据合规要求。建议定期 review 清理策略,根据实际业务需求调整保留周期。