IMS系统数据归档与清理策略完全指南

随着 IMS 系统长时间运行,数据量会不断增长。合理的数据归档与清理策略既能保证系统性能,又能满足数据合规要求。本文将详细介绍数据生命周期管理和自动化清理方案。

数据生命周期概述

IMS 系统中的数据通常分为以下几个阶段:

  • 热数据:最近 30 天的数据,需要频繁查询,存储在高性能存储中
  • 温数据:31-90 天的数据,偶尔查询,可使用普通存储
  • 冷数据:91 天 - 1 年的数据,很少查询,存储在低成本存储
  • 归档数据:超过 1 年的数据,仅用于审计合规,存储在归档存储

数据分类表设计

首先,我们需要设计支持数据分级的表结构:

/* 数据分类配置表 */
CREATE TABLE data_retention_policy (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    table_name VARCHAR(64) NOT NULL,        /* 表名 */
    column_name VARCHAR(64) NOT NULL,       /* 时间字段名 */
    hot_days INT DEFAULT 30,                 /* 热数据天数 */
    warm_days INT DEFAULT 90,                /* 温数据天数 */
    cold_days INT DEFAULT 365,               /* 冷数据天数 */
    archive_days INT DEFAULT 1825,           /* 归档天数(5年) */
    delete_after_archive TINYINT(1) DEFAULT 0,  /* 归档后是否删除 */
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    UNIQUE KEY uk_table (table_name)
);

/* 插入默认策略 */
INSERT INTO data_retention_policy (table_name, column_name) VALUES
('ims_order', 'created_at'),
('ims_log', 'log_time'),
('ims_audit_log', 'created_at'),
('ims_operation_log', 'operate_time'),
('ims_file', 'upload_time');

归档任务调度设计

/* data_archive_scheduler.py - 数据归档调度器 */
import schedule
import time
from datetime import datetime, timedelta

class DataArchiveScheduler:
    def __init__(self, db_config):
        self.db_config = db_config
        self.conn = None

    def connect(self):
        """连接数据库"""
        # MySQL 连接逻辑
        pass

    def get_retention_policy(self, table_name):
        """获取表的保留策略"""
        with self.conn.cursor() as cursor:
            cursor.execute(
                "SELECT * FROM data_retention_policy WHERE table_name = %s",
                (table_name,)
            )
            return cursor.fetchone()

    def archive_to_cold(self, table_name, column_name, cold_days):
        """将数据迁移到冷存储"""
        cutoff_date = datetime.now() - timedelta(days=cold_days)

        # 1. 导出到文件
        export_sql = f"""
SELECT * INTO OUTFILE '/backup/{table_name}_{{date}}.csv'
FROM {table_name}
WHERE {column_name} < '{cutoff_date.strftime('%Y-%m-%d')}'
"""

        # 2. 标记为已归档
        # ...

    def delete_from_hot(self, table_name, column_name, days):
        """从热数据中删除"""
        cutoff_date = datetime.now() - timedelta(days=days)

        with self.conn.cursor() as cursor:
            cursor.execute(
                f"DELETE FROM {table_name} WHERE {column_name} < %s",
                (cutoff_date,)
            )
            self.conn.commit()
            return cursor.rowcount

    def run_archive_task(self):
        """执行归档任务"""
        with self.conn.cursor() as cursor:
            cursor.execute("SELECT table_name FROM data_retention_policy")
            tables = cursor.fetchall()

        for (table_name,) in tables:
            policy = self.get_retention_policy(table_name)
            if policy:
                print(f"Processing {table_name}...")
                # 处理归档
                # 处理删除

    def start(self):
        """启动调度器"""
        schedule.every().day.at("02:00").do(self.run_archive_task)

        while True:
            schedule.run_pending()
            time.sleep(60)

分区表设计

使用分区表可以极大提升数据清理效率:

