MySQL同步ES的五种方案!

前言

有些小伙伴在工作中可能遇到过数据库查询慢的问题,特别是模糊查询和复杂聚合查询,这时候引入ES(Elasticsearch)作为搜索引擎是个不错的选择。

今天我们来聊聊MySQL同步到ES(Elasticsearch)的5种常见方案。

希望对你会有所帮助。

一、为什么需要MySQL同步到ES?

在我们深入讨论方案之前,先明确一下为什么需要将MySQL数据同步到ES:

全文搜索能力:ES提供强大的全文搜索功能,远超MySQL的LIKE查询。复杂聚合分析:ES支持复杂的聚合查询,适合大数据分析。高性能查询:ES的倒排索引设计使查询速度极快。水平扩展:ES天生支持分布式,易于水平扩展。

先来看一下整体的同步架构图:

图片

接下来,我们详细分析每种方案的实现原理和优缺点。

二、方案一:双写方案

双写方案是最直接的同步方式,即在业务代码中同时向MySQL和ES写入数据。

示例代码:

复制
@Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private ElasticsearchTemplate elasticsearchTemplate; @Transactional public void addUser(User user) { // 写入MySQL userMapper.insert(user); // 写入Elasticsearch IndexQuery indexQuery = new IndexQueryBuilder() .withObject(user) .withId(user.getId().toString()) .build(); elasticsearchTemplate.index(indexQuery); } @Transactional public void updateUser(User user) { // 更新MySQL userMapper.updateById(user); // 更新Elasticsearch IndexRequest request = new IndexRequest("user_index") .id(user.getId().toString()) .source(JSON.toJSONString(user), XContentType.JSON); elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.

优缺点分析

优点:

实现简单,不需要引入额外组件实时性高,数据立即同步

缺点:

数据一致性难保证,需要处理分布式事务问题代码侵入性强,业务逻辑复杂性能受影响,每次写操作都要等待ES响应

适用场景

适合数据量不大,对实时性要求高,且能够接受一定数据不一致的业务场景。

三、方案二:定时任务方案

定时任务方案通过定期扫描MySQL数据变化来同步到ES。

示例代码:

复制
@Component public class UserSyncTask { @Autowired private UserMapper userMapper; @Autowired private UserESRepository userESRepository; // 每5分钟执行一次 @Scheduled(fixedRate = 5 * 60 * 1000) public void syncUserToES() { // 查询最近更新的数据 Date lastSyncTime = getLastSyncTime(); List<User> updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime); // 同步到ES for (User user : updatedUsers) { userESRepository.save(user); } // 更新最后同步时间 updateLastSyncTime(new Date()); } // 获取最后同步时间 private Date getLastSyncTime() { // 从数据库或Redis中获取 // ... } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.

数据更新追踪策略

为了提高同步效率,通常需要设计良好的数据变更追踪机制:

图片

优缺点分析

优点:

实现简单,不需要修改现有业务代码对数据库压力可控,可以调整同步频率

缺点:

实时性差,数据同步有延迟可能遗漏数据,如果系统崩溃会丢失部分数据扫描全表可能对数据库造成压力

适用场景

适合对实时性要求不高,数据变更不频繁的场景。

四、方案三:Binlog同步方案

Binlog是MySQL的二进制日志,记录了所有数据变更操作。

通过解析Binlog可以实现数据同步。

示例代码:

复制
public class BinlogSyncService { public void startSync() { BinaryLogClient client = new BinaryLogClient("localhost", 3306, "username", "password"); client.registerEventListener(new BinaryLogClient.EventListener() { @Override public void onEvent(Event event) { EventData eventData = event.getData(); if (eventData instanceof WriteRowsEventData) { // 插入操作 WriteRowsEventData writeData = (WriteRowsEventData) eventData; processInsertEvent(writeData); } elseif (eventData instanceof UpdateRowsEventData) { // 更新操作 UpdateRowsEventData updateData = (UpdateRowsEventData) eventData; processUpdateEvent(updateData); } elseif (eventData instanceof DeleteRowsEventData) { // 删除操作 DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData; processDeleteEvent(deleteData); } } }); client.connect(); } private void processInsertEvent(WriteRowsEventData eventData) { // 处理插入事件,同步到ES for (Serializable[] row : eventData.getRows()) { User user = convertRowToUser(row); syncToElasticsearch(user, "insert"); } } private void syncToElasticsearch(User user, String operation) { // 同步到ES的实现 // ... } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.

优缺点分析

优点:

实时性高,几乎实时同步对业务代码无侵入,不需要修改现有代码性能好,不影响数据库性能

缺点:

实现复杂,需要解析Binlog格式需要考虑Binlog格式变更的兼容性问题主从切换时可能需要重新同步

适用场景

适合对实时性要求高,数据量大的场景。

五、方案四:Canal方案

Canal是阿里巴巴开源的MySQL Binlog增量订阅&消费组件,简化了Binlog同步的复杂性。

示例代码:

复制
# canal.properties 配置 canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=username canal.instance.dbPassword=password canal.instance.connectionCharset=UTF-8 canal.instance.filter.regex=.*\\..*1.2.3.4.5.6.
复制
public class CanalClientExample { public static void main(String[] args) { // 创建Canal连接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); try { connector.connect(); connector.subscribe(".*\\..*"); while (true) { Message message = connector.getWithoutAck(100); long batchId = message.getId(); if (batchId != -1 && !message.getEntries().isEmpty()) { processEntries(message.getEntries()); connector.ack(batchId); // 提交确认 } Thread.sleep(1000); } } finally { connector.disconnect(); } } private static void processEntries(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (rowChange.getEventType() == CanalEntry.EventType.INSERT) { processInsert(rowData); } elseif (rowChange.getEventType() == CanalEntry.EventType.UPDATE) { processUpdate(rowData); } elseif (rowChange.getEventType() == CanalEntry.EventType.DELETE) { processDelete(rowData); } } } } } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.

架构设计

Calan方案的架构如下:

图片

优缺点分析

优点:

实时性高,延迟低对业务系统无侵入阿里巴巴开源项目,社区活跃

缺点:

需要部署维护Canal服务器需要处理网络分区和故障恢复可能产生数据重复同步问题

适用场景

适合大数据量、高实时性要求的场景,且有专门团队维护中间件。

六、方案五:MQ异步方案

MQ异步方案通过消息队列解耦MySQL和ES的同步过程,提高系统的可靠性和扩展性。

示例代码:

复制
@Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private RabbitTemplate rabbitTemplate; @Transactional public void addUser(User user) { // 写入MySQL userMapper.insert(user); // 发送消息到MQ rabbitTemplate.convertAndSend("user.exchange", "user.add", user); } @Transactional public void updateUser(User user) { // 更新MySQL userMapper.updateById(user); // 发送消息到MQ rabbitTemplate.convertAndSend("user.exchange", "user.update", user); } } @Component public class UserMQConsumer { @Autowired private UserESRepository userESRepository; @RabbitListener(queues = "user.queue") public void processUserAdd(User user) { userESRepository.save(user); } @RabbitListener(queues = "user.queue") public void processUserUpdate(User user) { userESRepository.save(user); } @RabbitListener(queues = "user.queue") public void processUserDelete(Long userId) { userESRepository.deleteById(userId); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.

消息队列选型对比

不同的消息队列产品有不同特点,下面是常见MQ的对比:

图片

优缺点分析

优点:

完全解耦,MySQL和ES同步过程相互独立高可用,MQ本身提供消息持久化和重试机制可扩展,可以方便地增加消费者处理消息

缺点:

系统复杂度增加,需要维护MQ集群可能产生消息顺序问题,需要处理消息顺序性数据一致性延迟,依赖于消息消费速度

适用场景

适合大型分布式系统,对可靠性和扩展性要求高的场景。

七、5种方案对比

为了更直观地比较这5种方案,我们来看一个综合对比表格:

方案名称

实时性

数据一致性

系统复杂度

性能影响

适用场景

双写方案

难保证

小规模应用

定时任务

最终一致

非实时场景

Binlog方案

最终一致

大数据量高实时

Canal方案

最终一致

大数据量高实时

MQ异步方案

最终一致

分布式大型系统

选择建议

有些小伙伴在工作中可能会纠结选择哪种方案,这里给出一些建议:

初创项目或小规模系统:可以选择双写方案或定时任务方案,实现简单。中大型系统:建议使用Canal方案或MQ异步方案,保证系统的可靠性和扩展性。大数据量高实时要求:Binlog方案或Canal方案是最佳选择。已有MQ基础设施:优先考虑MQ异步方案,充分利用现有资源。

注意事项

无论选择哪种方案,都需要注意以下几点:

幂等性处理:同步过程需要保证幂等性,防止重复数据。监控告警:建立完善的监控体系,及时发现同步延迟或失败。数据校验:定期校验MySQL和ES中的数据一致性。容错机制:设计良好的故障恢复机制,避免数据丢失。

总结

MySQL同步到ES(Elasticsearch)是现代应用开发中常见的需求,选择合适的同步方案对系统性能和可靠性至关重要。

本文介绍了5种常见方案,各有优缺点,适用于不同场景。

在实际项目中,可能需要根据具体需求组合使用多种方案,或者对某种方案进行定制化改造。

重要的是要理解每种方案的原理和特点,才能做出合理的技术选型。

阅读剩余
THE END