SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操

复制
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>canal-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>canal-demo</name> <description>Demo project for Spring Boot with Canal</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.

application.properties

复制
# 数据源配置 spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver # Canal配置 canal.server.ip=127.0.0.1 canal.port=11111 canal.destination=example1.2.3.4.5.6.7.8.9.10.

交易实体类

复制
package com.example.canaldemo.model; import lombok.Data; @Data public class Transaction { private Long id; // 主键ID private String transactionId; // 交易ID private Double amount; // 交易金额 private String status; // 交易状态 }1.2.3.4.5.6.7.8.9.10.11.

create table

复制
CREATE TABLE transaction ( id BIGINT AUTO_INCREMENT PRIMARY KEY, transaction_id VARCHAR(50) NOT NULL, amount DECIMAL(18, 2) NOT NULL, status VARCHAR(20) NOT NULL );1.2.3.4.5.6.

交易Mapper接口

复制
package com.example.canaldemo.mapper; import com.example.canaldemo.model.Transaction; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Update; /** * 交易Mapper接口 */ @Mapper public interface TransactionMapper { /** * 插入一条新的交易记录 * * @param transaction 交易对象 */ @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})") void insert(@Param("transaction") Transaction transaction); /** * 更新一条交易记录 * * @param transaction 交易对象 */ @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}") void update(@Param("transaction") Transaction transaction); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.

Canal监听器类

复制
package com.example.canaldemo.listener; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.example.canaldemo.mapper.TransactionMapper; import com.example.canaldemo.model.Transaction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetSocketAddress; import java.util.List; /** * Canal监听器类,用于监听数据库的变化并进行相应的处理 */ @Component public class CanalListener { private final String destination = "example"; // 这个值需要与Canal配置中的destination一致 private final String serverIp = "127.0.0.1"; private final int port = 11111; @Autowired private TransactionMapper transactionMapper; /** * 在Bean初始化后启动Canal监听器 */ @PostConstruct public void start() { // 创建Canal连接器 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", ""); try { // 连接到Canal服务器 connector.connect(); // 订阅所有数据库的所有表 connector.subscribe(".*\\..*"); // 回滚到上次中断的位置 connector.rollback(); while (true) { // 获取一批消息,最多100条 Message message = connector.getWithoutAck(100); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // 如果没有消息,则等待1秒 Thread.sleep(1000); } else { // 处理消息 processMessage(message.getEntries()); } // 提交确认 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { // 断开连接 connector.disconnect(); } } /** * 处理Canal发送过来的消息 * * @param entryList 消息列表 */ private void processMessage(List<CanalEntry.Entry> entryList) { for (CanalEntry.Entry entry : entryList) { // 忽略事务开始和结束事件 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage; try { // 解析RowChange数据 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); // 打印日志 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); // 处理每一行数据变化 for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { Transaction transaction = convertToTransaction(rowData.getAfterColumnsList()); if (eventType == CanalEntry.EventType.DELETE) { // 处理删除事件(如果需要) } elseif (eventType == CanalEntry.EventType.INSERT) { // 插入新记录 transactionMapper.insert(transaction); } else { // 更新现有记录 transactionMapper.update(transaction); } } } } /** * 将Canal列数据转换为Transaction对象 * * @param columns 列数据列表 * @return 转换后的Transaction对象 */ private Transaction convertToTransaction(List<CanalEntry.Column> columns) { Transaction transaction = new Transaction(); for (CanalEntry.Column column : columns) { switch (column.getName()) { case"id": transaction.setId(Long.parseLong(column.getValue())); break; case"transaction_id": transaction.setTransactionId(column.getValue()); break; case"amount": transaction.setAmount(Double.parseDouble(column.getValue())); break; case"status": transaction.setStatus(column.getValue()); break; } } return transaction; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.

Application

复制
package com.example.canaldemo; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @MapperScan("com.example.canaldemo.mapper") public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }1.2.3.4.5.6.7.8.9.10.11.12.13.

测试

插入一条交易记录

复制
curl -X POST http://localhost:8080/api/transactions \ -H "Content-Type: application/json" \ -d {"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}1.2.3.

更新一条交易记录

复制
curl -X PUT http://localhost:8080/api/transactions/TX123 \ -H "Content-Type: application/json" \ -d {"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}1.2.3.

观察后台日志

复制
================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT id : 1 update=true transaction_id : TX123 update=true amount : 100.00 update=true status : PENDING update=true ================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE -------> before id : 1 update=false transaction_id : TX123 update=false amount : 100.00 update=false status : PENDING update=false -------> after id : 1 update=false transaction_id : TX123 update=false amount : 100.00 update=false status : COMPLETED update=true1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

阅读剩余
THE END