Authored by Gino Zhang

增加 Canal学习笔记 分享

# 1. 工作原理
mysql主备复制实现:
![mysql主备复制实现](http://www.programering.com/images/remote/ZnJvbT1pdGV5ZSZ1cmw9Y0djcTVDTTJJak16UUROM0F6TDRBek14QWpNdlFuYmwxR2FqRkdkMEYyTHQ5Mll1OEdkakZUTnVFelp0bDJMdm9EYzBSSGE.jpg)
从上层来看,复制分成三步:
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理:
![](https://camo.githubusercontent.com/46c626b4cde399db43b2634a7911a04aecf273a0/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333130372f63383762363762612d333934632d333038362d393537372d3964623035626530346339352e6a7067)
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
# 2 架构
![](https://camo.githubusercontent.com/fad39e40f844ae4196035f4007e31ae2bd020616/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333132362f34393535303038352d306364322d333266612d383661362d6636373664623562353937622e6a7067)
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
# 3 知识科普
mysql的Binlay Log介绍
- http://dev.mysql.com/doc/refman/5.5/en/binary-log.html
- http://www.taobaodba.com/html/474_mysqls-binary-log_details.html
简单点说:
- mysql的binlog是多文件存储,定位一个LogEvent需要通过binlog filename + binlog position,进行定位
- mysql的binlog数据格式,按照生成的方式,主要分为:statement-based、row-based、mixed。
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
目前canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)
# 4 EventParser设计
大致过程:
![](http://kaimingwan.com/canal/_image/binlog%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%BC%80%E6%BA%90%E5%B7%A5%E5%85%B7canal%E6%BA%90%E7%A0%81%E6%B5%85%E6%9E%90%EF%BC%883\)%EF%BC%9Aparser%E5%92%8Cprotocol%E6%A8%A1%E5%9D%97/14-14-42.jpg)
整个parser过程大致可分为几步:
1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
2. Connection建立链接,发送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
3. Mysql开始推送Binaly Log
4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
// 补充字段名字,字段类型,主键信息,unsigned类型处理
5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
6. 存储成功后,定时记录Binaly Log位置
mysql的Binlay Log网络协议:
![](https://camo.githubusercontent.com/5bb9d62abfd7c35302de1a1e159a77d9d7395620/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333137332f36333861383361652d336235632d336638332d393732322d3263313931326537636163362e706e67)
说明:
- 图中的协议4byte header,主要是描述整个binlog网络包的length
- binlog event structure,详细信息请参考: http://forge.mysql.com/wiki/MySQL_Internals_Binary_Log
# 5 EventSink设计
![](https://camo.githubusercontent.com/77829ab83df80538641d75e074cdcd68752f4c89/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333231342f30613266643637312d643665392d336564372d623131302d3661336237333861336362302e6a7067)
说明:
- 数据过滤:支持通配符的过滤模式,表名,字段内容等
- 数据路由/分发:解决1:n (1个parser对应多个store的模式)
- 数据归并:解决n:1 (多个parser对应1个store)
- 数据加工:在进入store之前进行额外的处理,比如join
##### 数据1:n业务
为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。
所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注
##### 数据n:1业务
同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。
所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.
# 6 EventStore设计
1. 目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
2. 借鉴了Disruptor的RingBuffer的实现思路
![](http://kaimingwan.com/canal/_image/binlog%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%BC%80%E6%BA%90%E5%B7%A5%E5%85%B7canal%E6%BA%90%E7%A0%81%E6%B5%85%E6%9E%90%EF%BC%886%EF%BC%89%EF%BC%9Astore%E6%A8%A1%E5%9D%97/14-05-44.jpg)
定义了3个cursor
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
![](https://camo.githubusercontent.com/8684b7499fe7d4e022b8d9ffac1768f811caed01/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333233392f34663538393132642d376338652d333764652d623762382d3130646432316161366332322e6a7067)
实现说明:
- Put/Get/Ack cursor用于递增,采用long型存储
- buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高)
# 7 Instance设计
![](https://camo.githubusercontent.com/a43b8352e880d0bb43c02b2fc1603095d11cb5dc/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333234372f35646531633961662d373739382d336434322d626334332d3563353464383263346462662e6a7067)
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
- manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
- spring方式:基于spring xml + properties进行定义,构建spring配置.
# 8 Server设计
![](https://camo.githubusercontent.com/930fe71532c25812966359ed35ee2371ec4664e8/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333235372f66346466333862612d353965322d333938652d623565622d3162626662656363303637362e6a7067)
server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现
- Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
- Netty : 基于netty封装了一层网络协议,由canalserver保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题)
# 9 增量订阅/消费设计
![](https://camo.githubusercontent.com/db1debcfa50f4ebea1f56a1fa0e18a4e960cafcc/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333239372f39643765643133652d366138362d333836642d393266342d3835323233386334373562662e6a7067)
具体的协议格式,可参见:[CanalProtocol.proto](https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalProtocol.proto)
get/ack/rollback协议介绍:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,对应的数据对象格式:[EntryProtocol.proto](https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto)
- Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
a. 拿够batchSize条记录或者超过timeout时间
b. timeout=0,阻塞等到足够的batchSize
- void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:
- get/ack异步化,减少因ack带来的网络延迟和操作成本(99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
- get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:
![](https://camo.githubusercontent.com/533903f4925be003abe78b259e77fd829fc58c2b/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333330302f37663739383665352d643863362d336131372d383739362d3731313636653662633264632e6a7067)
- 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
- 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
- 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
- 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
流式api带来的异步响应模型:
![](https://camo.githubusercontent.com/99854985217eb95d4faa0eea00f8fe2631fe803a/687474703a2f2f646c322e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303039302f363437392f35323062313532392d366665612d333932372d393635362d3333363661316661333337622e6a7067)
# 10 数据对象格式:[EntryProtocol.proto](https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto)
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳]
schemaName [数据库实例]
tableName [表名]
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index [column序号]
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为文本]
说明:
- 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
- 可以提供ddl的变更语句
# 11 HA机制设计
canal的ha分为两部分,canal server和canal client分别有对应的ha实现
- canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
- canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
Canal Server:
![](https://camo.githubusercontent.com/c8f1d98268a307821273e94e7eefcd29a26f9b78/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333330332f64333230326332362d653935342d333563302d613331392d3537363034313032633537642e6a7067)
大致步骤:
1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.'
Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.
# 12 mysql的配置
a. canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.
[mysqld]
log-bin=mysql-bin #添加这一行开启binlog
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
b. canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限.
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
针对已有的账户可通过grant
# canal 配置
介绍配置之前,先了解下canal的配置加载方式:
![](https://camo.githubusercontent.com/3772f5e12fb46df1032685319c83c557b3f6a098/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038312f383735392f34666161376332652d656439352d333633382d613365322d3862343831633865343036302e6a7067)
canal配置方式有两种:
- ManagerCanalInstanceGenerator:基于manager管理的配置方式,目前alibaba内部配置使用这种方式。大家可以实现CanalConfigClient,连接各自的管理系统,即可完成接入。
- SpringCanalInstanceGenerator:基于本地spring xml的配置方式,目前开源版本已经自带该功能所有代码,建议使用
### Spring配置
spring配置的原理是将整个配置抽象为两部分:
- xxxx-instance.xml (canal组件的配置定义,可以在多个instance配置中共享)
- xxxx.properties (每个instance通道都有各自一份定义,因为每个mysql的ip,帐号,密码等信息不会相同)
通过spring的PropertyPlaceholderConfigurer通过机制将其融合,生成一份instance实例对象,每个instance对应的组件都是相互独立的,互不影响
### properties配置文件
properties配置分为两部分:
- canal.properties (系统根配置文件)
- instance.properties (instance级别的配置文件,每个instance一份)
### canal.properties介绍:
canal配置主要分为两部分定义:
1. instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)
参数名字 | 参数说明 | 默认值
--------------------------|---------------------------------------|--------------------------------------
canal.destinations | 当前server上部署的instance列表 |无
canal.conf.dir | conf/目录所在的路径 |../conf
canal.auto.scan |开启instance自动扫描,canal.conf.dir目录下的instance配置变化会自动触发处理|true
canal.auto.scan.interval | instance自动扫描的间隔时间,单位秒 |5
canal.instance.global.mode| 全局配置加载方式 |spring
canal.instance.global.lazy| 全局lazy模式 |false
canal.instance.global.manager.address| 全局的manager配置方式的链接信息 |无
canal.instance.global.spring.xml| 全局的spring配置方式的组件文件|classpath: spring/file-instance.xml
canal.instance.example.mode, canal.instance.example.lazy, canal.instance.example.spring.xml| instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式。命名规则:canal.instance.{name}.xxx |无
2. common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】
参数名字 |参数说明 |默认值
--------------------------|---------------------------------------|-----------------------------
canal.id |每个canal server实例的唯一标识,暂无实际意义| 1
canal.ip |canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 |无
canal.port |canal server提供socket服务的端口 |11111
canal.zkServers |canal server链接zookeeper集群的链接信息 例子:127.0.0.1:2181,127.0.0.1:2182 |无
canal.zookeeper.flush.period|canal持久化数据到zookeeper上的更新频率,单位毫秒 |1000
canal.file.data.dir canal |持久化数据到file上的目录 |../conf
canal.file.flush.period canal|持久化数据到file上的更新频率,单位毫秒|1000
canal.instance.memory.batch.mode|canal内存store中数据缓存模式。ITEMSIZE :根据buffer.size进行限制,只限制记录的数量; MEMSIZE: 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小 |MEMSIZE
canal.instance.memory.buffer.size | canal内存store中可缓存buffer记录数,需要为2的指数 | 16384
canal.instance.memory.buffer.memunit|内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小|1024
canal.instance.transactionn.size|最大事务完整解析的长度支持。超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性|1024
canal.instance.fallbackIntervalInSeconds|canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒(说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢)|60
canal.instance.detecting.enable|是否开启心跳检查|false
canal.instance.detecting.sql |心跳检查sql |insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time | 心跳检查频率,单位秒 | 3
canal.instance.detecting.retry.threshold | 心跳检查失败重试次数 | 3
canal.instance.detecting.heartbeatHaEnable | 心跳检查失败后,是否开启自动mysql自动切换(说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据)| false
canal.instance.network.receiveBufferSize | 网络链接参数,SocketOptions.SO_RCVBUF | 16384
canal.instance.network.sendBufferSize | 网络链接参数,SocketOptions.SO_SNDBUF | 16384
canal.instance.network.soTimeout | 网络链接参数,SocketOptions.SO_TIMEOUT| 30
canal.instance.filter.query.dcl | 是否忽略DCL的query语句,比如grant/create user等 | false
canal.instance.filter.query.dml | 是否忽略DML的query语句,比如insert/update/delete table | false
canal.instance.filter.query.ddl | 是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型) | false
canal.instance.get.ddl.isolation |ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回.(otter ddl同步使用)| false
### instance.properties介绍:
在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件
比如:
canal.destinations = example1,example2
这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.
ps. canal自带了一份instance.properties demo,可直接复制conf/example目录进行配置修改
cp -R example example1/
cp -R example example2/
instance.properties参数列表:
参数名字 |参数说明 |默认值
-----------------------------------|----------------------------------|---------------------------
canal.instance.mysql.slaveId |mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 | 1234
canal.instance.master.address |mysql主库链接地址 | 127.0.0.1:3306
canal.instance.master.journal.name |mysql主库链接时起始的binlog文件 |无
canal.instance.master.position |mysql主库链接时起始的binlog偏移量 |无
canal.instance.master.timestamp |mysql主库链接时起始的binlog的时间戳 |无
canal.instance.dbUsername |mysql数据库帐号 |canal
canal.instance.dbPassword |mysql数据库密码 |canal
canal.instance.defaultDatabaseName |mysql链接时默认schema |
canal.instance.connectionCharset |mysql 数据解析编码 |UTF-8
canal.instance.filter.regex mysql |数据解析关注的表,Perl正则表达式. |.*\\..*
几点说明:
1. mysql链接时的起始位置
- canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
- canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
- 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
2. mysql解析关注表定义 标准的Perl正则,注意转义时需要双斜杠:\\
3. mysql链接的编码 目前canal版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码
### instance.xml配置文件
目前默认支持的instance.xml有以下几种:
spring/memory-instance.xml
spring/file-instance.xml
spring/default-instance.xml
spring/group-instance.xml
在介绍instance配置之前,先了解一下canal如何维护一份增量订阅&消费的关系信息:
- 解析位点 (parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager)
- 消费位点 (canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalMetaManager)
对应的两个位点组件,目前都有几种实现:
- memory (memory-instance.xml中使用)
- zookeeper
- mixed
- file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)
- period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)
-------------------
**memory-instance.xml介绍:**
所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特点:速度最快,依赖最少(不需要zookeeper)
场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境
**file-instance.xml介绍:**
所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.
特点:支持单机持久化
场景:生产环境,无HA需求,简单可用.
**default-instance.xml介绍:**
所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
特点:支持HA
场景:生产环境,集群化部署.
**group-instance.xml介绍:**
主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。
场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
**instance.xml设计初衷:**
允许进行自定义扩展,比如实现了基于数据库的位点管理后,可以自定义一份自己的instance.xml,整个canal设计中最大的灵活性在于此
# 13 ClientAPI
在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal.
![](https://camo.githubusercontent.com/8cc684cf92e22d738d57b002c356afba96bcc4f5/687474703a2f2f646c322e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303039302f363435332f39326233343335302d323566632d333162332d626361362d3865326131653763356532322e6a7067)
大致分为几部分:
- ClientIdentity: canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)
- CanalConnector: SimpleCanalConnector/ClusterCanalConnector两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
- CanalNodeAccessStrategy: SimpleNodeAccessStrategy/ClusterNodeAccessStrategy两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.
- ClientRunningMonitor/ClientRunningListener/ClientRunningData: client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.
# 14 代码框架 类图设计
**整体类图设计**
![](https://camo.githubusercontent.com/a9b2f3b6a6a6b08d315ca1d50edf54da825b88b1/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353139322f35306635326161322d643838362d333366322d613661362d3639343631316438363962612e6a7067)
说明:
- CanalLifeCycle为所有canal模块的生命周期接口
- CanalInstance组合parser,sink,store三个子模块,三个子模块的生命周期统一受CanalInstance管理
- CanalServer聚合了多个CanalInstance
**EventParser类图设计和扩展**
![](https://camo.githubusercontent.com/18166478784e086d703bb7121a2d8f7e93127243/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353139362f36376439326533362d366162302d333362362d386363662d3565663362373539303564302e6a7067)
每个EventParser都会关联两个内部组件: CanalLogPositionManager , CanalHAController
- CanalLogPositionManager : 记录binlog最后一次解析成功位置信息,主要是描述下一次canal启动的位点
- CanalHAController:控制EventParser的链接主机管理,判断当前该链接哪个mysql数据库.
说明:
1. 目前开源版本只有支持mysql的协议(LocalBinlog就是类似于relay log的那种模式,直接根据relay log进行数据消费)
2. 内部版本会有OracleEventParser,获取oracle增量变更信息,因为涉及一些政治,商业和产品关系,没有随canal开源。(oracle增量解析目前为c语言开发,提供socket方式供canal接入)
**CanalLogPositionManager类图设计**
![](https://camo.githubusercontent.com/9e1bf18f449a20c5183234b45bee33fdcdce9fb0/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353230322f63303331646238302d636564302d333634662d626430312d6636613162356436333462652e6a7067)
说明:
1. 如果CanalEventStore选择的是内存模式,可不保留解析位置,下一次canal启动时直接依赖CanalMetaManager记录的最后一次消费成功的位点即可. (最后一次ack提交的数据位点)
2. 如果CanalEventStore选择的是持久化模式,可通过zookeeper记录位点信息,canal instance发生failover切换到另一台机器,可通过读取zookeeper获取位点信息.
通过实现自己的CanalLogPositionManager,比如记录位点信息到本地文件/nas文件,简单可用的无HA的模式.
**CanalHAController类图设计**
![](https://camo.githubusercontent.com/de18dd87ba5f6a5c26b59bb9c18fefda78255440/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353232342f35633065336639372d306265392d333761342d626563662d6136373336623333316435622e6a7067)
说明:
1. 常见的就是基于心跳语句,定时请求当前链接的数据库,超过一定次数检测失败时,尝试切换到备机.
2. 比如阿里内部会有一套数据库主备信息管理系统,DBA做了数据库主备切换或者机器下线,推送配置到各个应用节点,HAController收到后,控制EventParser进行链接切换.
**EventSink类图设计和扩展**
![](https://camo.githubusercontent.com/593a6e8c7c0e9fd0ae1fc1edf1da32d1ffb02315/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353234302f35643637363930382d343261372d333966322d616333372d6537653366666538653931372e6a7067)
**EventStore类图设计和扩展**
![](https://camo.githubusercontent.com/74a5c58b7f6eecad6e965fa466d2d71e1a72b1ba/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353234342f30376161343038392d393962312d333731642d613438632d6433633562666330616365622e6a7067)
说明:
1. 抽象了CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次ack清理等
2. CanalEventStore接口,主要包含put/get/ack/rollback的相关接口. put/get操作会组成一个生产者/消费者模式,每个store都会有存储大小设计,存储满了,put操作会阻塞等待get获取数据,所以不会无线占用存储,比如内存大小
a. 目前EventStore主要实现了memory模式,支持按照内存大小和内存记录数进行存储大小限制.
b. 后续可开发基于本地文件的存储模式
c. 基于文件存储和内存存储,开发mixed模式,做成两级队列,内存buffer有空位时,将文件的数据读入到内存buffer中。
*重要:实现基于mixed模式后,canal才可以说是完成真正的消费/订阅的模型 (取1份binlog数据,提供多个客户端消费,消费有快有慢,各自保留消费位点)*
**MetaManager类图设计和扩展**
![](https://camo.githubusercontent.com/320b9df34570fef32419b9034c768ff661f624d5/687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038322f353235352f35333134623836352d666437392d333834652d623737302d6339613232653433366335322e6a7067)
说明:metaManager目前同样支持了多种模式,最顶层的就是memory和zookeeper的模式,还有就是mixed模式,先写内存,再写zookeeper.通过实现自己的CanalMetaManager,比如记录位点信息到本地文件/nas文件,简单可用的无HA的模式.
**应用扩展 (Advanced Topic)**
上面介绍了相关模块的设计,这里介绍下如何将自己的扩展代码应用到canal中. https://github.com/alibaba/canal/wiki/DevGuide#%E5%BA%94%E7%94%A8%E6%89%A9%E5%B1%95
# 15 Canal源码分析
见博客链接: http://kaimingwan.com/tag/canal
# 16 Producer增量异常问题定位思路
按照上述的分析,增量异常通常有以下三种情况:
- mysql没有记录binlog:检查mysql的配置,更新数据后观察binlog的position是否增加
show variables like 'log_bin%';
show master status;
show binlog events in 'mysql-bin.000047';
- canal异常,没有去读取:观察canal.log和shops.log是否有异常日志;截取javacore观察canal的主要线程是否正常;
- producer的client没有发送getWithoutAck请求:观察producer的日志是否发送get请求
... ...