Canal 同步数据库
1. 什么是 Canal?它的核心作用是什么?
考察点: 数据库同步工具理解、中间件基础知识
Canal 是阿里开源的基于 MySQL 数据库 binlog 的增量订阅&消费组件。其核心作用是:
- 数据库镜像:实时同步数据库变更
- 数据库实时备份:基于 binlog 的增量备份
- 索引构建和实时维护:如 Elasticsearch 索引更新
- 业务缓存刷新:如 Redis 缓存同步
- 增量数据 处理:带业务逻辑的数据变更处理
Canal 的工作原理:
- Canal 模拟 MySQL slave 的交互协议
- 向 MySQL Master 发送 dump 协议
- MySQL Master 推送 binary log 给 Canal
- Canal 解析 binary log 并发送到目标存储
2. Canal 的架构设计和工作流程是怎样的?
考察点: 系统架构设计、分布式系统理解
关键组件说明:
- Canal Server:负责连接 MySQL,解析 binlog
- Canal Client:消费 Canal Server 的数据,执行业务逻辑
- Binary Log:MySQL 的二进制日志,记录所有数据变更
3. 在 Go 中如何实现 Canal 客户端?请设计一个基本的架构
考察点: Go 语言实践、设计模式、错误处理
package main
import (
"context"
"log"
"time"
"github.com/withlin/canal-go/client"
"github.com/withlin/canal-go/protocol"
)
type CanalClient struct {
client *client.SimpleCanalConnector
handlers map[string]DataHandler
}
type DataHandler interface {
Handle(entry *protocol.Entry) error
}
type UserDataHandler struct{}
func (h *UserDataHandler) Handle(entry *protocol.Entry) error {
// 处理用户表数据变更
log.Printf("处理用户数据: %v", entry)
return nil
}
func NewCanalClient(addr string, port int, username, password, destination string) *CanalClient {
connector := client.NewSimpleCanalConnector(addr, port, username, password, destination, 60000, 60*60*1000)
return &CanalClient{
client: connector,
handlers: make(map[string]DataHandler),
}
}
func (c *CanalClient) RegisterHandler(tableName string, handler DataHandler) {
c.handlers[tableName] = handler
}
func (c *CanalClient) Start(ctx context.Context) error {
err := c.client.Connect()
if err != nil {
return err
}
defer c.client.DisConnect()
// 订阅所有表
err = c.client.Subscribe(".*\\..*")
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
message, err := c.client.Get(100, nil, nil)
if err != nil {
log.Printf("获取数据失败: %v", err)
time.Sleep(time.Second)
continue
}
if message.Id == -1 || len(message.Entries) == 0 {
time.Sleep(time.Second)
continue
}
c.processEntries(message.Entries)
c.client.Ack(message.Id)
}
}
}
func (c *CanalClient) processEntries(entries []*protocol.Entry) {
for _, entry := range entries {
if entry.GetEntryType() == protocol.EntryType_TRANSACTIONBEGIN ||
entry.GetEntryType() == protocol.EntryType_TRANSACTIONEND {
continue
}
tableName := entry.GetHeader().GetTableName()
if handler, exists := c.handlers[tableName]; exists {
if err := handler.Handle(entry); err != nil {
log.Printf("处理数据失败: %v", err)
}
}
}
}