程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

5种数据同步方案,我的成功同步1000亿数据的实战经验!

balukai 2025-03-18 10:10:34 文章精选 13 ℃

3个月前,我接到一个棘手的任务:

  • 1000亿订单数据迁移
  • 5个数据中心同步
  • 99.99%数据一致性要求
  • 0业务影响

今天,我要分享5种方案的实战对比



一、同步双写:最简单但最危险

数据同步双写

实现方案

@Transactional(rollbackFor = Exception.class)
public void saveOrder(Order order) {
    // 写入主库
    mysqlOrderMapper.insert(order);
    try {
        // 写入目标库
        targetOrderMapper.insert(order);
    } catch (Exception e) {
        // 记录失败并告警
        alarmService.sendAlarm("数据同步失败: " + order.getId());
        throw e;
    }
}

实战结果

RT上升400%

主库压力翻倍

频繁事务回滚

果断放弃!




二、异步消息队列:分布式系统的标配


异步MQ

架构设计

应用服务 -> RocketMQ集群 -> 消费者集群 -> 目标库
         |
         -> 主库


核心代码

@Service
public class AsyncDataSyncService {
    
    @Transactional
    public void syncData(Order order) {
        // 1. 主库写入
        orderMapper.insert(order);
        
        // 2. 发送同步消息
        Message msg = new Message("SYNC_TOPIC", order.toBytes());
        // 3. 设置消息延迟级别,防止主库事务还未提交
        msg.setDelayTimeLevel(2);
        
        rocketMQTemplate.asyncSend(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult result) {
                log.info("消息发送成功: {}", order.getId());
            }
            
            @Override
            public void onException(Throwable e) {
                // 写入重试队列
                retryQueue.offer(order);
                alarmService.sendAlarm("消息发送失败");
            }
        });
    }
}


实战效果

RT影响小于5%

支持削峰填谷

故障隔离

一致性延迟高




三、Binlog实时同步:互联网企业的首选



canal



binlog架构



架构设计

MySQL集群 -> Canal集群 -> Kafka集群 -> Flink处理 -> 目标库集群
                          |
                          -> 监控告警


核心实现

@Component
public class BinlogSyncService {
    
    private final CanalConnector connector;
    private final KafkaTemplate kafkaTemplate;
    
    public void startSync() {
        connector.connect();
        connector.subscribe(".*\\.order");
        
        while (running) {
            Message message = connector.getWithoutAck(BATCH_SIZE);
            long batchId = message.getId();
            try {
                // 解析binlog
                List entries = message.getEntries();
                for (CanalEntry.Entry entry : entries) {
                    if (entry.getEntryType() == EntryType.ROWDATA) {
                        // 发送到Kafka
                        publishToKafka(entry);
                    }
                }
                // 确认消息
                connector.ack(batchId);
            } catch (Exception e) {
                // 回滚位点
                connector.rollback(batchId);
                handleError(message, e);
            }
        }
    }
    
    private void publishToKafka(CanalEntry.Entry entry) {
        // 解析binlog数据
        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
        
        for (RowData rowData : rowChange.getRowDatasList()) {
            // 构建同步消息
            SyncMessage msg = SyncMessage.builder()
                .table(entry.getHeader().getTableName())
                .type(rowChange.getEventType())
                .before(convertToJson(rowData.getBeforeColumnsList()))
                .after(convertToJson(rowData.getAfterColumnsList()))
                .build();
                
            // 发送到Kafka
            kafkaTemplate.send("sync_topic", msg.toJson());
        }
    }
}


实战效果


零代码侵入

毫秒级延迟

支持高并发

最终采用方案!




四、数据库直接同步:传统企业的选择


架构设计

源库 -> 定时任务 -> 批量读取 -> 批量写入 -> 目标库
                    |
                    -> 位点管理

核心代码

@Service
public class DatabaseSyncService {
    
    @Scheduled(fixedRate = 5000)
    public void syncData() {
        // 获取同步位点
        long lastId = checkPointService.getLastId();
        
        // 分批查询数据
        while (true) {
            List orders = sourceMapper.queryBatch(lastId, BATCH_SIZE);
            if (orders.isEmpty()) {
                break;
            }
            
            try {
                // 批量写入目标库
                targetMapper.batchInsert(orders);
                // 更新位点
                lastId = orders.get(orders.size() - 1).getId();
                checkPointService.updateLastId(lastId);
            } catch (Exception e) {
                handleSyncError(orders, e);
                break;
            }
        }
    }
}


实战效果


简单易维护

支持大批量

实时性差

资源消耗大




五、阿里云DTS:省心但不省钱

特点:

开箱即用

多数据源支持

运维成本低

费用高昂

定制性差



最终方案对比


方案

延迟

一致性

成本

推荐指数

同步双写

消息队列

秒级

最终

Binlog

毫秒级

最终

直接同步

分钟级

最终

DTS

秒级

最终


实战建议

  1. 选型考虑因素 数据量级 实时性要求 成本预算 运维能力
  2. 监控必备指标 同步延迟 数据一致性 资源使用率 错误告警

写在最后

通过这个项目,我们积累了宝贵的经验:

  • 技术选型要全面评估
  • 监控告警要提前到位
  • 应急预案要实际演练
  • 文档规范要同步跟进

#系统架构 #数据同步 #实战经验 #性能优化

最近发表
标签列表