3个月前,我接到一个棘手的任务:
- 1000亿订单数据迁移
- 5个数据中心同步
- 99.99%数据一致性要求
- 0业务影响
今天,我要分享5种方案的实战对比!
一、同步双写:最简单但最危险
数据同步双写
实现方案
@Transactional(rollbackFor = Exception.class)
public void saveOrder(Order order) {
// 写入主库
mysqlOrderMapper.insert(order);
try {
// 写入目标库
targetOrderMapper.insert(order);
} catch (Exception e) {
// 记录失败并告警
alarmService.sendAlarm("数据同步失败: " + order.getId());
throw e;
}
}
实战结果
RT上升400%
主库压力翻倍
频繁事务回滚
果断放弃!
二、异步消息队列:分布式系统的标配
异步MQ
架构设计
应用服务 -> RocketMQ集群 -> 消费者集群 -> 目标库
|
-> 主库
核心代码
@Service
public class AsyncDataSyncService {
@Transactional
public void syncData(Order order) {
// 1. 主库写入
orderMapper.insert(order);
// 2. 发送同步消息
Message msg = new Message("SYNC_TOPIC", order.toBytes());
// 3. 设置消息延迟级别,防止主库事务还未提交
msg.setDelayTimeLevel(2);
rocketMQTemplate.asyncSend(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("消息发送成功: {}", order.getId());
}
@Override
public void onException(Throwable e) {
// 写入重试队列
retryQueue.offer(order);
alarmService.sendAlarm("消息发送失败");
}
});
}
}
实战效果
RT影响小于5%
支持削峰填谷
故障隔离
一致性延迟高
三、Binlog实时同步:互联网企业的首选
canal
binlog架构
架构设计
MySQL集群 -> Canal集群 -> Kafka集群 -> Flink处理 -> 目标库集群
|
-> 监控告警
核心实现
@Component
public class BinlogSyncService {
private final CanalConnector connector;
private final KafkaTemplate kafkaTemplate;
public void startSync() {
connector.connect();
connector.subscribe(".*\\.order");
while (running) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
try {
// 解析binlog
List entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
// 发送到Kafka
publishToKafka(entry);
}
}
// 确认消息
connector.ack(batchId);
} catch (Exception e) {
// 回滚位点
connector.rollback(batchId);
handleError(message, e);
}
}
}
private void publishToKafka(CanalEntry.Entry entry) {
// 解析binlog数据
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
// 构建同步消息
SyncMessage msg = SyncMessage.builder()
.table(entry.getHeader().getTableName())
.type(rowChange.getEventType())
.before(convertToJson(rowData.getBeforeColumnsList()))
.after(convertToJson(rowData.getAfterColumnsList()))
.build();
// 发送到Kafka
kafkaTemplate.send("sync_topic", msg.toJson());
}
}
}
实战效果
零代码侵入
毫秒级延迟
支持高并发
最终采用方案!
四、数据库直接同步:传统企业的选择
架构设计
源库 -> 定时任务 -> 批量读取 -> 批量写入 -> 目标库
|
-> 位点管理
核心代码
@Service
public class DatabaseSyncService {
@Scheduled(fixedRate = 5000)
public void syncData() {
// 获取同步位点
long lastId = checkPointService.getLastId();
// 分批查询数据
while (true) {
List orders = sourceMapper.queryBatch(lastId, BATCH_SIZE);
if (orders.isEmpty()) {
break;
}
try {
// 批量写入目标库
targetMapper.batchInsert(orders);
// 更新位点
lastId = orders.get(orders.size() - 1).getId();
checkPointService.updateLastId(lastId);
} catch (Exception e) {
handleSyncError(orders, e);
break;
}
}
}
}
实战效果
简单易维护
支持大批量
实时性差
资源消耗大
五、阿里云DTS:省心但不省钱
特点:
开箱即用
多数据源支持
运维成本低
费用高昂
定制性差
最终方案对比
方案 | 延迟 | 一致性 | 成本 | 推荐指数 |
同步双写 | 无 | 强 | 低 | |
消息队列 | 秒级 | 最终 | 中 | |
Binlog | 毫秒级 | 最终 | 中 | |
直接同步 | 分钟级 | 最终 | 低 | |
DTS | 秒级 | 最终 | 高 |
实战建议
- 选型考虑因素 数据量级 实时性要求 成本预算 运维能力
- 监控必备指标 同步延迟 数据一致性 资源使用率 错误告警
写在最后
通过这个项目,我们积累了宝贵的经验:
- 技术选型要全面评估
- 监控告警要提前到位
- 应急预案要实际演练
- 文档规范要同步跟进
#系统架构 #数据同步 #实战经验 #性能优化