企业级系统常需要将业务数据同步到数据仓库、缓存或下游系统。本文介绍 IMS 系统的数据同步方案,包括 CDC(Change Data Capture)技术、Binlog 解析与实时数据管道。
一、CDC 技术概述
CDC(Change Data Capture)通过捕获数据库变更日志,实现数据的实时同步。相比定时轮询,CDC 具有低延迟、低侵入性的优势。
CDC 优势
- 实时性:毫秒级延迟,数据变更即同步
- 低侵入:无需修改业务代码,不影响主业务
- 完整性:不遗漏任何变更,包括 UPDATE、DELETE
- 解耦性:生产系统与消费系统解耦
二、MySQL Binlog 解析
2.1 开启 Binlog
/* my.cnf */ [mysqld] server-id = 1 log_bin = /var/log/mysql/mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 7
2.2 Binlog 事件类型
- QUERY_EVENT:DDL 语句,如 CREATE、ALTER、DROP
- TABLE_MAP_EVENT:表结构元数据
- WRITE_ROWS_EVENT:INSERT 操作
- UPDATE_ROWS_EVENT:UPDATE 操作
- DELETE_ROWS_EVENT:DELETE 操作
三、Canal 中间件
Canal 是阿里巴巴开源的 MySQL Binlog 增量订阅与消费组件。
3.1 Canal 工作原理
/* Canal 架构 */
MySQL --> Canal Server --> Canal Client --> 目标系统
(解析 Binlog) (消费数据) (Redis/ES/数据仓库)
3.2 Canal 配置
/* canal.properties */ canal.destinations = example canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = mysql-bin.000001 canal.instance.master.position = 0 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.filter.regex = .*\\..*
3.3 Canal 客户端示例
/* Java 客户端消费 Canal 数据 */ public class CanalConsumer implements CanalConnector { public void run() { while (true) { connect(); while (hasNext()) { for (Entry entry : getEntries()) { if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChange = parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList()) { // 处理变更数据 if (rowChange.getEventType() == EventType.INSERT) { handleInsert(rowData); } else if (rowChange.getEventType() == EventType.UPDATE) { handleUpdate(rowData); } else if (rowChange.getEventType() == EventType.DELETE) { handleDelete(rowData); } } } } } disconnect(); } } }
四、Debezium 方案
Debezium 是 Red Hat 开源的 CDC 平台,支持 MySQL、PostgreSQL、MongoDB 等多种数据库。
Debezium MySQL 连接器配置
/* debezium-mysql.json */ { "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql.ims.local", "database.port": "3306", "database.user": "debezium", "database.password": "dbz123", "database.server.id": "184054", "database.server.name": "ims-mysql", "database.include.list": "ims", "table.include.list": "ims.user,ims.order,ims.product", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.ims", "include.schema.changes": "true", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } }
五、数据同步场景
5.1 业务库到 Redis 缓存
/* 用户数据同步到 Redis */ // Canal 捕获 user 表变更,同步到 Redis void syncUserToRedis(RowData rowData, EventType eventType) { String userId = getColumnValue(rowData, "id"); switch(eventType) { case INSERT: case UPDATE: User user = buildUserFromRow(rowData); redis.hset("user:info", userId, toJson(user)); break; case DELETE: redis.hdel("user:info", userId); break; } }
5.2 业务库到 Elasticsearch 搜索
/* 订单数据同步到 ES */ // 将订单数据同步到 ES,支持全文检索 void syncOrderToES(RowData rowData, EventType eventType) { String orderId = getColumnValue(rowData, "id"); if (eventType == EventType.DELETE) { esClient.delete("ims-order", orderId); } else { Order order = buildOrderFromRow(rowData); esClient.index("ims-order", orderId, toJson(order)); } }
六、数据一致性保障
- 幂等性:消费端实现幂等,同一消息多次处理结果一致
- 补偿机制:定期比对源库与目标库差异,发现问题自动修复
- 死信队列:处理失败的消息进入死信队列,后续人工处理
- 监控告警:监控同步延迟、错误率,异常及时告警
CDC 实践建议:
- Binlog 格式优先使用 ROW,确保数据准确
- 同步任务幂等设计,防止数据重复
- 设置合理的同步批次大小,平衡延迟与吞吐
- 关键表变更前先在测试环境验证
七、总结
- CDC 技术实现数据库变更的实时捕获
- Canal 适合阿里生态,Debezium 适合云原生场景
- 典型场景:缓存预热、搜索索引更新、数据仓库同步
- 关键保障:幂等性、补偿机制、监控告警