/* 使用 Range 分区表的订单表 */
CREATE TABLE ims_order (
    id BIGINT PRIMARY KEY,
    order_no VARCHAR(32) NOT NULL,
    customer_id BIGINT NOT NULL,
    amount DECIMAL(12,2),
    status TINYINT,
    created_at DATETIME NOT NULL,
    updated_at DATETIME,

    INDEX idx_created (created_at),
    INDEX idx_customer (customer_id)
)
PARTITION BY RANGE (TO_DAYS(created_at)) (
    /* 按月分区 */
    PARTITION p202605 VALUES LESS THAN (TO_DAYS('2026-06-01')),
    PARTITION p202606 VALUES LESS THAN (TO_DAYS('2026-07-01')),
    PARTITION p202607 VALUES LESS THAN (TO_DAYS('2026-08-01')),
    /* 更多分区... */
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

/* 清理旧分区示例 */
-- 删除 2025 年的数据分区
ALTER TABLE ims_order DROP PARTITION p202501;

增量清理策略

对于大数据量的表,需要采用增量清理避免长时间锁表:

/* incremental_cleanup.py - 增量清理脚本 */
import pymysql
from datetime import datetime, timedelta

class IncrementalCleanup:
    BATCH_SIZE = 5000  # 每批删除数量
    MAX_RETRY = 3

    def __init__(self, db_config):
        self.db_config = db_config

    def cleanup_table(self, table_name, column_name, days):
        """增量清理表数据"""
        conn = pymysql.connect(**self.db_config)

        try:
            cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
            total_deleted = 0

            while True:
                with conn.cursor() as cursor:
                    # 分批删除
                    sql = f"""
DELETE FROM {table_name}
WHERE {column_name} < '{cutoff_date}'
LIMIT {self.BATCH_SIZE}
"""
                    cursor.execute(sql)
                    deleted = cursor.rowcount

                    if deleted == 0:
                        break

                    total_deleted += deleted
                    conn.commit()

                    print(f"Deleted {deleted} rows from {table_name}")

                    # 让出时间给其他操作
                    import time
                    time.sleep(0.1)

            print(f"{table_name}: Total deleted {total_deleted} rows")

        finally:
            conn.close()

    def run_cleanup(self):
        """执行清理任务"""
        tasks = [
            ("ims_order", "created_at", 365),
            ("ims_log", "log_time", 90),
            ("ims_audit_log", "created_at", 730),
        ]

        for table, column, days in tasks:
            try:
                self.cleanup_table(table, column, days)
            except Exception as e:
                print(f"Error cleaning {table}: {e}")

日志清理策略

/* 日志表清理策略 */
CREATE TABLE ims_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    level VARCHAR(10),
    message TEXT,
    logger VARCHAR(128),
    log_time DATETIME NOT NULL,
    INDEX idx_log_time (log_time),
    INDEX idx_level (level)
) ENGINE=InnoDB;

/* 清理策略 */
-- 错误日志保留 90 天
DELETE FROM ims_log
WHERE level = 'ERROR'
AND log_time < DATE_SUB(NOW(), INTERVAL 90 DAY);

-- 警告日志保留 60 天
DELETE FROM ims_log
WHERE level = 'WARN'
AND log_time < DATE_SUB(NOW(), INTERVAL 60 DAY);

-- 信息日志保留 30 天
DELETE FROM ims_log
WHERE level IN ('INFO', 'DEBUG')
AND log_time < DATE_SUB(NOW(), INTERVAL 30 DAY);

归档文件管理

/* 文件归档脚本 */
import os
import shutil
from datetime import datetime, timedelta

def archive_old_files(upload_dir, archive_dir, days=365):
    """归档旧的上传文件"""
    cutoff = datetime.now() - timedelta(days=days)

    for root, dirs, files in os.walk(upload_dir):
        for filename in files:
            filepath = os.path.join(root, filename)
            mtime = datetime.fromtimestamp(os.path.getmtime(filepath))

            if mtime < cutoff:
                # 移动到归档目录
                rel_path = os.path.relpath(filepath, upload_dir)
                dest_path = os.path.join(archive_dir, rel_path)

                # 确保目标目录存在
                os.makedirs(os.path.dirname(dest_path), exist_ok=True)

                shutil.move(filepath, dest_path)
                print(f"Archived: {filepath} -> {dest_path}")

/* 压缩归档文件 */
def compress_archive(archive_dir):
    """压缩归档目录"""
    for root, dirs, files in os.walk(archive_dir):
        for filename in files:
            if not filename.endswith('.gz'):
                filepath = os.path.join(root, filename)
                # 使用 gzip 压缩
                # ...

重要提示

  • 在执行删除操作前务必先备份数据
  • 选择业务低峰期执行清理任务
  • 定期验证归档数据的完整性和可用性
  • 遵守数据保护法规,重要数据不可随意删除
  • 记录所有清理操作,便于审计追溯

总结

数据归档与清理是 IMS 系统运维的重要环节。通过合理的数据生命周期管理、分区表设计、增量清理策略,可以在保证系统性能的同时满足数据合规要求。建议定期 review 清理策略,根据实际业务需求调整保留周期。